Category Archives: apache-kafka

Spark streaming with kafka avro serialization

I am storing data into Kafka using schema registry and avro serialization. Now I have to consume this messages from Kafka and store it in 2 places. One is parquet file and other one is Cassandra database. After lot of search I found that to perform above two tasks we need to convert streaming rdd's to data frame. However I am not able get java implementation for the same. Can you please suggest on this.

Spark streaming with kafka avro serialization

I am storing data into Kafka using schema registry and avro serialization. Now I have to consume this messages from Kafka and store it in 2 places. One is parquet file and other one is Cassandra database. After lot of search I found that to perform above two tasks we need to convert streaming rdd's to data frame. However I am not able get java implementation for the same. Can you please suggest on this.

Spark streaming with kafka avro serialization

I am storing data into Kafka using schema registry and avro serialization. Now I have to consume this messages from Kafka and store it in 2 places. One is parquet file and other one is Cassandra database. After lot of search I found that to perform above two tasks we need to convert streaming rdd's to data frame. However I am not able get java implementation for the same. Can you please suggest on this.

Sample code::

JavaPairReceiverInputDstream<String, GenericRecord> = KafkaUtils.createStream(jssc, String.class,
    GenericRecord.class, KafkavroDecoder.class,
    KafkavroDecoder.class, Kafkaesque,topicMap);

JavaDStream<GenericRecord> msg= Kafka.map(
new Function<Tuple2<String, GenericRecord>, GenericRecord>(){
public GenericRecord call(Tuple2<String, GenericRecord> Tuple2) {
      return tuple2._2();
   }
});

msg.print(); // output looks like :: Id=1 name= man

I want store above data in Cassandra table.

Installing failed with error code 1 in /tmp/pip_build_root/confluent-kafka

When I try to download kafka using sudo pip3 install confluent_kafka on Ubuntu 14.04 I get error shown below. I have the newest version of librdkafka installed by sudo apt-get install librdkafka-dev python-dev which is required as it is written in prerequisites on confluent-kafka-python github. Any ideas why it doesn't work?

Command /usr/bin/python3 -c "import setuptools, tokenize;__file__='/tmp/pip_build_root/confluent-kafka/setup.py';exec(compile(getattr(tokenize, 'open', open)(__file__).read().replace('\r\n', '\n'), __file__, 'exec'))" install --record /tmp/pip-no3e8l_m-record/install-record.txt --single-version-externally-managed --compile failed with error code 1 in /tmp/pip_build_root/confluent-kafka
Storing debug log for failure in /home/elastic/.pip/pip.log

Installing failed with error code 1 in /tmp/pip_build_root/confluent-kafka

When I try to download kafka using sudo pip3 install confluent_kafka on Ubuntu 14.04 I get error shown below. I have the newest version of librdkafka installed by sudo apt-get install librdkafka-dev python-dev which is required as it is written in prerequisites on confluent-kafka-python github. Any ideas why it doesn't work?

Command /usr/bin/python3 -c "import setuptools, tokenize;__file__='/tmp/pip_build_root/confluent-kafka/setup.py';exec(compile(getattr(tokenize, 'open', open)(__file__).read().replace('\r\n', '\n'), __file__, 'exec'))" install --record /tmp/pip-no3e8l_m-record/install-record.txt --single-version-externally-managed --compile failed with error code 1 in /tmp/pip_build_root/confluent-kafka
Storing debug log for failure in /home/elastic/.pip/pip.log

Ignite hang on a complex SQL join occurring on 2 server nodes

The scenario is

  1. Data is loaded from cassandra to a lookup cache -> replicated cache
  2. Data is fetched from kafka input topic to a map
  3. The map is inserted - using putall method - to another cache -> partitioned cache
  4. a complex join is done between both caches
  5. The result set is processed and inserted into layer 2 cache Then processed again to a layer 3 cache then to kafka output topic

the jar is compiled and run on 2 different nodes.

some records are fetched, processing runs on both nodes -results of the first batch are not accurate- and then hangs and keeps logging this message ?!

