Amazon ECS at The Climate Corporation: Using Amazon ECR and Multiple Accounts for Isolated Regression Testing

This is a post from Nathan Mehl, a Senior Staff Engineer, on the Operations Engineering team at The Climate Corporation.

Like many large users of AWS, The Climate Corporation uses multiple AWS accounts in order to provide strict isolation between a production environment and various pre-production environments (testing, staging, etc.). Applications are continuously deployed from a build/CI system (Jenkins CI) into a testing environment, and then, after either automated or manual testing (depending on the application and team), are “promoted” into a staging/pre-production environment where full regression/load tests are done, and then finally promoted again into production.

But when it came time to evaluate the use of Amazon EC2 Container Registry (Amazon ECR) and Amazon EC2 Container Service (Amazon ECS), our multi-account deployment presented a challenge: if the unit of deployment is a container, and there are potentially multiple full-fledged independent accounts where that container could be running, how do we safely and conveniently manage the lifecycle of an individual container image as it makes its way through the testing pipeline and into staging and production?

One obvious approach that we quickly discarded was simply having an independent ECR deployment in each account. The problem with this approach is that it would be extremely difficult to know the current deployment state of any given image without building and maintaining an external state-tracking system that could easily fall out of sync with ground truth in ECR. Also, “promoting” an image between environments would require copying it from one account’s ECR registry to another’s: in addition to being slow, this would require careful construction of inter-account IAM policies to let a single client pull from ECR in account A and then push to account B.

We chose to have a single “ECR registry of record” in the AWS account that hosts our Jenkins continuous integration system: Jenkins builds create the container images and push them to ECR. An out-of-band scheduled AWS Lambda function process iterates over the list of ECR repositories and applies a registry access policy that allows all of our other accounts read access.

The next question was: how to track the state of any given container image as it moved through the different accounts/environments on its way to production? Again, we could build an external state-tracking system, but after some thought we realized that we could use the tag metadata offered by the v2.2 Docker Manifest specification and the ECR put_image API operation to provide the same information.

To track the state of the container image through the continuous deployment pipelines, we use multiple Docker tags to ensure that the image’s current state is implicit in the current state of its repository in ECR. Each image has a unique tag applied to it at creation time, what we call the BUILD TAG. The build tag is treated as immutable: once created, none of our other tooling alters it. The build tag ties the image back to a particular build in Jenkins and to a particular git hash, for example for build #11 of the “ecs-demo” project:

Image Tag Image Digest
2016.08.24T17.13.38Z.5ad95f2-ecs-demo-11 sha256:749a1f13eff516cc4fcbc2a9a28ea1685440a1fbf29b

But as our tooling “promotes” the image from our testing environment to staging to production, it adds or updates additional tags—which we call ENVIRONMENT TAGS—pointing at the same image.

When a new image is built in our Jenkins continuous integration server (usually because a git pull request has been approved and merged to the master branch of that project’s repository), a common build script takes care of building the image, applying the build tag and the “testing” environment tag, and then pushing the image and both of those tags to ECR and then kicking off the deployment into our testing environment. A tool called, simply enough, “promote” can be used either by automated processes or humans to move an image from the testing environment to the staging, or from there to production.

For example, if build #11 had made it through the testing and staging environments but had not yet hit production, there might be two images in play, but five tags:

Image Tag Image Digest
2016.08.24T17.13.38Z.5ad95f2-ecs-demo-11 sha256:749a1f13eff516cc4fcbc2a9a28ea1685440a1fbf29b
2016.08.24T17.13.38Z.5ad95f2-ecs-demo-10 sha256:62985ec242857128fa0acea55e3c760e85594d6a2868
testing sha256:749a1f13eff516cc4fcbc2a9a28ea1685440a1fbf29b
staging sha256:749a1f13eff516cc4fcbc2a9a28ea1685440a1fbf29b
production sha256:62985ec242857128fa0acea55e3c760e85594d6a2868

Later on, when build #11 is promoted to production but build #12 has hit the testing environment, the “testing” tag is applied to the same image as the build tag for build #12:

Image Tag Image Digest
2016.08.24T17.13.38Z.5ad95f2-ecs-demo-12 sha256:c79b3e5b3459eb6f0d08a26eb304b8b70235d2eb7622
2016.08.24T17.13.38Z.5ad95f2-ecs-demo-11 sha256:749a1f13eff516cc4fcbc2a9a28ea1685440a1fbf29b
testing sha256:c79b3e5b3459eb6f0d08a26eb304b8b70235d2eb7622
staging sha256:749a1f13eff516cc4fcbc2a9a28ea1685440a1fbf29b
production sha256:749a1f13eff516cc4fcbc2a9a28ea1685440a1fbf29b

The key here is that each image can potentially have multiple tags pointing at it, and the state of those image-to-tag mappings tells us where the image is in the CD pipeline.

So how do we manage all of these tags in ECR?

Tying this all together requires a bit of code: the Docker manifest format does not allow for the same tag to be associated with multiple images, and that’s a good thing as otherwise we would have no way of enforcing the uniqueness of a tag-to-hash mapping.

