MSK - Amazon Serverless Application Model
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

MSK

描述 MSK 事件源类型的对象。有关更多信息,请参阅Amazon Lambda 开发者指南中的Amazon Lambda 与 Amazon MSK 搭配使用

Amazon Serverless Application Model (Amazon SAM) 在设置此事件类型时生成AWS::Lambda::EventSourceMapping资源。

要使用架构注册表,您需要为函数定义特定的 IAM 角色权限。有关所需配置的示例,请参阅使用 IAM 角色完成设置

语法

要在 Amazon SAM 模板中声明此实体,请使用以下语法。

属性

ConsumerGroupId

用于配置如何从 Kafka 主题中读取事件的字符串。

类型:字符串

必需:否

Amazon CloudFormation 兼容性:此属性直接传递给AWS::Lambda::EventSourceMapping资源的AmazonManagedKafkaConfiguration属性。

DestinationConfig

一个配置对象,用于在 Lambda 处理事件后指定事件目的地。

使用此属性指定来自 Amazon MSK 事件源的失败调用的目的地。

类型DestinationConfig

必需:否

Amazon CloudFormation 兼容性:此属性直接传递给AWS::Lambda::EventSourceMapping资源的 DestinationConfig属性。

FilterCriteria

定义用于确定 Lambda 是否应处理事件的条件的对象。有关更多信息,请参阅《Amazon Lambda 开发人员指南》中的 Amazon Lambda 事件筛选

类型FilterCriteria

必需:否

Amazon CloudFormation 兼容性:此属性直接传递给AWS::Lambda::EventSourceMapping资源的FilterCriteria属性。

KmsKeyArn

用于加密与此事件相关信息的密钥的 Amazon 资源名称(ARN)。

类型:字符串

必需:否

Amazon CloudFormation 兼容性:此属性直接传递给AWS::Lambda::EventSourceMapping资源的KmsKeyArn属性。

MaximumBatchingWindowInSeconds

在调用函数之前收集记录的最长时间(以秒为单位)。

类型:整数

必需:否

Amazon CloudFormation 兼容性:此属性直接传递给AWS::Lambda::EventSourceMapping资源的MaximumBatchingWindowInSeconds属性。

ProvisionedPollerConfig

用于增加用于计算事件源映射的轮询器数量的配置。此配置允许最少 1 个轮询器和最多 20 个轮询器。有关示例,请参阅ProvisionedPollerConfig 示例

类型ProvisionedPollerConfig

必需:否

Amazon CloudFormation 兼容性:此属性直接传递给AWS::Lambda::EventSourceMapping资源的ProvisionedPollerConfig属性。

SchemaRegistryConfig

用于在 Kafka 事件源中使用架构注册表的配置。

注意

需要配置ProvisionedPollerConfig此功能。

类型: SchemaRegistryConfig

必需:否

Amazon CloudFormation 兼容性:此属性直接传递给AWS::Lambda::EventSourceMapping资源的AmazonManagedKafkaEventSourceConfig属性。

SourceAccessConfigurations

用于保护与定义事件源的身份验证协议数组 VPC 组件或虚拟化主机。

有效值CLIENT_CERTIFICATE_TLS_AUTH

类型SourceAccessConfiguration 列表

必需:否

Amazon CloudFormation 兼容性:此属性是AWS::Lambda::EventSourceMapping资源AmazonManagedKafkaEventSourceConfig属性的一部分。

StartingPosition

在流中开始读取数据的位置。

  • AT_TIMESTAMP – 指定开始读取记录的时间。

  • LATEST - 仅读取新记录。

  • TRIM_HORIZON - 处理所有可用的记录。

有效值AT_TIMESTAMP | LATEST | TRIM_HORIZON

类型:字符串

必需:否

Amazon CloudFormation 兼容性:此属性直接传递给AWS::Lambda::EventSourceMapping资源的StartingPosition属性。

StartingPositionTimestamp

开始读取的时间(以 Unix 时间秒为单位) 在 StartingPosition 被指定为 AT_TIMESTAMP 的情况下定义 StartingPositionTimestamp

类型:双精度

必需:否

Amazon CloudFormation 兼容性:此属性直接传递给AWS::Lambda::EventSourceMapping资源的StartingPositionTimestamp属性。

Stream

数据流的 Amazon 资源名称(ARN)或流使用者。

类型:字符串

必需:是

Amazon CloudFormation 兼容性:此属性直接传递给AWS::Lambda::EventSourceMapping资源的EventSourceArn属性。

Topics

