MSK - Amazon Serverless Application Model
Services or capabilities described in Amazon Web Services documentation might vary by Region. To see the differences applicable to the China Regions, see Getting Started with Amazon Web Services in China (PDF).

MSK

The object describing an MSK event source type. For more information, see Using Amazon Lambda with Amazon MSK in the Amazon Lambda Developer Guide.

Amazon Serverless Application Model (Amazon SAM) generates an AWS::Lambda::EventSourceMapping resource when this event type is set.

To use Schema Registry, you need to define specific IAM role permissions for your function. See Complete setup with IAM roles for an example of the required configuration.

Syntax

To declare this entity in your Amazon SAM template, use the following syntax.

Properties

ConsumerGroupId

A string that configures how events will be read from Kafka topics.

Type: String

Required: No

Amazon CloudFormation compatibility: This property is passed directly to the AmazonManagedKafkaConfiguration property of an AWS::Lambda::EventSourceMapping resource.

DestinationConfig

A configuration object that specifies the destination of an event after Lambda processes it.

Use this property to specify the destination of failed invocations from the Amazon MSK event source.

Type: DestinationConfig

Required: No

Amazon CloudFormation compatibility: This property is passed directly to the DestinationConfig property of an AWS::Lambda::EventSourceMapping resource.

FilterCriteria

A object that defines the criteria that determines whether Lambda should process an event. For more information, see Amazon Lambda event filtering in the Amazon Lambda Developer Guide.

Type: FilterCriteria

Required: No

Amazon CloudFormation compatibility: This property is passed directly to the FilterCriteria property of an AWS::Lambda::EventSourceMapping resource.

KmsKeyArn

The Amazon Resource Name (ARN) of the key to encrypt information related to this event.

Type: String

Required: No

Amazon CloudFormation compatibility: This property is passed directly to the KmsKeyArn property of an AWS::Lambda::EventSourceMapping resource.

MaximumBatchingWindowInSeconds

The maximum amount of time to gather records before invoking the function, in seconds.

Type: Integer

Required: No

Amazon CloudFormation compatibility: This property is passed directly to the MaximumBatchingWindowInSeconds property of an AWS::Lambda::EventSourceMapping resource.

ProvisionedPollerConfig

Configuration to increase the amount of pollers used to compute event source mappings. This configuration allows for a minimum of 1 poller and a maximum of 20 pollers. For an example, refer to ProvisionedPollerConfig example.

Type: ProvisionedPollerConfig

Required: No

Amazon CloudFormation compatibility: This property is passed directly to the ProvisionedPollerConfig property of an AWS::Lambda::EventSourceMapping resource.

SchemaRegistryConfig

Configuration for using a schema registry with the Kafka event source.

Note

This feature requires ProvisionedPollerConfig to be configured.

Type: SchemaRegistryConfig

Required: No

Amazon CloudFormation compatibility: This property is passed directly to the AmazonManagedKafkaEventSourceConfig property of an AWS::Lambda::EventSourceMapping resource.

SourceAccessConfigurations

An array of the authentication protocol, VPC components, or virtual host to secure and define your event source.

Valid values: CLIENT_CERTIFICATE_TLS_AUTH

Type: List of SourceAccessConfiguration

Required: No

Amazon CloudFormation compatibility: This propertyrty is part of the AmazonManagedKafkaEventSourceConfig property of an AWS::Lambda::EventSourceMapping resource.

StartingPosition

The position in a stream from which to start reading.

  • AT_TIMESTAMP – Specify a time from which to start reading records.

  • LATEST – Read only new records.

  • TRIM_HORIZON – Process all available records.

Valid values: AT_TIMESTAMP | LATEST | TRIM_HORIZON

Type: String

Required: No

Amazon CloudFormation compatibility: This property is passed directly to the StartingPosition property of an AWS::Lambda::EventSourceMapping resource.

StartingPositionTimestamp

The time from which to start reading, in Unix time seconds. Define StartingPositionTimestamp when StartingPosition is specified as AT_TIMESTAMP.

Type: Double

Required: No

Amazon CloudFormation compatibility: This property is passed directly to the StartingPositionTimestamp property of an AWS::Lambda::EventSourceMapping resource.

Stream

The Amazon Resource Name (ARN) of the data stream or a stream consumer.

Type: String

Required: Yes

Amazon CloudFormation compatibility: This property is passed directly to the EventSourceArn property of an AWS::Lambda::EventSourceMapping resource.

Topics

