MSK
The object describing an MSK
event source type. For more information, see
Using Amazon Lambda with Amazon MSK in the Amazon Lambda Developer Guide.
Amazon Serverless Application Model (Amazon SAM) generates an AWS::Lambda::EventSourceMapping resource when this event type is set.
To use Schema Registry, you need to define specific IAM role permissions for your function. See Complete setup with IAM roles for an example of the required configuration.
Syntax
To declare this entity in your Amazon SAM template, use the following syntax.
YAML
ConsumerGroupId:
String
DestinationConfig:DestinationConfig
FilterCriteria:FilterCriteria
KmsKeyArn:String
MaximumBatchingWindowInSeconds:Integer
ProvisionedPollerConfig:ProvisionedPollerConfig
SchemaRegistryConfig:SchemaRegistryConfig
SourceAccessConfigurations:SourceAccessConfigurations
StartingPosition:String
StartingPositionTimestamp:Double
Stream:String
Topics:List
Properties
-
ConsumerGroupId
-
A string that configures how events will be read from Kafka topics.
Type: String
Required: No
Amazon CloudFormation compatibility: This property is passed directly to the
AmazonManagedKafkaConfiguration
property of anAWS::Lambda::EventSourceMapping
resource. -
DestinationConfig
-
A configuration object that specifies the destination of an event after Lambda processes it.
Use this property to specify the destination of failed invocations from the Amazon MSK event source.
Type: DestinationConfig
Required: No
Amazon CloudFormation compatibility: This property is passed directly to the
DestinationConfig
property of anAWS::Lambda::EventSourceMapping
resource. -
FilterCriteria
-
A object that defines the criteria that determines whether Lambda should process an event. For more information, see Amazon Lambda event filtering in the Amazon Lambda Developer Guide.
Type: FilterCriteria
Required: No
Amazon CloudFormation compatibility: This property is passed directly to the
FilterCriteria
property of anAWS::Lambda::EventSourceMapping
resource. -
KmsKeyArn
-
The Amazon Resource Name (ARN) of the key to encrypt information related to this event.
Type: String
Required: No
Amazon CloudFormation compatibility: This property is passed directly to the
KmsKeyArn
property of anAWS::Lambda::EventSourceMapping
resource. -
MaximumBatchingWindowInSeconds
-
The maximum amount of time to gather records before invoking the function, in seconds.
Type: Integer
Required: No
Amazon CloudFormation compatibility: This property is passed directly to the
MaximumBatchingWindowInSeconds
property of anAWS::Lambda::EventSourceMapping
resource. -
ProvisionedPollerConfig
-
Configuration to increase the amount of pollers used to compute event source mappings. This configuration allows for a minimum of 1 poller and a maximum of 20 pollers. For an example, refer to ProvisionedPollerConfig example.
Type: ProvisionedPollerConfig
Required: No
Amazon CloudFormation compatibility: This property is passed directly to the
ProvisionedPollerConfig
property of anAWS::Lambda::EventSourceMapping
resource. SchemaRegistryConfig
-
Configuration for using a schema registry with the Kafka event source.
Note
This feature requires
ProvisionedPollerConfig
to be configured.Type: SchemaRegistryConfig
Required: No
Amazon CloudFormation compatibility: This property is passed directly to the
AmazonManagedKafkaEventSourceConfig
property of anAWS::Lambda::EventSourceMapping
resource. -
SourceAccessConfigurations
-
An array of the authentication protocol, VPC components, or virtual host to secure and define your event source.
Valid values:
CLIENT_CERTIFICATE_TLS_AUTH
Type: List of SourceAccessConfiguration
Required: No
Amazon CloudFormation compatibility: This propertyrty is part of the AmazonManagedKafkaEventSourceConfig property of an
AWS::Lambda::EventSourceMapping
resource. -
StartingPosition
-
The position in a stream from which to start reading.
-
AT_TIMESTAMP
– Specify a time from which to start reading records. -
LATEST
– Read only new records. -
TRIM_HORIZON
– Process all available records.
Valid values:
AT_TIMESTAMP
|LATEST
|TRIM_HORIZON
Type: String
Required: No
Amazon CloudFormation compatibility: This property is passed directly to the
StartingPosition
property of anAWS::Lambda::EventSourceMapping
resource. -
-
StartingPositionTimestamp
-
The time from which to start reading, in Unix time seconds. Define
StartingPositionTimestamp
whenStartingPosition
is specified asAT_TIMESTAMP
.Type: Double
Required: No
Amazon CloudFormation compatibility: This property is passed directly to the
StartingPositionTimestamp
property of anAWS::Lambda::EventSourceMapping
resource. -
Stream
-
The Amazon Resource Name (ARN) of the data stream or a stream consumer.
Type: String
Required: Yes
Amazon CloudFormation compatibility: This property is passed directly to the
EventSourceArn
property of anAWS::Lambda::EventSourceMapping
resource. -
Topics
-
The name of the Kafka topic.
Type: List
Required: Yes
Amazon CloudFormation compatibility: This property is passed directly to the
Topics
property of anAWS::Lambda::EventSourceMapping
resource.
Examples
Complete setup with IAM roles
The following example shows a complete setup including the required IAM role configuration for using Schema Registry:
Parameters: PreCreatedSubnetOne: Type: String PreCreatedSubnetTwo: Type: String MskClusterName4: Type: String Resources: MyLambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Action: [sts:AssumeRole] Effect: Allow Principal: Service: [lambda.amazonaws.com] Policies: - PolicyName: KafkaClusterPermissions PolicyDocument: Statement: - Action: [kafka:DescribeClusterV2, kafka:GetBootstrapBrokers] Effect: Allow Resource: 'arn:aws:kafka:us-east-1:123456789012:cluster/*' - PolicyName: KafkaAuthPolicy PolicyDocument: Statement: - Action: [secretsmanager:GetSecretValue, kms:Decrypt] Effect: "Allow" Resource: ['arn:aws:secretsmanager:us-west-2:123456789012:secret:kafkaSecret-******', 'arn:aws:kms:us-west-2:123456789012:key/keyId'] - PolicyName: ENIPolicy PolicyDocument: Statement: - Action: [ec2:CreateNetworkInterface, ec2:DescribeNetworkInterfaces, ec2:DescribeVpcs, ec2:DeleteNetworkInterface, ec2:DescribeSubnets, ec2:DescribeSecurityGroups] Effect: Allow Resource: '*' - PolicyName: SchemaRegistryPolicy PolicyDocument: Statement: - Action: [glue:GetRegistry] Effect: Allow Resource: 'arn:aws:glue:{region}:{account-id}:registry/{registry-name}' - PolicyName: SchemaVersionsPolicy PolicyDocument: Statement: - Action: [glue:GetSchemaVersions] Effect: Allow Resource: '*' ManagedPolicyArns: - !Sub arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole Tags: - {Value: SAM, Key: lambda:createdBy} MyMskCluster: Type: AWS::MSK::Cluster Properties: BrokerNodeGroupInfo: ClientSubnets: - Ref: PreCreatedSubnetOne - Ref: PreCreatedSubnetTwo InstanceType: kafka.t3.small StorageInfo: EBSStorageInfo: VolumeSize: 1 ClusterName: Ref: MskClusterName4 KafkaVersion: 3.8.x NumberOfBrokerNodes: 2 MyMskStreamProcessor: Type: AWS::Serverless::Function Properties: Runtime: nodejs18.x Handler: index.handler CodeUri: ${codeuri} Role: Fn::GetAtt: [MyLambdaExecutionRole, Arn] Events: MyMskEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster SourceAccessConfigurations: - Type: SASL_SCRAM_512_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c Topics: - SchemaRegistryTestTopic ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: AccessConfigs: - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c SchemaValidationConfigs: - Attribute: KEY EventRecordFormat: JSON SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:us-west-2:123456789012:registry/myregistry
ProvisionedPollerConfig example
ProvisionedPollerConfig: MinimumPollers: 1 MaximumPollers: 20
Amazon MSK Example for Existing Cluster
The following is an example of an MSK
event source type for an Amazon MSK
cluster that already exists in an Amazon Web Services account.
YAML
Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: arn:aws:kafka:us-east-1:012345678012:cluster/exampleClusterName/abcdefab-1234-abcd-5678-cdef0123ab01-2 Topics: - MyTopic
Amazon MSK Example for Cluster Declared in Same Template
The following is an example of an MSK
event source type for an Amazon MSK
cluster that is declared in the same template file.
YAML
Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster # This must be the name of an MSK cluster declared in the same template file Topics: - MyTopic
MSK Event Source with Schema Registry
The following is an example of an MSK
event source type configured with a schema registry.
Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster Topics: - SchemaRegistryTestTopic ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:us-west-2:123456789012:registry/myregistry EventRecordFormat: JSON SchemaValidationConfigs: - Attribute: KEY - Attribute: VALUE
MSK Event Source with Confluent Schema Registry
The following is an example of an MSK
event source type configured with a Confluent Schema Registry.
Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster Topics: - SchemaRegistryTestTopic ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: SchemaRegistryURI: https://my-schema-registry.confluent.cloud AccessConfigs: - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-secret EventRecordFormat: JSON SchemaValidationConfigs: - Attribute: KEY - Attribute: VALUE