Squeedo: Simplified Amazon SQS processing in Clojure


At The Climate Corporation, we use various forms of data and and Clojure based computations to help farmers make decisions about their operations.  In order to do this quickly and efficiently, we distribute the processing across machines, often using SQS, Amazon’s queue service.  Autoscaling of EC2 takes care of adding new machines for us, but we want to make sure we get the most from each machine—in a large organization, wasted resources add up quickly.

We also found that code to support queue-processing backends was quite similar: listen to SQS, read messages from the queue, process in a thread pool, and Ack the messages. We’d rather use a library to handle all that plumbing and just let us focus on the compute logic of what to actually do with the message.

With these two goals in mind (efficient use of EC2 resources, especially cpu, and reducing message processing to a simple pure function), we set out to build Squeedo.

The first workload we were trying to solve looked like this:

SQS -> Several second Http I/O API call -> Several Second CPU Intensive calculation -> Ack/Nack SQS

We had a double whammy of a long running synchronous http call using  clj-http along with a heavy cpu intensive process after that.
After several iterations of basing this plumbing code on threadpools, we quickly found that we couldn’t get the kind of throughput we wanted simply by tuning only the number of threads. We needed something more dynamic, that would adapt better to the number of cores it ran on and wouldn’t require us to figure out the “right” number of threads to use in the pool.  Additionally, we wanted a higher level abstraction that felt closer to message processing, rather than low level threadpools.

After reading this blog post http://martintrojer.github.io/clojure/2013/07/07/coreasync-and-blocking-io/ explaining how core.async could be used to handle blocking IO, we changed our line of thinking, and Squeedo was born.  The key was to combine asynchronous http using http-kit for I/O along with core.async for handling polling and acking SQS, message concurrency, and the amount of outstanding work.

Simple Usage: an http-kit example with non-blocking I/O

To use Squeedo, add the Leiningen dependency [com.climate/squeedo “0.1.0”] to your project.clj.

In its simplest form, Squeedo is composed of only 2 parts, a compute function and a consumer.

(require '[com.climate.squeedo.sqs-consumer 
            :refer [start-consumer stop-consumer]]
         '[org.httpkit.client :as client]
         '[clojure.core.async :refer [go >!]])

(defn- eat-some-cpu
  (reduce + (range 1 how-much)))

(defn compute
  [message done-channel]
  ;; do something expensive
  (eat-some-cpu 1000000)
  ;; do this if you will have I/O
  (client/get "http://google.com"
    (fn [response]
        ;; do some more processing with the response
        (eat-some-cpu 1000000)
        (>! done-channel message)))))

(def consumer (start-consumer "my-sqs-queue" compute 
                              :num-listeners 10 
                              :max-concurrent-work 50)) 
;; when done listening 
;; (stop-consumer consumer) 

The compute function must put to the done-channel.  Squeedo listens to the done-channel and will ack/nack each message from SQS for you and passes your compute function another message to process. You must put to the done-channel even for exceptions or Squeedo will think you have outstanding work and not pass you another message to process (any uncaught exception will be automatically nack’d).

Using this simple pattern and higher level abstraction, we’ve processed 100s of millions of messages with cpu utilization like what you see below (taken on an m3.large EC2 instance).

Screen Shot 2015-09-11 at 8.42.48 AM

Simple Tuning and Configuration

One of the great things about Squeedo is the advanced configuration options that can be used to tune the consumer to your workflow beyond what the very reasonable defaults do out of the box.

  • :message-channel-size – the number of messages to prefetch from SQS; default 20 * num-listeners. Prefetching messages allow us to keep the compute function continuously busy without having to wait for more to be first pulled from the remote SQS queue. Make sure to set the timeout appropriately when you create the queue.
  • :num-workers – the number of workers processing messages concurrently. This controls how many workers actually process messages at any one time. (defaults to number of CPU’s – 1 or 1 if single core). Squeedo works best with 2 or more CPU’s. This is not the amount of work that can be outstanding at any one time, that is controlled below with :max-concurrent-work.
  • :num-listeners – the number of listeners polling from SQS. default is (num-workers /dequeue-limit) since each listener dequeues up to dequeue-limit messages at a time. If you have a really fast process, you can actually starve the compute function of messages and thus need more listeners pulling from SQS.
  • :dequeue-limit – the number of messages to dequeue at a time; default 10
  • :max-concurrent-work – the maximum number of total messages processed concurrently. This is mainly for async workflows where you can have work started and are waiting for parked IO threads to complete; default num-workers. This allows you to always keep the CPU’s busy by having data returned by IO ready to be processed. Its really a memory game at this point — how much data you can buffer that’s ready to be processed by your asynchronous http clients.
  • :dl-queue-name – the dead letter SQS queue to which repeatedly failed messages will go.  A message that fails to process the maximum number of SQS receives is sent to the dead letter queue. (see SQS Redrive Policy)  The queue will be created if necessary.  Defaults to (str QUEUE-NAME \”-failed\”) where QUEUE-NAME is the name of the queue passed into the start-consumer function.

For more information or to contribute and ask for new features, check out the Squeedo on Github. Thanks for reading!

Tagged with: , ,
Posted in Engineering, Software Engineering

xray + dask: Out-of-core, labeled arrays in Python

Xray provides labeled, multi-dimensional arrays. Dask provides a system for parallel computing. Together, they allow for easy analysis of scientific datasets that don’t fit into memory.

An introduction to xray

To make it easier to work with scientific data in Python, we researchers at The Climate Corporation wrote an open source library called xray. Xray provides data structures for multi-dimensional labeled arrays, building on NumPy and pandas. Our goal was to make it easier to work with labeled, gridded datasets from physical sensors and models. At Climate, we use it to analyze weather data and satellite images.

The key idea of xray is that it uses metadata in the form of labeled dimensions (e.g., “time”) and coordinate values (e.g., the date “2015-04-10”) to enable a suite of expressive, label based operations. Here’s a quick example of how we might manipulate a 3D array of temperature data with xray:

ds.sel(time='2014-12-11').max(['latitude', 'longitude'])
Dimensions:  (time: 4)
  * time     (time) datetime64[ns] 2014-12-11 2014-12-11T06:00:00 ...
Data variables:
    t2m      (time) float64 311.4 316.2 312.0 309.6

For comparison, here’s the how we might write that using only NumPy:

# temperature contains 4x daily values for December 2014
# axis labels are (time, latitude, longitude)
ds.t2m.values[[40, 41, 42, 43]].max(axis=(1, 2));

Once we’ve done our computation, every xray dataset can be easily serialized to and from disk, using the portable and efficient netCDF file format (a dialect of HDF5):

saved_ds = xray.open_dataset('my-dataset_.nc')
assert saved_ds.equals(ds)

Or you might pass your data off to pandas for further analysis in tabular form:

df = ds.to_dataframe()

Without a tool like xray, we would need to keep track of metadata manually in order to preserve it in operations and save it with our results when we’re done. This is burdensome, especially for exploratory analysis, so we often don’t bother. Even when metadata is present, without xray it’s often easier not to use the metadata directly in our code. Magic constants end up littering our scripts, like the numbers 1 and 2 in our NumPy example above. Implicit assumptions about our data are left in comments, or worse, are not even stated at all.

Xray provides short-term incentives for providing metadata, in the form of additional expressive power. But the real payoff is enhanced readability and reproducibility in the long term when you or a colleague come back to your code or data six months from now.

Loading medium-sized weather data with dask

Today I will talk about how we have extended xray to work with datasets that don’t fit into memory by integrating it with dask. Dask is a new Python library that extends NumPy to out-of-core datasets by blocking arrays into small chunks and executing on those chunks in parallel. It allows xray to easily process large data and also simultaneously make use of all of our CPU resources.

Weather data — especially models from numerical weather prediction — can be big. The world uses its biggest supercomputers to generate weather and climate forecasts that run on increasingly high resolution grids. Even for data analysis purposes, it’s easy to need to process 10s or 100s of GBs of data.

For this blog post, we’ll work with a directory of netCDF files downloaded from the European Centre for Medium-Range Weather Forecast’s ERA-Interim reanalysis with the new xray function to open a multi-file dataset, xray.open_mfdataset:

import numpy as np
import matplotlib.pyplot as plt
import xray

ds = xray.open_mfdataset('/Users/shoyer/data/era-interim/2t/*.nc', engine='scipy')
ds.attrs = {} # for brevity, hide attributes

Previewing the dataset, we see that it consists of 35 years of 6-hourly surface temperature data at a spatial resolution of roughly 0.7 degrees:

Dimensions:    (latitude: 256, longitude: 512, time: 52596)
  * latitude   (latitude) &gt;f4 89.4628 88.767 88.067 87.3661 86.6648 ...
  * longitude  (longitude) &gt;f4 0.0 0.703125 1.40625 2.10938 2.8125 ...
  * time       (time) datetime64[ns] 1979-01-01 1979-01-01T06:00:00 ...
Data variables:
    t2m        (time, latitude, longitude) float64 240.6 240.6 240.6 ...

This is actually 51 GB of data once it’s uncompressed to 64-bit floats:

ds.nbytes * (2 ** -30)

Is this big data? No, not really; it fits on a single machine. But my laptop only has 16GB of memory, so I certainly could not analyze this dataset in memory.

The power of dask is that it enables analysis of “medium data” with tools that are not much more complex than those we already use for analyses that fit entirely into memory. In contrast, big data tools such as Hadoop and Spark require us to rewrite our analysis in a totally different paradigm. In most cases, we’ve found the cost of doing everyday work with big data is not worth the hefty price. Instead, we subsample aggressively, and work with small- to medium-sized data whenever possible. Dask expands our scale of easy-to-work-with data from “fits in memory” to “fits on disk.”

Computation with dask

Now, let’s try out some computation. You’ll quickly notice that applying operations to dask arrays is remarkably fast:

# convert from Kelvin to degrees Fahrenheit
%time 9 / 5.0 * (ds - 273.15) + 32
CPU times: user 26.1 ms, sys: 9.25 ms, total: 35.3 ms
Wall time: 28.6 ms
Dimensions:    (latitude: 256, longitude: 512, time: 52596)
  * latitude   (latitude) &gt;f4 89.4628 88.767 88.067 87.3661 86.6648 ...
  * longitude  (longitude) &gt;f4 0.0 0.703125 1.40625 2.10938 2.8125 ...
  * time       (time) datetime64[ns] 1979-01-01 1979-01-01T06:00:00 ...
Data variables:
    t2m        (time, latitude, longitude) float64 -26.58 -26.59 -26.6 ...

How is this possible? Dask uses a lazy computation model that divides large arrays into smaller blocks called chunks. Performing an operation on a dask array queues up a series of lazy computations that map across each chunk. These computations aren’t actually performed until values from a chunk are accessed.

So, when we print a dataset containing dask arrays, xray is only showing a preview of the data from the first few chunks. This means that xray can still be a useful tool for exploratory and interactive analysis even with large datasets.

To actually process all that data, let’s calculate and print the mean of that dataset:

%time float(ds.t2m.mean())
CPU times: user 3min 5s, sys: 1min 7s, total: 4min 13s
Wall time: 1min 3s

Notice that the computation used only 1 minute of wall clock time, but 4 minutes of CPU time — it’s definitely using my laptop’s 4 cores. This surface temperature field is 13.5 GB when stored as 16-bit integers on disk, meaning that my laptop churned through the data at \~210 MB/s.


Like pandas, xray excels at split-apply-combine operations. These types of grouped operations are key to many analysis tasks. For example, one of the most common things to do with a weather dataset is to understand its seasonal cycle.

With dask, xray’s expressive groupby syntax scales to datasets on disk. As an example, here’s how we calculate the average difference between summer and winter surface temperature 1:

# everything is fast until we compute values
ds_by_season = ds.groupby('time.season').mean('time')
t2m_range = abs(ds_by_season.sel(season='JJA')
- ds_by_season.sel(season='DJF')).t2m
CPU times: user 134 ms, sys: 18.4 ms, total: 153 ms
Wall time: 145 ms
# now we calculate the actual result
%time result = t2m_range.load()
CPU times: user 2min 3s, sys: 51 s, total: 2min 54s
Wall time: 44.8 s
plt.figure(figsize=(15, 6))
plt.imshow(result, cmap='cubehelix', vmin=0, vmax=60,
extent=[0, 360, -90, 90], interpolation='nearest')
plt.title('Summer minus winter mean temperature')
plt.colorbar(label='degrees C', extend='max');


A quick aside on climate science: we see that Eastern Siberia has the world’s largest seasonal temperature swings. It is far from equator, which means that the amount of sunlight varies dramatically between winter and summer. But just as importantly, the climate is highly continental: northeast Asia is about as far away from the moderating effects of the oceans as is possible, because prevailing winds in the middle latitudes blow West to East.