But that means that in order to find out what that mapping is, we need to iterate over the list of images in the repository using ECR’s batch_get_image API and then derive the mapping of build tag to environment tag: if two tags point to the same image digest, we can infer that they are both tagging the same image:

import re
from collections import defaultdict

BUILD_TAG_RE = re.compile(r'^\d{4}\.\d{2}\.\d{2}T\d{2}\.\d{2}\.\d{2}Z')
ENVS = frozenset(['testing', 'staging', 'production'])
REGISTRY = ''
ECR = '%s.dkr.us-west-2.amazonaws.com' % REGISTRY

def get_tags_by_image(all_images):
    """ Iterate over the output of batch_get_image; return a dictionary
        that maps image digest hashes to a set of image tags."""
    images = defaultdict(set)
    for image in all_images:
        images[image['imageId']['imageDigest']].add(
            image['imageId']['imageTag'])
    return images

def get_equivalent_tags(all_images):
    """ Iterate over the output of batch_get_image; return a dictionary
        that maps environment tag names to build tag names. """
    equivs = {}
    tags_by_image = get_tags_by_image(all_images)
    for tags in tags_by_image.itervalues():
        # any given tag can only apply to one image, but
        # an image may have multiple tags. So each possible
        # tag will only ever appear one time in one of the images
        # dict's value lists. Index those lists by the env tag.
        if len(tags) > 1:
            env_tags = [x for x in tags if x in ENVS]
            build_tags = [x for x in tags if BUILD_TAG_RE.match(x)]
            for env in env_tags:
                for build in build_tags:
                    # this is safe because `env` will only ever
                    # occur once in tags_by_image.values()
                    equivs[env] = build
    return equivs

def get_all_images(ecr, repo, tag_digest_list):
    """ Query ECR for all image manifests for the images listed
        in tag_digest_list. Return the 'images' section of the
        response; warn if there are errors. 
    """
    all_images = ecr.batch_get_image(
        registryId=REGISTRY,
        repositoryName=repo,
        imageIds=tag_digest_list
    )
    if all_images['failures']:
        # not even sure this could ever happen as we're
        # feeding it directly from ecr.list_images()...
        log.warning(
            'Some Docker images could not be fetched:\n %s',
            all_images['failures'])
    return all_images['images']

def get_digests_and_tags(ecr, repo):
    """ Iterate over the paginated output of ecr.list_images for
        our repository; return a list of
        {"imageTag": "foo", "imageDigest": "bar"} dicts
    """
    try:
        response_images = []
        paginator = ecr.get_paginator('list_images')
        for page in paginator.paginate(
                registryId=REGISTRY,
                filter={'tagStatus': 'TAGGED'},
                repositoryName=repo):
            response_images.extend(page['imageIds'])
    except Exception as e:
        log.fatal(
            'Failed to fetch images for {0} from ECR'.format(repo))
        raise e
    return response_images

def main(argv):
    repo = sys.argv[1]
    creds = get_creds()
    ecr = boto3.client('ecr')
    tag_digest_list = get_digests_and_tags(ecr, repo)
    all_images = get_all_images(ecr, repo, tag_digest_list)
    equivs = get_equivalent_tags(all_images)
    print json.dumps(equivs, indent=2)

This code dumps out a JSON object with the current mapping of environment tag to build tag:

$ list_tags.py oe/ecs-demo
{
"staging": "2016.08.24T17.13.38Z.5ad95f2-ecs-demo-11",
"production": "2016.08.24T17.13.38Z.5ad95f2-ecs-demo-11",
"testing": "2016.08.24T17.13.38Z.5ad95f2-ecs-demo-12"
}

The last problem to solve is updating the tags as the image moves along the pipeline. We could, of course, use the standard Docker tools to download the image, tag it, and push it back up:

$ eval $(aws ecr get-login)
$ docker pull oe/ecs-demo:2016.08.24T17.13.38Z.5ad95f2-ecs-demo-12
$ docker tag oe/ecs-demo:2016.08.24T17.13.38Z.5ad95f2-ecs-demo-12 oe/ecs-demo:staging
$ docker push oe/ecs-demo:staging

But pulling down a multi-gigabyte container image in order to change a few bytes worth of tag data is slow and time-consuming. Surely, there’s a better way?

As it happens, there is! The new v2.2 manifest format for Docker images finally separated the tag text from the secure hash of the image layers, and the most recent version of the ECR API lets us push up new image manifests and specify tags to apply to them as part of the request via the ecr.put_image API. All this means that we can easily create new manifests with the same contents but different tags, without having to actually pull down the layers themselves.

So, after we know the current mapping, updating the image’s tags is a matter of finding out the SHA256 hash that the build tag currently maps to, grabbing the manifest for that hash from ECR, and pushing the manifest back up to ECR using the ECR put_image API and setting a new tag there. Building on the code above:

def push_new_image(ecr, repo, manifest, dst_env_tag):
    """ Attach the desired tag to an existing image in our
        repository by creating a new image with the exact
        same manifest but a different tag name.
    """
    response = ecr.put_image(
        registryId=REGISTRY,
        repositoryName=repo,
        imageManifest=manifest,  # same manifest
        imageTag=dst_env_tag) # new tag!
    return response['image']['imageId']

