Category Archives: apache-storm

How to integrate connectors with Apache Kafka connect without confluent

There is a requirement where we get a stream of data from Kafka Stream and our objective is to push this data to SOLR.

We did some reading but we could find there are lot of apache solutions available in the market, but the problem is we donot know which is the best solution and how to achieve.

The options are 1. Use solr connector to connect with KAFKA. 2. Use Apache storm as it directly provides support for integrating with SOLR.

There is no much documentation or indepth information provided for the above mentioned options.

Will anyone be kind enough to let me know, How we can use a solr connector and integrate with kafka stream without using confluent.

Also, if anyone is aware about some documentation providing information about using Apache Storm With SOLR is also appreciated.

Change Log4J Configuration for Apache Storm Topology

I'm currently submitting Storm topologies programatically via my Java application by using the following command:

Nimbus.Client client = NimbusClient.getConfiguredClient(stormConfigProvider.getStormConfig()).getClient();
client.submitTopology(
        this.topologyID.toString(),
        stormJarManager.getRemoteJarLocation(),
        JSONValue.toJSONString(stormConfigProvider.getStormConfig()),
        topology
);

In my scenario, I have two kinds of topologies. Testing topologies and production topologies. For both kind of topologies, I require different types of logging. While the testing topologies run with TRACE level, the production topologies will run with INFO level. In addition, I require that the production topologies have a SPLUNK Log4J2 appender configured, to centralize the logging of my production application.

For that, I included a log4j.xml file into my topology JAR which configures the SPLUNK appender. However, the log4j.xml file is not honored by the Server. Instead, the Storm Server seems to use its own configuration.

How can I change my log4j configuration for different topologies? (I don't want to modify the log4j.xml on each worker).

Best regards, André

Kafka and Storm commitSync timing

I'm trying to design a system that has 2 major workflows:

Case 1: producers query for documents from sources. I need to do processing on each document, then save the result of that to MongoDB. In this case I figured a few Kafka producers would get the source and add it to a topic, and Storm would grab from the topic and Bolts would do the processing and save. From what I've read this should have high throughput.

Case 2: My webapp allows users to upload documents, so I wanted to just add them to a separate kafka queue (so my webapp acts as a producer), but I don't want to ack the item in the consumer until I've saved to Mongo, so that when I get the callback from producer.send, I can query the db and return the processed doc to the user (basically I want the producer to know when the data is in the db, and at that point query it and send the response to the user).

Question 1: for Case 1 it is completely asynchronous, so I think Storm is the way to go from what I've read, but I'm not sure I completely understand the benefit vs spinning up a bunch of my own consumer threads.

Question 2: I'm confused by how commitSync works in the consumer. If I have a bunch of consumers subscribing to the same topic, and they wait until they're done processing before calling commitSync, what happens if another consumer polls kafka while it's processing...will it get the same offset the previous one does,or does kafka know it's being handled and give it the next offset? In this case, if a consumer has already gotten the next offset and a previous consumer fails, what happens to that entry...does kafka know to give it to the next consumer to poll?

Question 3: For Storm, it looks like nextTuple calls commitSync. I can't tell if it happens after the last Bolt in the tree has called ack, or when the first Bolt in the tree accepts the message. For Case 1, asynchronous, this doesn't really matter, but for Case 2 it does. Is there a way to use Storm for both cases, or should I write my own standalone consumers for Case 2 to work. I'd rather have 1 set of consumers that handle both and just check the synchronous topic, if nothing is in the queue check the async one.

I just started looking into these technologies for this project, and haven't been able to find clear answers. I'm in the exploratory phase, so I want to make sure the technology choices for the architecture makes sense.

Also, if I'm off base and anyone has suggestions for better technologies that would be greatly appreciated.

Thanks in advance!

TupleWindow Start/End Time in Apache Storm

I have been developing a profilling application works on CDR(Call Detail Record) data in Apache Storm. Application's main purpose is extracting of Caller TotalCallCount and TotalCallDuration during a specified time block(in every window). For profilling I want to use SlidingWindow technique.

To understand you can look at following image SlidingWindow Image

For profilling I need to know when TupleWindow started and ended. I mean what is the timestamp of TupleWindow or timestamp of SlidingWindow for start and end.

Even if I looked up implementation of Storm, I couldn't find anything about that. Could you help me about how can figure it out?

Thank you very much

Storm failed to sync supervisor

I am trying to set up a Storm on cluster on 4 nodes, with one node running Nimbus, UI and ZooKeeper and the other 3 running Supervisors. my /etc/hosts is the following:

127.0.0.1      localhost
172.31.20.172  nimnbus
172.31.20.172  zkserver
172.31.20.173  supervisor1
172.31.20.174  supervisor2
172.31.20.175  supervisor3

When I started the cluster, on the UI page I can see that all three supervisors showed up correctly. But for nimbus summary, it said the following: (Not sure if this is the problem though)

host     port    status
nimbus   6627    offline
zkserver 6627    Leader

Then I went on to run the storm example on the nimbus node using the following command (note I am running storm 1.0.3)

./apache-storm-1.0.3/bin/storm jar apache-storm-1.0.3/examples/storm-starter/storm-starter-topologies-1.0.3.jar org.apache.storm.starter.ExclamationTopology

Then the following exception showed up

    17036 [Thread-14] ERROR o.a.s.d.s.ReadClusterState - Failed to Sync Supervisor
java.lang.RuntimeException: java.lang.InterruptedException
    at org.apache.storm.utils.Utils.wrapInRuntime(Utils.java:1507) ~[storm-core-1.0.3.jar:1.0.3]
    at org.apache.storm.zookeeper.Zookeeper.existsNode(Zookeeper.java:162) ~[storm-core-1.0.3.jar:1.0.3]
    at org.apache.storm.zookeeper.Zookeeper.getVersion(Zookeeper.java:241) ~[storm-core-1.0.3.jar:1.0.3]
    at org.apache.storm.cluster.ZKStateStorage.get_version(ZKStateStorage.java:163) ~[storm-core-1.0.3.jar:1.0.3]
    at org.apache.storm.cluster.StormClusterStateImpl.assignmentVersion(StormClusterStateImpl.java:185) ~[storm-core-1.0.3.jar:1.0.3]
    at org.apache.storm.daemon.supervisor.ReadClusterState.getAssignmentsSnapshot(ReadClusterState.java:191) ~[storm-core-1.0.3.jar:1.0.3]
    at org.apache.storm.daemon.supervisor.ReadClusterState.run(ReadClusterState.java:128) [storm-core-1.0.3.jar:1.0.3]
    at org.apache.storm.event.EventManagerImp$1.run(EventManagerImp.java:54) [storm-core-1.0.3.jar:1.0.3]
Caused by: java.lang.InterruptedException
    at java.lang.Object.wait(Native Method) ~[?:1.8.0_121]
    at java.lang.Object.wait(Object.java:502) ~[?:1.8.0_121]
    at org.apache.storm.shade.org.apache.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1342) ~[storm-core-1.0.3.jar:1.0.3]
    at org.apache.storm.shade.org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:1040) ~[storm-core-1.0.3.jar:1.0.3]
    at org.apache.storm.shade.org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:1073) ~[storm-core-1.0.3.jar:1.0.3]
    at org.apache.storm.shade.org.apache.curator.framework.imps.ExistsBuilderImpl$3.call(ExistsBuilderImpl.java:222) ~[storm-core-1.0.3.jar:1.0.3]
    at org.apache.storm.shade.org.apache.curator.framework.imps.ExistsBuilderImpl$3.call(ExistsBuilderImpl.java:215) ~[storm-core-1.0.3.jar:1.0.3]
    at org.apache.storm.shade.org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:108) ~[storm-core-1.0.3.jar:1.0.3]
    at org.apache.storm.shade.org.apache.curator.framework.imps.ExistsBuilderImpl.pathInForegroundStandard(ExistsBuilderImpl.java:212) ~[storm-core-1.0.3.jar:1.0.3]
    at org.apache.storm.shade.org.apache.curator.framework.imps.ExistsBuilderImpl.pathInForeground(ExistsBuilderImpl.java:205) ~[storm-core-1.0.3.jar:1.0.3]
    at org.apache.storm.shade.org.apache.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:168) ~[storm-core-1.0.3.jar:1.0.3]
    at org.apache.storm.shade.org.apache.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:39) ~[storm-core-1.0.3.jar:1.0.3]
    at org.apache.storm.zookeeper.Zookeeper.existsNode(Zookeeper.java:157) ~[storm-core-1.0.3.jar:1.0.3]
    ... 6 more

