At The Climate Corporation, we use various forms of data and and Clojure based computations to help farmers make decisions about their operations. In order to do this quickly and efficiently, we distribute the processing across machines, often using SQS, Amazon’s queue service. Autoscaling of EC2 takes care of adding new machines for us, but we want to make sure we get the most from each machine—in a large organization, wasted resources add up quickly.
We also found that code to support queue-processing backends was quite similar: listen to SQS, read messages from the queue, process in a thread pool, and Ack the messages. We’d rather use a library to handle all that plumbing and just let us focus on the compute logic of what to actually do with the message.
With these two goals in mind (efficient use of EC2 resources, especially cpu, and reducing message processing to a simple pure function), we set out to build Squeedo.
The first workload we were trying to solve looked like this:
SQS -> Several second Http I/O API call -> Several Second CPU Intensive calculation -> Ack/Nack SQS
We had a double whammy of a long running synchronous http call using clj-http along with a heavy cpu intensive process after that.
After several iterations of basing this plumbing code on threadpools, we quickly found that we couldn’t get the kind of throughput we wanted simply by tuning only the number of threads. We needed something more dynamic, that would adapt better to the number of cores it ran on and wouldn’t require us to figure out the “right” number of threads to use in the pool. Additionally, we wanted a higher level abstraction that felt closer to message processing, rather than low level threadpools.
After reading this blog post http://martintrojer.github.io/clojure/2013/07/07/coreasync-and-blocking-io/ explaining how core.async could be used to handle blocking IO, we changed our line of thinking, and Squeedo was born. The key was to combine asynchronous http using http-kit for I/O along with core.async for handling polling and acking SQS, message concurrency, and the amount of outstanding work.
Simple Usage: an http-kit example with non-blocking I/O
To use Squeedo, add the Leiningen dependency [com.climate/squeedo “0.1.0”] to your project.clj.
In its simplest form, Squeedo is composed of only 2 parts, a compute function and a consumer.
The compute function must put to the done-channel. Squeedo listens to the done-channel and will ack/nack each message from SQS for you and passes your compute function another message to process. You must put to the done-channel even for exceptions or Squeedo will think you have outstanding work and not pass you another message to process (any uncaught exception will be automatically nack’d).
Using this simple pattern and higher level abstraction, we’ve processed 100s of millions of messages with cpu utilization like what you see below (taken on an m3.large EC2 instance).
Simple Tuning and Configuration
One of the great things about Squeedo is the advanced configuration options that can be used to tune the consumer to your workflow beyond what the very reasonable defaults do out of the box.
- :message-channel-size – the number of messages to prefetch from SQS; default 20 * num-listeners. Prefetching messages allow us to keep the compute function continuously busy without having to wait for more to be first pulled from the remote SQS queue. Make sure to set the timeout appropriately when you create the queue.
- :num-workers – the number of workers processing messages concurrently. This controls how many workers actually process messages at any one time. (defaults to number of CPU’s – 1 or 1 if single core). Squeedo works best with 2 or more CPU’s. This is not the amount of work that can be outstanding at any one time, that is controlled below with :max-concurrent-work.
- :num-listeners – the number of listeners polling from SQS. default is (num-workers /dequeue-limit) since each listener dequeues up to dequeue-limit messages at a time. If you have a really fast process, you can actually starve the compute function of messages and thus need more listeners pulling from SQS.
- :dequeue-limit – the number of messages to dequeue at a time; default 10
- :max-concurrent-work – the maximum number of total messages processed concurrently. This is mainly for async workflows where you can have work started and are waiting for parked IO threads to complete; default num-workers. This allows you to always keep the CPU’s busy by having data returned by IO ready to be processed. Its really a memory game at this point — how much data you can buffer that’s ready to be processed by your asynchronous http clients.
- :dl-queue-name – the dead letter SQS queue to which repeatedly failed messages will go. A message that fails to process the maximum number of SQS receives is sent to the dead letter queue. (see SQS Redrive Policy) The queue will be created if necessary. Defaults to (str QUEUE-NAME \”-failed\”) where QUEUE-NAME is the name of the queue passed into the start-consumer function.
For more information or to contribute and ask for new features, check out the Squeedo on Github. Thanks for reading!