Kafka 主题的名称。

类型:列表

必需:是

Amazon CloudFormation 兼容性:此属性直接传递给AWS::Lambda::EventSourceMapping资源的Topics属性。

示例

使用 IAM 角色完成设置

以下示例显示了完整的设置,包括使用架构注册表所需的 IAM 角色配置:

Parameters: PreCreatedSubnetOne: Type: String PreCreatedSubnetTwo: Type: String MskClusterName4: Type: String Resources: MyLambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Action: [sts:AssumeRole] Effect: Allow Principal: Service: [lambda.amazonaws.com] Policies: - PolicyName: KafkaClusterPermissions PolicyDocument: Statement: - Action: [kafka:DescribeClusterV2, kafka:GetBootstrapBrokers] Effect: Allow Resource: 'arn:aws:kafka:us-east-1:123456789012:cluster/*' - PolicyName: KafkaAuthPolicy PolicyDocument: Statement: - Action: [secretsmanager:GetSecretValue, kms:Decrypt] Effect: "Allow" Resource: ['arn:aws:secretsmanager:us-west-2:123456789012:secret:kafkaSecret-******', 'arn:aws:kms:us-west-2:123456789012:key/keyId'] - PolicyName: ENIPolicy PolicyDocument: Statement: - Action: [ec2:CreateNetworkInterface, ec2:DescribeNetworkInterfaces, ec2:DescribeVpcs, ec2:DeleteNetworkInterface, ec2:DescribeSubnets, ec2:DescribeSecurityGroups] Effect: Allow Resource: '*' - PolicyName: SchemaRegistryPolicy PolicyDocument: Statement: - Action: [glue:GetRegistry] Effect: Allow Resource: 'arn:aws:glue:{region}:{account-id}:registry/{registry-name}' - PolicyName: SchemaVersionsPolicy PolicyDocument: Statement: - Action: [glue:GetSchemaVersions] Effect: Allow Resource: '*' ManagedPolicyArns: - !Sub arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole Tags: - {Value: SAM, Key: lambda:createdBy} MyMskCluster: Type: AWS::MSK::Cluster Properties: BrokerNodeGroupInfo: ClientSubnets: - Ref: PreCreatedSubnetOne - Ref: PreCreatedSubnetTwo InstanceType: kafka.t3.small StorageInfo: EBSStorageInfo: VolumeSize: 1 ClusterName: Ref: MskClusterName4 KafkaVersion: 3.8.x NumberOfBrokerNodes: 2 MyMskStreamProcessor: Type: AWS::Serverless::Function Properties: Runtime: nodejs18.x Handler: index.handler CodeUri: ${codeuri} Role: Fn::GetAtt: [MyLambdaExecutionRole, Arn] Events: MyMskEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster SourceAccessConfigurations: - Type: SASL_SCRAM_512_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c Topics: - SchemaRegistryTestTopic ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: AccessConfigs: - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c SchemaValidationConfigs: - Attribute: KEY EventRecordFormat: JSON SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:us-west-2:123456789012:registry/myregistry

ProvisionedPollerConfig 示例

ProvisionedPollerConfig: MinimumPollers: 1 MaximumPollers: 20

现有集群的 Amazon MSK 示例

以下示例显示了 Amazon Web Services 账户中已存在的 Amazon MSK 集群的 MSK 事件源类型。

YAML

Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: arn:aws:kafka:us-east-1:012345678012:cluster/exampleClusterName/abcdefab-1234-abcd-5678-cdef0123ab01-2 Topics: - MyTopic

在同一模板中声明的集群的 Amazon MSK 示例

以下是在同一模板文件中声明的 Amazon MSK 集群的 MSK 事件源类型的示例。

YAML

Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster # This must be the name of an MSK cluster declared in the same template file Topics: - MyTopic

带有架构注册表的 MSK 事件源

以下是使用架构注册表配置MSK的事件源类型的示例。

Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster Topics: - SchemaRegistryTestTopic ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:us-west-2:123456789012:registry/myregistry EventRecordFormat: JSON SchemaValidationConfigs: - Attribute: KEY - Attribute: VALUE

带有 Confluent 架构注册表的 MSK 事件源

以下是使用 Confluent 架构注册表配置MSK的事件源类型的示例。

Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster Topics: - SchemaRegistryTestTopic ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: SchemaRegistryURI: https://my-schema-registry.confluent.cloud AccessConfigs: - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-secret EventRecordFormat: JSON SchemaValidationConfigs: - Attribute: KEY - Attribute: VALUE