Amazon Kinesis stream as a source
You can use EventBridge Pipes to receive records in a Kinesis data stream. You can then optionally filter or enhance these records before sending them to one of the available destinations for processing. There are settings specific to Kinesis that you can choose when setting up the pipe. EventBridge Pipes maintains the order of records from the data stream when sending that data to the destination.
A Kinesis data stream is a set of shards. Each shard contains a sequence of data records. A consumer is an application that processes the data from a Kinesis data stream. You can map an EventBridge Pipe to a shared-throughput consumer (standard iterator), or to a dedicated-throughput consumer with enhanced fan-out.
For standard iterators, EventBridge uses the HTTP protocol to poll each shard in your Kinesis stream for records. The pipe shares the read throughput with other consumers of the shard.
To minimize latency and maximize read throughput, you can create a data stream consumer with enhanced fan-out. Stream consumers get a dedicated connection to each shard that doesn't impact other applications reading from the stream. The dedicated throughput can help if you have many applications reading the same data, or if you're reprocessing a stream with large records. Kinesis pushes records to EventBridge over HTTP/2. For information about Kinesis data streams, see Reading Data from Amazon Kinesis Data Streams.
Example event
The following sample event shows the information that is received by the pipe. You can use this event to create and filter your event patterns, or to define input transformation. Not all of the fields can be filtered. For more information about which fields you can filter, see Amazon EventBridge Pipes filtering.
[ { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }, { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692540925702759324208523137515618", "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=", "approximateArrivalTimestamp": 1545084711.166 "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" } ]
Polling and batching streams
By default, EventBridge invokes your pipe as soon as records are available. If the batch that EventBridge reads from the source has only one record in it, only one event is processed. To avoid processing a small number of records, you can tell the pipe to buffer records for up to five minutes by configuring a batching window. Before processing the events, EventBridge continues to read records from the source until it has gathered a full batch, the batching window expires, or the batch reaches the payload limit of 6 MB.
EventBridge polls shards in your DynamoDB stream for records at a base rate of four times per second. When records are available, EventBridge processes the event and waits for the result. If processing succeeds, EventBridge resumes polling until it receives more records.
By default, EventBridge invokes your pipe as soon as records are available. If the batch that EventBridge reads from the source has only one record in it, only one event is processed. To avoid processing a small number of records, you can tell the pipe to buffer records for up to five minutes by configuring a batching window. Before processing the events, EventBridge continues to read records from the source until it has gathered a full batch, the batching window expires, or the batch reaches the payload limit of 6 MB.
You can also increase concurrency by processing multiple batches from each shard in parallel. EventBridge can process up to 10 batches in each shard simultaneously. If you increase the number of concurrent batches per shard, EventBridge still ensures in-order processing at the partition key level.
Configure the ParallelizationFactor
setting to process one shard of a Kinesis or DynamoDB data stream with more than one Pipe execution simultaneously.
You can specify the number of concurrent batches that EventBridge polls from a shard via a parallelization factor from 1 (default) to 10.
For example, when you set ParallelizationFactor
to 2, you can have 200 concurrent EventBridge Pipe executions at maximum to process 100 Kinesis data shards.
This helps scale up the processing throughput when the data volume is volatile and the IteratorAge
is high.
Note that parallelization factor will not work if you are using Kinesis aggregation.
Execution role permissions
When setting up a pipe, you can use an existing execution role, or have EventBridge create one for you with the needed permissions. For Kinesis, EventBridge Pipes requires the following permissions to manage resources that are related to your Kinesis data stream. If you’re setting up your own execution role, you must add these permissions yourself.
If you’re unsure of the exact well-scoped permissions required to access the source, use the EventBridge Pipes console to create a new role, then inspect the actions listed in the policy.
Configuring a pipe with Kinesis as the source
Adding the Kinesis source
To add a Kinesis source by using the console
Open the Amazon EventBridge console at https://console.amazonaws.cn/events/
. On the navigation pane, choose Pipes.
Choose Create pipe.
Enter a name for the pipe.
(Optional) Add a description for the pipe.
On the Build pipe tab, for Source, choose Kinesis.
For Kinesis stream, choose the stream that you want to use as a source.
For Starting position, choose one of the following:
Latest – Start reading the stream with the most recent record in the shard.
Trim horizon – Start reading the stream with the last untrimmed record in the shard. This is the oldest record in the shard.
At timestamp – Start reading the stream from a specified time. Under Timestamp, enter a data and time using YYYY/MM/DD and hh:mm:ss format.
(Optional) For Additional setting - optional, do the following:
For Batch size - optional, enter a maximum number of records for each batch. The default value is 100.
(Optional) For Batch window - optional, enter a maximum number of seconds to gather records before proceeding.
For Concurrent batches per shard - optional, enter the number of batches from the same shard that can be read at the same time.
For On partial batch item failure, choose the following:
AUTOMATIC_BISECT – Halve each batch and retry each half until all the records are processed or there is one failed message remaining in the batch.
Note If you don't choose AUTOMATIC_BISECT, you can return specific failed records and only those get retried.
Now that the source is configured, you can add optional filtering, optional enrichment, or a target for the pipe.
(Optional) Configuring filtering
You can add filtering to your pipe so you’re sending only a subset of records from your DynamoDB stream to the target.
To configure filtering by using the console
Choose Filtering.
Under Sample event - optional, you’ll see a sample Kinesis stream event that you can use to build your event pattern, or you can enter your own event by choosing Enter your own.
Under Event pattern, enter the event pattern that that you want to filter the records. For more information about building event patterns, see Amazon EventBridge event patterns.
The following is an example event pattern that only sends events with the value Seattle in the City field.
{ "data": { "City": ["Seattle"] } }
Now that events are being filtered, you can add optional enrichment and a target for the pipe.
(Optional) Defining enrichment
You can send the event data for enrichment to a Lambda function, Amazon Step Functions state machine, Amazon API Gateway, or API destination.
To select enrichment
Choose Enrichment.
Under Details, for Service, select the service and related settings you want to use for enrichment.
You can also transform the data before sending it to be enhanced.
(Optional) To define the input transformer
Choose Enrichment Input Transformer - optional.
For Sample events/Event Payload, choose the sample event type.
For Transformer, enter the transformer syntax, such as
"Event happened at <$.detail.field>."
where<$.detail.field>
is a reference to a field from the sample event. You can also double-click a field from the sample event to add it to the transformer.For Output, verify that the output looks like you want it to.
Now that the data has been filtered and enhanced, you must define a target to send the event data to.
Configuring a target
To configure a target
Choose Target.
Under Details, for Target service, choose the target. The fields that display vary depending on the target that you choose. Enter information specific to this target type, as needed.
You can also transform the data before sending it to the target.
(Optional) To define the input transformer
Choose Target Input Transformer - optional.
For Sample events/Event Payload, choose the sample event type.
For Transformer, enter the transformer syntax, such as
"Event happened at <$.detail.field>."
where<$.detail.field>
is a reference to a field from the sample event. You can also double-click a field from the sample event to add it to the transformer.For Output, verify that the output looks like you want it to.
Now that the pipe is configured, make sure that its settings are configured correctly.
Configuring the pipe settings
A pipe is active by default, but you can deactivate it. You can also specify the permissions of the pipe and add tags.
To configure the pipe settings
Choose the Pipe settings tab.
By default, newly created pipes are active as soon as they're created. If you want to create an inactive pipe, under Activation, for Activate pipe, turn off Active.
Under Permissions, for Execution role, do one of the following:
To have EventBridge create a new execution role for this pipe, choose Create a new role for this specific resource. Under Role name, you can optionally edit the role name.
To use an existing execution role, choose Use existing role. Under Role name, choose the role.
-
(Optional) For Retry policy and Dead-letter queue - optional, do the following:
Under Retry policy, do the following:
By default, newly created pipes don't have a retry policy turned on. If you want to turn on retry policies, turn on Retry.
-
For Maximum age of event, enter a value between one minute (00:01) and 24 hours (24:00).
-
For Retry attempts, enter a number between 0 and 185.
By default, newly created pipes don't use a dead-letter queue (DLQ). If you want to use a DLQ, turn on Dead-letter queue, choose the method of your choice, and choose the queue or topic you'd like to use.
(Optional) Under Tags - optional, choose Add new tag and enter one or more tags for the rule. For more information, see Amazon EventBridge tags.
Choose Create pipe.
Reporting batch item failures
When EventBridge consumes and processes streaming data from an source, by default it checkpoints to the highest sequence number of a batch, but only when the batch is a complete success. EventBridge also allows partial successes and, by default, retries only specific failed records. This helps to reduce the number of retries for a record. However, it doesn’t prevent the possibility of retries for a successful record.
Success and failure conditions
If you return any of the following, EventBridge treats a batch as a complete success:
An empty
batchItemFailure
listA null
batchItemFailure
listAn empty
EventResponse
A null
EventResponse
If you return any of the following, EventBridge treats a batch as a complete failure:
An empty string
itemIdentifier
A null
itemIdentifier
An
itemIdentifier
with a bad key name
EventBridge retries failures based on your retry strategy.