Category Archives: accumulator

TimeTask and Thread.sleep not work inside Spark Accumulator object

I created my own Accumulator data type in Spark by extending AccumulatorV2 class. I would like to reset it's value at specific time (eg :at the midnight).I put inside my custom accumulator a nested class extending java.util.TimerTask and use a Timer to schedules the task but the task run automatically in every 5s despise the interval time i set.

Here is my code :

class ConcurrentHashMapAccumulator() extends AccumulatorV2[T,T]
{ 

 //Constructor
 def this(){
 this(new ConcurrentHashMap[String,Int]() asScala)
 val  runner  =  new  RunCheckReset()
 val  timer  =  new  Timer()
 timer.scheduleAtFixedRate(runner,0, 600000L) // not work
 //timer.schedule(runner,0, 600000L) // not work
 }

  //........
 //override methods
//........

class RunCheckReset extends  TimerTask{

 override def run(): Unit = {
  val now = new DateTime()
  println("Now is " + now + " in " + TaskContext.getPartitionId())
  try{
   Thread.sleep(25000L) // not work!
   }catch{
    case ie: InterruptedException => println(ie)
   }
  }
 }
}

I always got the result like this :

Now is 2017-03-24T09:52:40.050+07:00 in 0
Now is 2017-03-24T09:52:40.311+07:00 in 0
Now is 2017-03-24T09:52:45.059+07:00 in 0
Now is 2017-03-24T09:52:46.490+07:00 in 0
Now is 2017-03-24T09:52:50.015+07:00 in 0
Now is 2017-03-24T09:52:50.045+07:00 in 0
Now is 2017-03-24T09:52:55.035+07:00 in 0
Now is 2017-03-24T09:52:55.228+07:00 in 0
Now is 2017-03-24T09:53:00.015+07:00 in 0
Now is 2017-03-24T09:53:00.061+07:00 in 0
Now is 2017-03-24T09:53:05.024+07:00 in 0
Now is 2017-03-24T09:53:05.207+07:00 in 0
Now is 2017-03-24T09:53:10.017+07:00 in 0
Now is 2017-03-24T09:53:10.049+07:00 in 0

As it printed, run() method run twice every 5s. It'd be greatly appreciated if anyone could help. Thanks.

Spark Task not Serializable with simple accumulator?

I am running this simple code:

val accum = sc.accumulator(0, "Progress");
listFilesPar.foreach {
  filepath =>
    accum += 1
}

listFilesPar is an RDD[String]

which throws the following error:

org.apache.spark.SparkException: Task not serializable

Right now I don't understand what's happening and I don't put parenthesis but brackets because I need to write a lengthy function. I am just doing unit testing

Are Accumulators in Spark thread safe?

I am working on Accumulators. Just wanted to know if these objects are thread safe ?

accumInt is a type of AccumulatorParam

        //Current value accumInt -> 6
        AccumulatorThread t1 = new AccumulatorThread();
        t1.setAccum(accumInt); 
        t1.setValueToAdd(5);

        AccumulatorThread t2 = new AccumulatorThread();
        t2.setAccum(accumInt);
        t2.setValueToAdd(7);


        new Thread(t1).start();
        new Thread(t2).start();

        System.out.println(accumInt.value()); //11 or 13 or 18

AccumlatorThread class :

    class AccumulatorThread implements Runnable {
    Accumulator<Integer> accum;
    Integer              valueToAdd;

    public Integer getValueToAdd() {
        return valueToAdd;
    }


    public void setValueToAdd(Integer valueToAdd) {
        this.valueToAdd = valueToAdd;
    }


    public Accumulator<Integer> getAccum() {
        return accum;
    }


    public void setAccum(Accumulator<Integer> accum) {
        this.accum = accum;
    }


    public void run() {
        System.out.println("Value to Add in Thread : "+valueToAdd);
        accum.add(valueToAdd);
    }
}

The behavior shows that it is not a thread safe. Am I missing something ?