Category Archives: akka

Canceling Apache Flink job from the code

I am in a situation where I want to stop/cancel the flink job from the code. This is in my integration test where I am submitting a task to my flink job and check the result. As the job runs, asynchronously, it doesn't stop even when the test fails/passes. I want to job the stop after the test is over.

I tried a few things which I am listing below :

  1. Get the jobmanager actor
  2. Get running jobs
  3. For each running job, send a cancel request to the jobmanager

This, of course in not running but I am not sure whether the jobmanager actorref is wrong or something else is missing.

The error I get is : [] [akka://flink/user/jobmanager_1] Message [org.apache.flink.runtime.messages.JobManagerMessages$RequestRunningJobsStatus$] from Actor[akka://flink/temp/$a] to Actor[akka://flink/user/jobmanager_1] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'

which means either the job manager actor ref is wrong or the message sent to it is incorrect.

The code looks like the following:

val system = ActorSystem("flink", ConfigFactory.load.getConfig("akka")) //I debugged to get this path
 val jobManager = system.actorSelection("/user/jobmanager_1") //also got this akka path by debugging and getting the jobmanager akka url
val responseRunningJobs = Patterns.ask(jobManager, JobManagerMessages.getRequestRunningJobsStatus, new FiniteDuration(10000, TimeUnit.MILLISECONDS))
    try {
      val result = Await.result(responseRunningJobs, new FiniteDuration(5000, TimeUnit.MILLISECONDS))
        val runningJobs = result.asInstanceOf[RunningJobsStatus].getStatusMessages()
        val itr = runningJobs.iterator()
          val jobId =
          val killResponse = Patterns.ask(jobManager, new CancelJob(jobId), new Timeout(new FiniteDuration(2000, TimeUnit.MILLISECONDS)));
          try {
            Await.result(killResponse, new FiniteDuration(2000, TimeUnit.MILLISECONDS))
          catch {
            case e : Exception =>"Canceling the job with ID " + jobId + " failed." + e

      case e : Exception => "Could not retrieve running jobs from the JobManager." + e


Can someone check if this is the correct approach ?

Spark Workers dropping off after couple of days

I have a spark standalone cluster with 6 workers , I left the cluster idle for 3 days and after 3 days I saw only 4 workers on the spark master UI , 2 workers with the same exception -

Strange part is cluster was running fine for over 2 days and it suddenly happened while the cluster was sitting idle (no jobs were running for over days !!)

2016-02-14 01:12:59 ERROR Worker:75 - Connection to master failed! Waiting for master to reconnect... 2016-02-14 01:12:59 ERROR Worker:75 - Connection to master failed! Waiting for master to reconnect... 2016-02-14 01:13:10 ERROR SparkUncaughtExceptionHandler:96 - Uncaught exception in thread Thread[,5,main] java.util.concurrent.RejectedExecutionException: Task [email protected] rejected from ja[email protected][Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 3] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution( at java.util.concurrent.ThreadPoolExecutor.reject( at java.util.concurrent.ThreadPoolExecutor.execute( at java.util.concurrent.AbstractExecutorService.submit( at org.apache.spark.deploy.worker.Worker$$anonfun$org$apache$spark$deploy$worker$Worker$$reregisterWithMaster$1.apply$mcV$sp(Worker.scala:269) at org.apache.spark.util.Utils$.tryOrExit(Utils.scala:1119) at$apache$spark$deploy$worker$Worker$$reregisterWithMaster(Worker.scala:234) at org.apache.spark.deploy.worker.Worker$$anonfun$receive$1.applyOrElse(Worker.scala:521) at$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:177) at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:126) at$apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:197) at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:125) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42) at$class.aroundReceive(Actor.scala:467) at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:92) at at at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec( at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask( at scala.concurrent.forkjoin.ForkJoinPool.runWorker( at