Category Archives: spark-streaming

Spark streaming with kafka avro serialization

I am storing data into Kafka using schema registry and avro serialization. Now I have to consume this messages from Kafka and store it in 2 places. One is parquet file and other one is Cassandra database. After lot of search I found that to perform above two tasks we need to convert streaming rdd's to data frame. However I am not able get java implementation for the same. Can you please suggest on this.

Spark streaming with kafka avro serialization

I am storing data into Kafka using schema registry and avro serialization. Now I have to consume this messages from Kafka and store it in 2 places. One is parquet file and other one is Cassandra database. After lot of search I found that to perform above two tasks we need to convert streaming rdd's to data frame. However I am not able get java implementation for the same. Can you please suggest on this.

Spark streaming with kafka avro serialization

I am storing data into Kafka using schema registry and avro serialization. Now I have to consume this messages from Kafka and store it in 2 places. One is parquet file and other one is Cassandra database. After lot of search I found that to perform above two tasks we need to convert streaming rdd's to data frame. However I am not able get java implementation for the same. Can you please suggest on this.

Sample code::

JavaPairReceiverInputDstream<String, GenericRecord> = KafkaUtils.createStream(jssc, String.class,
    GenericRecord.class, KafkavroDecoder.class,
    KafkavroDecoder.class, Kafkaesque,topicMap);

JavaDStream<GenericRecord> msg= Kafka.map(
new Function<Tuple2<String, GenericRecord>, GenericRecord>(){
public GenericRecord call(Tuple2<String, GenericRecord> Tuple2) {
      return tuple2._2();
   }
});

msg.print(); // output looks like :: Id=1 name= man

I want store above data in Cassandra table.

How to keep a program running using spark streaming

I using Apache Spark on my Java project, the result of the program occurs only when I run my program, now I want my program to be always up and running using SparkStreaming.

My project is structured as follow :

  • Package.launch :

    public class App

    {

    public App(){
    
        new Launch();
    }
    public static void main( String[] args )
    {
        new App();
    }
    

    }

    public class Launch {

    Read read = new Read();
    Transform transform = new Transform();
    Write write = new Write();
    
    public Launch(){
    
        write.getWriter(
                        transform.getTransformer(
                                                    read.getReader())); 
    }   
    

    }

  • Package.read :

    public class Read {

    public Dataset getReader(){
        // Read from csv file then return dataset
        return ds;
    }
    

    }

  • Package.transform :

    public class Transform {

    public Dataset getTransformer(Dataset ds){
    
        //do transfomration on Dataset ds then return the final Dataset
    
        return ds;
    
    }
    

    }

  • Package.write :

    public class Write {

    public void getWriter(Dataset ds){
        // write the result on csv file 
    }
    

    }

ClassNotFoundException in SparkStreaming Example

I am new to Spark streaming and trying to run a example from this tutorial and I am following MAKING AND RUNNING OUR OWN NETWORKWORDCOUNT. I have completed 8th step and made a jar from sbt.

Now I am trying to run deploy my jar using the command in 9th step like this:

bin/spark-submit --class "NetworkWordCount" --master spark://abc:7077 target/scala-2.11/networkcount_2.11-1.0.jar localhost 9999

but when I run this command I get following exception:

java.lang.ClassNotFoundException: NetworkWordCount at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.spark.util.Utils$.classForName(Utils.scala:229) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:700) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

jar that I have created contains "NetworkWordCount" class having the following code from the spark examples

package src.main.scala

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

object NetworkWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: NetworkWordCount <hostname> <port>")
      System.exit(1)
    }

    //StreamingExamples.setStreamingLogLevels()

    // Create the context with a 1 second batch size
    val sparkConf = new SparkConf().setAppName("MyNetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

I am unable to identify what am I doing wrong.

Spark Sreaming: broadcast variable size limit?

I want to use a broadcast variable in Spark Streaming, and according to a previous question: Is there any limit on size of a spark broadcast variable? , the answer was that there is a limit of Integer.MAX_VALUE, which means 2-3 GB.

I need to broadcast a lookup table (lets say HashMap for example) to all mappers, and it may weight up to several GB (at least 2).

Is it possible or the limit size prevent me to implement it that way?

Spark streaming and Kafka intergration

I'm new to Apache Spark and I've been doing a project related to sentiment analysis on twitter data which involves spark streaming and kafka integration. I have been following the github code (link provided below)

https://github.com/sridharswamy/Twitter-Sentiment-Analysis-Using-Spark-Streaming-And-Kafka However, in the last stage, that is during the integration of Kafka with Apache Spark, the following errors were obtained

py4j.protocol.Py4JError: An error occurred while calling o24.createDirectStreamWithoutMessageHandler. Trace:
py4j.Py4JException: Method createDirectStreamWithoutMessageHandler([class org.apache.spark.streaming.api.java.JavaStreamingContext, class java.util.HashMap, class java.util.HashSet, class java.util.HashMap]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:272)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)

