Category Archives: apache-spark

Understanding combineByKey process in spark RDD

Here is my dataset:

val combineByKeyRdd = sc.parallelize(List(
                     ("key1", 1),
                     ("key2", 2),
                     ("key3", 39),
                     ("key1", 1),
                     ("key2", 2),
                     ("key4", 22)))  

and below is my combineByKey code:

combineByKeyRdd.combineByKey(
 |     (x:Int) => (x, 1),
 |     (acc:(Int, Int), x) => (acc._1 + x, acc._2 + 1),
 |     (acc1:(Int, Int), acc2:(Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2))  

Below is my understanding:

The first parameter i.e (x:Int) => (x, 1)-This function is invoked only once for every key.It creates the initial value of the accumulator

Second parameter i.e (acc:(Int, Int), x) => (acc._1 + x, acc._2 + 1) is for merging values. So what I understood is it will take every new key and it will add to the existing same key and its value and it will repeat till all the same keys are covered.So basically below is how I think it should be:

first combiner for key1 is initialized at 1 then below part  
mergeValue(accum[key1], 1) ==> accum[key1] = (1 + 1 (values), 1 + 1 (occurences/partitions))  

Third and the final part is of combining the combiners which adds values for that particular key:
For key1:

mergeCombiner((1, 1), (1, 1)) ==> (1 + 1, 1 + 1)  

final result would be in Array form:

Array[(String, (Int, Int))] = Array((key3,(39,1)), (key4,(22,1)), (key1,(2,2)),))  

So, am I correct conceptually?? I hope so that I am clear enough to express what I understood. If not please explain me in layman terms how it should be then. Thanks in advance.

How to keep a program running using spark streaming

I using Apache Spark on my Java project, the result of the program occurs only when I run my program, now I want my program to be always up and running using SparkStreaming.

My project is structured as follow :

  • Package.launch :

    public class App

    {

    public App(){
    
        new Launch();
    }
    public static void main( String[] args )
    {
        new App();
    }
    

    }

    public class Launch {

    Read read = new Read();
    Transform transform = new Transform();
    Write write = new Write();
    
    public Launch(){
    
        write.getWriter(
                        transform.getTransformer(
                                                    read.getReader())); 
    }   
    

    }

  • Package.read :

    public class Read {

    public Dataset getReader(){
        // Read from csv file then return dataset
        return ds;
    }
    

    }

  • Package.transform :

    public class Transform {

    public Dataset getTransformer(Dataset ds){
    
        //do transfomration on Dataset ds then return the final Dataset
    
        return ds;
    
    }
    

    }

  • Package.write :

    public class Write {

    public void getWriter(Dataset ds){
        // write the result on csv file 
    }
    

    }

Append to existing dataframe

I am using Java to create a JavaRDD and persisting it to memory and disk. I am trying to find if dataframe exists before appending new data to it. I will have incremental data to be added to dataframe on a daily basis. Any suggestions please? I am looking to satisfy if condition

if(DATA_FRAME_EXISTS) {
    //get existing dataframe and append new JavaRDD to it
} else {
        JavaRDD<URI> uriRDD = spark.read().option("quote", "\"")                    
                .textFile("C:\\tmp\\1.csv").javaRDD().map(line -> {
                    String[] parts = line.split(",");
                    URI uri = new URI();
                    uri.url("test")
                    return uri;
                });

        Dataset<Row> uriDF = spark.createDataFrame(uriRDD, URI.class);
        uriDF.persist(StorageLevel.MEMORY_AND_DISK());
        uriDF.createOrReplaceTempView("uri");
}       

Analysis of Movie Ratings percentages across Occupation and Movie Genre

I just started learning Spark Programming and Python programming: Can you please help me understand my mistake in my code:

