Deduplicating high-throughput data-streams with Bloom Filters
TLDR: we run several applications consuming multiple real-time streams. These applications implement idempotence using Redis sets. In this post I describe our journey moving to bloom filters (using the ReBloom module), which brought down our memory usage by almost 10x.
Let's briefly define idempotence:
An operation is said to be idempotent when applying it multiple times has the same effect.
There are some reasons why applications consuming a real-time stream must have an idempotence layer, e.g.:
- If the application crashes or restarts (e.g., after a deploy), it might need to read parts of the stream that were processed already.
- If the producer sends duplicated events into the stream (e.g., because of connectivity issues), the consumer should process them only once.
Streaming systems support parallel reads by multiple consumers of the same application by partitioning the stream into shards. This allows to distribute the work between multiple workers, so there must be a shared data store to keep track of the progress. Redis is a good candidate.
It is also necessary for the applications to survive from both software and hardware crashes (e.g., the application, and the hardware where it is running, are assumed to be ephemeral), so implementing idempotence in-memory or storing data in locally is not an option.
A naïve approach to idempotence
The simplest way to provide an idempotence layer is to save all elements observed into a set. Before processing an event, one would check whether it is already there.
This approach is obviously memory intensive and will break if the amount of data is significant enough (say a couple of million events per hour).
Our not-so-naïve approach
Before diving into the meat of this post, let's briefly see what our current implementation looks like, which we have been using for years and that we are replacing with bloom filters.
Every event in the stream is associated with an ID, which we rehash using MD5 to shorten it. Then, we check/add (
SADD) the ID in a set whose key name is derived from the current day + the initial two bytes of the MD5-hashed ID (i.e., every day of idempotence uses 65536 sets). We then save the ID (minus its first two bytes) as raw bytes.
This is a nice trick that reduces storage usage a little bit: since all keys starting with bytes
XX go into the same set, we can drop that part when inserting them. Think of a single-level trie.
We also keep around the last day of idempotence for 24 hours (using
EXPIRE), and when checking for nullipotent elements, we do so on both days.
Enter bloom filters
A bloom filter is a space-efficient probabilistic data structure that allows you to check whether an item is a member of a set. There is a catch, though: a query returns either "definitely not in the set" or "possibly in the set" (i.e., false positives).
Another caveat is that you can't list the members of a set (like you would do in a redis set using
SMEMBERS), because the items themselves are not saved, just their fingerprint. This is what makes them space-efficient.
bloomd: our first attempt at using bloom filters
We had already used bloom filters before to deduplicate events for our real-time dashboards. We tried implementing them using Lua on Redis, but the performance was not production-grade. So we turned our attention to bloomd, a bloom filters server written in C.
bloomd for a while, and its performance is OK. But it has some drawbacks:
- It does not support clustering. If you want to distribute the load between different nodes, you have to provision them manually.
- Lack of High Availability: if your cluster dies you are done. There is no built-in data redundancy, and the underlaying data is easy to corrupt.
- You have to maintain it yourself: we try to align with the "run less software" philosophy, so having yet another service to maintain, that is not directly part of our business, is a bummer.
- The API is not the most flexible.
- Almost abandonware: the author is happy with the current set of capabilities, and it's unlikely that this will evolve into something better.
ReBloom: bloom filters as a Redis module
In 2016 Salvatore [announced the Redis Modules API](http://antirez.com/news/106) at RedisConf; I remember fantasizing about implementing bloom filters as a module. But
bloomd was already working, and the modules API still felt experimental, so we waited.
A few months ago we came to know about ReBloom, a Redis module that provides two probabilistic datatypes: scalable bloom filters and [cuckoo filters](https://www.cs.cmu.edu/~dga/papers/cuckoo-conext2014.pdf). It is also developed by Redis Labs, our Redis provider, which is encouraging.
ReBloom brings to the table several benefits over
bloomd. Since it sits on top of Redis, there are lots of things that come for free:
- Lower cognitive overhead (we use Redis extensively already)
- A more powerful API
- No need to maintain it, as we delegate that task to [Redis Labs](https://redislabs.com/)
Here's an example of the API usage:
bash # Assuming you are running a redis server with rebloom loaded... # Use BF.RESERVE to create a filter with the specified # error rate probability and capacity > BF.RESERVE your_filter 0.00001 50000000 OK # Use BF.ADD to add elements to the filter > BF.ADD your_filter foo # Use BF.EXISTS to get elements from a filter > BF.EXISTS your_filter foo 1 > BF.EXISTS your_filter bar 0
It is also possible to add or check multiple items in bulk. And using the
BF.INSERT command you can also insert and specify the error rate/capacity to use if the filter is not created yet.
Memory usage comparison
Our current ReBloom setup, which came to be after lots of manual tuning and analysis, is almost ten times more space-efficient than using sets at peak time.
The first thing that stands out is that the space taken by the bloom filters is constant. The reason behind this is that the space used by a bloom filter depends on its initial configuration (capacity and error rate), and it is allocated at the moment of creation.
ReBloom implements Scalable Bloom Filters, which means that once the maximum capacity is reached, a new filter is created on top of the original one, with higher capacity and error rate.
In the graph above there was no scaling, and it is not a coincidence: we tuned the configuration to prevent the filter to scale as this has two undesirable effects:
- Scaling increases the size of the filter, usually, by a factor of two.
- When testing for membership of an element, ReBloom has to check all layers in the scaled filter, which increases latency.
Tuning the initial configuration of your filters is paramount to get the most out of them. It requires lots of experimentation and understanding of your data.
Tuning ReBloom initial parameters
The parameters with which new filters are created, capacity and error rate, determine their initial size and are immutable.
If you are too conservative, your filter will probably scale multiple times, and every time it will become twice as big, i.e., it grows exponentially. On the other hand, if you are too generous, odds are you will waste space unnecessarily, making this whole journey pointless.
To make things more complicated, it's not easy to predict what the initial size of the bloom filter will be. Here are some examples:
bash # creating filter with capacity of 25M and 1-e10 error rate > BF.RESERVE foo 0.0000000001 25000000 OK # let's inspect what we actually got... > BF.DEBUG foo 1) "size:0" 2) "bytes:268435456 bits:2147483648 hashes:34 capacity:44808984 size:0 ratio:1e-10"
Interesting: the capacity is actually 44808984, way more than we wanted! What's going on? Well, it has to do with the [math used to reserve the filter](https://github.com/RedisLabsModules/rebloom/blob/1aeb76f93d51c1cd29cc63e1dc0a21b4bdcc71ac/contrib/bloom.c#L140-L154).
Let's try a different one, e.g.:
bash # creating filter with capacity of 22M and 1-e10 error rate > BF.RESERVE bar 0.0000000001 22404492 OK > BF.DEBUG bar 1) "size:0" 2) "bytes:134217728 bits:1073741824 hashes:34 capacity:22404492 size:0 ratio:1e-10"
That looks more like what we want. Let's increase the error rate by 10:
bash # creating filter with capacity of 22M and 1-e09 error rate > BF.RESERVE qux 0.000000001 22404492 OK > BF.DEBUG qux 1) "size:0" 2) "bytes:134217728 bits:1073741824 hashes:30 capacity:24893880 size:0 ratio:1e-09"
Even more interesting: the **size in bytes is identical to the previous example**, even though we decreased the error rate. There is a difference though, our filter has a more generous capacity (it went from 22404492 to 24893880), and it uses fewer hashes. Again, this is due to the math used by the current implementation.
Since changes to the initial capacity and error rate influence the resulting size so dramatically, I plotted it for different parameters in an [interactive notebook](https://beta.observablehq.com/@casidiablo/rebloom-capacity-and-error-rate-vs-size), which you will find handy should you need to tune your filters:
The elephant in the room
Something to keep in mind is that you might lose data to false positives. This is a very important trade-off you need to make when using bloom filters, which also limits the scenarios in which is feasible to use them.
If every event in your pipeline is critical, and losing even one would impact business, then bloom filters are not for you.
Most often than not, though, pipelines contain all kinds of events, with different degrees of importance. And in some applications, dropping events are not that of a big deal (e.g., an app feeding a real-time dashboard: if you are summing large amounts of events, dropping one or two would be unnoticeable).
There are several ways to avoid this, though. For instance, you could use bloom filters with different configuration depending on the kind of event passing through it: high-priority events could have a minimal error probability (or even use plain old sets instead of a probabilistic data structure), while those that are less important could use a more forgiving configuration.
Originally published on cristian.io