Writing SSTables with Beam

Apache Beam is an open source system for processing large datasets. It has both a realtime and a batch processing mode. The batch processing mode is based on Google’s internal Flume framework which I had the pleasure of using for 7 years while processing Android telemetry. It’s also the perfect system for building a search index.

I’ve started by building a reader and writer for my SSTable format. Reading an SSTable is natural and easy. You can either read each element one-by-one in sorted order, or by using the index block, you can turn the SSTable into chunks for more parallel processing.

Writing on the other hand is harder because the data needs to be sorted before writing. Sorting is hard because you typically need all of your data in memory to sort it. With large datasets, you can easily run out of memory. To get around this, we’re going to need sorting algorithms that rely on disk when there are too many records (which is always).

My first approach, suggested by AI, to use the BufferedExternalSorter in Apache Beam. This will try to sort in memory and go out to disk before running out of memory. I’m using Gemini Pro to help me implement all this and it tends to make a few mistakes along the way.

The BufferedExternalSorter provides sorted values after a GroupByKey. In our case, we want everything in the output shard sorted so the group by key is the shard itself. This should set off red flags; Beam is great at parallel operations, but iterating through all elements in the shard in a single processing function — even if you’re properly treating it as a stream — can cause memory issues.

While the AI is very happy to suggest these kind of processing mode where the elements refer to files or shards, it will also cause non-obvious memory issues that are hard to debug.

My way around this was to write my own ParDo which would collect a fix number of elements (or fix number of bytes), sort them in memory, write them to an SSTable, and then merge them as a separate stage. The Rust implement of sstable merge is lighting fast.

Here’s an example of it merging 12 shards that are just under 200MB each.

byte64:~ time sstable merge data/enwiki.temp_*.sst -o data/enwiki.merged.sst
SSTable Merged Successfully into 'enwiki.merged.sst'
  Elements: 6000000
  Total size: 2450827207 bytes (2.28 GiB)

real    0m6.348s
user    0m3.812s
sys     0m1.904s

Another flub by the AI was to sort all the values and then re-emit them losing their sorted order.

Finally, even though I’ve batched the elements and sent them to their own subshard file, the group by shard was still causing me memory issues. My way around this was to add a subshard number to the key, making the group by more parallel.

With these changes, I’ve been able to take all of Wikipedia, read it from a single SSTable shard (with values cut off at 1KiB), invert the index to map words to documents, and then write out the sorted search index as an SSTable again in a single shard.

byte64:~ sstable info data/enwiki-search_index.sst 
File: enwiki-search_index.sst
  Elements:   9294269
  Chunk Size: 400
  Skip Size:  20
  Min Key:    00
  Max Key:    𱑕𠄩婆媒

The SSTableWriter is still a work in progress. It could use some performance tuning. But for now, having a reliable library for writing SSTables without memory issues is a win.


Comments

Leave a Reply

Your email address will not be published. Required fields are marked *