TCC Field Experiments on Sub-Field Variability of Weather

Hi, I’m Adam Pasch, Weather Data Manager and a member of the Meteorology team at TCC.  I’ll be presenting a poster at American Meteorological Society (AMS) Annual Meeting next week talking about some initial analyses of data from TCC field experiments on Climate Research Farms completed in 2015.

TCC Field Experiments of Sub-Field Variability of Precipitation and Wind [PDF]Academic - Science behind Nitrogen Advisor

We test our advice on real farms and provide agronomically-relevant, field-specific weather information to growers (over 75 million acres in 2015).  We conducted three field experiments during 2015: Sub-Field Variability, Interstation Comparison, and Remotely-Sensed Precipitation at four research locations throughout the Midwest.

We found that within a 140-acre field, rainfall can vary by 65% of the intensity. In other words, if the true rainfall is 1″, a gauge on the field might report anywhere from 0.7″ to 1.3″ simply because of where in the field that gauge is.








Another advantage of deploying our own gauges was that it allowed us to evaluate our rainfall algorithms. We found that TCC QPE was more accurate than other commercial options.





We also found dramatic variation of wind across a farmer’s field. Here’s an example of a field in Iowa and you can see that average wind speeds vary from 0.8 to 1.5 (m/s).


I’m looking forward to AMS. Please reach out with questions:

Tagged with: , ,
Posted in Engineering, Science

Big Data, Cloud and NOAA CRADA’s with AWS

New Way

Hi, I’m Lak and I lead the Meteorology team at TCC.  I’ll be on a panel at the American Meteorological Society Annual Meeting next week talking about the NOAA Big Data cooperative research and development agreement with Amazon Web Services (we wrote a blog post about this CRADA in October).

To start off, the panelists make some opening remarks using a few slides. These are mine:

Big Data, Cloud and NOAA CRADA at the Climate Corporation [PDF]

I will use my opening remarks to introduce the concepts of Big Data and the Cloud and will then talk about how having NEXRAD data on AWS enables faster and better science at TCC.

This was the old way to carry out radar research at TCC — notice how much preparation we had to do just to get the data to work with:

Old WayHaving the NEXRAD data readily available on S3 makes the research process much shorter:

New Way

As as result, science at TCC is faster and because we can do the science faster, we carry out research studies on larger, more representative datasets. Using more representative data makes our science better.

Just as it has helped us, the NOAA CRADA opens up many opportunities for national labs, private companies and universities to create Big Data tools and services.  I’m looking forward to the panel discussion!


Tagged with: , , ,
Posted in Engineering, Science

How to read and display Nexrad on AWS using Python

Amazon Web Services (AWS) is making NEXRAD data freely available on Amazon S3 as part of the NOAA Big Data Project. In this Python notebook, I will step you through being able to read and display this data from your Python programs. I will assume that you know Python, how to install Python modules, and can access AWS.  (Follow along by downloading and running this iPython notebook).

What you need

You probably have ipython and matplotlib already. In addition, you may need to install the following Python modules:

  1. boto which I installed using conda:
    conda install boto
  2. pyart which I installed using conda:
    conda install -c pyart

You may also need to configure your AWS credentials to access S3.

Import modules

import matplotlib.pyplot as plt
import as ma
import numpy as np
import pyart.graph
import tempfile
import boto

Find volume scan

Volume scans from NEXRAD are stored on S3 such that the bucket name is noaa-nexrad-level2 and the key name is something like 2014/04/03/KLSX/KLSX20140403_211350_V06.gz i.e. YYYY/MM/DD/KRAD/KRADYYYYMMDD_HHmmSS_V0?.gz. You can use the boto library to browse and select the keys that you want.

Here, I’ll directly navigate to a NEXRAD file that I know exists.

# read a volume scan file on S3. I happen to know this file exists.
s3conn = boto.connect_s3()
bucket = s3conn.get_bucket('noaa-nexrad-level2')
s3key = bucket.get_key('2015/05/15/KVWX/KVWX20150515_080737_V06.gz')
print s3key

Download volume scan from S3

