Processing self-managed Apache Kafka messages with Lambda - Amazon Lambda
Services or capabilities described in Amazon Web Services documentation might vary by Region. To see the differences applicable to the China Regions, see Getting Started with Amazon Web Services in China (PDF).

Processing self-managed Apache Kafka messages with Lambda

Note

If you want to send data to a target other than a Lambda function or enrich the data before sending it, see Amazon EventBridge Pipes.

Adding a Kafka cluster as an event source

To create an event source mapping, add your Kafka cluster as a Lambda function trigger using the Lambda console, an Amazon SDK, or the Amazon Command Line Interface (Amazon CLI).

This section describes how to create an event source mapping using the Lambda console and the Amazon CLI.

Prerequisites

  • A self-managed Apache Kafka cluster. Lambda supports Apache Kafka version 0.10.1.0 and later.

  • An execution role with permission to access the Amazon resources that your self-managed Kafka cluster uses.

Customizable consumer group ID

When setting up Kafka as an event source, you can specify a consumer group ID. This consumer group ID is an existing identifier for the Kafka consumer group that you want your Lambda function to join. You can use this feature to seamlessly migrate any ongoing Kafka record processing setups from other consumers to Lambda.

If you specify a consumer group ID and there are other active pollers within that consumer group, Kafka distributes messages across all consumers. In other words, Lambda doesn't receive all message for the Kafka topic. If you want Lambda to handle all messages in the topic, turn off any other pollers in that consumer group.

Additionally, if you specify a consumer group ID, and Kafka finds a valid existing consumer group with the same ID, Lambda ignores the StartingPosition parameter for your event source mapping. Instead, Lambda begins processing records according to the committed offset of the consumer group. If you specify a consumer group ID, and Kafka cannot find an existing consumer group, then Lambda configures your event source with the specified StartingPosition.

The consumer group ID that you specify must be unique among all your Kafka event sources. After creating a Kafka event source mapping with the consumer group ID specified, you cannot update this value.

Adding a self-managed Kafka cluster (console)

Follow these steps to add your self-managed Apache Kafka cluster and a Kafka topic as a trigger for your Lambda function.

To add an Apache Kafka trigger to your Lambda function (console)
  1. Open the Functions page of the Lambda console.

  2. Choose the name of your Lambda function.

  3. Under Function overview, choose Add trigger.

  4. Under Trigger configuration, do the following:

    1. Choose the Apache Kafka trigger type.

    2. For Bootstrap servers, enter the host and port pair address of a Kafka broker in your cluster, and then choose Add. Repeat for each Kafka broker in the cluster.

    3. For Topic name, enter the name of the Kafka topic used to store records in the cluster.

    4. (Optional) For Batch size, enter the maximum number of records to receive in a single batch.

    5. For Batch window, enter the maximum amount of seconds that Lambda spends gathering records before invoking the function.

    6. (Optional) For Consumer group ID, enter the ID of a Kafka consumer group to join.

    7. (Optional) For Starting position, choose Latest to start reading the stream from the latest record, Trim horizon to start at the earliest available record, or At timestamp to specify a timestamp to start reading from.

    8. (Optional) For VPC, choose the Amazon VPC for your Kafka cluster. Then, choose the VPC subnets and VPC security groups.

      This setting is required if only users within your VPC access your brokers.

    9. (Optional) For Authentication, choose Add, and then do the following:

      1. Choose the access or authentication protocol of the Kafka brokers in your cluster.

        • If your Kafka broker uses SASL/PLAIN authentication, choose BASIC_AUTH.

        • If your broker uses SASL/SCRAM authentication, choose one of the SASL_SCRAM protocols.

        • If you're configuring mTLS authentication, choose the CLIENT_CERTIFICATE_TLS_AUTH protocol.

      2. For SASL/SCRAM or mTLS authentication, choose the Secrets Manager secret key that contains the credentials for your Kafka cluster.

    10. (Optional) For Encryption, choose the Secrets Manager secret containing the root CA certificate that your Kafka brokers use for TLS encryption, if your Kafka brokers use certificates signed by a private CA.

      This setting applies to TLS encryption for SASL/SCRAM or SASL/PLAIN, and to mTLS authentication.

    11. To create the trigger in a disabled state for testing (recommended), clear Enable trigger. Or, to enable the trigger immediately, select Enable trigger.

  5. To create the trigger, choose Add.

Adding a self-managed Kafka cluster (Amazon CLI)

Use the following example Amazon CLI commands to create and view a self-managed Apache Kafka trigger for your Lambda function.

Using SASL/SCRAM

If Kafka users access your Kafka brokers over the internet, specify the Secrets Manager secret that you created for SASL/SCRAM authentication. The following example uses the create-event-source-mapping Amazon CLI command to map a Lambda function named my-kafka-function to a Kafka topic named AWSKafkaTopic.

aws lambda create-event-source-mapping \ --topics AWSKafkaTopic \ --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws-cn:secretsmanager:us-east-1:111122223333:secret:MyBrokerSecretName \ --function-name arn:aws-cn:lambda:us-east-1:111122223333:function:my-kafka-function \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'

Using a VPC

