2013-12-18_11-41-20

Migrating from MongoDB to Cassandra

The Story

Long long ago (2011) in a Littleton founder’s basement not so far away, the decision was made to use MongoDB as the persistence engine for our Person API product. The idea of the Person API is pretty simple:

Given some input piece of data {email, phone, Twitter, Facebook}, find other data related to that query and produce a merged document of that data. This is essentially a curated, recursive federated search, where multiple databases are consulted, reconsulted and their results aggregated and stored into MongoDB. These databases consist of various internal services (including previously searched data) and APIs from providers across the web.

When we first started, we needed to move fast according to what our market wanted. As our customers queried us and we added data, our database continued to grow in size and operations per second.

We were a young startup and made a few crucial mistakes. MongoDB was not a mistake. It let us iterate rapidly and scaled reasonably well. Our mistake was when we decided to put off fixing MongoDB’s deployment, instead vertically scaling to maintain the Person API product while working on other technologies such as the FullContact Address Book and our ‘secret sauce’ behind the Person API and our deduplication technology (look for posts on this later this month).

Eventually MongoDB started to have issues with lock time percentage, even on the generous hardware it already had. MongoDB has what’s known as a shared-exclusive or readers-writer lock over each database. While a write operation is in progress, a read is unable to proceed until the write operation yields the lock. Even then, a queued write is given precedence over a read and can queue reads, leading to a latency spike. As you can imagine, the Person API writes a lot of data (sometimes over 200K for a single document), averaging a 50/50 or 60/40 distribution of read/writes and several million daily writes. We worked hard to eliminate multiple updates by no longer using partial updates and staging documents in Redis, but even this wasn’t enough and our lock percentage continued to climb into the 40-50%’s leaving us with unhappy customers.

We pushed Mongo onto the best database hardware AWS could provide: hi1.4xlarge’s. With these we had 2TB of SSD. This dropped our lock percentage to a small fraction of the 40-50% we’d been pushing, not to mention much more storage space.

Oops...

And it was for a long time. We refactored products and launched others. One day we realized our ‘data burn’ was on the order of 20GB a day and we had less than 200GB remaining on our SSDs. Even MongoDB’s ‘repairDatabase’ to compact the data was unable to comfortably free enough data for more than a few days.

The Dirty Solution

The most painful part of this story to admit is the solution we came up with to limp along. Maybe you can guess from the code snippit below:

Code Snippit

By the time we had cycles to spend, it was too late to shard effectively. It would have been terribly painful and unacceptably slowed our cluster for days. We could have done it in parallel, but if we were going to make a painful transition it was better to align our database technology to our eventual goals of extreme fault-tolerance, horizontal scalability, and operational simplicity. MongoDB’s HBase-esque model of config servers, shard routing daemons, and the shards themselves is needlessly complicated. Webscale, right?

To buy us time, we ‘sharded’ our MongoDB cluster. At the application layer. We had two MongoDB clusters of hi1.4xlarges, sent all new writes to the new cluster, and read from both. If a document was present in the new cluster we’d return that, otherwise the old cluster. Meanwhile, Sherlock (internal codename for our search system) was busily refreshing older data and writing it into the new cluster as well. This in mind, we projected this to only last 2-3 months.

Cassandra

We’re believers in the Netflix way of thinking. Build with resilience in the DNA of an application, with the public cloud in mind. We’ve been long Apache HBase users, but found it to be tough to maintain 100% availability on Amazon EC2 for a variety of reasons. Cassandra attracted us with a veritable siren song of operational simplicity. No HDFS or Zookeeper dependencies, multiple servers serve the same data without requiring regions to failover (a process of sometimes multiple minutes or more), and a similar data model to HBase.

It’s important to note that Cassandra was a year-long evaluation for us and aligned with our availability and fault-tolerance goals. Additionally, strong consistency isn’t something we needed for our use case. YMMV. That’s not to say things would be simple or expected, so we moved a less critical service over.

Trial Run – HBase -> Cassandra

