While using Storm, there are multiple cases where one would
need to implement micro batching in core storm topologies. May it be for
performance reasons, integration with external systems (like ElasticSearch, Solr, HBase
or even a database) or for various other reasons.
Implementing reliable, latency sensitive and effective
batching in Storm isn’t as easy as it may seem. (Note: Storm’s Trident APIs do
provide internal batching capabilities and one can use it where appropriate.
This article focuses on batching using core storm APIs)
Here are some challenges in implementing micro batching in
Storm:
- Data delivery reliability: Storm’s ‘Ack’ing mechanism plays a great role in reliable data processing and delivery. ‘Ack’ing stream tuples too early (before data is synced into external system) may result into data loss in case of failures during external system syncs. Eg. If tuples are ‘ack’ed as soon as they are received in bolt but persisted in external store in batches, there is a potential that the persistence may fail due to various reasons. But from Storm’s perspective, tuple is successfully 'ack'ed and would not be replayed. In such situations, the data may never reach its destination and will be lost.
- Unnecessary data duplication: To solve above issue, if tuples are ‘ack’ed only after successful persistence, the tuples may not get 'ack'ed within their timeout period. In such situations, storm will replay those tuples and that will result into unnecessary reprocess as well as data duplication in target system.
- Increased latency: Streams can have peaks and off peaks and during off peaks, the stream may not bring large enough data to fulfill the batch size in reasonable time period. Having too small batch size can harm the performance and having large batch size can increase the latency of tuple reaching its destination. Consider for example in a bolt configured to have 100 tuples batch size, if during off peak periods the first tuple reaches the bolt at 2:00 PM and the 100th comes at 2:30 PM, the batch would get flushed at 2:30 PM which means that it will be 30 minutes for a first tuple to reach its destination. Which defeats the purpose of real time streaming.
- Complexity in time bound batching: In order to avoid above latency and replay issues, the batches needs to be not just tuple size bound but also time bound. But since the user code (in storm bolts) is usually trigged only on tuple availability, it is hard to inject code that can get auto triggered on certain time intervals.
Lets see what options are available for time bound batching and their pros and cons.
1. Thread based model
Implementation details: Create a thread in every bolt where batching needs to be implemented and this thread can sleep for predefined time, then wake up and flush the batch.
Pros:
- Developer would have control over spinning their own threads and wake them up at certain intervals to flush the batches.
- Each bolt can have a different time intervals for flushing
Cons:
- Storm already has internal thread model (bolts and spouts run in the threads). So creating your own threads make things more complicated and invites for concurrency issues, dead lock possibilities etc. that needs to be carefully handled. On top of that, testing multi-threaded code is hard.
- This threading implementation would need to be done almost again and again for every different bolt.
2. Fabric Stream To Trigger Batch Flush
Implementation details: Implement a fabricated parallel data ingestion stream that developer can keep feeding data to at certain time intervals and use arrival of tuple from this stream a signal to flush the batch. See the illustration below.
Pros:
- Since the stream is managed by the application itself and is core part of design, it is almost certain to have data coming at certain time and rate to trigger the batch flush. Applications peaks/off peaks do not impact the flush times.
Cons:
- Don’t have to deal with threading and related complications
- Guarantying that the tuple from this stream reaches every bolt and every instance of the bolt in a topology is hard (especially if the topology has a long and/or complex DAG)
- Need additional development for generating the new stream. Making sure it reaches every bolt in topology is complex. On top of that this implementation needs to be repeated again and again for every topology.
- Need to plan for fault tolerance, error handling of this fabricated stream. What if this fabric stream stops sending data? What if the host generating this fabric stream fails? This soon becomes complicated and unmanageable too quickly.
3. Use Tick Tuples
Implementation details: Tick Tuples is storm’s in built mechanism of generating tuples at specified intervals and sent to each and every bolt.
From Nathan Marz’s comment: It’s common to require a bolt to “do something” at a fixed interval, like flush writes to a database. Many people have been using variants of a ClockSpout to send these ticks. The problem with a ClockSpout is that you can’t internalize the need for ticks within your bolt, so if you forget to set up your bolt correctly within your topology it won’t work correctly. 0.8.0 introduces a new “tick tuple” config that lets you specify the frequency at which you want to receive tick tuples via the “topology.tick.tuple.freq.secs” component-specific config, and then your bolt will receive a tuple from the __system component and __tick stream at that frequency.
Pros:
Cons:
- No custom code needed.
- Don’t have to worry about failures, fault tolerance, threads, tuple distribution etc. It is all handled by Storm
- Tick tuples are topology specific and not bolt specific. So cannot use them for managing different batch schedules for different components in topologies
So to conclude: the tick tuples are best way to solve this problem. For further details, see below a sample implementation for reliable, latency aware and effective micro batching
/** The queue holding tuples in a batch. */ protected LinkedBlockingQueue<Tuple> queue = new LinkedBlockingQueue<Tuple>(); /** The threshold after which the batch should be flushed out. */ int batchSize = 100; /** * The batch interval in sec. Minimum time between flushes if the batch sizes * are not met. This should typically be equal to * topology.tick.tuple.freq.secs and half of topology.message.timeout.secs */ int batchIntervalInSec = 45; /** The last batch process time seconds. Used for tracking purpose */ long lastBatchProcessTimeSeconds = 0; @Override public void execute(Tuple tuple) { // Check if the tuple is of type Tick Tuple if (TupleHelpers.isTickTuple(tuple)) { // If so, it is indication for batch flush. But don't flush if previous // flush was done very recently (either due to batch size threshold was // crossed or because of another tick tuple // if ((System.currentTimeMillis() / 1000 - lastBatchProcessTimeSeconds) >= batchIntervalInSec) { LOG.debug("Current queue size is " + this.queue.size() + ". But received tick tuple so executing the batch"); finishBatch(); } else { LOG.debug("Current queue size is " + this.queue.size() + ". Received tick tuple but last batch was executed " + (System.currentTimeMillis() / 1000 - lastBatchProcessTimeSeconds) + " seconds back that is less than " + batchIntervalInSec + " so ignoring the tick tuple"); } } else { // Add the tuple to queue. But don't ack it yet. this.queue.add(tuple); int queueSize = this.queue.size(); LOG.debug("current queue size is " + queueSize); if (queueSize >= batchSize) { LOG.debug("Current queue size is >= " + batchSize + " executing the batch"); finishBatch(); } } } /** * Finish batch. */ public void finishBatch() { LOG.debug("Finishing batch of size " + queue.size()); lastBatchProcessTimeSeconds = System.currentTimeMillis() / 1000; List<Tuple> tuples = new ArrayList<Tuple>(); queue.drainTo(tuples); BulkRequestBuilder bulkRequest = client.prepareBulk(); BulkResponse bulkResponse = null; for (Tuple tuple : tuples) { // Prepare your batch here (may it be JDBC, HBase, ElasticSearch, Solr or // anything else. } try { // Execute bulk request and get individual tuple responses back. bulkResponse = bulkRequest.execute().actionGet(); BulkItemResponse[] responses = bulkResponse.getItems(); BulkItemResponse response = null; LOG.debug("Executed the batch. Processing responses."); for (int counter = 0; counter < responses.length; counter++) { response = responses[counter]; if (response.isFailed()) { ElasticSearchDocument failedEsDocument = this.tupleMapper .mapToDocument(tuples.get(counter)); LOG.error("Failed to process tuple # " + counter); this.collector.fail(tuples.get(counter)); } else { LOG.debug("Successfully processed tuple # " + counter); this.collector.ack(tuples.get(counter)); } } } catch (Exception e) { LOG.error("Unable to process " + tuples.size() + " tuples", e); // Fail entire batch for (Tuple tuple : tuples) { this.collector.fail(tuple); } } }