Cross-shard transactional integrity

jwatte's picture

I've been looking at NoSQL data stores for online transaction processing lately. Yes, related to work :-)

The basic model of a key/value store, where the value is a JSON object or similar, only gets you so far. The bonus with it is that it is very, very easy to horizontally partition ("shard" or "federate"). The draw-back is that you really can't do any online queries on anything other than primary key values. If you're trying to do a query on, say, "any foobar created between 2011-01-05 and 211-01-06" then you're screwed -- you have to traverse the entire data store to find that.

The first order of business in this situation is to understand what queries you want to run, and pre-code them into your application. When you create a foo, you also add the foo to some key saying "foos created on ." This allows you to query for all foos created on a particular day. Writes get a little more expensive, because you have to also update this separate key; reads are really fast, because the query results are already calculated for you. However, if you want to change your mind on what you want to query, you're back at having to re-iterate over the entire key space to build the new index.

Also, we have now introduced a consistency problem, and a possible race condition. Let's talk about the race first:
When I create a foo, I store the foo into some key (say, "foo:123456") and then I append the value of this key to the key that stores foos created today (say, "foo:created:2011-01-05"). Unfortunately, "append" is not generally an atomic operation, so you have to read the data, append the key, and write the data. Now, suddely, you have a race if two separate processes try to read-modify-write this key at the same time; there is a risk of race conditions that cause data to be lost from this index.

One possible solution is to use versioning. Each time you fetch data from a key, you get a version number. Each time data is stored into a key, the version number for that key's data is incremented. When you store data, you supply the version number you previously fetched; if it doesn't match the current version number, you know someone else has mutated the data since you fetched it; the storage engine can throw a "data invalid" exception, and your application can read the new data, and try to re-apply its edit. This works great as long as contention is rare, rather than common. Once contention becomes common, you'll enter livelock with this solution. Although any one racing writer is always guaranteed to make progress, other writers will queue up and the system may spiral out of control beacause of the quadratic increase in the number of re-tries.

Another solution is the one chosen by the Redis key/value storage engine. It supports atomic operations on its data, including "semantic" operations like list mutation. Thus, you could just tell Redis, "push this value at the end of the list stored at key foo:created:2011-01-05," and Redis will do it. Redis will interlock multiple requests to the same key, so that they are serialized, and all of them complete correctly without re-try. Great!

However, Redis transactional semantics are not supported across shards -- or, to put it another way, Redis relies on client-side sharding (point at different Redis instances based on a hash of the key), and doesn't support server-side sharding. This means that you now have a consistency problem: If you do one store to one shard/partition (say, for the base data), then that store will be visible to the world before the second operation that adds the record to the index. What's worse, if the Redis instance where the index lives, or the process that updates the database, crashes before the index update has committed, you will be left with a database that loses the key from the index: the same outcome a we got in the race condition case.

Further, some queries are not necessarily knowable beforehand. If you want to query foos created between 2011-01-05 21:30 through 2011-01-06 02:00, then you have to read both of the by-day buckets, and toss out all the data that doesn't match. Because the data value doesn't live in the primary key, you actually have to read a whole lot of data key values out of the data store, that you will end up throwing out. This is similar to a "table scan" in SQL queries, and is not a good way of keeping up performance as your application becomes popular!

The solution is to move sharding into the data store itself. That way, you can tell the data store to commit two operations (store the data, update the index) as part of a single, all-or-nothing transaction. Unfortunately, most noSQL data stores don't support this. One of the few that does is MongoDB -- but MongoDB, instead, doesn't actually have any transactional semantics at all. The MongoDB answer to the question "how do you survive a data center power loss" is "always keep online replication to a secondary data center," which is maybe not the answer most of us want to hear.

SQL databases have solved this for aeons. "Two-phase commit," also known as "the marriage protocol," solves this problem. It introduces a transaction monitor, whose job it is to talk to the different shards, and tell them what to do. Once everything seems ready, the TM will first tell each shard to prepare to commit. This includes storing a transaction identifier (and the source TM identifier) along with the data in a "todo" journal. Once all shards that have pending modifications have replied back that their journals are updated, the TM then stores a local durable annotation that the transaction is now committed, and tells each shard to go ahead and commit the change, making the modifications globally visible. If any shard node crashes after preparing, but before getting the final commit message, it will find the annotation about the pending transaction in its local store, and contact the TM to find out whether the transaction finished or not, and roll back/forward as appropriate, when coming back up.

Also, SQL databases support sorted indices, not just hash indices (like key/value stores). This makes it easy to read only the data you need out of the value store (or tables, in the SQL database case).

Unfortunately, this kind of automatic partitioning does not come with the low-cost versions of the major databases. You need something like IBM DB/2 Enterprise to actually get a SQL database that both shards automatically across nodes, and supports two-phase commits, and supports sorted indices. This is very expensive software. ORACLE has a similar version, with a similar price tag.

Several Internet companies have run into this same problem. Yahoo did, and came up with Hadoop, the HDFS distributed file system, and the PIG query language. Google did, and came up with the BigTable data store. Amazon did, and came up with the SimpleDB data store (with technology called Dynamo). Unfortunately, the Hadoop approach (distributed map/reduce) is great for queries that mine lots of data to find an answer, but is poor for online transaction cases. The reason is that Hadoop still shards by hash key, so you don't know where the data within your range of interest will live. The BigTable and SimpleDB implementations are not openly available, although the technology that led up to their initial versions is described in various papers.

Other open source technologies that come close include CouchDB, Riak, Hypertable, and Cassandra. However, each of them has some fatal flaw -- no range queries, or map/reduced based querying (which doesn't scale efficiently to many parallel subrange queries), or poor resiliency to failure. Readers might be reminded of the CAP theorem here: Consistency, Availability, Partition Tolerance; pick any two! ( However, even if I pick, say, C and A, there exists no good, affordable solutions that actually deliver. It all comes back to those darn range queries, and that darn two-phase commit problem that disqualifies any client-based partitioning scheme.

The model of a key/value store with JSON-like values, where you can add range-query indices on arbitrary fields of the value, is very useful. Say, for example: "for every JSON blob where the field 'type' has value 'foo,' range index the value of the field 'created' as a date". This semi-structured approach is likely to lie behind many upcoming, highly scalable, quickly evolving, flexible web phenomena. I think distributed key/value stores that want to grow up and become the engines of the future, will have to solve the range index problem. One way of doing it would be to implement a second storage model for chunks of an index, and be able to migrate these chunks between storage nodes. An index starts out in a single chunk, but as it grows, it splits into more chunks; each chunk is stored on some hash-selected node, and thus spread load evenly across the storage nodes as the index size grows. Additional higher-layer indices (with a 60,000:1 splay, say) make it so that you don't have to scan chunks to find the ones you want, but instead walk a very shallow tree of links between nodes, much like BTrees walk a shallow-ish tree between disk blocks. Add distributed commit support, and you've won! In fact, add distributed commit support first, and an application could conceivably implement the index solution on top of the data store (sub-optimally, but correctly!)

Here's my question for the storage people of the internet:
MongoDB: Will you add a mode where I can actually guarantee transactional integrity?
Riak: Will you add range-based queries that do not have to map/reduce over the entire data set, so they can actually be used online?
Redis: Will you add support for primary key sharding on the inside of the already existing transaction layer?

Who will be first to this feast? :-)