The beauty of the dask.array design is that adding these operations to xray was almost no trouble at all: all we had to do is call functions like da.mean and da.concatenate instead of the numpy functions with the same names.

Computation graphs

Under the hood, dask represents deferred computations as graphs. We can look at these graphs directly to better understand and visualize our computations. Looking at these graphs also helps illustrate how dask itself works.

Here’s the graph corresponding to the above computation (limited to only two years of data). It shows how data flows as chunks through a series of functions that manipulate and combine the data, starting from files storing one month of data each:


When asked for a computation result, Dask approaches graphs like this holistically. We never end up computing either of the right two branches in the above figure, corresponding to spring and fall data, because we never use them. After an optimization pass, the graph is consolidated into a much smaller set of tasks:


Each circle corresponds to a function evaluation on a chunk of an array handled by dask’s multi-threaded executor.

What’s next?

Last week, we released version 0.5 of xray with this experimental dask integration, giving you the features we just demonstrated. You can install it with conda:

$ conda install xray dask netcdf4

Dask will also allow us to write powerful new features for xray. For example, we can use dask as a backend for distributed computation that lets us automatically parallelize grouped operations written like ds.groupby('some variable').apply(f), even if f is a function that only knows how to act on NumPy arrays.

You can read more about what we’ve done so far in the documentation for dask.array and xray. Matthew Rocklin has recently written about the State of Dask on his blog.

If you try xray and/or dask for your own problems, we’d love to hear how it goes, either in the comments on this post or in our issue tracker.

Thanks to Matthew Rocklin, Leon Barrett, and Todd Small for their feedback on drafts of this article. It is cross posted on the Continuum blog and my personal website.

  1. In this example, 'time.season' is using a special datetime component syntax to indicate that season should be calculated on the fly from the time variable. 
Posted in Engineering

Numba vs Cython: How to Choose

My name is Stephan, and I’m a scientist on the Climatology team at The Climate Corporation. We make extensive use of Python to build statistical weather models, and sometimes we need our code to be fast. Here’s how I choose between Numba and Cython, two of the best options for accelerating numeric Python code.

Most of the time, libraries like NumPy, SciPy and pandas, whose critical loops are already written in a compiled language like C, are enough fast scientific Python code. Unfortunately, sometimes you need to write your own loop in performance critical paths of your code, and also unfortunately, loops in Python are painfully slow. This is where Numba and Cython come in: they both promise the ability to write the inner loop of your code in something that looks a lot like normal Python, but that runs about as fast as handwritten C.

Numba uses LLVM to power Just-In-Time compilation of array oriented Python code. Using Numba is usually about as simple as adding a decorator to your functions:

from numba import jit

def numba_mean(x):
    total = 0
    for xi in x:
        total += xi
    return total / len(x)

You can supply optional types, but they aren’t required for performant code as Numba can compile functions on the fly using its JIT compiler.

In contrast, Cython is a general purpose tool, not just for array oriented computing, that compiles Python into C extensions. To see impressive speedups, you need to manually add types:

def cython_mean(double[:] x):
    cdef double total = 0
    for i in range(len(x)):
        total += x[i]
    return total / len(x)

When I benchmark this example, IPython’s %timeit reports that calling this function on a 100000 element array takes ~16 ms with pure Python version, but only ~93 µs with Numba and ~96 µs with Cython.1

This trivial example illustrates my broader experience with Numba and Cython: both are pretty easy to use, and result in roughly equivalently fast code. For similar results on a less contrived example, see this blog post by Jake VanderPlas.

The bottom line is that even though performance is why we reach for tools like Numba and Cython, it doesn’t provide a good basis for choosing one over the other. So here are the questions I ask myself when making that choice for my projects.

Will other people be deploying your code?

Cython is easier to distribute than Numba, which makes it a better option for user facing libraries. It’s the preferred option for most of the scientific Python stack, including NumPy, SciPy, pandas and Scikit-Learn. In contrast, there are very few libraries that use Numba. I know of two, both of which are basically in the experimental phase: Blaze and my project numbagg.

The main issue is that it can be difficult to install Numba unless you use Conda, which is a great tool, but not one everyone wants to use. In contrast, distributing a package with Cython based C-extensions is almost miraculous easy. Cython is also a more stable and mature platform, whereas the features and performance of Numba are still evolving.

