Processing self-managed Apache Kafka messages with Lambda - Amazon Lambda
Services or capabilities described in Amazon Web Services documentation might vary by Region. To see the differences applicable to the China Regions, see Getting Started with Amazon Web Services in China (PDF).

Processing self-managed Apache Kafka messages with Lambda

Note

If you want to send data to a target other than a Lambda function or enrich the data before sending it, see Amazon EventBridge Pipes.

Adding a Kafka cluster as an event source

To create an event source mapping, add your Kafka cluster as a Lambda function trigger using the Lambda console, an Amazon SDK, or the Amazon Command Line Interface (Amazon CLI).

This section describes how to create an event source mapping using the Lambda console and the Amazon CLI.

Prerequisites

  • A self-managed Apache Kafka cluster. Lambda supports Apache Kafka version 0.10.1.0 and later.

  • An execution role with permission to access the Amazon resources that your self-managed Kafka cluster uses.

Customizable consumer group ID

When setting up Kafka as an event source, you can specify a consumer group ID. This consumer group ID is an existing identifier for the Kafka consumer group that you want your Lambda function to join. You can use this feature to seamlessly migrate any ongoing Kafka record processing setups from other consumers to Lambda.

If you specify a consumer group ID and there are other active pollers within that consumer group, Kafka distributes messages across all consumers. In other words, Lambda doesn't receive all message for the Kafka topic. If you want Lambda to handle all messages in the topic, turn off any other pollers in that consumer group.

Additionally, if you specify a consumer group ID, and Kafka finds a valid existing consumer group with the same ID, Lambda ignores the StartingPosition parameter for your event source mapping. Instead, Lambda begins processing records according to the committed offset of the consumer group. If you specify a consumer group ID, and Kafka cannot find an existing consumer group, then Lambda configures your event source with the specified StartingPosition.

The consumer group ID that you specify must be unique among all your Kafka event sources. After creating a Kafka event source mapping with the consumer group ID specified, you cannot update this value.

Adding a self-managed Kafka cluster (console)

Follow these steps to add your self-managed Apache Kafka cluster and a Kafka topic as a trigger for your Lambda function.

To add an Apache Kafka trigger to your Lambda function (console)
  1. Open the Functions page of the Lambda console.

  2. Choose the name of your Lambda function.

  3. Under Function overview, choose Add trigger.

  4. Under Trigger configuration, do the following:

    1. Choose the Apache Kafka trigger type.

    2. For Bootstrap servers, enter the host and port pair address of a Kafka broker in your cluster, and then choose Add. Repeat for each Kafka broker in the cluster.

    3. For Topic name, enter the name of the Kafka topic used to store records in the cluster.

    4. (Optional) For Batch size, enter the maximum number of records to receive in a single batch.

    5. For Batch window, enter the maximum amount of seconds that Lambda spends gathering records before invoking the function.

    6. (Optional) For Consumer group ID, enter the ID of a Kafka consumer group to join.

    7. (Optional) For Starting position, choose Latest to start reading the stream from the latest record, Trim horizon to start at the earliest available record, or At timestamp to specify a timestamp to start reading from.

    8. (Optional) For VPC, choose the Amazon VPC for your Kafka cluster. Then, choose the VPC subnets and VPC security groups.

      This setting is required if only users within your VPC access your brokers.

    9. (Optional) For Authentication, choose Add, and then do the following:

      1. Choose the access or authentication protocol of the Kafka brokers in your cluster.

        • If your Kafka broker uses SASL/PLAIN authentication, choose BASIC_AUTH.

        • If your broker uses SASL/SCRAM authentication, choose one of the SASL_SCRAM protocols.

        • If you're configuring mTLS authentication, choose the CLIENT_CERTIFICATE_TLS_AUTH protocol.

      2. For SASL/SCRAM or mTLS authentication, choose the Secrets Manager secret key that contains the credentials for your Kafka cluster.

    10. (Optional) For Encryption, choose the Secrets Manager secret containing the root CA certificate that your Kafka brokers use for TLS encryption, if your Kafka brokers use certificates signed by a private CA.

      This setting applies to TLS encryption for SASL/SCRAM or SASL/PLAIN, and to mTLS authentication.

    11. To create the trigger in a disabled state for testing (recommended), clear Enable trigger. Or, to enable the trigger immediately, select Enable trigger.

  5. To create the trigger, choose Add.

Adding a self-managed Kafka cluster (Amazon CLI)

Use the following example Amazon CLI commands to create and view a self-managed Apache Kafka trigger for your Lambda function.

Using SASL/SCRAM

If Kafka users access your Kafka brokers over the internet, specify the Secrets Manager secret that you created for SASL/SCRAM authentication. The following example uses the create-event-source-mapping Amazon CLI command to map a Lambda function named my-kafka-function to a Kafka topic named AWSKafkaTopic.

