本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
MSK
描述 MSK
事件源类型的对象。有关更多信息,请参阅Amazon Lambda 开发者指南中的Amazon Lambda 与 Amazon MSK 搭配使用。
Amazon Serverless Application Model (Amazon SAM) 在设置此事件类型时生成AWS::Lambda::EventSourceMapping资源。
要使用架构注册表,您需要为函数定义特定的 IAM 角色权限。有关所需配置的示例,请参阅使用 IAM 角色完成设置。
语法
要在 Amazon SAM 模板中声明此实体,请使用以下语法。
YAML
ConsumerGroupId:
String
DestinationConfig:DestinationConfig
FilterCriteria:FilterCriteria
KmsKeyArn:String
MaximumBatchingWindowInSeconds:Integer
ProvisionedPollerConfig:ProvisionedPollerConfig
SchemaRegistryConfig:SchemaRegistryConfig
SourceAccessConfigurations:SourceAccessConfigurations
StartingPosition:String
StartingPositionTimestamp:Double
Stream:String
Topics:List
属性
-
ConsumerGroupId
-
用于配置如何从 Kafka 主题中读取事件的字符串。
类型:字符串
必需:否
Amazon CloudFormation 兼容性:此属性直接传递给
AWS::Lambda::EventSourceMapping
资源的AmazonManagedKafkaConfiguration
属性。 -
DestinationConfig
-
一个配置对象,用于在 Lambda 处理事件后指定事件目的地。
使用此属性指定来自 Amazon MSK 事件源的失败调用的目的地。
必需:否
Amazon CloudFormation 兼容性:此属性直接传递给
AWS::Lambda::EventSourceMapping
资源的DestinationConfig
属性。 -
FilterCriteria
-
定义用于确定 Lambda 是否应处理事件的条件的对象。有关更多信息,请参阅《Amazon Lambda 开发人员指南》中的 Amazon Lambda 事件筛选。
必需:否
Amazon CloudFormation 兼容性:此属性直接传递给
AWS::Lambda::EventSourceMapping
资源的FilterCriteria
属性。 -
KmsKeyArn
-
用于加密与此事件相关信息的密钥的 Amazon 资源名称(ARN)。
类型:字符串
必需:否
Amazon CloudFormation 兼容性:此属性直接传递给
AWS::Lambda::EventSourceMapping
资源的KmsKeyArn
属性。 -
MaximumBatchingWindowInSeconds
-
在调用函数之前收集记录的最长时间(以秒为单位)。
类型:整数
必需:否
Amazon CloudFormation 兼容性:此属性直接传递给
AWS::Lambda::EventSourceMapping
资源的MaximumBatchingWindowInSeconds
属性。 -
ProvisionedPollerConfig
-
用于增加用于计算事件源映射的轮询器数量的配置。此配置允许最少 1 个轮询器和最多 20 个轮询器。有关示例,请参阅ProvisionedPollerConfig 示例。
必需:否
Amazon CloudFormation 兼容性:此属性直接传递给
AWS::Lambda::EventSourceMapping
资源的ProvisionedPollerConfig
属性。 SchemaRegistryConfig
-
用于在 Kafka 事件源中使用架构注册表的配置。
注意
需要配置
ProvisionedPollerConfig
此功能。类型: SchemaRegistryConfig
必需:否
Amazon CloudFormation 兼容性:此属性直接传递给
AWS::Lambda::EventSourceMapping
资源的AmazonManagedKafkaEventSourceConfig
属性。 -
SourceAccessConfigurations
-
用于保护与定义事件源的身份验证协议数组 VPC 组件或虚拟化主机。
有效值:
CLIENT_CERTIFICATE_TLS_AUTH
类型:SourceAccessConfiguration 列表
必需:否
Amazon CloudFormation 兼容性:此属性是
AWS::Lambda::EventSourceMapping
资源AmazonManagedKafkaEventSourceConfig属性的一部分。 -
StartingPosition
-
在流中开始读取数据的位置。
-
AT_TIMESTAMP
– 指定开始读取记录的时间。 -
LATEST
- 仅读取新记录。 -
TRIM_HORIZON
- 处理所有可用的记录。
有效值:
AT_TIMESTAMP
|LATEST
|TRIM_HORIZON
类型:字符串
必需:否
Amazon CloudFormation 兼容性:此属性直接传递给
AWS::Lambda::EventSourceMapping
资源的StartingPosition
属性。 -
-
StartingPositionTimestamp
-
开始读取的时间(以 Unix 时间秒为单位) 在
StartingPosition
被指定为AT_TIMESTAMP
的情况下定义StartingPositionTimestamp
。类型:双精度
必需:否
Amazon CloudFormation 兼容性:此属性直接传递给
AWS::Lambda::EventSourceMapping
资源的StartingPositionTimestamp
属性。 -
Stream
-
数据流的 Amazon 资源名称(ARN)或流使用者。
类型:字符串
必需:是
Amazon CloudFormation 兼容性:此属性直接传递给
AWS::Lambda::EventSourceMapping
资源的EventSourceArn
属性。 -
Topics
-
Kafka 主题的名称。
类型:列表
必需:是
Amazon CloudFormation 兼容性:此属性直接传递给
AWS::Lambda::EventSourceMapping
资源的Topics
属性。
示例
使用 IAM 角色完成设置
以下示例显示了完整的设置,包括使用架构注册表所需的 IAM 角色配置:
Parameters: PreCreatedSubnetOne: Type: String PreCreatedSubnetTwo: Type: String MskClusterName4: Type: String Resources: MyLambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Action: [sts:AssumeRole] Effect: Allow Principal: Service: [lambda.amazonaws.com] Policies: - PolicyName: KafkaClusterPermissions PolicyDocument: Statement: - Action: [kafka:DescribeClusterV2, kafka:GetBootstrapBrokers] Effect: Allow Resource: 'arn:aws:kafka:us-east-1:123456789012:cluster/*' - PolicyName: KafkaAuthPolicy PolicyDocument: Statement: - Action: [secretsmanager:GetSecretValue, kms:Decrypt] Effect: "Allow" Resource: ['arn:aws:secretsmanager:us-west-2:123456789012:secret:kafkaSecret-******', 'arn:aws:kms:us-west-2:123456789012:key/keyId'] - PolicyName: ENIPolicy PolicyDocument: Statement: - Action: [ec2:CreateNetworkInterface, ec2:DescribeNetworkInterfaces, ec2:DescribeVpcs, ec2:DeleteNetworkInterface, ec2:DescribeSubnets, ec2:DescribeSecurityGroups] Effect: Allow Resource: '*' - PolicyName: SchemaRegistryPolicy PolicyDocument: Statement: - Action: [glue:GetRegistry] Effect: Allow Resource: 'arn:aws:glue:{region}:{account-id}:registry/{registry-name}' - PolicyName: SchemaVersionsPolicy PolicyDocument: Statement: - Action: [glue:GetSchemaVersions] Effect: Allow Resource: '*' ManagedPolicyArns: - !Sub arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole Tags: - {Value: SAM, Key: lambda:createdBy} MyMskCluster: Type: AWS::MSK::Cluster Properties: BrokerNodeGroupInfo: ClientSubnets: - Ref: PreCreatedSubnetOne - Ref: PreCreatedSubnetTwo InstanceType: kafka.t3.small StorageInfo: EBSStorageInfo: VolumeSize: 1 ClusterName: Ref: MskClusterName4 KafkaVersion: 3.8.x NumberOfBrokerNodes: 2 MyMskStreamProcessor: Type: AWS::Serverless::Function Properties: Runtime: nodejs18.x Handler: index.handler CodeUri: ${codeuri} Role: Fn::GetAtt: [MyLambdaExecutionRole, Arn] Events: MyMskEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster SourceAccessConfigurations: - Type: SASL_SCRAM_512_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c Topics: - SchemaRegistryTestTopic ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: AccessConfigs: - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c SchemaValidationConfigs: - Attribute: KEY EventRecordFormat: JSON SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:us-west-2:123456789012:registry/myregistry
ProvisionedPollerConfig 示例
ProvisionedPollerConfig: MinimumPollers: 1 MaximumPollers: 20
现有集群的 Amazon MSK 示例
以下示例显示了 Amazon Web Services 账户中已存在的 Amazon MSK 集群的 MSK
事件源类型。
YAML
Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: arn:aws:kafka:us-east-1:012345678012:cluster/exampleClusterName/abcdefab-1234-abcd-5678-cdef0123ab01-2 Topics: - MyTopic
在同一模板中声明的集群的 Amazon MSK 示例
以下是在同一模板文件中声明的 Amazon MSK 集群的 MSK
事件源类型的示例。
YAML
Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster # This must be the name of an MSK cluster declared in the same template file Topics: - MyTopic
带有架构注册表的 MSK 事件源
以下是使用架构注册表配置MSK
的事件源类型的示例。
Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster Topics: - SchemaRegistryTestTopic ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:us-west-2:123456789012:registry/myregistry EventRecordFormat: JSON SchemaValidationConfigs: - Attribute: KEY - Attribute: VALUE
带有 Confluent 架构注册表的 MSK 事件源
以下是使用 Confluent 架构注册表配置MSK
的事件源类型的示例。
Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster Topics: - SchemaRegistryTestTopic ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: SchemaRegistryURI: https://my-schema-registry.confluent.cloud AccessConfigs: - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-secret EventRecordFormat: JSON SchemaValidationConfigs: - Attribute: KEY - Attribute: VALUE