If you don’t need to distribute your code beyond your computer or your team (especially if you use Conda), then Numba can be a great choice. Otherwise, you should lean toward Cython.

Do you need advanced Python features or to use C-level APIs?

The features that Numba supports in the accelerated nopython mode are very limited. For example:

  • Numba only accelerates code that uses scalars or (N-dimensional) arrays. You can’t use built-in types like list or dict or your own custom classes.
  • You can’t allocate new arrays in accelerated code.
  • You can’t use recursion.

Some of these are design decisions; in other cases, these are being actively worked on.

In contrast, Cython can compile arbitrary Python code, and can even directly call C. The ability to “cythonize” an entire module written using advanced Python features and then only tweak the bottlenecks for speed can be really nice.

For example, switching to an extension type and calling C APIs directly can make for big differences in speed, even if you still rely on builtin Python types like lists or dictionaries. Writing
something like cyordereddict in Numba would be nearly impossible.

Do you want to write code that works on N-dimensional arrays?

Suppose you want a function that takes several arguments and returns a scalar or array, depending on the number of provided arguments. For example, consider a function that averages two numbers:

def average(a, b):
    return 0.5 * (a + b)

One of the most powerful features of NumPy is that this simple function would work even if a or b are multi-dimensional arrays (tensors), by following broadcasting rules.

Numba makes it easy to accelerate functions with broadcasting by simply adding the vectorize decorator. This produces universal functions (ufuncs) that automatically work (even preserving labels) on array-like data structures in the entire scientific Python ecosystem, including xray (my project) and pandas. In other cases, Numba can handle arbitrary dimensional input by using Just-In-Time compilation with jit or by creating generalized universal functions with guvectorize.

In contrast, generally speaking, your Cython functions will only work for input with a number of dimensions that you determine ahead of time (e.g., a 1D vector, but not a scalar or 2D array). It certainly possible to do this sort of stuff with Cython, but it’s not easy, and you’ll need to get your hands dirty with the NumPy C-API. Keith Goodman has some nice examples in version 1.0 of bottleneck.

Still not sure?

When I’m not constrained by other concerns, I’ll try to make Numba work. Numba is usually easier to write for the simple cases where it works. You may still run into annoying limitations when you try to do complex things, but Numba has been getting a lot better, even just over the past few months (e.g., they recently added support for generating random numbers). At the end of the day, even if you ultimately can’t get things to work, you’ll still have idiomatic Python code that should be easy to accelerate with Cython.

This post is cross posted to my personal website.

  1. numpy.mean is faster still, at ~60 µs, but here we’re pretending that we need to write our own custom function that is not already built in. It’s still impressive that we’re only 50% slower than highly tuned C. 
Tagged with: , , , ,
Posted in Engineering

S3 Distributed Version Restore

At The Climate Corporation, we work with Amazon Web Services a lot. As you can imagine, weather data is very plentiful. We keep lots of data in Amazon’s Simple Storage Service (S3), which is basically a great big filesystem in the sky, and in Mandoline, our array data store, which can use S3 as a backend. It can take some effort to deal with so much data, but we make that effort willingly, since we need it for important things like predicting fertilizer needs for fields.

To help manage all this data, we use S3’s advanced features, such as versioning. When versioning is enabled, every time a file in S3 is updated or deleted, the old version remains stored in S3 as a backup, and we can access it with a special API call. Then, if we accidentally overwrite or delete a file, we can make an API call to list all that file’s versions, choose one, and then use another API call to restore that old version.

However, at Climate, we often manipulate our weather data with automated tools. That means that if we mess up, we can mess up in a really big way. With millions of files and billions of versions, the API for listing and restoring old versions might take weeks to restore them all. In order to fix mistakes made by automated tools, at large scale, we need another automated tool, one that can restore lots of files in a hurry. Unfortunately, as far as we know, there wasn’t one provided.

So, we’ve made just that automated tool, S3DistVersions, to let us restore lots of files efficiently. For it to perform well, it needs lots of simple parallelism, so we used Hadoop MapReduce, a parallel batch processing system. Because its primary task is to access Amazon S3, we designed it to work well inside Elastic MapReduce (EMR), Amazon’s on-demand Hadoop service.