def get_img_manifest(build_tag, all_images):
    """ Iterate over the output of ecr.batch_get_image();
        return the manifest of the first image with
        an ImageTag matching build_tag.
    """
    for image in all_images:
        if image['imageId']['imageTag'] == build_tag:
             return image['imageManifest']
    raise Exception('Manifest Not Found!')

def promote_tag(ecr, repo, equivs, all_images, src_env_tag, dst_env_tag):
    """ Update the state of our Docker repo, telling it that the destination
        environment tag should now be associated with the same image as the
        source environment tag.
    """
    # "equivs" is the mapping of environment tags to build tags, from
    # get_equivalent_tags(), above
    build_tag = equivs[src_env_tag]
    manifest = get_img_manifest(build_tag, all_images)
    response = push_new_image(ecr, repo, manifest, dst_env_tag)
    log.info('Created new image: \n%s', response)

This code:

  1. Finds the build tag that corresponds to the same image hash as our source environment tag, by using the “equivs” dict that uses get_equvalent_tags() built in the previous code example.
  2. Gets the image manifest for that build tag.
  3. Pushes that manifest back up to ECR using put_image, but using the imageTag attribute to attach the name of the destination environment to the manifest.

After this process is done, it’s reflected in the mappings for any new client that comes along to look, for example, if we promoted build #12 from testing to staging:

$ list_tags.py oe/ecs-demo
{
  "staging": "2016.08.24T17.13.38Z.5ad95f2-ecs-demo-12",
  "production": "2016.08.24T17.13.38Z.5ad95f2-ecs-demo-11",
  "testing": "2016.08.24T17.13.38Z.5ad95f2-ecs-demo-12"
}

Of course, that doesn’t actually deploy any code, it just moves the tags around to reflect what the deployment system has done. How that part works might be material for another post.

Conclusion

By using the ecr.put_image() API and ECR support for the v2.2 Docker Manifest format, we’ve implemented state tracking for our containers using nothing but the Docker tagging system. There’s no external state database to keep synced up: the intended state of each image can be found by using ecr.batch_get_image().

Posted in Engineering

TCC Field Experiments on Sub-Field Variability of Weather

Hi, I’m Adam Pasch, Weather Data Manager and a member of the Meteorology team at TCC.  I’ll be presenting a poster at American Meteorological Society (AMS) Annual Meeting next week talking about some initial analyses of data from TCC field experiments on Climate Research Farms completed in 2015.

TCC Field Experiments of Sub-Field Variability of Precipitation and Wind [PDF]Academic - Science behind Nitrogen Advisor

We test our advice on real farms and provide agronomically-relevant, field-specific weather information to growers (over 75 million acres in 2015).  We conducted three field experiments during 2015: Sub-Field Variability, Interstation Comparison, and Remotely-Sensed Precipitation at four research locations throughout the Midwest.

We found that within a 140-acre field, rainfall can vary by 65% of the intensity. In other words, if the true rainfall is 1″, a gauge on the field might report anywhere from 0.7″ to 1.3″ simply because of where in the field that gauge is.

modeloutput

 

 

 

 

 

 

Another advantage of deploying our own gauges was that it allowed us to evaluate our rainfall algorithms. We found that TCC QPE was more accurate than other commercial options.

Pasch_TCCCRF_AMS

 

 

 

We also found dramatic variation of wind across a farmer’s field. Here’s an example of a field in Iowa and you can see that average wind speeds vary from 0.8 to 1.5 (m/s).

upload

I’m looking forward to AMS. Please reach out with questions: adam.pasch@climate.com.

Tagged with: , ,
Posted in Engineering, Science

Big Data, Cloud and NOAA CRADA’s with AWS

New Way

Hi, I’m Lak and I lead the Meteorology team at TCC.  I’ll be on a panel at the American Meteorological Society Annual Meeting next week talking about the NOAA Big Data cooperative research and development agreement with Amazon Web Services (we wrote a blog post about this CRADA in October).

To start off, the panelists make some opening remarks using a few slides. These are mine:

Big Data, Cloud and NOAA CRADA at the Climate Corporation [PDF]

I will use my opening remarks to introduce the concepts of Big Data and the Cloud and will then talk about how having NEXRAD data on AWS enables faster and better science at TCC.

This was the old way to carry out radar research at TCC — notice how much preparation we had to do just to get the data to work with:


Old WayHaving the NEXRAD data readily available on S3 makes the research process much shorter:

New Way

As as result, science at TCC is faster and because we can do the science faster, we carry out research studies on larger, more representative datasets. Using more representative data makes our science better.

Just as it has helped us, the NOAA CRADA opens up many opportunities for national labs, private companies and universities to create Big Data tools and services.  I’m looking forward to the panel discussion!

 

Tagged with: , , ,
Posted in Engineering, Science

How to read and display Nexrad on AWS using Python

Amazon Web Services (AWS) is making NEXRAD data freely available on Amazon S3 as part of the NOAA Big Data Project. In this Python notebook, I will step you through being able to read and display this data from your Python programs. I will assume that you know Python, how to install Python modules, and can access AWS.  (Follow along by downloading and running this iPython notebook).

What you need

You probably have ipython and matplotlib already. In addition, you may need to install the following Python modules:

  1. boto which I installed using conda:
    conda install boto
  2. pyart which I installed using conda:
    conda install -c https://conda.anaconda.org/jjhelmus pyart

You may also need to configure your AWS credentials to access S3.

Import modules

import matplotlib.pyplot as plt
import numpy.ma as ma
import numpy as np
import pyart.graph
import tempfile
import pyart.io
import boto

Find volume scan

Volume scans from NEXRAD are stored on S3 such that the bucket name is noaa-nexrad-level2 and the key name is something like 2014/04/03/KLSX/KLSX20140403_211350_V06.gz i.e. YYYY/MM/DD/KRAD/KRADYYYYMMDD_HHmmSS_V0?.gz. You can use the boto library to browse and select the keys that you want.

Here, I’ll directly navigate to a NEXRAD file that I know exists.

# read a volume scan file on S3. I happen to know this file exists.
s3conn = boto.connect_s3()
bucket = s3conn.get_bucket('noaa-nexrad-level2')
s3key = bucket.get_key('2015/05/15/KVWX/KVWX20150515_080737_V06.gz')
print s3key

Download volume scan from S3

For further processing, it is helpful to have the file locally on disk, so let’s download it:

# download to a local file, and read it
localfile = tempfile.NamedTemporaryFile()
s3key.get_contents_to_filename(localfile.name)
radar = pyart.io.read_nexrad_archive(localfile.name)

Display the data

Then, let’s use PyArt to display the lowest elevation scan data of all the moments.

# display the lowest elevation scan data
display = pyart.graph.RadarDisplay(radar)
fig = plt.figure(figsize=(9, 12))

plots = [
    # variable-name in pyart, display-name that we want, sweep-number of radar (0=lowest ref, 1=lowest velocity)
    ['reflectivity', 'Reflectivity (dBZ)', 0],
    ['differential_reflectivity', 'Zdr (dB)', 0],
    ['differential_phase', 'Phi_DP (deg)', 0],
    ['cross_correlation_ratio', 'Rho_HV', 0],
    ['velocity', 'Velocity (m/s)', 1],
    ['spectrum_width', 'Spectrum Width', 1]
]

def plot_radar_images(plots):
    ncols = 2
    nrows = len(plots)/2
    for plotno, plot in enumerate(plots, start=1):
        ax = fig.add_subplot(nrows, ncols, plotno)
        display.plot(plot[0], plot[2], ax=ax, title=plot[1],
             colorbar_label='',
             axislabels=('East-West distance from radar (km)' if plotno == 6 else '', 
                         'North-South distance from radar (km)' if plotno == 1 else ''))
        display.set_limits((-300, 300), (-300, 300), ax=ax)
        display.set_aspect_ratio('equal', ax=ax)
        display.plot_range_rings(range(100, 350, 100), lw=0.5, col='black', ax=ax)
    plt.show()

plot_radar_images(plots)

The result will look like this:

Plots of NEXRAD data

Processing data

Not all the reflectivity above is because of weather echoes. Most of the data are actually bioscatter (insects and birds). Let’s mask the reflectivity data using the polarimetric variables to retain only approximately spherical echoes.

refl_grid = radar.get_field(0, 'reflectivity')
print refl_grid[0]
rhohv_grid = radar.get_field(0, 'cross_correlation_ratio')
zdr_grid = radar.get_field(0, 'differential_reflectivity')

# apply rudimentary quality control
reflow = np.less(refl_grid, 20)
zdrhigh = np.greater(np.abs(zdr_grid), 2.3)
rhohvlow = np.less(rhohv_grid, 0.95)
notweather = np.logical_or(reflow, np.logical_or(zdrhigh, rhohvlow))
print notweather[0]

qcrefl_grid = ma.masked_where(notweather, refl_grid)
print qcrefl_grid[0]

# let's create a new object containing only sweep=0 so we can add the QC'ed ref to it for plotting
qced = radar.extract_sweeps([0])
qced.add_field_like('reflectivity', 'reflectivityqc', qcrefl_grid)
display = pyart.graph.RadarDisplay(qced)
fig = plt.figure(figsize=(11, 5))

plots = [
    # variable-name in pyart, display-name that we want, sweep-number of radar (0=lowest ref, 1=lowest velocity)
    ['reflectivity', 'Reflectivity (dBZ)', 0],
    ['reflectivityqc', 'QCed Reflectivity (dBZ)', 0],
]
plot_radar_images(plots)

 

qced

Our simple quality control removes much of the bioscatter and retains the meteorological signal in the north-northeast. However, it is restricted to radial distances below 300 km where polarimetric radar variables are available, and some noise remains. See http://journals.ametsoc.org/doi/abs/10.1175/JTECH-D-13-00073.1 for a better quality-control technique.

Acknowledgment

Thanks to NOAA and Amazon for making the NEXRAD data available.

Tagged with: , ,
Posted in Engineering