Command used: bin/spark-submit --packages org.apache.spark:spark-streaming-kafka_2.10:1.5.1 twitterStream.py

Apache Spark version: spark-2.1.0-bin-hadoop2.4

Kafka version: kafka_2.11-0.10.1.1

I haven't been able to debug this and any help would be much appreciated.

Apache Spark – Multiple spark context error

I am getting the Multiple spark context error.

Can anybody help me in resolving this?

If I take parsing.take(1) its running fine. but it is giving Multiple spark context error when i do take > 2 in the last line of my code.

Any help is much appreciated

from pyspark import SparkConf
from pyspark import SparkContext

sc = SparkContext()

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

############ IRIS DataSet ##############
iris= sc.textFile("hdfs:///user/edureka/IRIS.csv")
testset,trainingset = iris.randomSplit([1,2])

import numpy as np
def parse_interaction(line):
    line_split = line.split(",")
    # keep just numeric and logical values
    symbolic_indexes = [4] # Specify the columns which has the String values
    features = [item for i,item in enumerate(line_split) if i not in symbolic_indexes]
    return np.array([float(x) for x in features])


def parse_interaction_label(line):
    line_split = line.split(",")
    # keep just numeric and logical values
    symbolic_indexes = [4] # Specify the columns which has the String values
    label = [item for i,item in enumerate(line_split) if i in symbolic_indexes]
    return np.array([float(x) for x in label])


features_train = trainingset.map(parse_interaction)

labels_train = trainingset.map(parse_interaction_label)

features_test=testset.map(parse_interaction)

labels_test=testset.map(parse_interaction_label)

def parse_interaction_with_key(line):
    line_split = line.split(",")
    # keep just numeric and logical values
    #symbolic_indexes = [4] # Specify the columns which has the String values
    features_label = [item for i,item in enumerate(line_split)]
    return (np.array([float(x) for x in features_label]))

features_train_label = trainingset.map(parse_interaction_with_key)
features_test_label= testset.map(parse_interaction_with_key)

product=features_train_label.cartesian(features_test_label)


import math
def distancecal(line):
    training_label=line[0]
    training=training_label[0:4] # hardcoded the Training Column
    train_label = training_label[-1]
    testing_label=line[1]
    test=testing_label[0:4]  # Hardcoded the Testing column Modified the Testing Column
    stringtest=str(line[1])
    points=zip(training,test)
    diffs_squared_distance = [pow(a - b, 2) for (a, b) in points]
    score = math.sqrt(sum(diffs_squared_distance))
    training_label = np.append(training_label,score)
    return (stringtest,training_label)

training_label_test_score = product.map(distancecal)

keyvalue=training_label_test_score.groupByKey().mapValues(list)

def sortingvalue(l):
    from pyspark import SparkConf
    from pyspark import SparkContext
    #conf1 = SparkConf()
    #conf1.setAppName('Sorting Job Stream')
    #conf1.set("spark.ui.port", "36000")
    #conf1.set("spark.driver.allowMultipleContexts", "true")
    sc1 = SparkContext()
    v = sc1.parallelize(l)
    vSorted = v.sortBy(lambda a: a[5])
    return(vSorted.collect())


def parsekeyvalueforsorting(line):
    key=line[0]
    cdata=line[1]
    scdata=sortingvalue(cdata)
    return (key,scdata)


parsing=keyvalue.map(parsekeyvalueforsorting)
print(parsing.take(2))

Create Analytics from http using spark streaming

Hi My reqmnt is to create Analytics from http://10.3.9.34:9900/messages that is pull data from from http://10.3.9.34:9900/messages and put this data in HDFS location /user/cloudera/flume and from HDFS create Analytics report using Tableau or HUE UI . I tried with below code at scala console of spark-shell of CDH5.5 but unable to fetch data from the http link

import org.apache.spark.SparkContext
val dataRDD = sc.textFile("http://10.3.9.34:9900/messages")
dataRDD.collect().foreach(println)
dataRDD.count()
dataRDD.saveAsTextFile("/user/cloudera/flume")

I get below error at scala console:

java.io.IOException: No FileSystem for scheme: http at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2623) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2637) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:93) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2680) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2662) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:379) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)

Is it possible to perform JOIN operation on two DATAFILES in apache spark?

I am doing some work related to sort shuffling in Spark. I think one map task creates one datafile (data in form of serialized objects) and one indexfile (to point records of that datafile). I want to perform JOIN on two different datafiles (of two different map tasks). Is it possible to do that by changing internal code of Spark? Please help me. Thank you