Category Archives: apache-flink

Connecting to remote task manager has failed. This might indicate that the remote task manager has been lost

I have created a flink standalone cluster with 1 job manager and 2 taskmanager. When a batch task/job is submitted , one of the task manager is throwing the below error . flink dashboard shows both task managers actvive. Sample wordcount program works . Any help would be appreciated.

java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'hostname/127.0.0.1:46537' has failed. This might indicate that the remote task manager has been lost.
    at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
    at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:132)
    at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:84)
    at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
    at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:156)
    at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:480)
    at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:502)
    at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
    at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:42)
    at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
    at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94)
    at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
    at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connecting to remote task manager + 'hostname/127.0.0.1:46537' has failed. This might indicate that the remote task manager has been lost.
    at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
    at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:132)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:268)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:284)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
    ... 1 more
Caused by: java.net.ConnectException: Connection refused: ekablr-ca-s010/127.0.0.1:46537
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:281)
    ... 6 more

Reading multiple files from s3 in Apache flink using regular expression

I am trying to read a list of of sequence files from S3 bucket into Apache flink dataframe using the below code.

val sequenceInputFormat = HadoopInputs.readSequenceFile[LongWritable, Text](classOf[LongWritable], classOf[Text], filePath)

However, I am not able to provide the file path as comma separated string with asterisk(*) as wild card. Apache flink accepts only one absolute path at a time. Can someone please help me out with this use case. I want to provide file path as shown below:-

filePath="/path/to/file1*,/path/to/file2*,/path/to/file3*"

What are the basic differences between Apache Apex and Apache Flink? [on hold]

From whatever I could read over internet, they are pretty similar as they both aim to perform stream and batch processing with similar models.

As mentioned here, https://thenewstack.io/apache-gets-another-real-time-stream-processing-framework-apex/

Vice president of Data Torrent stated the following:

"Apex and Flink are conceptually similar, according to Thomas Weise, DataTorrent’s vice president of Apex (not to be confused about the serverless computing framework of the same name). Both Apex and Flink can do batch processing, but are more focused on streaming. And though he concedes it might make sense to merge the two projects, he doesn’t see that happening, primarily because of their different roots."

Also, both aim to follow the Google dataflow model and check out most of the same things on this capability matrix: https://beam.apache.org/documentation/runners/capability-matrix/

If they are so similar then why they are still both Apache top level projects, f they are different, what makes them concretely different from one another?

InvalidTypesException for Generic Pattern Transformation in Apache Flink

I have problem regarding Apache Flink. I want to have an abstract class that consumes a stream. However the pattern applied to this stream should be interchangeable.

public abstract class AbstractVisitConsumer<TEventType>

TEventType marks the type of the event that is generated from the pattern. Every pattern must implement an interface called IEventPattern

public interface IEventPattern<TStreamInput, TMatchedEvent> extends Serializable {

TMatchedEvent create(Map<String, List<TStreamInput>> pattern);

Pattern<TStreamInput, ?> getEventPattern();

The abstract class has a method called applyPatternSelectToStream()

 DataStream<TEventType> applyPatternSelectToStream(DataStream<VisitEvent> stream, IEventPattern<VisitEvent, TEventType> pattern) {
    DataStream<TEventType> patternStream = CEP.pattern(stream, pattern.getEventPattern()).select(new PatternSelectFunction<VisitEvent, TEventType>() {
        @Override
        public TEventType select(Map<String, List<VisitEvent>> map) throws Exception {
            return pattern.create(map);
        }
    }).returns(this.typeInformation);
    return patternStream;
}

The flink compiler always gives me the error

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'TEventType' in 'class com.felix.AbstractVisitConsumer' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s)

My class WorkPlaceConsumer extends the aforementioned abstract class to specify the desired event that is generated from the stream.

public class WorkPlaceConsumer extends AbstractVisitConsumer<WorkPlaceEvent> {


public WorkPlaceConsumer(TypeInformation typeInfo) {
    super(TypeInformation.of(WorkPlaceEvent.class));
}

public static void main(String[] args) {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    WorkPlaceConsumer consumer = new WorkPlaceConsumer();
    DataStream<VisitEvent> visitStream = consumer.getVisitStream(env);
    DataStream<WorkPlaceEvent> workPlaceStream = consumer.applyPatternSelectToStream(visitStream, new WorkPlacePattern());

    visitStream.print();
    workPlaceStream
            .keyBy((KeySelector<WorkPlaceEvent, Integer>) event -> event.getUserId())
            .filter(new NewWorkPlaceEventFilter())
            .print();

    try {
        env.execute();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

I already tried to implement the ResultTypeQueryable interface and tried to store the type information in the abstract class by passing the type information from the child class at runtime. I also was using .returns for the method to provide manual type information. Maybe I am just doing it wrong. Had anybody similar issues with generic transformations on streams?

Thanks in advance.

Canceling Apache Flink job from the code

I am in a situation where I want to stop/cancel the flink job from the code. This is in my integration test where I am submitting a task to my flink job and check the result. As the job runs, asynchronously, it doesn't stop even when the test fails/passes. I want to job the stop after the test is over.

I tried a few things which I am listing below :

  1. Get the jobmanager actor
  2. Get running jobs
  3. For each running job, send a cancel request to the jobmanager

This, of course in not running but I am not sure whether the jobmanager actorref is wrong or something else is missing.

The error I get is : [flink-akka.actor.default-dispatcher-5] [akka://flink/user/jobmanager_1] Message [org.apache.flink.runtime.messages.JobManagerMessages$RequestRunningJobsStatus$] from Actor[akka://flink/temp/$a] to Actor[akka://flink/user/jobmanager_1] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'

which means either the job manager actor ref is wrong or the message sent to it is incorrect.

The code looks like the following:

val system = ActorSystem("flink", ConfigFactory.load.getConfig("akka")) //I debugged to get this path
 val jobManager = system.actorSelection("/user/jobmanager_1") //also got this akka path by debugging and getting the jobmanager akka url
val responseRunningJobs = Patterns.ask(jobManager, JobManagerMessages.getRequestRunningJobsStatus, new FiniteDuration(10000, TimeUnit.MILLISECONDS))
    try {
      val result = Await.result(responseRunningJobs, new FiniteDuration(5000, TimeUnit.MILLISECONDS))
      if(result.isInstanceOf[RunningJobsStatus]){
        val runningJobs = result.asInstanceOf[RunningJobsStatus].getStatusMessages()
        val itr = runningJobs.iterator()
        while(itr.hasNext){
          val jobId = itr.next().getJobId
          val killResponse = Patterns.ask(jobManager, new CancelJob(jobId), new Timeout(new FiniteDuration(2000, TimeUnit.MILLISECONDS)));
          try {
            Await.result(killResponse, new FiniteDuration(2000, TimeUnit.MILLISECONDS))
          }
          catch {
            case e : Exception =>"Canceling the job with ID " + jobId + " failed." + e
          }

        }
      }
    }
    catch{
      case e : Exception => "Could not retrieve running jobs from the JobManager." + e
    }

  }

Can someone check if this is the correct approach ?