Processing Amazon MSK messages with Lambda - Amazon Lambda
Services or capabilities described in Amazon Web Services documentation might vary by Region. To see the differences applicable to the China Regions, see Getting Started with Amazon Web Services in China (PDF).

Processing Amazon MSK messages with Lambda

Note

If you want to send data to a target other than a Lambda function or enrich the data before sending it, see Amazon EventBridge Pipes.

Adding Amazon MSK as an event source

To create an event source mapping, add Amazon MSK as a Lambda function trigger using the Lambda console, an Amazon SDK, or the Amazon Command Line Interface (Amazon CLI). Note that when you add Amazon MSK as a trigger, Lambda assumes the VPC settings of the Amazon MSK cluster, not the Lambda function's VPC settings.

This section describes how to create an event source mapping using the Lambda console and the Amazon CLI.

Prerequisites

  • An Amazon MSK cluster and a Kafka topic. For more information, see Getting Started Using Amazon MSK in the Amazon Managed Streaming for Apache Kafka Developer Guide.

  • An execution role with permission to access the Amazon resources that your MSK cluster uses.

Customizable consumer group ID

When setting up Kafka as an event source, you can specify a consumer group ID. This consumer group ID is an existing identifier for the Kafka consumer group that you want your Lambda function to join. You can use this feature to seamlessly migrate any ongoing Kafka record processing setups from other consumers to Lambda.

If you specify a consumer group ID and there are other active pollers within that consumer group, Kafka distributes messages across all consumers. In other words, Lambda doesn't receive all message for the Kafka topic. If you want Lambda to handle all messages in the topic, turn off any other pollers in that consumer group.

Additionally, if you specify a consumer group ID, and Kafka finds a valid existing consumer group with the same ID, Lambda ignores the StartingPosition parameter for your event source mapping. Instead, Lambda begins processing records according to the committed offset of the consumer group. If you specify a consumer group ID, and Kafka cannot find an existing consumer group, then Lambda configures your event source with the specified StartingPosition.

The consumer group ID that you specify must be unique among all your Kafka event sources. After creating a Kafka event source mapping with the consumer group ID specified, you cannot update this value.

Adding an Amazon MSK trigger (console)

Follow these steps to add your Amazon MSK cluster and a Kafka topic as a trigger for your Lambda function.

To add an Amazon MSK trigger to your Lambda function (console)
  1. Open the Functions page of the Lambda console.

  2. Choose the name of your Lambda function.

  3. Under Function overview, choose Add trigger.

  4. Under Trigger configuration, do the following:

    1. Choose the MSK trigger type.

    2. For MSK cluster, select your cluster.

    3. For Batch size, enter the maximum number of messages to receive in a single batch.

    4. For Batch window, enter the maximum amount of seconds that Lambda spends gathering records before invoking the function.

    5. For Topic name, enter the name of a Kafka topic.

    6. (Optional) For Consumer group ID, enter the ID of a Kafka consumer group to join.

    7. (Optional) For Starting position, choose Latest to start reading the stream from the latest record, Trim horizon to start at the earliest available record, or At timestamp to specify a timestamp to start reading from.

    8. (Optional) For Authentication, choose the secret key for authenticating with the brokers in your MSK cluster.

    9. To create the trigger in a disabled state for testing (recommended), clear Enable trigger. Or, to enable the trigger immediately, select Enable trigger.

  5. To create the trigger, choose Add.

Adding an Amazon MSK trigger (Amazon CLI)

Use the following example Amazon CLI commands to create and view an Amazon MSK trigger for your Lambda function.

Creating a trigger using the Amazon CLI

Example — Create event source mapping for cluster that uses IAM authentication

The following example uses the create-event-source-mapping Amazon CLI command to map a Lambda function named my-kafka-function to a Kafka topic named AWSKafkaTopic. The topic's starting position is set to LATEST. When the cluster uses IAM role-based authentication, you don't need a SourceAccessConfiguration object. Example:

aws lambda create-event-source-mapping \ --event-source-arn arn:aws-cn:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function
Example — Create event source mapping for cluster that uses SASL/SCRAM authentication

If the cluster uses SASL/SCRAM authentication, you must include a SourceAccessConfiguration object that specifies SASL_SCRAM_512_AUTH and a Secrets Manager secret ARN.

aws lambda create-event-source-mapping \ --event-source-arn arn:aws-cn:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'
Example — Create event source mapping for cluster that uses mTLS authentication

If the cluster uses mTLS authentication, you must include a SourceAccessConfiguration object that specifies CLIENT_CERTIFICATE_TLS_AUTH and a Secrets Manager secret ARN.

aws lambda create-event-source-mapping \ --event-source-arn arn:aws-cn:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'

For more information, see the CreateEventSourceMapping API reference documentation.

Viewing the status using the Amazon CLI

The following example uses the get-event-source-mapping Amazon CLI command to describe the status of the event source mapping that you created.

aws lambda get-event-source-mapping \ --uuid 6d9bce8e-836b-442c-8070-74e77903c815

Amazon MSK configuration parameters

All Lambda event source types share the same CreateEventSourceMapping and UpdateEventSourceMapping API operations. However, only some of the parameters apply to Amazon MSK.

Parameter Required Default Notes

AmazonManagedKafkaEventSourceConfig

N

Contains the ConsumerGroupId field, which defaults to a unique value.

Can set only on Create

BatchSize

N

100

Maximum: 10,000

Enabled

N

Enabled

none

EventSourceArn

Y

N/A

