Tuesday, September 9, 2014

How to make an asynchronous processor in Apache Camel for generating and streaming files

We've had a case where we needed to generate a PDF file and stream it back to the client from Camel. The PDF generating library expects an OutputStream to write to, and as a file can grow big, it was important not to buffer an entire stream in memory. While Camel already provides many options when it comes to concurrency, they are centered on invoking a route asynchronously, which was not (or at least not nicely) applicable to our case as we want only a part of the logic to run in a separate thread. We needed an asynchronous processor. An interface AsyncProcessor already exists, but it only provides a callback to notify the caller of job done, but no actual mechanism to execute logic asynchronously. This needs to be done manually, but luckily, Camel does offer an easy way to create thread pools and use them to submit asynchronous tasks. To create a thread pool, add a stanza similar to the following into Camel's Spring configuration, inside the camelContext:

<threadPool id="smallPool" threadName="pdfWriter"
 poolSize="5" maxPoolSize="30" maxQueueSize="50"/>

This pool can then be used as normal to invoke routes asynchronously:

<route>
    <from uri="direct:start"/>
    <to uri="log:start"/>
    <threads executorServiceRef="smallPool">
        <to uri="log:hello"/>
    </threads>
</route>

but this is not what we're after. Instead, we can inject this pool into an AsyncProcessor where we can submit a task to it. In the following example, the processor with create an instance of a CircularByteBuffer and give its OutputStream to the PDF generation process running in a separate thread to write to, and immediately return the associated InputStream as a response (without blocking or buffering everything) to be consumed by the client.

@Component
public class PdfGenerationProcessor implements AsyncProcessor {
    @Resource(name="smallPool")
    private ExecutorService smallPool;
    @Override
        public void process(Exchange exchange) throws Exception {
        AsyncProcessorHelper.process(this, exchange); //Wrap synchronous invocations
    }
    @Override
    public boolean process(Exchange exchange, AsyncCallback callback) {
       CircularByteBuffer buffer = new CircularByteBuffer(1024 * 100); //Buffer a 100K
       smallPool.submit(new PDFGenerator(exchange, callback, buffer.getOutputStream())); //Generate the PDF in a separate thread
       exchange.getIn().setBody(buffer.getInputStream()); //Immediately return the stream for the client to read from
       return false; //Signifies that the job will be performed asynchronously
   }
   private class PDFGenerator implements Runnable {
       private Exchange exchange;
       private AsyncCallback callback;
       private OutputStream out;
       private PDFGenerator(Exchange exchange, AsyncCallback callback, OutputStream out) {
           this.exchange = exchange;
           this.callback = callback;
           this.out = out;
       }
       @Override
       public void run() {
           try {
               generatePDF(out); //Actual logic for generating the file and writing it to the stream is here
           } catch (Exception e) {
               exchange.setException(e); //Async processors must not throw exceptions and must add them to the Exchange instead
           } finally {
               // callback must be invoked
               callback.done(true);
               //Cleanup, close streams etc.
               try{out.flush(); out.close();} catch(Exception e) {/*ignore*/}
           }
       }
    }
}

References

  • The library used here (CircularByteBuffer) is a convenient implementation of piped streams (see more here). To use it, add this to Maven:
    <dependency>
        <groupId>org.ostermiller</groupId>
        <artifactId>utils</artifactId>
        <version>1.07.00</version>
    </dependency>
    
  • Camel in Action, chapter 10 has very good guides for concurrency in Camel