Squeedo: Simplified Amazon SQS processing in Clojure

Inspiration

At The Climate Corporation, we use various forms of data and and Clojure based computations to help farmers make decisions about their operations.  In order to do this quickly and efficiently, we distribute the processing across machines, often using SQS, Amazon’s queue service.  Autoscaling of EC2 takes care of adding new machines for us, but we want to make sure we get the most from each machine—in a large organization, wasted resources add up quickly.

We also found that code to support queue-processing backends was quite similar: listen to SQS, read messages from the queue, process in a thread pool, and Ack the messages. We’d rather use a library to handle all that plumbing and just let us focus on the compute logic of what to actually do with the message.

With these two goals in mind (efficient use of EC2 resources, especially cpu, and reducing message processing to a simple pure function), we set out to build Squeedo.

The first workload we were trying to solve looked like this:

SQS -> Several second Http I/O API call -> Several Second CPU Intensive calculation -> Ack/Nack SQS

We had a double whammy of a long running synchronous http call using  clj-http along with a heavy cpu intensive process after that.
After several iterations of basing this plumbing code on threadpools, we quickly found that we couldn’t get the kind of throughput we wanted simply by tuning only the number of threads. We needed something more dynamic, that would adapt better to the number of cores it ran on and wouldn’t require us to figure out the “right” number of threads to use in the pool.  Additionally, we wanted a higher level abstraction that felt closer to message processing, rather than low level threadpools.

After reading this blog post http://martintrojer.github.io/clojure/2013/07/07/coreasync-and-blocking-io/ explaining how core.async could be used to handle blocking IO, we changed our line of thinking, and Squeedo was born.  The key was to combine asynchronous http using http-kit for I/O along with core.async for handling polling and acking SQS, message concurrency, and the amount of outstanding work.

Simple Usage: an http-kit example with non-blocking I/O

To use Squeedo, add the Leiningen dependency [com.climate/squeedo “0.1.0”] to your project.clj.

In its simplest form, Squeedo is composed of only 2 parts, a compute function and a consumer.

(require '[com.climate.squeedo.sqs-consumer 
            :refer [start-consumer stop-consumer]]
         '[org.httpkit.client :as client]
         '[clojure.core.async :refer [go >!]])

(defn- eat-some-cpu
  [how-much]
  (reduce + (range 1 how-much)))

(defn compute
  [message done-channel]
  ;; do something expensive
  (eat-some-cpu 1000000)
  ;; do this if you will have I/O
  (client/get "http://google.com"
    (fn [response]
      (go
        ;; do some more processing with the response
        (eat-some-cpu 1000000)
        (>! done-channel message)))))

(def consumer (start-consumer "my-sqs-queue" compute 
                              :num-listeners 10 
                              :max-concurrent-work 50)) 
;; when done listening 
;; (stop-consumer consumer) 

The compute function must put to the done-channel.  Squeedo listens to the done-channel and will ack/nack each message from SQS for you and passes your compute function another message to process. You must put to the done-channel even for exceptions or Squeedo will think you have outstanding work and not pass you another message to process (any uncaught exception will be automatically nack’d).

Using this simple pattern and higher level abstraction, we’ve processed 100s of millions of messages with cpu utilization like what you see below (taken on an m3.large EC2 instance).

Screen Shot 2015-09-11 at 8.42.48 AM

Simple Tuning and Configuration

One of the great things about Squeedo is the advanced configuration options that can be used to tune the consumer to your workflow beyond what the very reasonable defaults do out of the box.

  • :message-channel-size – the number of messages to prefetch from SQS; default 20 * num-listeners. Prefetching messages allow us to keep the compute function continuously busy without having to wait for more to be first pulled from the remote SQS queue. Make sure to set the timeout appropriately when you create the queue.
  • :num-workers – the number of workers processing messages concurrently. This controls how many workers actually process messages at any one time. (defaults to number of CPU’s – 1 or 1 if single core). Squeedo works best with 2 or more CPU’s. This is not the amount of work that can be outstanding at any one time, that is controlled below with :max-concurrent-work.
  • :num-listeners – the number of listeners polling from SQS. default is (num-workers /dequeue-limit) since each listener dequeues up to dequeue-limit messages at a time. If you have a really fast process, you can actually starve the compute function of messages and thus need more listeners pulling from SQS.
  • :dequeue-limit – the number of messages to dequeue at a time; default 10
  • :max-concurrent-work – the maximum number of total messages processed concurrently. This is mainly for async workflows where you can have work started and are waiting for parked IO threads to complete; default num-workers. This allows you to always keep the CPU’s busy by having data returned by IO ready to be processed. Its really a memory game at this point — how much data you can buffer that’s ready to be processed by your asynchronous http clients.
  • :dl-queue-name – the dead letter SQS queue to which repeatedly failed messages will go.  A message that fails to process the maximum number of SQS receives is sent to the dead letter queue. (see SQS Redrive Policy)  The queue will be created if necessary.  Defaults to (str QUEUE-NAME \”-failed\”) where QUEUE-NAME is the name of the queue passed into the start-consumer function.

For more information or to contribute and ask for new features, check out the Squeedo on Github. Thanks for reading!