aws lambda create-event-source-mapping \ --topics AWSKafkaTopic \ --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws-cn:secretsmanager:us-east-1:111122223333:secret:MyBrokerSecretName \ --function-name arn:aws-cn:lambda:us-east-1:111122223333:function:my-kafka-function \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'

Using a VPC

If only Kafka users within your VPC access your Kafka brokers, you must specify your VPC, subnets, and VPC security group. The following example uses the create-event-source-mapping Amazon CLI command to map a Lambda function named my-kafka-function to a Kafka topic named AWSKafkaTopic.

aws lambda create-event-source-mapping \ --topics AWSKafkaTopic \ --source-access-configuration '[{"Type": "VPC_SUBNET", "URI": "subnet:subnet-0011001100"}, {"Type": "VPC_SUBNET", "URI": "subnet:subnet-0022002200"}, {"Type": "VPC_SECURITY_GROUP", "URI": "security_group:sg-0123456789"}]' \ --function-name arn:aws-cn:lambda:us-east-1:111122223333:function:my-kafka-function \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'

Viewing the status using the Amazon CLI

The following example uses the get-event-source-mapping Amazon CLI command to describe the status of the event source mapping that you created.

aws lambda get-event-source-mapping --uuid dh38738e-992b-343a-1077-3478934hjkfd7

Self-managed Apache Kafka configuration parameters

All Lambda event source types share the same CreateEventSourceMapping and UpdateEventSourceMapping API operations. However, only some of the parameters apply to Apache Kafka.

Parameter Required Default Notes

BatchSize

N

100

Maximum: 10,000

DestinationConfig

N

N/A

Capturing discarded batches for a self-managed Apache Kafka event source

Enabled

N

True

FilterCriteria

N

N/A

Control which events Lambda sends to your function

FunctionName

Y

N/A

KMSKeyArn

N

N/A

Encryption of filter criteria

MaximumBatchingWindowInSeconds

N

500 ms

Batching behavior

ProvisionedPollersConfig

N

MinimumPollers: default value of 1 if not specified

MaximumPollers: default value of 200 if not specified

Configuring provisioned mode

SelfManagedEventSource

Y

N/A

List of Kafka Brokers. Can set only on Create

SelfManagedKafkaEventSourceConfig

N

Contains the ConsumerGroupId field which defaults to a unique value.

Can set only on Create

SourceAccessConfigurations

N

No credentials

VPC information or authentication credentials for the cluster

For SASL_PLAIN, set to BASIC_AUTH

StartingPosition

Y

N/A

AT_TIMESTAMP, TRIM_HORIZON, or LATEST

Can set only on Create

StartingPositionTimestamp

N

N/A

Required if StartingPosition is set to AT_TIMESTAMP

Tags

N

N/A

Using tags on event source mappings

Topics

Y

N/A

Topic name

Can set only on Create

Using a Kafka cluster as an event source

When you add your Apache Kafka or Amazon MSK cluster as a trigger for your Lambda function, the cluster is used as an event source.

Lambda reads event data from the Kafka topics that you specify as Topics in a CreateEventSourceMapping request, based on the StartingPosition that you specify. After successful processing, your Kafka topic is committed to your Kafka cluster.

If you specify the StartingPosition as LATEST, Lambda starts reading from the latest message in each partition belonging to the topic. Because there can be some delay after trigger configuration before Lambda starts reading the messages, Lambda doesn't read any messages produced during this window.

Lambda processes records from one or more Kafka topic partitions that you specify and sends a JSON payload to your function. A single Lambda payload can contain messages from multiple partitions. When more records are available, Lambda continues processing records in batches, based on the BatchSize value that you specify in a CreateEventSourceMapping request, until your function catches up with the topic.

If your function returns an error for any of the messages in a batch, Lambda retries the whole batch of messages until processing succeeds or the messages expire. You can send records that fail all retry attempts to an on-failure destination for later processing.

Note

While Lambda functions typically have a maximum timeout limit of 15 minutes, event source mappings for Amazon MSK, self-managed Apache Kafka, Amazon DocumentDB, and Amazon MQ for ActiveMQ and RabbitMQ only support functions with maximum timeout limits of 14 minutes. This constraint ensures that the event source mapping can properly handle function errors and retries.

Polling and stream starting positions

Be aware that stream polling during event source mapping creation and updates is eventually consistent.

  • During event source mapping creation, it may take several minutes to start polling events from the stream.

  • During event source mapping updates, it may take several minutes to stop and restart polling events from the stream.

This behavior means that if you specify LATEST as the starting position for the stream, the event source mapping could miss events during creation or updates. To ensure that no events are missed, specify the stream starting position as TRIM_HORIZON or AT_TIMESTAMP.

Message throughput scaling behavior for self-managed Apache Kafka event source mappings

You can choose between two modes of message throughput scaling behavior for your Amazon MSK event source mapping:

Default (on-demand) mode

