Deduplication at Scale

Gurminder Singh
Amplitude Engineering
8 min readMay 23, 2019

--

At Amplitude, we try to ensure that customers see accurate analytics by maintaining data integrity. This aspect is explained with details in Analytics that doesn’t compromise on data integrity. As part of this commitment, we do deduplication of the ingested event data and this involves dropping any duplicate data.

We ingest events at high rates (this can be millions of events per second) and these events get published from various sources.

The requests to publish events to Amplitude can fail for various reason (network /communication/service issues) and can be retried. Also, customers may backfill and re-publish events. These retried requests may result in duplicate events.

Now, to address this issue we recommended sending a unique identifier viz. insert identifier with each ingested event. We do deduplication of any subsequent events sent (within the last 7 days) with the same insert identifier.

The ingested data goes through various transformation steps before it is available to the customers for analysis. We aspire to keep this lag to the minimum. Deduping is one of the transformation steps in our data ingestion pipeline.

The challenge with deduplication at scale involves tracking insert identifiers of events flowing at very high rate in a distributed environment and matching duplicates while keeping the solution:

  • performant (low latency)
  • scalable
  • reliable
  • cost-effective
  • operationally viable

In this post, I will walk you through a typical approach for deduplication of data and a radically different approach we implemented at Amplitude.

One traditional approach is to define a unique key in the database (the key is based on the fields identified for ensuring uniqueness).

Database Unique Key Approach

With the database unique key approach, you can have a table with a unique key based on the fields identified for deduping.

We check the database for the existence of an entry, if it is not found, we are processing a new event; so we process the event and then add a corresponding entry to the database. On the other hand, if the entry is found, we have a duplicate event and we can drop it.

Some databases offer a convenient mechanism of expiring the data to easily support moving 7 days window. We can take advantage of this by adding a TTL with the entry in the database.

We are using Cassandra as a database with this approach. There are performance, cost and operational challenges with maintaining a healthy cluster.

Also, we are storing the dedupe key but all that we are looking to answer is the presence or absence of the key.

Can we do better?

Alternate Approach — Bloom Filters

A Bloom filter is a space-efficient probabilistic data structure, conceived by Burton Howard Bloom. It is designed to tell you, rapidly and memory-efficiently, whether an element is a member of a set. It allows elements to be added to the set, but they can not be removed.

Since it is a probabilistic data structure — it tells us that the element either definitely is not in the set or may be in the set. In other words, false negatives are not possible but false positives are possible.

Bloom Filter — Overview

An empty Bloom filter is a bit array of m bits, all set to 0. It is associated with k different hash functions. Typically, k is a constant, much smaller than m. Also, m is proportional to the number of elements to be added to the bloom filter. The precise choice of k and the constant of proportionality of m are determined by the intended false positive rate of the filter.

To add an element, we feed it to each of the k hash functions to get k array positions. We set the bits at all these positions to 1.

To query for an element (test whether it is in the set), we feed it to each of the k hash functions to get k array positions. If any of the bits at these positions is 0, the element is definitely not in the set — if it were, then all the bits would have been set to 1 when it was inserted. If all are 1, then either the element is in the set, or the bits have by chance been set to 1 during the insertion of other elements, resulting in a false positive.

The following diagram illustrates a bloom filter with two hash functions (items A and B have been added; we query for items A, C and D):

While risking false positives, Bloom filters have a strong space advantage over other data structures for representing sets (we do not store the data items at all). But, we can manage the false positive rate of the Bloom filter. A larger Bloom filter will have less false positives, and a smaller one more.

So, depending on the use-case, we can appropriately choose the parameters for a bloom filter viz. number of items (N), probability of false positives (P), number of hash functions (K) and size of bloom filter (M). There is a Bloom Filter calculator which is really handy to play with these.

The time needed either to add items or to check whether an item is in the set is a fixed constant, O(k), completely independent of the number of items already in the set.

Also, the size of bloom filter (M) grows linearly as we increase the number of items (N) while we keep the probability of false positives (P) and number of hash functions (K) constant.

Fitting the Solution to the Problem

Bloom filter seems to fit the use-case very well, we want a quick way to get an answer on the presence or absence of specific data item (without even storing the item). We can pick the probability values such that the results remain accurate.

We know our traffic numbers (this gives N), pick reasonable number of hash functions say 20 (this gives K) and we can target for low probability of false positives say 1 in a billion (this gives P), this will give us the size of bloom filter (which is M).

Also, since the size of Bloom filter will grow linearly with the increase in traffic, we will have linear scalability of the solution.

We partition our ingested data into a reasonably big number of partitions. This helps us split the problem into smaller sub-problems.

Let us take an example to play with some numbers e.g.

  • with 200K events per second overall
  • 1024 partitions for processing of data
  • we need to process ~17 million events per day per partition.

Now, if we round this to 20 million events per day per partition, we will need a bloom filter of size ~108Mb per partition with false positive probability of 1 in billion.

This would mean, we will need ~108Gb (1024 partitions*108Mb) of memory per day.

So, for 7 days of dedupe data, we will need ~756Gg (7*108Gb) of memory.

This can be handled easily with just a few machines (e.g. AWS x1e.2xlarge offers 244Gb memory), with really good performance, good reliability (we picked low false probability numbers), with linear scalability, much lower cost and less operational overhead.

Also, to support rolling window of deduplication, we can create a new bloom filter each day and can drop it as we move out of the window.

Implementation

This comes down to implementing a distributed Bloom Filter as a service. We have a simple interface with two methods:

  • add, to add an entry to the bloom filter
  • exists, which returns a boolean

We have a service instance per partition and the client discovers the appropriate service instance. The service is hosted in Kubernetes using stateful sets.

The service creates a new Bloom Filter at the start of the day. Also, it is important to maintain the promised false positive probability (P) constrains. We keep track of the number of entries in the bloom filer and the corresponding false positive probability. If we exceed the thresholds representing the constraints (say, we got lots of traffic on that day), we create an additional bloom filter. This also serves to scale the service based on the changing traffic patterns.

The following graphic captures the creation of new Bloom filter at the start of the day and creation of the new one as it reaches the threshold in terms of number of entries for a sample dedupe instance.

Also, at the end of the day, we drop the Bloom filters which fall outside of the 7 day dedupe window.

The add API, adds entry to the current Bloom filter. The exists API, checks for the entry in the Bloom filters from the current 7 days dedupe window.

To make the service production ready we added some of the reliability aspects inherent in databases viz. snapshots, request log and crash recovery.

We used Kafka for storing request logs, this gives us highly available and efficient store for logs. The other option would be use local memory mapped files.

We take periodic snapshots of the current bloom filter and store these in remote storage say S3. We also store some metadata corresponding to the snapshot — a temporal value viz. Kafka offset, which we use to relate the snapshot with the offset.

In case of system crash, we restore the latest bloom filter from S3, replay the requests from the request log using the metadata corresponding to the snapshot. Also, we restore all previous bloom filter snapshots which fall within the dedupe window.

Conclusion

On moving to the Bloom filter based solution we saw ~20x improvement in the read operations and ~3x improvement for write operations. Also, this gave us ~40% savings in terms of infrastructure costs with enough extra capacity to grow.

A probabilistic approach which specifically addressed the problem made a big difference in terms of scalability, performance and operational cost.

--

--

Technical Evangelist with interests in Distributed systems, Cloud computing, Scalability.