Handle duplicate records
There are two primary reasons why records may be delivered more than one time to your Amazon Kinesis Data Streams application: producer retries and consumer retries. Your application must anticipate and appropriately handle processing individual records multiple times.
Producer retries
Consider a producer that experiences a network-related timeout after it makes a call to
PutRecord
, but before it can receive an acknowledgement from Amazon Kinesis Data Streams. The
producer cannot be sure if the record was delivered to Kinesis Data Streams. Assuming that every record is
important to the application, the producer would have been written to retry the call with
the same data. If both PutRecord
calls on that same data were successfully
committed to Kinesis Data Streams, then there will be two Kinesis Data Streams records. Although the two records have
identical data, they also have unique sequence numbers. Applications that need strict
guarantees should embed a primary key within the record to remove duplicates later when
processing. Note that the number of duplicates due to producer retries is usually low
compared to the number of duplicates due to consumer retries.
Note
If you use the Amazon SDK PutRecord
, learn about SDK Retry
behavior in the Amazon SDKs and Tools user guide.
Consumer retries
Consumer (data processing application) retries happen when record processors restart. Record processors for the same shard restart in the following cases:
-
A worker terminates unexpectedly
-
Worker instances are added or removed
-
Shards are merged or split
-
The application is deployed
In all these cases, the shards-to-worker-to-record-processor mapping is continuously updated to load balance processing. Shard processors that were migrated to other instances restart processing records from the last checkpoint. This results in duplicated record processing as shown in the example below. For more information about load-balancing, see Use resharding, scaling, and parallel processing to change the number of shards.
Example: Consumer retries resulting in redelivered records
In this example, you have an application that continuously reads records from a stream, aggregates records into a local file, and uploads the file to Amazon S3. For simplicity, assume there is only 1 shard and 1 worker processing the shard. Consider the following example sequence of events, assuming that the last checkpoint was at record number 10000:
-
A worker reads the next batch of records from the shard, records 10001 to 20000.
-
The worker then passes the batch of records to the associated record processor.
-
The record processor aggregates the data, creates an Amazon S3 file, and uploads the file to Amazon S3 successfully.
-
Worker terminates unexpectedly before a new checkpoint can occur.
-
Application, worker, and record processor restart.
-
Worker now begins reading from the last successful checkpoint, in this case 10001.
Thus, records 10001-20000 are consumed more than one time.
Being resilient to consumer retries
Even though records may be processed more than one time, your application may want to
present the side effects as if records were processed only one time (idempotent
processing). Solutions to this problem vary in complexity and accuracy. If the destination
of the final data can handle duplicates well, we recommend relying on the final
destination to achieve idempotent processing. For example, with Opensearch
In the example application in the previous section, it continuously reads records from a stream, aggregates records into a local file, and uploads the file to Amazon S3. As illustrated, records 10001 -20000 are consumed more than one time resulting in multiple Amazon S3 files with the same data. One way to mitigate duplicates from this example is to ensure that step 3 uses the following scheme:
-
Record Processor uses a fixed number of records per Amazon S3 file, such as 5000.
-
The file name uses this schema: Amazon S3 prefix, shard-id, and
First-Sequence-Num
. In this case, it could be something likesample-shard000001-10001
. -
After you upload the Amazon S3 file, checkpoint by specifying
Last-Sequence-Num
. In this case, you would checkpoint at record number 15000.
With this scheme, even if records are processed more than one time, the resulting Amazon S3 file has the same name and has the same data. The retries only result in writing the same data to the same file more than one time.
In the case of a reshard operation, the number of records left in the shard may be
less than your desired fixed number needed. In this case, your shutdown()
method has to flush the file to Amazon S3 and checkpoint on the last sequence number. The above
scheme is compatible with reshard operations as well.