Which indicates that my nimbus node can not talk to the supervisor node. I also checked the logs on the supervisors, it seemed that all they recorded was that they have connected to ZooKeeper. Can someone tell me what is the problem? All these machines can ping each others.

2 spout read 2files (2files each other).How?

I have 2 spout and 4 files and for each spout I want to read 2files. I wil give you for example now: for the first spout:

builder.setSpout("Spout1",new ReadLineSpout(FileName1,1);

and

builder.setSpout("Spout1",new ReadLineSpout(FileName2,1);

for the second spout:

builder.setSpout("Spout2",new ReadLineSpout(FileName3,1);

and

builder.setSpout("Spout2",new ReadLineSpout(FileName4,1);

Is this solution correct? Is there any other solution that you know of?

Apache Storm Extending Event Logging Functionality

According to the Apache Storm documentation (http://storm.apache.org/releases/1.0.3/Eventlogging.html - Extending eventlogging) there is a possibility to add custom implementation when logging events. The problem is that I could not find anywhere how to actually do it. The only thing I could see is the method addEventLogger in StormCommon class (https://github.com/apache/storm/blob/47dd09c1f258e33f752b84db7ad5ae0e74cdb1f5/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java#L368) but can't see a way to access this method.

Is there any detailed explanation about the Storm Topology Metrics?

I am collecting the stats from the Storm UI REST API, but I am confused about what they mean even after reading the [Storm API page]. 1http://storm.apache.org/releases/0.9.6/STORM-UI-REST-API.html

What exactly is

  1. "completeLatency" of Spout?

    • According to the API page, "Total latency for processing the message" whereas "the message" means the last processed message or the average total latency of processed messages?
    • As its value is always larger, does it include other latencies (i.e. executeLatency and processLatency of bolts)?
  2. "tuple"

    • is it same as "message" that can be used interchangeably?
  3. "executeLatency" of bolt?

    • how is it measured?
    • Is this per message/tuple?
  4. "processLatency" of bolt?

    • how is it measured
    • Is this per message/tuple?
  5. any batch-related latency for trident-spout?

Below is the screenshot of the Storm UI.

enter image description here