For further processing, it is helpful to have the file locally on disk, so let’s download it:

# download to a local file, and read it
localfile = tempfile.NamedTemporaryFile()
radar =

Display the data

Then, let’s use PyArt to display the lowest elevation scan data of all the moments.

# display the lowest elevation scan data
display = pyart.graph.RadarDisplay(radar)
fig = plt.figure(figsize=(9, 12))

plots = [
    # variable-name in pyart, display-name that we want, sweep-number of radar (0=lowest ref, 1=lowest velocity)
    ['reflectivity', 'Reflectivity (dBZ)', 0],
    ['differential_reflectivity', 'Zdr (dB)', 0],
    ['differential_phase', 'Phi_DP (deg)', 0],
    ['cross_correlation_ratio', 'Rho_HV', 0],
    ['velocity', 'Velocity (m/s)', 1],
    ['spectrum_width', 'Spectrum Width', 1]

def plot_radar_images(plots):
    ncols = 2
    nrows = len(plots)/2
    for plotno, plot in enumerate(plots, start=1):
        ax = fig.add_subplot(nrows, ncols, plotno)
        display.plot(plot[0], plot[2], ax=ax, title=plot[1],
             axislabels=('East-West distance from radar (km)' if plotno == 6 else '', 
                         'North-South distance from radar (km)' if plotno == 1 else ''))
        display.set_limits((-300, 300), (-300, 300), ax=ax)
        display.set_aspect_ratio('equal', ax=ax)
        display.plot_range_rings(range(100, 350, 100), lw=0.5, col='black', ax=ax)


The result will look like this:

Plots of NEXRAD data

Processing data

Not all the reflectivity above is because of weather echoes. Most of the data are actually bioscatter (insects and birds). Let’s mask the reflectivity data using the polarimetric variables to retain only approximately spherical echoes.

refl_grid = radar.get_field(0, 'reflectivity')
print refl_grid[0]
rhohv_grid = radar.get_field(0, 'cross_correlation_ratio')
zdr_grid = radar.get_field(0, 'differential_reflectivity')

# apply rudimentary quality control
reflow = np.less(refl_grid, 20)
zdrhigh = np.greater(np.abs(zdr_grid), 2.3)
rhohvlow = np.less(rhohv_grid, 0.95)
notweather = np.logical_or(reflow, np.logical_or(zdrhigh, rhohvlow))
print notweather[0]

qcrefl_grid = ma.masked_where(notweather, refl_grid)
print qcrefl_grid[0]

# let's create a new object containing only sweep=0 so we can add the QC'ed ref to it for plotting
qced = radar.extract_sweeps([0])
qced.add_field_like('reflectivity', 'reflectivityqc', qcrefl_grid)
display = pyart.graph.RadarDisplay(qced)
fig = plt.figure(figsize=(11, 5))

plots = [
    # variable-name in pyart, display-name that we want, sweep-number of radar (0=lowest ref, 1=lowest velocity)
    ['reflectivity', 'Reflectivity (dBZ)', 0],
    ['reflectivityqc', 'QCed Reflectivity (dBZ)', 0],



Our simple quality control removes much of the bioscatter and retains the meteorological signal in the north-northeast. However, it is restricted to radial distances below 300 km where polarimetric radar variables are available, and some noise remains. See for a better quality-control technique.


Thanks to NOAA and Amazon for making the NEXRAD data available.

Tagged with: , ,
Posted in Engineering

Squeedo: Simplified Amazon SQS processing in Clojure


At The Climate Corporation, we use various forms of data and and Clojure based computations to help farmers make decisions about their operations.  In order to do this quickly and efficiently, we distribute the processing across machines, often using SQS, Amazon’s queue service.  Autoscaling of EC2 takes care of adding new machines for us, but we want to make sure we get the most from each machine—in a large organization, wasted resources add up quickly.

We also found that code to support queue-processing backends was quite similar: listen to SQS, read messages from the queue, process in a thread pool, and Ack the messages. We’d rather use a library to handle all that plumbing and just let us focus on the compute logic of what to actually do with the message.

With these two goals in mind (efficient use of EC2 resources, especially cpu, and reducing message processing to a simple pure function), we set out to build Squeedo.

The first workload we were trying to solve looked like this:

SQS -> Several second Http I/O API call -> Several Second CPU Intensive calculation -> Ack/Nack SQS

We had a double whammy of a long running synchronous http call using  clj-http along with a heavy cpu intensive process after that.
After several iterations of basing this plumbing code on threadpools, we quickly found that we couldn’t get the kind of throughput we wanted simply by tuning only the number of threads. We needed something more dynamic, that would adapt better to the number of cores it ran on and wouldn’t require us to figure out the “right” number of threads to use in the pool.  Additionally, we wanted a higher level abstraction that felt closer to message processing, rather than low level threadpools.

After reading this blog post explaining how core.async could be used to handle blocking IO, we changed our line of thinking, and Squeedo was born.  The key was to combine asynchronous http using http-kit for I/O along with core.async for handling polling and acking SQS, message concurrency, and the amount of outstanding work.

Simple Usage: an http-kit example with non-blocking I/O

To use Squeedo, add the Leiningen dependency [com.climate/squeedo “0.1.0”] to your project.clj.

In its simplest form, Squeedo is composed of only 2 parts, a compute function and a consumer.

(require '[com.climate.squeedo.sqs-consumer 
            :refer [start-consumer stop-consumer]]
         '[org.httpkit.client :as client]
         '[clojure.core.async :refer [go >!]])

(defn- eat-some-cpu
  (reduce + (range 1 how-much)))

(defn compute
  [message done-channel]
  ;; do something expensive
  (eat-some-cpu 1000000)
  ;; do this if you will have I/O
  (client/get ""
    (fn [response]
        ;; do some more processing with the response
        (eat-some-cpu 1000000)
        (>! done-channel message)))))

(def consumer (start-consumer "my-sqs-queue" compute 
                              :num-listeners 10 
                              :max-concurrent-work 50)) 
;; when done listening 
;; (stop-consumer consumer) 

The compute function must put to the done-channel.  Squeedo listens to the done-channel and will ack/nack each message from SQS for you and passes your compute function another message to process. You must put to the done-channel even for exceptions or Squeedo will think you have outstanding work and not pass you another message to process (any uncaught exception will be automatically nack’d).

Using this simple pattern and higher level abstraction, we’ve processed 100s of millions of messages with cpu utilization like what you see below (taken on an m3.large EC2 instance).

Screen Shot 2015-09-11 at 8.42.48 AM

Simple Tuning and Configuration

One of the great things about Squeedo is the advanced configuration options that can be used to tune the consumer to your workflow beyond what the very reasonable defaults do out of the box.

  • :message-channel-size – the number of messages to prefetch from SQS; default 20 * num-listeners. Prefetching messages allow us to keep the compute function continuously busy without having to wait for more to be first pulled from the remote SQS queue. Make sure to set the timeout appropriately when you create the queue.
  • :num-workers – the number of workers processing messages concurrently. This controls how many workers actually process messages at any one time. (defaults to number of CPU’s – 1 or 1 if single core). Squeedo works best with 2 or more CPU’s. This is not the amount of work that can be outstanding at any one time, that is controlled below with :max-concurrent-work.
  • :num-listeners – the number of listeners polling from SQS. default is (num-workers /dequeue-limit) since each listener dequeues up to dequeue-limit messages at a time. If you have a really fast process, you can actually starve the compute function of messages and thus need more listeners pulling from SQS.
  • :dequeue-limit – the number of messages to dequeue at a time; default 10
  • :max-concurrent-work – the maximum number of total messages processed concurrently. This is mainly for async workflows where you can have work started and are waiting for parked IO threads to complete; default num-workers. This allows you to always keep the CPU’s busy by having data returned by IO ready to be processed. Its really a memory game at this point — how much data you can buffer that’s ready to be processed by your asynchronous http clients.
  • :dl-queue-name – the dead letter SQS queue to which repeatedly failed messages will go.  A message that fails to process the maximum number of SQS receives is sent to the dead letter queue. (see SQS Redrive Policy)  The queue will be created if necessary.  Defaults to (str QUEUE-NAME \”-failed\”) where QUEUE-NAME is the name of the queue passed into the start-consumer function.

For more information or to contribute and ask for new features, check out the Squeedo on Github. Thanks for reading!

Tagged with: , ,
Posted in Engineering, Software Engineering

xray + dask: Out-of-core, labeled arrays in Python

Xray provides labeled, multi-dimensional arrays. Dask provides a system for parallel computing. Together, they allow for easy analysis of scientific datasets that don’t fit into memory.

An introduction to xray

To make it easier to work with scientific data in Python, we researchers at The Climate Corporation wrote an open source library called xray. Xray provides data structures for multi-dimensional labeled arrays, building on NumPy and pandas. Our goal was to make it easier to work with labeled, gridded datasets from physical sensors and models. At Climate, we use it to analyze weather data and satellite images.

The key idea of xray is that it uses metadata in the form of labeled dimensions (e.g., “time”) and coordinate values (e.g., the date “2015-04-10”) to enable a suite of expressive, label based operations. Here’s a quick example of how we might manipulate a 3D array of temperature data with xray:

ds.sel(time='2014-12-11').max(['latitude', 'longitude'])
Dimensions:  (time: 4)
  * time     (time) datetime64[ns] 2014-12-11 2014-12-11T06:00:00 ...
Data variables:
    t2m      (time) float64 311.4 316.2 312.0 309.6

For comparison, here’s the how we might write that using only NumPy:

# temperature contains 4x daily values for December 2014
# axis labels are (time, latitude, longitude)
ds.t2m.values[[40, 41, 42, 43]].max(axis=(1, 2));

Once we’ve done our computation, every xray dataset can be easily serialized to and from disk, using the portable and efficient netCDF file format (a dialect of HDF5):

saved_ds = xray.open_dataset('')
assert saved_ds.equals(ds)

Or you might pass your data off to pandas for further analysis in tabular form:

df = ds.to_dataframe()

Without a tool like xray, we would need to keep track of metadata manually in order to preserve it in operations and save it with our results when we’re done. This is burdensome, especially for exploratory analysis, so we often don’t bother. Even when metadata is present, without xray it’s often easier not to use the metadata directly in our code. Magic constants end up littering our scripts, like the numbers 1 and 2 in our NumPy example above. Implicit assumptions about our data are left in comments, or worse, are not even stated at all.

Xray provides short-term incentives for providing metadata, in the form of additional expressive power. But the real payoff is enhanced readability and reproducibility in the long term when you or a colleague come back to your code or data six months from now.

Loading medium-sized weather data with dask

Today I will talk about how we have extended xray to work with datasets that don’t fit into memory by integrating it with dask. Dask is a new Python library that extends NumPy to out-of-core datasets by blocking arrays into small chunks and executing on those chunks in parallel. It allows xray to easily process large data and also simultaneously make use of all of our CPU resources.

Weather data — especially models from numerical weather prediction — can be big. The world uses its biggest supercomputers to generate weather and climate forecasts that run on increasingly high resolution grids. Even for data analysis purposes, it’s easy to need to process 10s or 100s of GBs of data.

For this blog post, we’ll work with a directory of netCDF files downloaded from the European Centre for Medium-Range Weather Forecast’s ERA-Interim reanalysis with the new xray function to open a multi-file dataset, xray.open_mfdataset:

import numpy as np
import matplotlib.pyplot as plt
import xray

ds = xray.open_mfdataset('/Users/shoyer/data/era-interim/2t/*.nc', engine='scipy')
ds.attrs = {} # for brevity, hide attributes

Previewing the dataset, we see that it consists of 35 years of 6-hourly surface temperature data at a spatial resolution of roughly 0.7 degrees:

Dimensions:    (latitude: 256, longitude: 512, time: 52596)
  * latitude   (latitude) &gt;f4 89.4628 88.767 88.067 87.3661 86.6648 ...
  * longitude  (longitude) &gt;f4 0.0 0.703125 1.40625 2.10938 2.8125 ...
  * time       (time) datetime64[ns] 1979-01-01 1979-01-01T06:00:00 ...
Data variables:
    t2m        (time, latitude, longitude) float64 240.6 240.6 240.6 ...

This is actually 51 GB of data once it’s uncompressed to 64-bit floats:

ds.nbytes * (2 ** -30)

Is this big data? No, not really; it fits on a single machine. But my laptop only has 16GB of memory, so I certainly could not analyze this dataset in memory.

The power of dask is that it enables analysis of “medium data” with tools that are not much more complex than those we already use for analyses that fit entirely into memory. In contrast, big data tools such as Hadoop and Spark require us to rewrite our analysis in a totally different paradigm. In most cases, we’ve found the cost of doing everyday work with big data is not worth the hefty price. Instead, we subsample aggressively, and work with small- to medium-sized data whenever possible. Dask expands our scale of easy-to-work-with data from “fits in memory” to “fits on disk.”

Computation with dask

Now, let’s try out some computation. You’ll quickly notice that applying operations to dask arrays is remarkably fast:

# convert from Kelvin to degrees Fahrenheit
%time 9 / 5.0 * (ds - 273.15) + 32
CPU times: user 26.1 ms, sys: 9.25 ms, total: 35.3 ms
Wall time: 28.6 ms
Dimensions:    (latitude: 256, longitude: 512, time: 52596)
  * latitude   (latitude) &gt;f4 89.4628 88.767 88.067 87.3661 86.6648 ...
  * longitude  (longitude) &gt;f4 0.0 0.703125 1.40625 2.10938 2.8125 ...
  * time       (time) datetime64[ns] 1979-01-01 1979-01-01T06:00:00 ...
Data variables:
    t2m        (time, latitude, longitude) float64 -26.58 -26.59 -26.6 ...

How is this possible? Dask uses a lazy computation model that divides large arrays into smaller blocks called chunks. Performing an operation on a dask array queues up a series of lazy computations that map across each chunk. These computations aren’t actually performed until values from a chunk are accessed.

So, when we print a dataset containing dask arrays, xray is only showing a preview of the data from the first few chunks. This means that xray can still be a useful tool for exploratory and interactive analysis even with large datasets.

To actually process all that data, let’s calculate and print the mean of that dataset:

%time float(ds.t2m.mean())
CPU times: user 3min 5s, sys: 1min 7s, total: 4min 13s
Wall time: 1min 3s

Notice that the computation used only 1 minute of wall clock time, but 4 minutes of CPU time — it’s definitely using my laptop’s 4 cores. This surface temperature field is 13.5 GB when stored as 16-bit integers on disk, meaning that my laptop churned through the data at \~210 MB/s.


Like pandas, xray excels at split-apply-combine operations. These types of grouped operations are key to many analysis tasks. For example, one of the most common things to do with a weather dataset is to understand its seasonal cycle.

With dask, xray’s expressive groupby syntax scales to datasets on disk. As an example, here’s how we calculate the average difference between summer and winter surface temperature 1:

# everything is fast until we compute values
ds_by_season = ds.groupby('time.season').mean('time')
t2m_range = abs(ds_by_season.sel(season='JJA')
- ds_by_season.sel(season='DJF')).t2m
CPU times: user 134 ms, sys: 18.4 ms, total: 153 ms
Wall time: 145 ms
# now we calculate the actual result
%time result = t2m_range.load()
CPU times: user 2min 3s, sys: 51 s, total: 2min 54s
Wall time: 44.8 s
plt.figure(figsize=(15, 6))
plt.imshow(result, cmap='cubehelix', vmin=0, vmax=60,
extent=[0, 360, -90, 90], interpolation='nearest')
plt.title('Summer minus winter mean temperature')
plt.colorbar(label='degrees C', extend='max');


A quick aside on climate science: we see that Eastern Siberia has the world’s largest seasonal temperature swings. It is far from equator, which means that the amount of sunlight varies dramatically between winter and summer. But just as importantly, the climate is highly continental: northeast Asia is about as far away from the moderating effects of the oceans as is possible, because prevailing winds in the middle latitudes blow West to East.

The beauty of the dask.array design is that adding these operations to xray was almost no trouble at all: all we had to do is call functions like da.mean and da.concatenate instead of the numpy functions with the same names.

Computation graphs

Under the hood, dask represents deferred computations as graphs. We can look at these graphs directly to better understand and visualize our computations. Looking at these graphs also helps illustrate how dask itself works.

Here’s the graph corresponding to the above computation (limited to only two years of data). It shows how data flows as chunks through a series of functions that manipulate and combine the data, starting from files storing one month of data each:


When asked for a computation result, Dask approaches graphs like this holistically. We never end up computing either of the right two branches in the above figure, corresponding to spring and fall data, because we never use them. After an optimization pass, the graph is consolidated into a much smaller set of tasks:


Each circle corresponds to a function evaluation on a chunk of an array handled by dask’s multi-threaded executor.

What’s next?

Last week, we released version 0.5 of xray with this experimental dask integration, giving you the features we just demonstrated. You can install it with conda:

$ conda install xray dask netcdf4

Dask will also allow us to write powerful new features for xray. For example, we can use dask as a backend for distributed computation that lets us automatically parallelize grouped operations written like ds.groupby('some variable').apply(f), even if f is a function that only knows how to act on NumPy arrays.

You can read more about what we’ve done so far in the documentation for dask.array and xray. Matthew Rocklin has recently written about the State of Dask on his blog.

If you try xray and/or dask for your own problems, we’d love to hear how it goes, either in the comments on this post or in our issue tracker.

Thanks to Matthew Rocklin, Leon Barrett, and Todd Small for their feedback on drafts of this article. It is cross posted on the Continuum blog and my personal website.

  1. In this example, 'time.season' is using a special datetime component syntax to indicate that season should be calculated on the fly from the time variable. 
Posted in Engineering

Numba vs Cython: How to Choose

My name is Stephan, and I’m a scientist on the Climatology team at The Climate Corporation. We make extensive use of Python to build statistical weather models, and sometimes we need our code to be fast. Here’s how I choose between Numba and Cython, two of the best options for accelerating numeric Python code.

Most of the time, libraries like NumPy, SciPy and pandas, whose critical loops are already written in a compiled language like C, are enough fast scientific Python code. Unfortunately, sometimes you need to write your own loop in performance critical paths of your code, and also unfortunately, loops in Python are painfully slow. This is where Numba and Cython come in: they both promise the ability to write the inner loop of your code in something that looks a lot like normal Python, but that runs about as fast as handwritten C.

Numba uses LLVM to power Just-In-Time compilation of array oriented Python code. Using Numba is usually about as simple as adding a decorator to your functions:

from numba import jit

def numba_mean(x):
    total = 0
    for xi in x:
        total += xi
    return total / len(x)

You can supply optional types, but they aren’t required for performant code as Numba can compile functions on the fly using its JIT compiler.

In contrast, Cython is a general purpose tool, not just for array oriented computing, that compiles Python into C extensions. To see impressive speedups, you need to manually add types:

def cython_mean(double[:] x):
    cdef double total = 0
    for i in range(len(x)):
        total += x[i]
    return total / len(x)

When I benchmark this example, IPython’s %timeit reports that calling this function on a 100000 element array takes ~16 ms with pure Python version, but only ~93 µs with Numba and ~96 µs with Cython.1

This trivial example illustrates my broader experience with Numba and Cython: both are pretty easy to use, and result in roughly equivalently fast code. For similar results on a less contrived example, see this blog post by Jake VanderPlas.

The bottom line is that even though performance is why we reach for tools like Numba and Cython, it doesn’t provide a good basis for choosing one over the other. So here are the questions I ask myself when making that choice for my projects.

Will other people be deploying your code?

Cython is easier to distribute than Numba, which makes it a better option for user facing libraries. It’s the preferred option for most of the scientific Python stack, including NumPy, SciPy, pandas and Scikit-Learn. In contrast, there are very few libraries that use Numba. I know of two, both of which are basically in the experimental phase: Blaze and my project numbagg.

The main issue is that it can be difficult to install Numba unless you use Conda, which is a great tool, but not one everyone wants to use. In contrast, distributing a package with Cython based C-extensions is almost miraculous easy. Cython is also a more stable and mature platform, whereas the features and performance of Numba are still evolving.

If you don’t need to distribute your code beyond your computer or your team (especially if you use Conda), then Numba can be a great choice. Otherwise, you should lean toward Cython.

Do you need advanced Python features or to use C-level APIs?

The features that Numba supports in the accelerated nopython mode are very limited. For example:

  • Numba only accelerates code that uses scalars or (N-dimensional) arrays. You can’t use built-in types like list or dict or your own custom classes.
  • You can’t allocate new arrays in accelerated code.
  • You can’t use recursion.

Some of these are design decisions; in other cases, these are being actively worked on.

In contrast, Cython can compile arbitrary Python code, and can even directly call C. The ability to “cythonize” an entire module written using advanced Python features and then only tweak the bottlenecks for speed can be really nice.

For example, switching to an extension type and calling C APIs directly can make for big differences in speed, even if you still rely on builtin Python types like lists or dictionaries. Writing
something like cyordereddict in Numba would be nearly impossible.

Do you want to write code that works on N-dimensional arrays?

Suppose you want a function that takes several arguments and returns a scalar or array, depending on the number of provided arguments. For example, consider a function that averages two numbers:

def average(a, b):
    return 0.5 * (a + b)

One of the most powerful features of NumPy is that this simple function would work even if a or b are multi-dimensional arrays (tensors), by following broadcasting rules.

Numba makes it easy to accelerate functions with broadcasting by simply adding the vectorize decorator. This produces universal functions (ufuncs) that automatically work (even preserving labels) on array-like data structures in the entire scientific Python ecosystem, including xray (my project) and pandas. In other cases, Numba can handle arbitrary dimensional input by using Just-In-Time compilation with jit or by creating generalized universal functions with guvectorize.

In contrast, generally speaking, your Cython functions will only work for input with a number of dimensions that you determine ahead of time (e.g., a 1D vector, but not a scalar or 2D array). It certainly possible to do this sort of stuff with Cython, but it’s not easy, and you’ll need to get your hands dirty with the NumPy C-API. Keith Goodman has some nice examples in version 1.0 of bottleneck.

Still not sure?

When I’m not constrained by other concerns, I’ll try to make Numba work. Numba is usually easier to write for the simple cases where it works. You may still run into annoying limitations when you try to do complex things, but Numba has been getting a lot better, even just over the past few months (e.g., they recently added support for generating random numbers). At the end of the day, even if you ultimately can’t get things to work, you’ll still have idiomatic Python code that should be easy to accelerate with Cython.

This post is cross posted to my personal website.

  1. numpy.mean is faster still, at ~60 µs, but here we’re pretending that we need to write our own custom function that is not already built in. It’s still impressive that we’re only 50% slower than highly tuned C. 
Tagged with: , , , ,
Posted in Engineering

S3 Distributed Version Restore

At The Climate Corporation, we work with Amazon Web Services a lot. As you can imagine, weather data is very plentiful. We keep lots of data in Amazon’s Simple Storage Service (S3), which is basically a great big filesystem in the sky, and in Mandoline, our array data store, which can use S3 as a backend. It can take some effort to deal with so much data, but we make that effort willingly, since we need it for important things like predicting fertilizer needs for fields.

To help manage all this data, we use S3’s advanced features, such as versioning. When versioning is enabled, every time a file in S3 is updated or deleted, the old version remains stored in S3 as a backup, and we can access it with a special API call. Then, if we accidentally overwrite or delete a file, we can make an API call to list all that file’s versions, choose one, and then use another API call to restore that old version.

However, at Climate, we often manipulate our weather data with automated tools. That means that if we mess up, we can mess up in a really big way. With millions of files and billions of versions, the API for listing and restoring old versions might take weeks to restore them all. In order to fix mistakes made by automated tools, at large scale, we need another automated tool, one that can restore lots of files in a hurry. Unfortunately, as far as we know, there wasn’t one provided.

So, we’ve made just that automated tool, S3DistVersions, to let us restore lots of files efficiently. For it to perform well, it needs lots of simple parallelism, so we used Hadoop MapReduce, a parallel batch processing system. Because its primary task is to access Amazon S3, we designed it to work well inside Elastic MapReduce (EMR), Amazon’s on-demand Hadoop service.

We based the interface for S3DistVersions on that of S3DistCp, a batch tool for copying data in S3. Given a time and a location in S3, S3DistVersions will restore all files there to their state at the specified time. (Because each file has its own list of versions, you can’t specify a version to restore them all to. Instead, we find it easier to specify a time, and S3DistVersions selects the appropriate version for each file.) It takes simple arguments:

java -jar S3DistVersions-0.1.0.jar
  --src s3://mybucket/mypath
  --prefixes s3://mybucket/list-of-s3-prefixes.txt
  --restore-time 2014-01-01T14:00:00+07:00


The hardest thing to deal with was obtaining parallelism while working with S3’s versioning API. Suppose we wanted to restore all the files under the location s3://mybucket/mypath. Unfortunately, S3 only permits access to the versions of files under a location via a (serial) paging API, so we have to make a long sequence of API calls. Sure, each request might return a thousand versions, but it would take a long time to get through a million files and all their versions at that rate. In order to restore those files in a useful amount of time, we need to access the version information in parallel.

(Note: We can’t just start by listing the files under s3://mybucket/mypath because S3’s file-listing API won’t show deleted files. Instead, we have to use the version-listing API.)

To get parallelism while listing versions, we ask the user to provide a list of “prefixes”–the beginnings of the filenames that appear under s3://mybucket/mypath. Then, we can make requests for versions in those separate places in parallel. (For instance, given prefixes /a/1 and /b/2, we can scan for versions under s3://mybucket/mypath/a/1 and s3://mybucket/mypath/b/2 in parallel.) We need that parallelism in order to get through our billions of versions in a reasonable amount of time; for smaller restorations, it’s perfectly fine to omit the list of prefixes. And of course, parallelism is easy in the restore step, so the prefix list is only needed for parallelism in listing versions.

In practice, we find that it’s not hard to use a prefix list like this. We have our files spread out in many different subdirectories, so we use that list of subdirectories as our prefixes.

We designed this program to follow the recommended advice for partitioning keys. S3 keeps track of filenames (keys) in an index that is spread across many different Amazon computers in sequential chunks. It’s sort of like the way conferences have registration tables organized by segments of the alphabet (e.g. ‘A-C’, ‘D-F’, …) to distribute the load. Just as you wouldn’t want to have all the people with ‘A’ names come in at the same time, we don’t want our program to access the list of files in sequential order. Therefore, when S3DistVersions interacts with S3 files or lists their versions, it first shuffles the filenames or filename prefixes. That way, it spreads the load across Amazon’s S3 index cluster.


First of all, for this project, as for most things we do, we used Clojure, a programming language that’s based on Lisp and runs on the Java Virtual Machine (JVM). Those parents give it simplicity and speed, plus access to a large range of useful libraries. It’s also an opinionated language that prefers a functional, immutable style. We like that opinion; for instance, functional code is easier to reason about in bulk, since we can be confident it doesn’t have weird side effects. That also happens to work great with parallel data processing such as MapReduce.

We also often use Cascalog, a MapReduce tool for Clojure. However, here we wanted tight control over keys, grouping, and whether work happens in mappers or reducers, so we tried Parkour, which had some exceptions in EMR. Ultimately, we used clojure-hadoop. It isn’t as elegant as Cascalog in some ways, but it does give us the control we want.


Now, we are confident that when we mess up lots of files in S3, we can fix the problem at scale. We are pleased to release S3DistVersions, and we hope that it will help others work with S3’s versioning more easily. We welcome contributions and feedback, so please let us know how it works for you.

Tagged with: , , , ,
Posted in Engineering, Software Engineering