We based the interface for S3DistVersions on that of S3DistCp, a batch tool for copying data in S3. Given a time and a location in S3, S3DistVersions will restore all files there to their state at the specified time. (Because each file has its own list of versions, you can’t specify a version to restore them all to. Instead, we find it easier to specify a time, and S3DistVersions selects the appropriate version for each file.) It takes simple arguments:

java -jar S3DistVersions-0.1.0.jar
  --src s3://mybucket/mypath
  --prefixes s3://mybucket/list-of-s3-prefixes.txt
  --restore-time 2014-01-01T14:00:00+07:00


The hardest thing to deal with was obtaining parallelism while working with S3’s versioning API. Suppose we wanted to restore all the files under the location s3://mybucket/mypath. Unfortunately, S3 only permits access to the versions of files under a location via a (serial) paging API, so we have to make a long sequence of API calls. Sure, each request might return a thousand versions, but it would take a long time to get through a million files and all their versions at that rate. In order to restore those files in a useful amount of time, we need to access the version information in parallel.

(Note: We can’t just start by listing the files under s3://mybucket/mypath because S3’s file-listing API won’t show deleted files. Instead, we have to use the version-listing API.)

To get parallelism while listing versions, we ask the user to provide a list of “prefixes”–the beginnings of the filenames that appear under s3://mybucket/mypath. Then, we can make requests for versions in those separate places in parallel. (For instance, given prefixes /a/1 and /b/2, we can scan for versions under s3://mybucket/mypath/a/1 and s3://mybucket/mypath/b/2 in parallel.) We need that parallelism in order to get through our billions of versions in a reasonable amount of time; for smaller restorations, it’s perfectly fine to omit the list of prefixes. And of course, parallelism is easy in the restore step, so the prefix list is only needed for parallelism in listing versions.

In practice, we find that it’s not hard to use a prefix list like this. We have our files spread out in many different subdirectories, so we use that list of subdirectories as our prefixes.

We designed this program to follow the recommended advice for partitioning keys. S3 keeps track of filenames (keys) in an index that is spread across many different Amazon computers in sequential chunks. It’s sort of like the way conferences have registration tables organized by segments of the alphabet (e.g. ‘A-C’, ‘D-F’, …) to distribute the load. Just as you wouldn’t want to have all the people with ‘A’ names come in at the same time, we don’t want our program to access the list of files in sequential order. Therefore, when S3DistVersions interacts with S3 files or lists their versions, it first shuffles the filenames or filename prefixes. That way, it spreads the load across Amazon’s S3 index cluster.


First of all, for this project, as for most things we do, we used Clojure, a programming language that’s based on Lisp and runs on the Java Virtual Machine (JVM). Those parents give it simplicity and speed, plus access to a large range of useful libraries. It’s also an opinionated language that prefers a functional, immutable style. We like that opinion; for instance, functional code is easier to reason about in bulk, since we can be confident it doesn’t have weird side effects. That also happens to work great with parallel data processing such as MapReduce.

We also often use Cascalog, a MapReduce tool for Clojure. However, here we wanted tight control over keys, grouping, and whether work happens in mappers or reducers, so we tried Parkour, which had some exceptions in EMR. Ultimately, we used clojure-hadoop. It isn’t as elegant as Cascalog in some ways, but it does give us the control we want.


Now, we are confident that when we mess up lots of files in S3, we can fix the problem at scale. We are pleased to release S3DistVersions, and we hope that it will help others work with S3’s versioning more easily. We welcome contributions and feedback, so please let us know how it works for you.

Tagged with: , , , ,
Posted in Engineering, Software Engineering

A Summary of Kyle Kingsbury’s Distributed Systems Workshop

Last week The Climate Corporation invited Kyle Kingsbury — inventor of Jepsen and distributed-systems-expert-about-town — to lead a workshop on distributed computing theory and practice at our San Francisco office.

Kyle’s work focuses on the necessity of theory to distributed computing practice — it’s a task that’s almost impossible to get right on a first try, and you need all the theoretical assistance you can get. You can see empirical application of distributed computing theory to real-world system in his series of Jepsen blog posts.

This post will follow a rough outline of Kyle’s workshop, starting with theoretical definitions and concepts, and ending with applying those definitions and concepts to discuss and evaluate distributed computing practice. Enjoy!

What is a Distributed System?

Kyle began the discussion with the most fundamental question: what does it mean when we call a system “distributed”? His definition — any system made up of parts that interact slowly or unreliably — is familiar, but by noting that the definition of “slow” is highly dependent on context, he opens up the “distributed” label to a wide range of systems that don’t immediately appear to be distributed. This could include:

  • NUMA architectures
  • making plans via SMS
  • Point-of-sale terminals
  • mobile applications
  • communicating threads or processes on a single computer
  • and of course, computers communicating via TCP/UDP over a LAN or the internet.

If the inter-part communication is, say, an order of magnitude slower than the intra-part communication, then you have a distributed system.

Asynchronous Networks

Since all networks in the real world take time to propagate information (and that information can be arbitrarily delayed or lost), understanding timing (and uncertainty of timing) is crucial to being able to reason about the behavior of the system. An important tool Kyle introduced for that purpose is the light cone diagram (by analogy to relativity and spacetime).

two threads accessing a shared variable

Two threads accessing a shared variable.

Because the operations take time to execute (and return a result), the ordering of those operations is ambiguous in this example. Thread 2 can know the time it initiated the read and the time it received the result, but it can’t know the time the read operation was applied on the memory — it could return either the old or new value.


We want to be able to define an ordering for events. However, keeping track of time in a distributed system is difficult. There are a number of types of clocks used for ordering events in a distributed system.

Wall clocks. Nodes track “actual” (human) time, likely by talking with an NTP (Network Time Protocol) server. However, in practice the synchronization doesn’t work well enough to track a precise enough time. The hardware frequently drifts, sometimes by centuries, not to mention that POSIX time is not monotonic.

Lamport clocks. Each node has a counter that it increments with each state transition. The counter is included with every message and, on receiving a message, a node sets its counter to the one included on the message, if greater than its own. This gives a partial ordering of events. If event b is causally dependent on event a, then C(a) < C(b).

Vector clocks. Vector clocks generalize Lamport clocks. The clock is a vector of counters, one for each node. When executing an operation, a node increments its counter in the vector, and again includes the clock with every message to other nodes. When receiving a message, a node takes the pairwise maximum of its clock and the one in the message. Orders more events than Lamport clocks.

Dotted version vectors. “Better vector clocks.” Still a partial ordering, but orders more events.

GPS & Atomic clocks. Much better than NTP. Gives a globally distributed total orders on the scale of milliseconds, which means you can perform one operation (that’s allowed to conflict) per uncertainty window. But it’s really expensive.

Consistency Models

As software developers, we expect the systems we program against to behave correctly. But there are many possible definitions of correct behavior, which we call consistency models.

Each consistency model defines what sequence of operations on a state machine are valid. In the diagram below, a parent-child relationship between two models means any valid sequence of operations in the parent is valid in the child. That is, requirements are stricter as you move up the tree, which makes it easier to reason about the system in question, but incurs performance and availability penalties.

A hierarchy of consistency models.

A hierarchy of consistency models.

Kyle himself has an excellent write-up of these consistency models on his blog, but I’ll give a brief description of a few here.

Linearizability – There appears to be a single global state where operations are applied linearly.

Sequential consistency – Writes appear in the same order to all readers. (But a read may not reflect the “current” state.)

Causal consistency – Only the order of *causally related* operations is linear. Unrelated (concurrent) operations may appear in any order to readers.

Linearizability/Strong serializability is the “golden standard” of consistency models. But there are performance and availability tradeoffs as you move up the tree. It is impossible to have total availability of a linearizable (or even sequentially consistent) system.

Convergent Replicated Data Types

CRDTs are a class of eventually consistent data types that work by supporting an idempotent, commutative and associative merge operation. The operation merges the histories of two (diverging) versions of the data structure. Some thorough and accessible examples of CRDTs can be found in Kyle’s project here.

Building Consensus

To achieve stricter consistency models in a distributed system, it is necessary to achieve consensus on certain facts among a majority of nodes. The Paxos algorithm is the notoriously difficult to understand gold standard of consensus algorithms, introduced by Leslie Lamport in his 1970 paper, The Part-Time Parliament.

Since it’s publication, it has inspired a family of similar algorithms with optimizations in performance or accessibility, including Multi-Paxos, Fast Paxos and Generalized Paxos.

In Search of an Understandable Consensus Algorithm (Ongaro & Ousterhout, 2014) is a recent publication describing the Raft consensus algorithm that, like it says on the tin, is intended to be (more) easily understood than Paxos, while retaining similar guarantees and performance characteristics.

A Pattern Language

Kyle’s number one piece of advice for building a distributed system is don’t do it if you don’t have to. Maybe your problem can be solved by getting a bigger box, or tolerating some downtime.

Number two is to use someone else’s distributed system. Pay Amazon to do it, or at least use battle-tested software.

Number three is don’t fail. You can build really reliable hardware and networks at the cost of moving slowly, buying expensive hardware and hiring really good talent.

But, if your system runs long enough, it will eventually fail, so accept it gracefully.

Take frequent backups. Done correctly, they give you sequential consistency. When taking a backup of a database, understand the transactional semantics. (For example, copying filesystem state of a MySQL database may not reflect a consistent transactional state at every point in time, or filesystem may change during backup process. Better to use mysqldump which preserves database semantics.)

Use immutable values when possible. Data that never changes is trivial to store because it does not require coordination.

If you require mutable values, instead use mutable identities (pointers to immutable values). Can fit a huge number of mutable pointers in a linearizable RDBMS while immutable values are stored in S3 or other eventually consistent stores. This is similar to how Datomic works.

Some operations can return fractional results or tolerate fractional availability. (E.g., search results, video or voice call quality, analytics, map tiles.) This lets you gracefully degrade the app experience while trying to recover, rather than suddenly crashing. Feature flags that turn off application features in response to system health metrics can be used to the same end.

Service owners should write client libraries for their services, and the libraries should include mock I/O to allow consumers to test locally without having to set up (and keep in sync) a local instance of the dependency. Include the library version in the request headers so you can track uptake of new versions and go after the slowpokes.

Particularly important when moving from monolithic to distributed architectures is to test everything, all the time. This includes correctness invariants of course, but also performance invariants. Simulate network failures, performance test against large data sets. Test against production data sets since they are always different from test data.

Always bound the size of queues in your application, or you can get in a situation where work takes arbitrarily long to complete. Monitor enqueue and dequeue rates on the queue (throughput) as well as the time it takes a message to be worked after being enqueued (latency).


Thanks to Sebastian Galkin for organizing the event, Dylan Pittman for his notes and diagrams, and of course, Kyle Kingsbury for his time, energy and expertise.

Posted in Engineering

Multidimensional Arrays with Mandoline

At the Climate Corporation, we have a great demand for storing large amounts of raster-based data, and an even greater demand to retrieve small amounts of it quickly.  Mandoline is our distributed, immutable, versioned database for storing big multidimensional data.  We use it for storing weather data, elevation data, satellite imagery, and other kinds of data.  It is one of the core systems that we use in production.


What can Mandoline do for me?

Mandoline can store your multidimensional array data in a versionable way that doesn’t bloat your storage.  When you don’t know what your query pattern is, or when you want to preserve past versions of your data, Mandoline may be the solution for you.

Some more details

  • It’s a clojure library.
  • Using the Mandoline library, we have built services both to access and to ingest data in a distributed fashion.  The reasons for doing it this way is so that our scientists have easy and language agnostic access to the data, and so that it plays well with existing scientific tools, like netCDF libraries and applications in any language.
  • When we say distributed, we mean that you can have distributed reads and writes from different machines to your dataset.
  • Mandoline uses swappable backends, and can save actual data to different backends.  In production, it currently runs on Amazon’s DynamoDB.  For testing purposes, we can either use the sqlite or in memory backends.
  • We want to expand the backend offerings to other databases like Cassandra and HBase.
  • Mandoline takes advantage of shared structure to make immutability and versions possible.

For more of an introduction to Mandoline (formerly known as Doc Brown), here is a video of the talk I gave at Clojure/West 2014.



Mandoline was primarily written by Brian Davis, Alice Liang, and Sebastian Galkin.  Brian Davis and Steve Kim have been the main push to open source Mandoline for use with the general public.

Posted in Engineering

Service Oriented Authorization – Part 3

As a follow up to Part 1 and Part 2 here is the video of the talk I gave at RailsConf last month.

Disclaimer: This is my first conference talk, ever!

Slides available on Speakerdeck https://speakerdeck.com/lumberj/authorization-in-a-service-oriented-environment

Tagged with: , , ,
Posted in Engineering

Get every new post delivered to your Inbox.

Join 489 other followers