If only Kafka users within your VPC access your Kafka brokers, you must specify your VPC, subnets, and VPC security group. The following example uses the create-event-source-mapping Amazon CLI command to map a Lambda function named my-kafka-function to a Kafka topic named AWSKafkaTopic.

aws lambda create-event-source-mapping \ --topics AWSKafkaTopic \ --source-access-configuration '[{"Type": "VPC_SUBNET", "URI": "subnet:subnet-0011001100"}, {"Type": "VPC_SUBNET", "URI": "subnet:subnet-0022002200"}, {"Type": "VPC_SECURITY_GROUP", "URI": "security_group:sg-0123456789"}]' \ --function-name arn:aws-cn:lambda:us-east-1:111122223333:function:my-kafka-function \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'

Viewing the status using the Amazon CLI

The following example uses the get-event-source-mapping Amazon CLI command to describe the status of the event source mapping that you created.

aws lambda get-event-source-mapping --uuid dh38738e-992b-343a-1077-3478934hjkfd7

Self-managed Apache Kafka configuration parameters

All Lambda event source types share the same CreateEventSourceMapping and UpdateEventSourceMapping API operations. However, only some of the parameters apply to Apache Kafka.

Event source parameters that apply to self-managed Apache Kafka
Parameter Required Default Notes

BatchSize

N

100

Maximum: 10,000

Enabled

N

Enabled

FunctionName

Y

FilterCriteria

N

Lambda event filtering

MaximumBatchingWindowInSeconds

N

500 ms

Batching behavior

SelfManagedEventSource

Y

List of Kafka Brokers. Can set only on Create

SelfManagedKafkaEventSourceConfig

N

Contains the ConsumerGroupId field which defaults to a unique value.

Can set only on Create

SourceAccessConfigurations

N

No credentials

VPC information or authentication credentials for the cluster

For SASL_PLAIN, set to BASIC_AUTH

StartingPosition

Y

AT_TIMESTAMP, TRIM_HORIZON, or LATEST

Can set only on Create

StartingPositionTimestamp

N

Required if StartingPosition is set to AT_TIMESTAMP

Topics

Y

Topic name

Can set only on Create

Using a Kafka cluster as an event source

When you add your Apache Kafka cluster as a trigger for your Lambda function, the cluster is used as an event source.

Lambda reads event data from the Kafka topics that you specify as Topics in a CreateEventSourceMapping request, based on the StartingPosition that you specify. After successful processing, your Kafka topic is committed to your Kafka cluster.

If you specify the StartingPosition as LATEST, Lambda starts reading from the latest message in each partition belonging to the topic. Because there can be some delay after trigger configuration before Lambda starts reading the messages, Lambda doesn't read any messages produced during this window.

Lambda processes records from one or more Kafka topic partitions that you specify and sends a JSON payload to your function. When more records are available, Lambda continues processing records in batches, based on the BatchSize value that you specify in a CreateEventSourceMapping request, until your function catches up with the topic.

If your function returns an error for any of the messages in a batch, Lambda retries the whole batch of messages until processing succeeds or the messages expire. You can send records that fail all retry attempts to an on-failure destination for later processing.

Note

While Lambda functions typically have a maximum timeout limit of 15 minutes, event source mappings for Amazon MSK, self-managed Apache Kafka, Amazon DocumentDB, and Amazon MQ for ActiveMQ and RabbitMQ only support functions with maximum timeout limits of 14 minutes. This constraint ensures that the event source mapping can properly handle function errors and retries.

Polling and stream starting positions

Be aware that stream polling during event source mapping creation and updates is eventually consistent.

  • During event source mapping creation, it may take several minutes to start polling events from the stream.

  • During event source mapping updates, it may take several minutes to stop and restart polling events from the stream.

This behavior means that if you specify LATEST as the starting position for the stream, the event source mapping could miss events during creation or updates. To ensure that no events are missed, specify the stream starting position as TRIM_HORIZON or AT_TIMESTAMP.

Auto scaling of the Kafka event source

When you initially create an an Apache Kafka event source, Lambda allocates one consumer to process all partitions in the Kafka topic. Each consumer has multiple processors running in parallel to handle increased workloads. Additionally, Lambda automatically scales up or down the number of consumers, based on workload. To preserve message ordering in each partition, the maximum number of consumers is one consumer per partition in the topic.

In one-minute intervals, Lambda evaluates the consumer offset lag of all the partitions in the topic. If the lag is too high, the partition is receiving messages faster than Lambda can process them. If necessary, Lambda adds or removes consumers from the topic. The scaling process of adding or removing consumers occurs within three minutes of evaluation.

If your target Lambda function is overloaded, Lambda reduces the number of consumers. This action reduces the workload on the function by reducing the number of messages that consumers can retrieve and send to the function.

To monitor the throughput of your Kafka topic, you can view the Apache Kafka consumer metrics, such as consumer_lag and consumer_offset. To check how many function invocations occur in parallel, you can also monitor the concurrency metrics for your function.

Amazon CloudWatch metrics

Lambda emits the OffsetLag metric while your function processes records. The value of this metric is the difference in offset between the last record written to the Kafka event source topic and the last record that your function's consumer group processed. You can use OffsetLag to estimate the latency between when a record is added and when your consumer group processes it.

An increasing trend in OffsetLag can indicate issues with pollers in your function's consumer group. For more information, see Working with Lambda function metrics.