10/05/2018 16:39:48  INFO [tcp-disco-stats-printer-#4%cluster-VEON%] 
TcpDiscoverySpi: Discovery SPI statistics [statistics=TcpDiscoveryStatistics 
[joinStartedTs=1525962688220, joinFinishedTs=1525962706508, 
crdSinceTs=1525962706498, joinedNodesCnt=1, failedNodesCnt=0, leftNodesCnt=0, 
ackTimeoutsCnt=0, sockTimeoutsCnt=0, rcvdMsgs= 
{TcpDiscoveryMetricsUpdateMessage=474, TcpDiscoveryNodeAddedMessage=1, 
TcpDiscoveryNodeAddFinishedMessage=1, TcpDiscoveryJoinRequestMessage=2, 
TcpDiscoveryDiscardMessage=35, TcpDiscoveryConnectionCheckMessage=706, 
TcpDiscoveryCustomEventMessage=39}, procMsgs= 
{TcpDiscoveryMetricsUpdateMessage=715, TcpDiscoveryNodeAddedMessage=1, 
TcpDiscoveryNodeAddFinishedMessage=1, TcpDiscoveryJoinRequestMessage=2, 
TcpDiscoveryCustomEventMessage=53, TcpDiscoveryDiscardMessage=76}, 
avgMsgsSndTimes={TcpDiscoveryMetricsUpdateMessage=0, 
TcpDiscoveryNodeAddedMessage=10, TcpDiscoveryNodeAddFinishedMessage=0, 
TcpDiscoveryJoinRequestMessage=10, TcpDiscoveryConnectionCheckMessage=0, 
TcpDiscoveryDiscardMessage=0, TcpDiscoveryCustomEventMessage=0}, 
maxMsgsSndTimes={TcpDiscoveryMetricsUpdateMessage=10, 
TcpDiscoveryNodeAddedMessage=10, TcpDiscoveryNodeAddFinishedMessage=0, 
TcpDiscoveryJoinRequestMessage=10, TcpDiscoveryConnectionCheckMessage=42, 
TcpDiscoveryDiscardMessage=0, TcpDiscoveryCustomEventMessage=0}, sentMsgs= 
{TcpDiscoveryMetricsUpdateMessage=474, TcpDiscoveryNodeAddedMessage=1, 
TcpDiscoveryNodeAddFinishedMessage=1, TcpDiscoveryJoinRequestMessage=1, 
TcpDiscoveryConnectionCheckMessage=706, TcpDiscoveryDiscardMessage=35, 
TcpDiscoveryCustomEventMessage=33}, avgMsgsAckTimes= 
{TcpDiscoveryMetricsUpdateMessage=0, TcpDiscoveryNodeAddedMessage=10, 
TcpDiscoveryNodeAddFinishedMessage=0, TcpDiscoveryJoinRequestMessage=10, 
TcpDiscoveryConnectionCheckMessage=0, TcpDiscoveryDiscardMessage=0, 
TcpDiscoveryCustomEventMessage=0}, maxMsgsAckTimes= 
{TcpDiscoveryMetricsUpdateMessage=10, TcpDiscoveryNodeAddedMessage=10, 
TcpDiscoveryNodeAddFinishedMessage=0, TcpDiscoveryJoinRequestMessage=10, 
TcpDiscoveryConnectionCheckMessage=42, TcpDiscoveryDiscardMessage=0, 
TcpDiscoveryCustomEventMessage=0}, avgMsgQueueTime=0, maxMsgQueueTime=11, 
ringMsgsSent=35, avgRingMsgTime=0, maxRingMsgTime=121, 
maxRingTimeMsgCls=TcpDiscoveryNodeAddedMessage, avgMsgProcTime=0, 
maxMsgProcTime=91, maxProcTimeMsgCls=TcpDiscoveryJoinRequestMessage, 
sockReadersCreated=3, sockReadersRmv=2, avgSrvSockInitTime=3, 
maxSrvSockInitTime=10, clientSockCreatedCnt=2, avgClientSockInitTime=15, 
maxClientSockInitTime=21, pendingMsgsRegistered=35, pendingMsgsDiscarded=0], 
spiState=CONNECTED, coord=TcpDiscoveryNode [id=8a03f4a3-a631-42e7-bdf2- 
b062e1e977b6, addrs=[0:0:0:0:0:0:0:1, 10.19.1.14, 127.0.0.1, 172.16.44.139, 
172.23.48.1], sockAddrs=[WEGMM250132-O5F.TD.TERADATA.COM/10.19.1.14:47501, 
/172.16.44.139:47501, /172.23.48.1:47501, /0:0:0:0:0:0:0:1:47501, 
/127.0.0.1:47501], discPort=47501, order=1, intOrder=1, 
lastExchangeTime=1525962688210, loc=true, ver=2.0.0#20170430-sha1:d4eef3c6, 
isClient=false], next=TcpDiscoveryNode [id=ee98d2ff-40fe-4981-8a59- 
1b12b478bc11, addrs=[0:0:0:0:0:0:0:1, 10.19.1.14, 127.0.0.1, 172.16.44.139, 
172.23.48.1], sockAddrs=[WEGMM250132-O5F.TD.TERADATA.COM/10.19.1.14:47500, 
/172.23.48.1:47500, /0:0:0:0:0:0:0:1:47500, /127.0.0.1:47500, 
/172.16.44.139:47500], discPort=47500, order=2, intOrder=2, 
lastExchangeTime=1525962714266, loc=false, ver=2.0.0#20170430-sha1:d4eef3c6, 
isClient=false], intOrder=1, topSize=2, leavingNodesSize=0, 
failedNodesSize=0, 
joiningNodesSize=0, pendingCustomMsgs=0, msgWorker.queue.size=0, clients=0, 
clientWorkers=0, lastUpdate=05/10/2018 16:39:47, heapFree=5727M, 
heapTotal=5888M] 
  • some times I get this message too

    [GridCachePartitionExchangeManager] Failed to wait for initial partition map exchange. Possible reasons are:

  • ^-- Transactions in deadlock.
  • ^-- Long running transactions (ignore if this is the case).
  • ^-- Unreleased explicit locks.

How can I solve this in your opinions?

Kafka avro serialization with schema evolution

I am trying to build a kakfa pipeline which will read JSON input data into Kafka topic. I am using AVRO serialization with schema registry , as my schema gets changed on regular basis. As of now GenericRecord is used to parse the schema. But recently I came to know that avro-tools are available to read schema and generate java classes which can be used to create Producer Code. I am confused choose between these two options. Can you please suggest me which one is better as my schema gets changed frequently.

How to integrate connectors with Apache Kafka connect without confluent

There is a requirement where we get a stream of data from Kafka Stream and our objective is to push this data to SOLR.

We did some reading but we could find there are lot of apache solutions available in the market, but the problem is we donot know which is the best solution and how to achieve.

The options are 1. Use solr connector to connect with KAFKA. 2. Use Apache storm as it directly provides support for integrating with SOLR.

There is no much documentation or indepth information provided for the above mentioned options.

Will anyone be kind enough to let me know, How we can use a solr connector and integrate with kafka stream without using confluent.

Also, if anyone is aware about some documentation providing information about using Apache Storm With SOLR is also appreciated.

Classpath is empty. Please build the project first e.g. by running ‘./gradlew jar -PscalaVersion=2.11.12′

I am not able to run a Apache Kafka service due to a failure while trying to start a Zookeeper instance. I have downloaded and tried it with all 3 availabe downloads at the official site. (binarys and source) When i try to start zookeeper with

./bin/zookeeper-server-start.sh config/zookeeper.properties

I always get the same error message:

Classpath is empty. Please build the project first e.g. by running './gradlew jar -PscalaVersion=2.11.12'

The same goes for (after starting a seperate zookeeper (not the build-in from kakfa) instance)

./bin/kafka-server-start.sh config/server.properties

I have tried it under Ubuntu 17.04 and 18.04. When i try this on a virtual machine using Ubuntu 16.04 it works.

Unfortunatly, all i found regarding this problem, was for Windows. Thank you for any help.