At The Climate Corporation, we work with Amazon Web Services a lot. As you can imagine, weather data is very plentiful. We keep lots of data in Amazon’s Simple Storage Service (S3), which is basically a great big filesystem in the sky, and in Mandoline, our array data store, which can use S3 as a backend. It can take some effort to deal with so much data, but we make that effort willingly, since we need it for important things like predicting fertilizer needs for fields.
To help manage all this data, we use S3’s advanced features, such as versioning. When versioning is enabled, every time a file in S3 is updated or deleted, the old version remains stored in S3 as a backup, and we can access it with a special API call. Then, if we accidentally overwrite or delete a file, we can make an API call to list all that file’s versions, choose one, and then use another API call to restore that old version.
However, at Climate, we often manipulate our weather data with automated tools. That means that if we mess up, we can mess up in a really big way. With millions of files and billions of versions, the API for listing and restoring old versions might take weeks to restore them all. In order to fix mistakes made by automated tools, at large scale, we need another automated tool, one that can restore lots of files in a hurry. Unfortunately, as far as we know, there wasn’t one provided.
So, we’ve made just that automated tool, S3DistVersions, to let us restore lots of files efficiently. For it to perform well, it needs lots of simple parallelism, so we used Hadoop MapReduce, a parallel batch processing system. Because its primary task is to access Amazon S3, we designed it to work well inside Elastic MapReduce (EMR), Amazon’s on-demand Hadoop service.
We based the interface for S3DistVersions on that of S3DistCp, a batch tool for copying data in S3. Given a time and a location in S3, S3DistVersions will restore all files there to their state at the specified time. (Because each file has its own list of versions, you can’t specify a version to restore them all to. Instead, we find it easier to specify a time, and S3DistVersions selects the appropriate version for each file.) It takes simple arguments:
java -jar S3DistVersions-0.1.0.jar --src s3://mybucket/mypath --prefixes s3://mybucket/list-of-s3-prefixes.txt --restore-time 2014-01-01T14:00:00+07:00
The hardest thing to deal with was obtaining parallelism while working with S3’s versioning API. Suppose we wanted to restore all the files under the location
s3://mybucket/mypath. Unfortunately, S3 only permits access to the versions of files under a location via a (serial) paging API, so we have to make a long sequence of API calls. Sure, each request might return a thousand versions, but it would take a long time to get through a million files and all their versions at that rate. In order to restore those files in a useful amount of time, we need to access the version information in parallel.
(Note: We can’t just start by listing the files under
s3://mybucket/mypath because S3’s file-listing API won’t show deleted files. Instead, we have to use the version-listing API.)
To get parallelism while listing versions, we ask the user to provide a list of “prefixes”–the beginnings of the filenames that appear under
s3://mybucket/mypath. Then, we can make requests for versions in those separate places in parallel. (For instance, given prefixes
/b/2, we can scan for versions under
s3://mybucket/mypath/b/2 in parallel.) We need that parallelism in order to get through our billions of versions in a reasonable amount of time; for smaller restorations, it’s perfectly fine to omit the list of prefixes. And of course, parallelism is easy in the restore step, so the prefix list is only needed for parallelism in listing versions.
In practice, we find that it’s not hard to use a prefix list like this. We have our files spread out in many different subdirectories, so we use that list of subdirectories as our prefixes.
We designed this program to follow the recommended advice for partitioning keys. S3 keeps track of filenames (keys) in an index that is spread across many different Amazon computers in sequential chunks. It’s sort of like the way conferences have registration tables organized by segments of the alphabet (e.g. ‘A-C’, ‘D-F’, …) to distribute the load. Just as you wouldn’t want to have all the people with ‘A’ names come in at the same time, we don’t want our program to access the list of files in sequential order. Therefore, when S3DistVersions interacts with S3 files or lists their versions, it first shuffles the filenames or filename prefixes. That way, it spreads the load across Amazon’s S3 index cluster.
First of all, for this project, as for most things we do, we used Clojure, a programming language that’s based on Lisp and runs on the Java Virtual Machine (JVM). Those parents give it simplicity and speed, plus access to a large range of useful libraries. It’s also an opinionated language that prefers a functional, immutable style. We like that opinion; for instance, functional code is easier to reason about in bulk, since we can be confident it doesn’t have weird side effects. That also happens to work great with parallel data processing such as MapReduce.
We also often use Cascalog, a MapReduce tool for Clojure. However, here we wanted tight control over keys, grouping, and whether work happens in mappers or reducers, so we tried Parkour, which had some exceptions in EMR. Ultimately, we used clojure-hadoop. It isn’t as elegant as Cascalog in some ways, but it does give us the control we want.
Now, we are confident that when we mess up lots of files in S3, we can fix the problem at scale. We are pleased to release S3DistVersions, and we hope that it will help others work with S3’s versioning more easily. We welcome contributions and feedback, so please let us know how it works for you.