When you initially create an self-managed Apache Kafka event source, Lambda allocates a default number of event pollers to process all partitions in the Kafka topic. Lambda automatically scales up or down the number of event pollers based on message load.

In one-minute intervals, Lambda evaluates the consumer offset lag of all the partitions in the topic. If the offset lag is too high, the partition is receiving messages faster than Lambda can process them. If necessary, Lambda adds or removes event pollers from the topic. This autoscaling process of adding or removing event pollers occurs within three minutes of evaluation.

If your target Lambda function is throttled, Lambda reduces the number of event pollers. This action reduces the workload on the function by reducing the number of messages that event pollers can retrieve and send to the function.

To monitor the throughput of your Kafka topic, you can view the Apache Kafka consumer metrics, such as consumer_lag and consumer_offset.

Configuring provisioned mode

For workloads where you need to fine-tune the throughput of your event source mapping, you can use provisioned mode. In provisioned mode, you define minimum and maximum limits for the amount of provisioned event pollers. These provisioned event pollers are dedicated to your event source mapping, and can handle unexpected message spikes instantly when they occur. We recommend that you use provisioned mode for Kafka workloads that have strict performance requirements.

In Lambda, an event poller is a compute unit capable of handling up to 5 MBps of throughput. For reference, suppose your event source produces an average payload of 1MB, and the average function duration is 1 sec. If the payload doesn’t undergo any transformation (such as filtering), a single poller can support 5 MBps throughput, and 5 concurrent Lambda invocations. Using provisioned mode incurs additional costs. For pricing estimates, see Amazon Lambda pricing.

In provisioned mode, the range of accepted values for the minimum number of event pollers (MinimumPollers) is between 1 and 200, inclusive. The range of accepted values for the maximum number of event pollers (MaximumPollers) is between 1 and 2,000, inclusive. MaximumPollers must be greater than or equal to MinimumPollers. In addition, to maintain ordered processing within partitions, Lambda caps the MaximumPollers to the number of partitions in the topic.

For more details about choosing appropriate values for minimum and maximum event pollers, see Best practices and considerations when using provisioned mode.

You can configure provisioned mode for your self-managed Apache Kafka event source mapping using the console or the Lambda API.

To configure provisioned mode for an existing self-managed Apache Kafka event source mapping (console)
  1. Open the Functions page of the Lambda console.

  2. Choose the function with the self-managed Apache Kafka event source mapping you want to configure provisioned mode for.

  3. Choose Configuration, then choose Triggers.

  4. Choose the self-managed Apache Kafka event source mapping that you want to configure provisioned mode for, then choose Edit.

  5. Under Event source mapping configuration, choose Configure provisioned mode.

    • For Minimum event pollers, enter a value between 1 and 200. If you don't specify a value, Lambda chooses a default value of 1.

    • For Maximum event pollers, enter a value between 1 and 2,000. This value must be greater than or equal to your value for Minimum event pollers. If you don't specify a value, Lambda chooses a default value of 200.

  6. Choose Save.

You can configure provisioned mode programmatically using the ProvisionedPollerConfig object in your EventSourceMappingConfiguration. For example, the following UpdateEventSourceMapping CLI command configures a MinimumPollers value of 5, and a MaximumPollers value of 100.

aws lambda update-event-source-mapping \ --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \ --provisioned-poller-config '{"MinimumPollers": 5, "MaximumPollers": 100}'

After configuring provisioned mode, you can observe the usage of event pollers for your workload by monitoring the ProvisionedPollers metric. For more information, see Event source mapping metrics.

To disable provisioned mode and return to default (on-demand) mode, you can use the following UpdateEventSourceMapping CLI command:

aws lambda update-event-source-mapping \ --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \ --provisioned-poller-config '{}'

Best practices and considerations when using provisioned mode

The optimal configuration of minimum and maximum event pollers for your event source mapping depends on your application's performance requirements. We recommend that you start with the default minimum event pollers to baseline the performance profile. Adjust your configuration based on observed message processing patterns and your desired performance profile.

For workloads with spiky traffic and strict performance needs, increase the minimum event pollers to handle sudden surges in messages. To determine the minimum event pollers required, consider your workload's messages per second and average payload size, and use the throughput capacity of a single event poller (up to 5 MBps) as a reference.

To maintain ordered processing within a partition, Lambda limits the maximum event pollers to the number of partitions in the topic. Additionally, the maximum event pollers your event source mapping can scale to depends on the function's concurrency settings.

When activating provisioned mode, update your network settings to remove Amazon PrivateLink VPC endpoints and associated permissions.

Amazon CloudWatch metrics

Lambda emits the OffsetLag metric while your function processes records. The value of this metric is the difference in offset between the last record written to the Kafka event source topic and the last record that your function's consumer group processed. You can use OffsetLag to estimate the latency between when a record is added and when your consumer group processes it.

An increasing trend in OffsetLag can indicate issues with pollers in your function's consumer group. For more information, see Using CloudWatch metrics with Lambda.