I am running code in Jupyter Notebooks, interactive mode.

  1. The below test code is working fine, where I tested the concept:

     rdd = sc.parallelize([('librarian', (1, [0, 0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])), ('librarian', (1, [0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0]))])
    
    result = rdd.reduceByKey(lambda x, y: ((x[0]+y[0]), (x[1][0]+y[1][0]), (x[1][1]+y[1][1]), (x[1][2]+y[1][2]), (x[1][3]+y[1][3]), (x[1][4]+y[1][4]), (x[1][5]+y[1][5]), (x[1][6]+y[1][6]), (x[1][7]+y[1][7]), (x[1][8]+y[1][8]), (x[1][9]+y[1][9]), (x[1][10]+y[1][10]), (x[1][11]+y[1][11])))
    print (result.top(3))
    

    Output:

    [('librarian', (2, 0, 1, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0))]
    
  2. Below also works fine:

    #[(movieid, genre_list)]
    
    aggregateRDD = movieRDD.reduceByKey(lambda x, y: ((x[0]+y[0]), (x[1][0]+y[1][0]), (x[1][1]+y[1][1])))
    print (aggregateRDD.top(3))
    

    Output:

    [(1682, [0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]), (1681, [0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]), (1680, [0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0])]
    
  3. But, when I use a similar concept in my program, it doesn't accept. What am I doing wrong:

    ##############################################################################
    ### Analysis of Movie Ratings percentages across Occupation and Movie Genre
    ##############################################################################
    
    from pyspark import SparkConf, SparkContext
    
    conf = SparkConf().setMaster("local").setAppName("popularMovie")
    sc = SparkContext(conf =conf)
    
    ###import movie ratings into RDD
    ratingLines = sc.textFile("C:/SparkCourse/ml-100k/u.data")
    ###import user details into RDD
    userLines = sc.textFile("///SparkCourse/ml-100k/u.user")
    ###import movie data into RDD
    movieLines = sc.textFile("C:/SparkCourse/ml-100k/u.item")
    ###import genre data into RDD
    genreLines = sc.textFile("C:/SparkCourse/ml-100k/u.genre")
    
    ###split on delimiter functions
    def splitRatingTab(line):
        line = line.split('\t')
        return (int(line[0]), int(line[1]), int(line[2])) #(movieid, user, rating)
    def splitUserPipe(line):
        line = line.split('|')
        return (int(line[0]), line[3]) #(user, occupation)
    def splitMoviePipe(line):
        line = line.split('|')
        return (int(line[0]), list(listToIntElements(line[5:]))) #(movieid, genre_list[])
    
    
    def listToIntElements(lst):
        """conver the boolean text ('0', '1') genre value to integers (0, 1)"""
        for cnt, _ in enumerate(lst):
            lst[cnt] = int(_)
        return lst
    
    ###create dictionary object for genreid and genre
    def loadMovieGenre():
        """
        create dictionary object for genreid and genre
        """
        genre = {}
        with open('C:/SparkCourse/ml-100k/u.genre') as file:
            for line in file:
                #each line is of type [genere, genreid]
                line = line.split('|')
                #convert genreid to int, to remove new line '\n' at the end of string
                genre[int(line[1])] = line[0]
            return genre
    
    
    ### Transform to RDD as [(movieid, user, rating)] for movies, which are reviewed by viewers
    ratingRDD = ratingLines.map(lambda line: splitRatingTab(line))
    
    ### Transform to RDD as [(user, occupation)]
    occupationRDD = userLines.map(splitUserPipe)
    
    ### Transform to RDD as [(movieid, genre_list)], genre is boolean value, movie can be in multiple genres
    movieRDD = movieLines.map(splitMoviePipe)
    
    ###join Transformed rating RDD [(movieid, (user, rating))] and movieRDD [(movieid, genre] to get all genres; 
    ###then Transform to [(movieid,((userid, rating), genre) )]
    joinRatingMovieGenres = ratingRDD.map(lambda line: (line[0], (line[1], line[2]))).join(movieRDD)
    
    
    ###Transform joinRatingMovieGenres to RDD [userid, (rating, genre)] and join with occupationRDD [(userid, occupation)]
    ###to Transform to [(occupation, ((1, genre)))]
    transRatingMovieGenres = joinRatingMovieGenres.map(lambda line: (line[1][0][0], (line[1][0][1], line[1][1])))
    joinRatingGenresOccup = transRatingMovieGenres.join(occupationRDD).map(lambda line: (line[1][1], (1, line[1][0][1])))
    print (joinRatingGenresOccup.take(2))
    
    
    ###Transform by Aggregating the ratingCount and genreCount to [(occupation, (totalRatings, {cntGenresRating}))]
    totalRatingGenreCntByOccupation = joinRatingGenresOccup.reduceByKey(lambda x, y: ((x[0]+y[0]), (x[1][0]+y[1][0]), (x[1][1]+y[1][1]), (x[1][2]+y[1][12]), (x[1][3]+y[1][3])))
    print (totalRatingGenreCntByOccupation.take(2))
    

    Error:

    [('librarian', (1, [0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])), ('librarian', (1, [0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0]))]
    ---------------------------------------------------------------------------
    Py4JJavaError                             Traceback (most recent call last)
    <ipython-input-137-a156d8bbfde9> in <module>()
    ----> 1 get_ipython().run_cell_magic('time', '', '\n##############################################################################\n### Analysis of Movie Ratings percentages across Occupation and Movie Genre\n##############################################################################\n\n#import movie ratings into RDD\nratingLines = sc.textFile("C:/SparkCourse/ml-100k/u.data")\n#import user details into RDD\nuserLines = sc.textFile("///SparkCourse/ml-100k/u.user")\n#import movie data into RDD\nmovieLines = sc.textFile("C:/SparkCourse/ml-100k/u.item")\n#import genre data into RDD\ngenreLines = sc.textFile("C:/SparkCourse/ml-100k/u.genre")\n\n#split on delimiter functions\ndef splitRatingTab(line):\n    line = line.split(\'\\t\')\n    return (int(line[0]), int(line[1]), int(line[2])) #(movieid, user, rating)\ndef splitUserPipe(line):\n    line = line.split(\'|\')\n    return (int(line[0]), line[3]) #(user, occupation)\ndef splitMoviePipe(line):\n    line = line.split(\'|\')\n    return (int(line[0]), list(listToIntElements(line[5:]))) #(movieid, genre_list[])\n\n\ndef listToIntElements(lst):\n    """conver the boolean text (\'0\', \'1\') genre value to integers (0, 1)"""\n    for cnt, _ in enumerate(lst):\n        lst[cnt] = int(_)\n    return lst\n\n#create dictionary object for genreid and genre\ndef loadMovieGenre():\n    """\n    create dictionary object for genreid and genre\n    """\n    genre = {}\n    with open(\'C:/SparkCourse/ml-100k/u.genre\') as file:\n        for line in file:\n            #each line is of type [genere, genreid]\n            line = line.split(\'|\')\n            #convert genreid to int, to remove new line \'\\n\' at the end of string\n            genre[int(line[1])] = line[0]\n        return genre\n\n    \n# Transform to RDD as [(movieid, user, rating)] for movies, which are reviewed by viewers\nratingRDD = ratingLines.map(lambda line: splitRatingTab(line))\n#print (\'ratingRDD:\\n\',ratingRDD.top(5))\n\n# Transform to RDD as [(user, occupation)]\noccupationRDD = userLines.map(splitUserPipe)\n#print (\'occupationRDD:\\n\',occupationRDD.top(3))\n\n# Transform to RDD as [(movieid, genre_list)], genre is boolean value, movie can be in multiple genres\nmovieRDD = movieLines.map(splitMoviePipe)\n#print (\'movieRDD:\\n\',movieRDD.top(3))\n\n#join Transformed rating RDD [(movieid, (user, rating))] and movieRDD [(movieid, genre] to get all genres; \n#then Transform to [(movieid,((userid, rating), genre) )]\njoinRatingMovieGenres = ratingRDD.map(lambda line: (line[0], (line[1], line[2]))).join(movieRDD)\n#print (joinRatingMovieGenres.take(2))\n\n#Transform joinRatingMovieGenres to RDD [userid, (rating, genre)] and join with occupationRDD [(userid, occupation)]\n#to Transform to [(occupation, ((1, genre)))]\ntransRatingMovieGenres = joinRatingMovieGenres.map(lambda line: (line[1][0][0], (line[1][0][1], line[1][1])))\njoinRatingGenresOccup = transRatingMovieGenres.join(occupationRDD).map(lambda line: (line[1][1], (1, line[1][0][1])))\nprint (joinRatingGenresOccup.take(2))\n\n\n#Transform by Aggregating the ratingCount and genreCount to [(occupation, (totalRatings, {cntGenresRating}))]\ntotalRatingGenreCntByOccupation = joinRatingGenresOccup.reduceByKey(lambda x, y: ((x[0]+y[0]), (x[1][0]+y[1][0]), (x[1][1]+y[1][1]), (x[1][2]+y[1][12]), (x[1][3]+y[1][3])))\nprint (totalRatingGenreCntByOccupation.take(2))')
    
    C:\Users\vmatcha\AppData\Local\Enthought\Canopy\edm\envs\User\lib\site-packages\IPython\core\interactiveshell.py in run_cell_magic(self, magic_name, line, cell)
       2113             magic_arg_s = self.var_expand(line, stack_depth)
       2114             with self.builtin_trap:
    -> 2115                 result = fn(magic_arg_s, cell)
       2116             return result
       2117 
    
    <decorator-gen-60> in time(self, line, cell, local_ns)
    
    C:\Users\vmatcha\AppData\Local\Enthought\Canopy\edm\envs\User\lib\site-packages\IPython\core\magic.py in <lambda>(f, *a, **k)
        186     # but it's overkill for just that one bit of state.
        187     def magic_deco(arg):
    --> 188         call = lambda f, *a, **k: f(*a, **k)
        189 
        190         if callable(arg):
    
    C:\Users\vmatcha\AppData\Local\Enthought\Canopy\edm\envs\User\lib\site-packages\IPython\core\magics\execution.py in time(self, line, cell, local_ns)
       1183         else:
       1184             st = clock2()
    -> 1185             exec(code, glob, local_ns)
       1186             end = clock2()
       1187             out = None
    
    <timed exec> in <module>()
    
    C:\spark\python\pyspark\rdd.py in take(self, num)
       1356 
       1357             p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
    -> 1358             res = self.context.runJob(self, takeUpToNumLeft, p)
       1359 
       1360             items += res
    
    C:\spark\python\pyspark\context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal)
        999         # SparkContext#runJob.
       1000         mappedRDD = rdd.mapPartitions(partitionFunc)
    -> 1001         port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
       1002         return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
       1003 
    
    C:\spark\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py in __call__(self, *args)
       1158         answer = self.gateway_client.send_command(command)
       1159         return_value = get_return_value(
    -> 1160             answer, self.gateway_client, self.target_id, self.name)
       1161 
       1162         for temp_arg in temp_args:
    
    C:\spark\python\pyspark\sql\utils.py in deco(*a, **kw)
         61     def deco(*a, **kw):
         62         try:
    ---> 63             return f(*a, **kw)
         64         except py4j.protocol.Py4JJavaError as e:
         65             s = e.java_exception.toString()
    
    C:\spark\python\lib\py4j-0.10.6-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
        318                 raise Py4JJavaError(
        319                     "An error occurred while calling {0}{1}{2}.\n".
    --> 320                     format(target_id, ".", name), value)
        321             else:
        322                 raise Py4JError(
    
    Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
    : org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 239.0 failed 1 times, most recent failure: Lost task 1.0 in stage 239.0 (TID 447, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
      File "C:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 229, in main
      File "C:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 224, in process
      File "C:\spark\python\pyspark\rdd.py", line 2438, in pipeline_func
        return func(split, prev_func(split, iterator))
      File "C:\spark\python\pyspark\rdd.py", line 2438, in pipeline_func
        return func(split, prev_func(split, iterator))
      File "C:\spark\python\pyspark\rdd.py", line 362, in func
        return f(iterator)
      File "C:\spark\python\pyspark\rdd.py", line 1857, in combineLocally
        merger.mergeValues(iterator)
      File "C:\spark\python\lib\pyspark.zip\pyspark\shuffle.py", line 238, in mergeValues
        d[k] = comb(d[k], v) if k in d else creator(v)
      File "<timed exec>", line 73, in <lambda>
    TypeError: 'int' object is not subscriptable
    
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)
        at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)
    
    Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
        at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:141)
        at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
        at sun.reflect.GeneratedMethodAccessor90.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:214)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
      File "C:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 229, in main
      File "C:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 224, in process
      File "C:\spark\python\pyspark\rdd.py", line 2438, in pipeline_func
        return func(split, prev_func(split, iterator))
      File "C:\spark\python\pyspark\rdd.py", line 2438, in pipeline_func
        return func(split, prev_func(split, iterator))
      File "C:\spark\python\pyspark\rdd.py", line 362, in func
        return f(iterator)
      File "C:\spark\python\pyspark\rdd.py", line 1857, in combineLocally
        merger.mergeValues(iterator)
      File "C:\spark\python\lib\pyspark.zip\pyspark\shuffle.py", line 238, in mergeValues
        d[k] = comb(d[k], v) if k in d else creator(v)
      File "<timed exec>", line 73, in <lambda>
    TypeError: 'int' object is not subscriptable
    
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)
        at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        ... 1 more 
    

pyspark structured streaming window timestamp always 1970

I have literally copied and pasted the code from official spark tutorial. https://raw.githubusercontent.com/apache/spark/v2.3.0/examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py

and ran the following code to open port

nc -lk 9999

and did spark-submit with 5 second time and window.

spark-submit --num-executors 2 --executor-cores 4 windowbased.py localhost 9999 5 5

and my dates are as follows no matter how late I type in words in my console.

[1970-01-18 15:45:00, 1970-01-18 15:45:05]

[Sample Output][1]

(My working environment is mac with conda. The same can be replicated on AWS EMR with 5.13 version)

Thanks!

[EDIT]

THIS PROBLEM ONLY OCCURS IN SPARK 2.3.0.

IT WORKS FINE WITH 2.2.1 .