The name of the Kafka topic.

Type: List

Required: Yes

Amazon CloudFormation compatibility: This property is passed directly to the Topics property of an AWS::Lambda::EventSourceMapping resource.

Examples

Complete setup with IAM roles

The following example shows a complete setup including the required IAM role configuration for using Schema Registry:

Parameters: PreCreatedSubnetOne: Type: String PreCreatedSubnetTwo: Type: String MskClusterName4: Type: String Resources: MyLambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Action: [sts:AssumeRole] Effect: Allow Principal: Service: [lambda.amazonaws.com] Policies: - PolicyName: KafkaClusterPermissions PolicyDocument: Statement: - Action: [kafka:DescribeClusterV2, kafka:GetBootstrapBrokers] Effect: Allow Resource: 'arn:aws:kafka:us-east-1:123456789012:cluster/*' - PolicyName: KafkaAuthPolicy PolicyDocument: Statement: - Action: [secretsmanager:GetSecretValue, kms:Decrypt] Effect: "Allow" Resource: ['arn:aws:secretsmanager:us-west-2:123456789012:secret:kafkaSecret-******', 'arn:aws:kms:us-west-2:123456789012:key/keyId'] - PolicyName: ENIPolicy PolicyDocument: Statement: - Action: [ec2:CreateNetworkInterface, ec2:DescribeNetworkInterfaces, ec2:DescribeVpcs, ec2:DeleteNetworkInterface, ec2:DescribeSubnets, ec2:DescribeSecurityGroups] Effect: Allow Resource: '*' - PolicyName: SchemaRegistryPolicy PolicyDocument: Statement: - Action: [glue:GetRegistry] Effect: Allow Resource: 'arn:aws:glue:{region}:{account-id}:registry/{registry-name}' - PolicyName: SchemaVersionsPolicy PolicyDocument: Statement: - Action: [glue:GetSchemaVersions] Effect: Allow Resource: '*' ManagedPolicyArns: - !Sub arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole Tags: - {Value: SAM, Key: lambda:createdBy} MyMskCluster: Type: AWS::MSK::Cluster Properties: BrokerNodeGroupInfo: ClientSubnets: - Ref: PreCreatedSubnetOne - Ref: PreCreatedSubnetTwo InstanceType: kafka.t3.small StorageInfo: EBSStorageInfo: VolumeSize: 1 ClusterName: Ref: MskClusterName4 KafkaVersion: 3.8.x NumberOfBrokerNodes: 2 MyMskStreamProcessor: Type: AWS::Serverless::Function Properties: Runtime: nodejs18.x Handler: index.handler CodeUri: ${codeuri} Role: Fn::GetAtt: [MyLambdaExecutionRole, Arn] Events: MyMskEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster SourceAccessConfigurations: - Type: SASL_SCRAM_512_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c Topics: - SchemaRegistryTestTopic ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: AccessConfigs: - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c SchemaValidationConfigs: - Attribute: KEY EventRecordFormat: JSON SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:us-west-2:123456789012:registry/myregistry

ProvisionedPollerConfig example

ProvisionedPollerConfig: MinimumPollers: 1 MaximumPollers: 20

Amazon MSK Example for Existing Cluster

The following is an example of an MSK event source type for an Amazon MSK cluster that already exists in an Amazon Web Services account.

YAML

Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: arn:aws:kafka:us-east-1:012345678012:cluster/exampleClusterName/abcdefab-1234-abcd-5678-cdef0123ab01-2 Topics: - MyTopic

Amazon MSK Example for Cluster Declared in Same Template

The following is an example of an MSK event source type for an Amazon MSK cluster that is declared in the same template file.

YAML

Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster # This must be the name of an MSK cluster declared in the same template file Topics: - MyTopic

MSK Event Source with Schema Registry

The following is an example of an MSK event source type configured with a schema registry.

Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster Topics: - SchemaRegistryTestTopic ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:us-west-2:123456789012:registry/myregistry EventRecordFormat: JSON SchemaValidationConfigs: - Attribute: KEY - Attribute: VALUE

MSK Event Source with Confluent Schema Registry

The following is an example of an MSK event source type configured with a Confluent Schema Registry.

Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster Topics: - SchemaRegistryTestTopic ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: SchemaRegistryURI: https://my-schema-registry.confluent.cloud AccessConfigs: - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-secret EventRecordFormat: JSON SchemaValidationConfigs: - Attribute: KEY - Attribute: VALUE