Tagged with: , ,
Posted in Engineering, Software Engineering

xray + dask: Out-of-core, labeled arrays in Python

Xray provides labeled, multi-dimensional arrays. Dask provides a system for parallel computing. Together, they allow for easy analysis of scientific datasets that don’t fit into memory.

An introduction to xray

To make it easier to work with scientific data in Python, we researchers at The Climate Corporation wrote an open source library called xray. Xray provides data structures for multi-dimensional labeled arrays, building on NumPy and pandas. Our goal was to make it easier to work with labeled, gridded datasets from physical sensors and models. At Climate, we use it to analyze weather data and satellite images.

The key idea of xray is that it uses metadata in the form of labeled dimensions (e.g., “time”) and coordinate values (e.g., the date “2015-04-10”) to enable a suite of expressive, label based operations. Here’s a quick example of how we might manipulate a 3D array of temperature data with xray:

ds.sel(time='2014-12-11').max(['latitude', 'longitude'])
<xray.Dataset>
Dimensions:  (time: 4)
Coordinates:
  * time     (time) datetime64[ns] 2014-12-11 2014-12-11T06:00:00 ...
Data variables:
    t2m      (time) float64 311.4 316.2 312.0 309.6

For comparison, here’s the how we might write that using only NumPy:

# temperature contains 4x daily values for December 2014
# axis labels are (time, latitude, longitude)
ds.t2m.values[[40, 41, 42, 43]].max(axis=(1, 2));

Once we’ve done our computation, every xray dataset can be easily serialized to and from disk, using the portable and efficient netCDF file format (a dialect of HDF5):

ds.to_netcdf('my-dataset_.nc')
saved_ds = xray.open_dataset('my-dataset_.nc')
assert saved_ds.equals(ds)

Or you might pass your data off to pandas for further analysis in tabular form:

df = ds.to_dataframe()

Without a tool like xray, we would need to keep track of metadata manually in order to preserve it in operations and save it with our results when we’re done. This is burdensome, especially for exploratory analysis, so we often don’t bother. Even when metadata is present, without xray it’s often easier not to use the metadata directly in our code. Magic constants end up littering our scripts, like the numbers 1 and 2 in our NumPy example above. Implicit assumptions about our data are left in comments, or worse, are not even stated at all.

Xray provides short-term incentives for providing metadata, in the form of additional expressive power. But the real payoff is enhanced readability and reproducibility in the long term when you or a colleague come back to your code or data six months from now.

Loading medium-sized weather data with dask

Today I will talk about how we have extended xray to work with datasets that don’t fit into memory by integrating it with dask. Dask is a new Python library that extends NumPy to out-of-core datasets by blocking arrays into small chunks and executing on those chunks in parallel. It allows xray to easily process large data and also simultaneously make use of all of our CPU resources.

Weather data — especially models from numerical weather prediction — can be big. The world uses its biggest supercomputers to generate weather and climate forecasts that run on increasingly high resolution grids. Even for data analysis purposes, it’s easy to need to process 10s or 100s of GBs of data.

For this blog post, we’ll work with a directory of netCDF files downloaded from the European Centre for Medium-Range Weather Forecast’s ERA-Interim reanalysis with the new xray function to open a multi-file dataset, xray.open_mfdataset:

import numpy as np
import matplotlib.pyplot as plt
import xray

ds = xray.open_mfdataset('/Users/shoyer/data/era-interim/2t/*.nc', engine='scipy')
ds.attrs = {} # for brevity, hide attributes

Previewing the dataset, we see that it consists of 35 years of 6-hourly surface temperature data at a spatial resolution of roughly 0.7 degrees:

repr(ds)
<xray.Dataset>
Dimensions:    (latitude: 256, longitude: 512, time: 52596)
Coordinates:
  * latitude   (latitude) &gt;f4 89.4628 88.767 88.067 87.3661 86.6648 ...
  * longitude  (longitude) &gt;f4 0.0 0.703125 1.40625 2.10938 2.8125 ...
  * time       (time) datetime64[ns] 1979-01-01 1979-01-01T06:00:00 ...
Data variables:
    t2m        (time, latitude, longitude) float64 240.6 240.6 240.6 ...

This is actually 51 GB of data once it’s uncompressed to 64-bit floats:

ds.nbytes * (2 ** -30)
51.363675981760025

Is this big data? No, not really; it fits on a single machine. But my laptop only has 16GB of memory, so I certainly could not analyze this dataset in memory.

The power of dask is that it enables analysis of “medium data” with tools that are not much more complex than those we already use for analyses that fit entirely into memory. In contrast, big data tools such as Hadoop and Spark require us to rewrite our analysis in a totally different paradigm. In most cases, we’ve found the cost of doing everyday work with big data is not worth the hefty price. Instead, we subsample aggressively, and work with small- to medium-sized data whenever possible. Dask expands our scale of easy-to-work-with data from “fits in memory” to “fits on disk.”

Computation with dask

Now, let’s try out some computation. You’ll quickly notice that applying operations to dask arrays is remarkably fast:

# convert from Kelvin to degrees Fahrenheit
%time 9 / 5.0 * (ds - 273.15) + 32
CPU times: user 26.1 ms, sys: 9.25 ms, total: 35.3 ms
Wall time: 28.6 ms
<xray.Dataset>
Dimensions:    (latitude: 256, longitude: 512, time: 52596)
Coordinates:
  * latitude   (latitude) &gt;f4 89.4628 88.767 88.067 87.3661 86.6648 ...
  * longitude  (longitude) &gt;f4 0.0 0.703125 1.40625 2.10938 2.8125 ...
  * time       (time) datetime64[ns] 1979-01-01 1979-01-01T06:00:00 ...
Data variables:
    t2m        (time, latitude, longitude) float64 -26.58 -26.59 -26.6 ...

How is this possible? Dask uses a lazy computation model that divides large arrays into smaller blocks called chunks. Performing an operation on a dask array queues up a series of lazy computations that map across each chunk. These computations aren’t actually performed until values from a chunk are accessed.

So, when we print a dataset containing dask arrays, xray is only showing a preview of the data from the first few chunks. This means that xray can still be a useful tool for exploratory and interactive analysis even with large datasets.

To actually process all that data, let’s calculate and print the mean of that dataset:

%time float(ds.t2m.mean())
CPU times: user 3min 5s, sys: 1min 7s, total: 4min 13s
Wall time: 1min 3s
278.64617415292236

Notice that the computation used only 1 minute of wall clock time, but 4 minutes of CPU time — it’s definitely using my laptop’s 4 cores. This surface temperature field is 13.5 GB when stored as 16-bit integers on disk, meaning that my laptop churned through the data at \~210 MB/s.

Split-apply-combine

Like pandas, xray excels at split-apply-combine operations. These types of grouped operations are key to many analysis tasks. For example, one of the most common things to do with a weather dataset is to understand its seasonal cycle.

With dask, xray’s expressive groupby syntax scales to datasets on disk. As an example, here’s how we calculate the average difference between summer and winter surface temperature 1:

%%time
# everything is fast until we compute values
ds_by_season = ds.groupby('time.season').mean('time')
t2m_range = abs(ds_by_season.sel(season='JJA')
- ds_by_season.sel(season='DJF')).t2m
CPU times: user 134 ms, sys: 18.4 ms, total: 153 ms
Wall time: 145 ms
# now we calculate the actual result
%time result = t2m_range.load()
CPU times: user 2min 3s, sys: 51 s, total: 2min 54s
Wall time: 44.8 s
plt.figure(figsize=(15, 6))
plt.imshow(result, cmap='cubehelix', vmin=0, vmax=60,
extent=[0, 360, -90, 90], interpolation='nearest')
plt.title('Summer minus winter mean temperature')
plt.colorbar(label='degrees C', extend='max');

seasonal-differences-map

A quick aside on climate science: we see that Eastern Siberia has the world’s largest seasonal temperature swings. It is far from equator, which means that the amount of sunlight varies dramatically between winter and summer. But just as importantly, the climate is highly continental: northeast Asia is about as far away from the moderating effects of the oceans as is possible, because prevailing winds in the middle latitudes blow West to East.

The beauty of the dask.array design is that adding these operations to xray was almost no trouble at all: all we had to do is call functions like da.mean and da.concatenate instead of the numpy functions with the same names.

Computation graphs

Under the hood, dask represents deferred computations as graphs. We can look at these graphs directly to better understand and visualize our computations. Looking at these graphs also helps illustrate how dask itself works.

Here’s the graph corresponding to the above computation (limited to only two years of data). It shows how data flows as chunks through a series of functions that manipulate and combine the data, starting from files storing one month of data each:

seasonal-differences-dask

When asked for a computation result, Dask approaches graphs like this holistically. We never end up computing either of the right two branches in the above figure, corresponding to spring and fall data, because we never use them. After an optimization pass, the graph is consolidated into a much smaller set of tasks:

seasonal-differences-optimized-dask

Each circle corresponds to a function evaluation on a chunk of an array handled by dask’s multi-threaded executor.

What’s next?

Last week, we released version 0.5 of xray with this experimental dask integration, giving you the features we just demonstrated. You can install it with conda:

$ conda install xray dask netcdf4

Dask will also allow us to write powerful new features for xray. For example, we can use dask as a backend for distributed computation that lets us automatically parallelize grouped operations written like ds.groupby('some variable').apply(f), even if f is a function that only knows how to act on NumPy arrays.

You can read more about what we’ve done so far in the documentation for dask.array and xray. Matthew Rocklin has recently written about the State of Dask on his blog.

If you try xray and/or dask for your own problems, we’d love to hear how it goes, either in the comments on this post or in our issue tracker.


Thanks to Matthew Rocklin, Leon Barrett, and Todd Small for their feedback on drafts of this article. It is cross posted on the Continuum blog and my personal website.


  1. In this example, 'time.season' is using a special datetime component syntax to indicate that season should be calculated on the fly from the time variable. 
Posted in Engineering

Numba vs Cython: How to Choose

