Optimizing Bulk Load in RocksDB

August 21, 2019

,
See Rockset
in action

Get a product tour with a Rockset engineer

What’s the fastest we can load data into RocksDB? We were faced with this challenge because we wanted to enable our customers to quickly try out Rockset on their big datasets. Even though the bulk load of data in LSM trees is an important topic, not much has been written about it. In this post, we’ll describe the optimizations that increased RocksDB’s bulk load performance by 20x. While we had to solve interesting distributed challenges as well, in this post we’ll focus on single node optimizations. We assume some familiarity with RocksDB and the LSM tree data structure.

Rockset’s write process contains a couple of steps:

  1. In the first step, we retrieve documents from the distributed log store. One document represent one JSON document encoded in a binary format.
  2. For every document, we need to insert many key-value pairs into RocksDB. The next step converts the list of documents into a list of RocksDB key-value pairs. Crucially, in this step, we also need to read from RocksDB to determine if the document already exists in the store. If it does we need to update secondary index entries.
  3. Finally, we commit the list of key-value pairs to RocksDB.

Screen Shot 2019-08-19 at 1.56.39 PM

We optimized this process for a machine with many CPU cores and where a reasonable chunk of the dataset (but not all) fits in the main memory. Different approaches might work better with small number of cores or when the whole dataset fits into main memory.

Trading off Latency for Throughput

Rockset is designed for real-time writes. As soon as the customer writes a document to Rockset, we have to apply it to our index in RocksDB. We don’t have time to build a big batch of documents. This is a shame because increasing the size of the batch minimizes the substantial overhead of per-batch operations. There is no need to optimize the individual write latency in bulk load, though. During bulk load we increase the size of our write batch to hundreds of MB, naturally leading to a higher write throughput.

Parallelizing Writes

In a regular operation, we only use a single thread to execute the write process. This is enough because RocksDB defers most of the write processing to background threads through compactions. A couple of cores also need to be available for the query workload. During the initial bulk load, query workload is not important. All cores should be busy writing. Thus, we parallelized the write process — once we build a batch of documents we distribute the batch to worker threads, where each thread independently inserts data into RocksDB. The important design consideration here is to minimize exclusive access to shared data structures, otherwise, the write threads will be waiting, not writing.

Avoiding Memtable

RocksDB offers a feature where you can build SST files on your own and add them to RocksDB, without going through the memtable, called IngestExternalFile(). This feature is great for bulk load because write threads don’t have to synchronize their writes to the memtable. Write threads all independently sort their key-value pairs, build SST files and add them to RocksDB. Adding files to RocksDB is a cheap operation since it involves only a metadata update.

In the current version, each write thread builds one SST file. However, with many small files, our compaction is slower than if we had a smaller number of bigger files. We are exploring an approach where we would sort key-value pairs from all write threads in parallel and produce one big SST file for each write batch.

Challenges with Turning off Compactions

The most common advice for bulk loading data into RocksDB is to turn off compactions and execute one big compaction in the end. This setup is also mentioned in the official RocksDB Performance Benchmarks. After all, the only reason RocksDB executes compactions is to optimize reads at the expense of write overhead. However, this advice comes with two very important caveats.

At Rockset we have to execute one read for each document write - we need to do one primary key lookup to check if the new document already exists in the database. With compactions turned off we quickly end up with thousands of SST files and the primary key lookup becomes the biggest bottleneck. To avoid this we built a bloom filter on all primary keys in the database. Since we usually don’t have duplicate documents in the bulk load, the bloom filter enables us to avoid expensive primary key lookups. A careful reader will notice that RocksDB also builds bloom filters, but it does so per file. Checking thousands of bloom filters is still expensive.

The second problem is that the final compaction is single-threaded by default. There is a feature in RocksDB that enables multi-threaded compaction with option max_subcompactions. However, increasing the number of subcompactions for our final compaction doesn’t do anything. With all files in level 0, the compaction algorithm cannot find good boundaries for each subcompaction and decides to use a single thread instead. We fixed this by first executing a priming compaction — we first compact a small number of files with CompactFiles(). Now that RocksDB has some files in non-0 level, which are partitioned by range, it can determine good subcompaction boundaries and the multi-threaded compaction works like a charm with all cores busy.

Our files in level 0 are not compressed — we don’t want to slow down our write threads and there is a limited benefit of having them compressed. Final compaction compresses the output files.

Conclusion

With these optimizations, we can load a dataset of 200GB uncompressed physical bytes (80GB with LZ4 compression) in 52 minutes (70 MB/s) while using 18 cores. The initial load took 35min, followed by 17min of final compaction. With none of the optimizations the load takes 18 hours. By only increasing the batch size and parallelizing the write threads, with no changes to RocksDB, the load takes 5 hours. Note that all of these numbers are measured on a single node RocksDB instance. Rockset parallelizes writes on multiple nodes and can achieve much higher write throughput.

Screen Shot 2019-08-19 at 2.07.12 PM

Bulk loading of data into RocksDB can be modeled as a large parallel sort where the dataset doesn’t fit into memory, with an additional constraint that we also need to read some part of the data while sorting. There is a lot of interesting work on parallel sort out there and we hope to survey some techniques and try applying them in our setting. We also invite other RocksDB users to share their bulk load strategies.

I’m very grateful to everybody who helped with this project — our awesome interns Jacob Klegar and Aditi Srinivasan; and Dhruba Borthakur, Ari Ekmekji and Kshitij Wadhwa.

Learn more about how Rockset uses RocksDB: