Category Archives: aggregate

Aggregation and filtering through the consumer template

This is more a general, whats the best practice question...

I have a few processes where the consumer template has been used to read a directory (or a MQ queue) for whatever is available and then stop itself, the entire route-set it calls is created programmatically based of a few parameters

So using the consumer template method below... Is there a way to assign

  • A filter operation programmatically (ie, if i want to filter out certain files from the below, its easy if its through a standard route... (through .filter) but at the moment, i have no predefined beans, so adding #filter=filter to the EIP is not really an option).

  • An aggregation function from inside my while loop. (while still using the template).

Thanks!

        @Override
        public void process(Exchange exchange) throws Exception {

            getConsumer().start();                      
            int exchangeCount = 0;
            while (true) {
                String consumerEp = "file:d://directory?delete=true&sendEmptyMessageWhenIdle=true&idempotent=false";
                Exchange fileExchange = getConsumer().receive(consumerEp);
                if (fileExchange == null || fileExchange.getIn()==null || fileExchange.getIn().getHeader(CAMEL_FILE_NAME)==null) {
                    break;
                }
                exchangeCount++;
                Boolean batchStatus = (Boolean) fileExchange.getProperty(PROP_CAMEL_BATCH_COMPLETE);
                LOG.info("---PROCESSING : " + fileExchange.getIn().getHeader(CAMEL_FILE_NAME));
                getProducer().send("direct:some-other-process", fileExchange);

                //Get the CamelBatchComplete Property to establish the end of the batch, and not cycle through twice.
                if(batchStatus!=null && batchStatus==true){
                    break;                      
                }                   
            }       
            // Stop the consumer service
            getConsumer().stop();
            LOG.info("End Group Operation : Total Exchanges=" + exchangeCount);
        }

Apache Camel – Empty body inside AggregationStrategy

Here is my route:

def.convertBodyTo(String.class).split()
                    .method(splittingProcessor, "split")
                    .aggregationStrategy(myAggregationStrategy)
                    .bean(myProcessor, "aMethod")
                    .end();

I am trying to send one exchange to more than two different HTTP endpoints.

Here is my aggregation strategy:

    @Override 
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { 
    Message inMsg = newExchange.getIn();
    String body = inMsg.getBody(String.class);

    String oldBody= "";
    if (oldExchange == null) {
        return newExchange;
    }
    else {
        oldBody = oldExchange.getIn().getBody(String.class);
        oldExchange.getIn().setBody(oldBody + " "+body);
        return oldExchange;
    }
} 

but

body is always equal to "" and inMsg is "[Body is instance of java.io.InputStream]"

convertBodyTo(String.class) does not work either (at least the way I am using it).

What am I doing wrong?

PS. streamCaching() on the route or setStreamCache(true) on the context also do not work.