My name is Stephan, and I’m a scientist on the Climatology team at The Climate Corporation. We make extensive use of Python to build statistical weather models, and sometimes we need our code to be fast. Here’s how I choose between Numba and Cython, two of the best options for accelerating numeric Python code.

Most of the time, libraries like NumPy, SciPy and pandas, whose critical loops are already written in a compiled language like C, are enough fast scientific Python code. Unfortunately, sometimes you need to write your own loop in performance critical paths of your code, and also unfortunately, loops in Python are painfully slow. This is where Numba and Cython come in: they both promise the ability to write the inner loop of your code in something that looks a lot like normal Python, but that runs about as fast as handwritten C.

Numba uses LLVM to power Just-In-Time compilation of array oriented Python code. Using Numba is usually about as simple as adding a decorator to your functions:

from numba import jit

@jit
def numba_mean(x):
    total = 0
    for xi in x:
        total += xi
    return total / len(x)

You can supply optional types, but they aren’t required for performant code as Numba can compile functions on the fly using its JIT compiler.

In contrast, Cython is a general purpose tool, not just for array oriented computing, that compiles Python into C extensions. To see impressive speedups, you need to manually add types:

def cython_mean(double[:] x):
    cdef double total = 0
    for i in range(len(x)):
        total += x[i]
    return total / len(x)

When I benchmark this example, IPython’s %timeit reports that calling this function on a 100000 element array takes ~16 ms with pure Python version, but only ~93 µs with Numba and ~96 µs with Cython.1

This trivial example illustrates my broader experience with Numba and Cython: both are pretty easy to use, and result in roughly equivalently fast code. For similar results on a less contrived example, see this blog post by Jake VanderPlas.

The bottom line is that even though performance is why we reach for tools like Numba and Cython, it doesn’t provide a good basis for choosing one over the other. So here are the questions I ask myself when making that choice for my projects.

Will other people be deploying your code?

Cython is easier to distribute than Numba, which makes it a better option for user facing libraries. It’s the preferred option for most of the scientific Python stack, including NumPy, SciPy, pandas and Scikit-Learn. In contrast, there are very few libraries that use Numba. I know of two, both of which are basically in the experimental phase: Blaze and my project numbagg.

The main issue is that it can be difficult to install Numba unless you use Conda, which is a great tool, but not one everyone wants to use. In contrast, distributing a package with Cython based C-extensions is almost miraculous easy. Cython is also a more stable and mature platform, whereas the features and performance of Numba are still evolving.

If you don’t need to distribute your code beyond your computer or your team (especially if you use Conda), then Numba can be a great choice. Otherwise, you should lean toward Cython.

Do you need advanced Python features or to use C-level APIs?

The features that Numba supports in the accelerated nopython mode are very limited. For example:

  • Numba only accelerates code that uses scalars or (N-dimensional) arrays. You can’t use built-in types like list or dict or your own custom classes.
  • You can’t allocate new arrays in accelerated code.
  • You can’t use recursion.

Some of these are design decisions; in other cases, these are being actively worked on.

In contrast, Cython can compile arbitrary Python code, and can even directly call C. The ability to “cythonize” an entire module written using advanced Python features and then only tweak the bottlenecks for speed can be really nice.

For example, switching to an extension type and calling C APIs directly can make for big differences in speed, even if you still rely on builtin Python types like lists or dictionaries. Writing
something like cyordereddict in Numba would be nearly impossible.

Do you want to write code that works on N-dimensional arrays?

Suppose you want a function that takes several arguments and returns a scalar or array, depending on the number of provided arguments. For example, consider a function that averages two numbers:

def average(a, b):
    return 0.5 * (a + b)

One of the most powerful features of NumPy is that this simple function would work even if a or b are multi-dimensional arrays (tensors), by following broadcasting rules.

Numba makes it easy to accelerate functions with broadcasting by simply adding the vectorize decorator. This produces universal functions (ufuncs) that automatically work (even preserving labels) on array-like data structures in the entire scientific Python ecosystem, including xray (my project) and pandas. In other cases, Numba can handle arbitrary dimensional input by using Just-In-Time compilation with jit or by creating generalized universal functions with guvectorize.

In contrast, generally speaking, your Cython functions will only work for input with a number of dimensions that you determine ahead of time (e.g., a 1D vector, but not a scalar or 2D array). It certainly possible to do this sort of stuff with Cython, but it’s not easy, and you’ll need to get your hands dirty with the NumPy C-API. Keith Goodman has some nice examples in version 1.0 of bottleneck.

Still not sure?

When I’m not constrained by other concerns, I’ll try to make Numba work. Numba is usually easier to write for the simple cases where it works. You may still run into annoying limitations when you try to do complex things, but Numba has been getting a lot better, even just over the past few months (e.g., they recently added support for generating random numbers). At the end of the day, even if you ultimately can’t get things to work, you’ll still have idiomatic Python code that should be easy to accelerate with Cython.


This post is cross posted to my personal website.


  1. numpy.mean is faster still, at ~60 µs, but here we’re pretending that we need to write our own custom function that is not already built in. It’s still impressive that we’re only 50% slower than highly tuned C. 
Tagged with: , , , ,
Posted in Engineering