As experience is everything when it comes to stable production deployments, we first moved a different part of our system, an outbound HTTP proxy cache if you will, to Cassandra. The existing system ran on Apache HBase, quite well for the most part. Periodically, this system is MapReduce’d into a coherent set of data for ingestion. This was a simple refactor and taught us some of the patterns we’d need to undertake our MongoDB conversion.

The general pattern of any online conversion looks a little like this:

Cassandra to Mongo 5

We lean toward reading HBase HFiles directly when we can and avoiding HBase. It’s unnecessary overhead when the entire table is processed anyways. Our plan was to go from HFiles -> SSTables -> Cassandra Cluster.

One of the issues we first ran into was a deficiency of Cassandra’s Hadoop MapReduce tooling. By default, it’s not compiled for Cloudera CDH4. We ended up vendoring half of the classes and applying patches from the Cassandra JIRA before giving up on their BulkOutputFormat (BOF) and ColumnFamilyOutputFormat (CFOF). It was a nice concept to write pre-sorted SSTables and load them directly into Cassandra, but it didn’t work out due to a combination of Hadoop bugs and operational issues (Context#progress no-op, SSTableLoader just randomly breaking, having to manually delete the SSTables after load).

The other option was to use the ColumnFamilyOutputFormat, but we had ended up deciding to use CQL3 tables to make things simple from a tooling perspective (cqlsh and co.) and future-proofing perspective. CQL3 is really some pattern-sauce on top of traditional ‘Thrift’ tables using Composite Types. The Hadoop utilities are rooted in these more complex Thrift APIs and doesn’t easily support non-COMPACT CQL3 tables. I’d already written some basic utilities around being able to utilize CQL3 tables from the BulkOutputFormat, MutationUtils.java (these are pretty rough, you’ll get the idea if you need it), and found the CFOF to be just as awkward but without the promise of SSTables bulk-loaded. We didn’t try the CQL3 tools in recent releases.

Eventually we decided to just use Astyanax (Netflix’s Cassandra Client) directly from our MapReduce job reducers, which worked splendidly for writes. The first time we did this, we wrote as fast as we could to the Cassandra cluster. Topping out at just under 9300 writes/s on a 6-node cluster, we were appropriately impressed. Unfortunately, using all the CPU for the write path left no time for leveled compaction and the cluster spent several days trying to compact 128mb SSTables. Thanks to AWS, we spun up another cluster and repeated the job, this time with the Size-Tiered Compaction strategy (less IO, good for spinning disks) and a Guava RateLimiter to allow Cassandra to breathe and compact during the load process. A handy side-effect of ratelimiting, we were able to accurately project when our jobs would be finished.

The rest of this trial wasn’t without issues, some of which we didn’t discover until we’d moved ahead with moving MongoDB to Cassandra. In general, a simple move.

MongoDB Conversion

We’re big users of Hadoop MapReduce and tend to lean on it whenever we need to make large scale migrations, especially ones with lots of transformation. That fact along with our existing conversion project from before, we used 10gen’s mongo-hadoop project which has input and output formats for Hadoop. We immediately realized that the InputFormat which connected to a MongoDB cluster was ill-suited to our usage. We had 3TB of partially-overlapping data across 2 clusters. After calculating input splits for a few hours, it began pulling documents at an uncomfortably slow pace. It was slow enough, in fact, that we developed an alternative plan.

The plan was to export large BSON files to S3 and MapReduce over them the same way we’d converted our cache before. Files are MapReduce’s bread and butter, this would work much better than opening a lot of cursors against a backup-restored MongoDB server. The Mongo-Hadoop project has a BSONInputFormat for reading in BSON dump files. A match made in heaven? Perhaps.

MongoDB is capable of exporting a single BSON file comprising your entire collection with the mongodump utility. The mongodump utility doesn’t need MongoDB to be running, making it ideal to extract data from backups. As we’d kept backups with snapshotted EBS volumes, we were able to boot a new machine with backup-bearing volumes attached. We pointed mongodump at the data directory on the EBS volume and exported the data as BSON to another disk, which took about a day for each cluster. After this process, we had to run mongo-hadoop’s bson_splitter.py tool to generate split points the BSONInputFormat can use, otherwise the entire file must be read in by a single mapper. It goes without saying, a single mapper is very slow. Then you can push both files to S3 and kick off your Hadoop job.

# Approximate commands..
# mongodump --dbpath /mnt/restore/data/ --collection identities --out /mnt/bsondump/
<wait a day>
# python bson_splitter.py /mnt/bsondump/identities.bson
<wait 20 minutes>
# s3cmd --multipart-chunk-size-mb=1000 put /mnt/bsondump/identities.bson s3://bucket/path
# s3cmd put /mnt/bsondump/.identities.bson.splits s3://bucket/path

It’s important to use a newer s3cmd with multipart support. S3 uploads tend to fail for single large objects with single puts, or too many puts. 5mb (the default with s3cmd-1.1.0) was too small — we found an acceptable size at 1GB which partitioned the upload into ~1500 individual uploads resulting in a single large ~1.5TB S3 object. The other cluster was about half the size. Phew. The only wrinkle we had with this process was getting SBT (mongo-hadoop’s build tool) to upload our CDH4-compiled artifacts to our Artifactory for inclusion into our conversion project.

The mapper in this case is quite simple:

package com.fullcontact.hadoop.mongo;

import com.mongodb.hadoop.io.BSONWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.bson.BSONObject;

import java.io.IOException;

public class MongoConversionMapper extends Mapper<Object,BSONObject, Text, BSONWritable> {
    @Override
    protected void map(Object key, BSONObject value, Context context) throws IOException, InterruptedException {
        if (key != null && value != null) {
            context.write(new Text(key.toString()), new BSONWritable(value));
        }
    }
}

This emits each document as a different input group. The reducer too can be rather simple. Note: IntWritable key doesn’t match the Text key of the mapper, not shown is an intermediate step to SequenceFiles. We did this to more rapidly develop the reducer against on-disk data vs. re-pulling from S3 each time. Additionally, we wanted to write more jobs over this data for analytical purposes.

    @Override
    protected void reduce(IntWritable randomIdentifier, Iterable<BSONWritable> values, Context context) throws IOException, InterruptedException {
        for (BSONWritable bsonWritable : values) {
            ProfileIdentity identity = null;
            try {
                identity = buildProfile(bsonWritable);

                context.getCounter(Stats.PROFILE_SUCCESS).increment(1);
            } catch (Exception e) {
                log.warn("Error building profile for " + identity + ", ex: " + e.getClass().getSimpleName() + " => " + e.getMessage(), e);
                context.getCounter(Stats.PROFILE_FAILED).increment(1);
                continue;
            }


            try {
                writeCassandraRow(context, identity.get_id(), identity);
                context.getCounter(Stats.CASSANDRA_SUCCESS).increment(1);
            } catch (Exception e) {
                log.warn("Error writing Identity ("+randomIdentifier.toString()+") " + e.getClass().getSimpleName(), e);
                context.getCounter(Stats.CASSANDRA_FAILED).increment(1);
            }
        }
    }

    private void writeCassandraRow(Context context, String storageIdentifier, ProfileIdentity identity) throws IOException, InterruptedException, FullContactException {
        limiter.acquire();

        store.upsert(storageIdentifier , identity);
    }

We ran the conversion and converted all the data to our new data model, along the way cleaning up several artifacts of mistakes long past. The process was fast enough that we needed to rate limit the writes to Cassandra lest we end up with a endlessly-compacting cluster as we had in our first attempt. We then ran the same job against the second cluster of newer data and overwrote any overlapping data from the first load.

At long last, we’d moved MongoDB into Cassandra and started verifying data and ensuring our 9-node m1.xlarge deployment would hold up under our read load by doing dual reads and not returning the Cassandra result to API consumers. Unfortunately, our latency was terrible and we weren’t yet sure why.

Cassandra & Linux Tuning

We’d just started sending reads and realized that spinning rust might not be fast enough for us, or AWS’s rust was even more rusty than we’d imagined. Not everything added up though. With some napkin math, it was pretty obvious we should have been able to sustain much higher loads for the amount of spindles we’d deployed. m1.xlarges are given 4 ephemeral drives to work with. 1 of these we dedicate to the Cassandra write log, the other 3 are put into RAID0 for a data directory. On top of this, we run LUKS+dm-crypt to encrypt the filesystem, as we have the self-imposed requirement to encrypt all contact data. All that being said, 200 IOPS should have been a cakewalk for this system. What was wrong?

The first thing we really noticed is that we were reading 40+MB/s of disk for a relatively modest 500-1000KB/s of network traffic out. That was our first clue that something wasn’t tuned right, either in Cassandra or Linux. The disparity was especially troublesome, we were doing a lot of extra work. We first tried lots of different Cassandra tuning options, but finally came around to Linux.

It’s important to set your disk readahead appropriately, especially databases. We’d forgotten to check it, as the defaults are usually pretty sane. A look at blockdev –report blew us away.

RO RA SSZ BSZ StartSec Size Device
rw 256 512 4096 0 8589934592 /dev/xvda1
rw 128 512 4096 0 450934865920 /dev/xvdb
rw 128 512 4096 0 450934865920 /dev/xvdc
rw 128 512 4096 0 450934865920 /dev/xvdd
rw 128 512 4096 0 450934865920 /dev/xvde
rw 128 512 4096 0 1352803024896 /dev/md127
rw 6144 512 4096 0 1352801452032 /dev/dm-0

 

For every disk read, we were pulling in 3MB of data (RA is sectors, SSZ is sector size, 6144*512=3145728 bytes) into cache. Oops. Not only were we doing tons of extra work, but we were trashing our page cache too. The default for the device-mapper used by LUKS under Ubuntu 12.04LTS is incredibly sub-optimal for database usage, especially our usage of Cassandra (more small random reads vs. large rows). We turned this down to 128 sectors — 64KB. Below this we saw no benefits.

We also mounted our filesystems with noatime and nodiratime. They’re pretty easy to forget, but given how often SSTables are accessed it definitely adds up.

Immediately after our first readahead change, we saw promising first results:

Cassandra to Mongo 6

And disk IO from StackDriver:

Cassandra to Mongo: DiskIO

Loads flattened and Cassandra was able to perform. We played with some Cassandra tuning too. Given our large number of keys, we opted to bump our key cache to 500MB to help reduce the number of random IOs necessary to find a row. We also tuned down our bloom_filter_fp_chance on our rows to 0.00075 to keep false positive rates low and random IOs down. We considered increasing our read_repair_chance to keep data warm on replicas too, but deemed that to be more harmful than helpful. All throughout, our graphs were our truth.

Over the next few days we continued dual reads, verified data, and moved select customers over before others to receive feedback before flipping the switch. For the last two months, the FullContact Person API has been powered by Cassandra 1.2.10.

That’s a wrap

Moving to Cassandra was a large effort with many long-running steps and acclimation time. Taking up the banner of any new database system is not without it’s share of risks. One really needs to know their data and access patterns to appropriately choose a technology. For us, horizontal scalability and the ability to run our database in a self-repairing autoscaling group are the most important factors for our choice. Paired with Priam, we’ve been able to unsafely terminate Cassandra instances with no impact to production clients. And perhaps the greatest benefit, we’ve been able to continue serving our API clients with an ever-expanding database of contact information.

Our success with Cassandra has inspired other teams to adopt it for problems where traditional RDBMSes are an impedance mismatch. Cassandra will likely power features in our Address Book in the future.

The future is bright for Cassandra at FullContact. As for MongoDB, I can’t say I’ve ever looked back.


We’re hiring DevOps engineers.


Image Credit: Jemimus via Flickr

Update: Check out the Hacker News discussion.

Recent Posts