Can set only on Create

FunctionName

Y

N/A

none

FilterCriteria

N

N/A

Control which events Lambda sends to your function

MaximumBatchingWindowInSeconds

N

500 ms

Batching behavior

SourceAccessConfigurations

N

No credentials

SASL/SCRAM or CLIENT_CERTIFICATE_TLS_AUTH (MutualTLS) authentication credentials for your event source

StartingPosition

Y

N/A

AT_TIMESTAMP, TRIM_HORIZON, or LATEST

Can set only on Create

StartingPositionTimestamp

N

N/A

Required if StartingPosition is set to AT_TIMESTAMP

Topics

Y

N/A

Kafka topic name

Can set only on Create

Creating cross-account event source mappings

You can use multi-VPC private connectivity to connect a Lambda function to a provisioned MSK cluster in a different Amazon Web Services account. Multi-VPC connectivity uses Amazon PrivateLink, which keeps all traffic within the Amazon network.

Note

You can't create cross-account event source mappings for serverless MSK clusters.

To create a cross-account event source mapping, you must first configure multi-VPC connectivity for the MSK cluster. When you create the event source mapping, use the managed VPC connection ARN instead of the cluster ARN, as shown in the following examples. The CreateEventSourceMapping operation also differs depending on which authentication type the MSK cluster uses.

Example — Create cross-account event source mapping for cluster that uses IAM authentication

When the cluster uses IAM role-based authentication, you don't need a SourceAccessConfiguration object. Example:

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function
Example — Create cross-account event source mapping for cluster that uses SASL/SCRAM authentication

If the cluster uses SASL/SCRAM authentication, you must include a SourceAccessConfiguration object that specifies SASL_SCRAM_512_AUTH and a Secrets Manager secret ARN.

There are two ways to use secrets for cross-account Amazon MSK event source mappings with SASL/SCRAM authentication:

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'
Example — Create cross-account event source mapping for cluster that uses mTLS authentication

If the cluster uses mTLS authentication, you must include a SourceAccessConfiguration object that specifies CLIENT_CERTIFICATE_TLS_AUTH and a Secrets Manager secret ARN. The secret can be stored in the cluster account or the Lambda function account.

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'

Using an Amazon MSK cluster as an event source

When you add your Apache Kafka or Amazon MSK cluster as a trigger for your Lambda function, the cluster is used as an event source.

Lambda reads event data from the Kafka topics that you specify as Topics in a CreateEventSourceMapping request, based on the StartingPosition that you specify. After successful processing, your Kafka topic is committed to your Kafka cluster.

If you specify the StartingPosition as LATEST, Lambda starts reading from the latest message in each partition belonging to the topic. Because there can be some delay after trigger configuration before Lambda starts reading the messages, Lambda doesn't read any messages produced during this window.

Lambda reads messages sequentially for each Kafka topic partition. A single Lambda payload can contain messages from multiple partitions. When more records are available, Lambda continues processing records in batches, based on the BatchSize value that you specify in a CreateEventSourceMapping request, until your function catches up with the topic.

After Lambda processes each batch, it commits the offsets of the messages in that batch. If your function returns an error for any of the messages in a batch, Lambda retries the whole batch of messages until processing succeeds or the messages expire. You can send records that fail all retry attempts to an on-failure destination for later processing.

Note

While Lambda functions typically have a maximum timeout limit of 15 minutes, event source mappings for Amazon MSK, self-managed Apache Kafka, Amazon DocumentDB, and Amazon MQ for ActiveMQ and RabbitMQ only support functions with maximum timeout limits of 14 minutes. This constraint ensures that the event source mapping can properly handle function errors and retries.

Polling and stream starting positions

Be aware that stream polling during event source mapping creation and updates is eventually consistent.

  • During event source mapping creation, it may take several minutes to start polling events from the stream.

  • During event source mapping updates, it may take several minutes to stop and restart polling events from the stream.

This behavior means that if you specify LATEST as the starting position for the stream, the event source mapping could miss events during creation or updates. To ensure that no events are missed, specify the stream starting position as TRIM_HORIZON or AT_TIMESTAMP.

Amazon CloudWatch metrics

Lambda emits the OffsetLag metric while your function processes records. The value of this metric is the difference in offset between the last record written to the Kafka event source topic and the last record that your function's consumer group processed. You can use OffsetLag to estimate the latency between when a record is added and when your consumer group processes it.

An increasing trend in OffsetLag can indicate issues with pollers in your function's consumer group. For more information, see View metrics for Lambda functions.

Auto scaling of the Amazon MSK event source

When you initially create an Amazon MSK event source, Lambda allocates one consumer to process all partitions in the Kafka topic. Each consumer has multiple processors running in parallel to handle increased workloads. Additionally, Lambda automatically scales up or down the number of consumers, based on workload. To preserve message ordering in each partition, the maximum number of consumers is one consumer per partition in the topic.

In one-minute intervals, Lambda evaluates the consumer offset lag of all the partitions in the topic. If the lag is too high, the partition is receiving messages faster than Lambda can process them. If necessary, Lambda adds or removes consumers from the topic. The scaling process of adding or removing consumers occurs within three minutes of evaluation.

If your target Lambda function is throttled, Lambda reduces the number of consumers. This action reduces the workload on the function by reducing the number of messages that consumers can retrieve and send to the function.

To monitor the throughput of your Kafka topic, view the Offset lag metric Lambda emits while your function processes records.

To check how many function invocations occur in parallel, you can also monitor the concurrency metrics for your function.