本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
SelfManagedKafka
描述 SelfManagedKafka
事件源类型的对象。有关更多信息,请参阅《开发人员指南》中的Amazon Lambda 与自行管理的 Apache Kafka 配合使用。Amazon Lambda
Amazon Serverless Application Model (Amazon SAM) 在设置此事件类型时生成AWS::Lambda::EventSourceMapping资源。
要使用架构注册表,您需要为函数定义特定的 IAM 角色权限。有关所需配置的示例,请参阅使用 IAM 角色完成设置。
语法
要在 Amazon SAM 模板中声明此实体,请使用以下语法。
YAML
BatchSize:
Integer
ConsumerGroupId:String
DestinationConfig:DestinationConfig
Enabled:Boolean
FilterCriteria:FilterCriteria
KafkaBootstrapServers:List
KmsKeyArn:String
ProvisionedPollerConfig:ProvisionedPollerConfig
SchemaRegistryConfig:SchemaRegistryConfig
SourceAccessConfigurations:SourceAccessConfigurations
StartingPosition:String
StartingPositionTimestamp:Double
Topics:List
属性
-
BatchSize
-
Lambda 从流中提取并发送到函数的每个批处理中的最大记录数。
类型:整数
必需:否
默认值:100
Amazon CloudFormation 兼容性:此属性直接传递给
AWS::Lambda::EventSourceMapping
资源的BatchSize
属性。最小值:
1
最大值:
10000
-
ConsumerGroupId
-
用于配置如何从 Kafka 主题中读取事件的字符串。
类型:字符串
必需:否
Amazon CloudFormation 兼容性:此属性直接传递给
AWS::Lambda::EventSourceMapping
资源的SelfManagedKafkaConfiguration
属性。 -
DestinationConfig
-
一个配置对象,用于在 Lambda 处理事件后指定事件目的地。
使用此属性指定来自管理 Kafka 事件源的失败调用的目的地。
必需:否
Amazon CloudFormation 兼容性:此属性直接传递给
AWS::Lambda::EventSourceMapping
资源的DestinationConfig
属性。 -
Enabled
-
禁用事件源映射以暂停轮询和调用。
类型:布尔值
必需:否
Amazon CloudFormation 兼容性:此属性直接传递给
AWS::Lambda::EventSourceMapping
资源的Enabled
属性。 -
FilterCriteria
-
定义用于确定 Lambda 是否应处理事件的条件的对象。有关更多信息,请参阅《Amazon Lambda 开发人员指南》中的 Amazon Lambda 事件筛选。
必需:否
Amazon CloudFormation 兼容性:此属性直接传递给
AWS::Lambda::EventSourceMapping
资源的FilterCriteria
属性。 -
KafkaBootstrapServers
-
Kafka 代理的引导服务器列表。包括端口,例如
broker.example.com:
xxxx
类型:列表
必需:否
Amazon CloudFormation 兼容性:此属性是独有的 Amazon SAM ,没有 Amazon CloudFormation 等效属性。
-
KmsKeyArn
-
用于加密与此事件相关信息的密钥的 Amazon 资源名称(ARN)。
类型:字符串
必需:否
Amazon CloudFormation 兼容性:此属性直接传递给
AWS::Lambda::EventSourceMapping
资源的KmsKeyArn
属性。 -
ProvisionedPollerConfig
-
用于增加用于计算事件源映射的轮询器数量的配置。此配置允许最少 1 个轮询器和最多 20 个轮询器。有关示例,请参阅 ProvisionedPollerConfig 示例
必需:否
Amazon CloudFormation 兼容性:此属性直接传递给
AWS::Lambda::EventSourceMapping
资源的ProvisionedPollerConfig
属性。 SchemaRegistryConfig
-
用于将架构注册表与自管理 Kafka 事件源配合使用的配置。
注意
需要配置
ProvisionedPollerConfig
此功能。类型: SchemaRegistryConfig
必需:否
Amazon CloudFormation 兼容性:此属性直接传递给
AWS::Lambda::EventSourceMapping
资源的SelfManagedKafkaEventSourceConfig
属性。 -
SourceAccessConfigurations
-
用于保护与定义事件源的身份验证协议数组 VPC 组件或虚拟化主机。
有效值:
BASIC_AUTH | CLIENT_CERTIFICATE_TLS_AUTH | SASL_SCRAM_256_AUTH | SASL_SCRAM_512_AUTH | SERVER_ROOT_CA_CERTIFICATE
类型:SourceAccessConfiguration 列表
必需:是
Amazon CloudFormation 兼容性:此属性是
AWS::Lambda::EventSourceMapping
资源SelfManagedKafkaEventSourceConfig属性的一部分。 -
StartingPosition
-
在流中开始读取数据的位置。
-
AT_TIMESTAMP
– 指定开始读取记录的时间。 -
LATEST
- 仅读取新记录。 -
TRIM_HORIZON
- 处理所有可用的记录。
有效值:
AT_TIMESTAMP
|LATEST
|TRIM_HORIZON
类型:字符串
必需:否
Amazon CloudFormation 兼容性:此属性直接传递给
AWS::Lambda::EventSourceMapping
资源的StartingPosition
属性。 -
-
StartingPositionTimestamp
-
开始读取的时间(以 Unix 时间秒为单位) 在
StartingPosition
被指定为AT_TIMESTAMP
的情况下定义StartingPositionTimestamp
。类型:双精度
必需:否
Amazon CloudFormation 兼容性:此属性直接传递给
AWS::Lambda::EventSourceMapping
资源的StartingPositionTimestamp
属性。 -
Topics
-
Kafka 主题的名称。
类型:列表
必需:是
Amazon CloudFormation 兼容性:此属性直接传递给
AWS::Lambda::EventSourceMapping
资源的Topics
属性。
示例
使用 IAM 角色完成设置
以下示例显示了完整的设置,包括使用架构注册表所需的 IAM 角色配置:
Parameters: PreCreatedSubnetOne: Type: String PreCreatedSubnetTwo: Type: String Resources: MyLambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Action: [sts:AssumeRole] Effect: Allow Principal: Service: [lambda.amazonaws.com] Policies: - PolicyName: KafkaAuthPolicy PolicyDocument: Statement: - Action: [secretsmanager:GetSecretValue, kms:Decrypt] Effect: "Allow" Resource: ['arn:aws:secretsmanager:us-west-2:123456789012:secret:kafkaSecret-******', 'arn:aws:kms:us-west-2:123456789012:key/keyId'] - PolicyName: ENIPolicy PolicyDocument: Statement: - Action: [ec2:CreateNetworkInterface, ec2:DescribeNetworkInterfaces, ec2:DescribeVpcs, ec2:DeleteNetworkInterface, ec2:DescribeSubnets, ec2:DescribeSecurityGroups] Effect: Allow Resource: '*' - PolicyName: SchemaRegistryPolicy PolicyDocument: Statement: - Action: [glue:GetRegistry] Effect: Allow Resource: 'arn:aws:glue:{region}:{account-id}:registry/{registry-name}' - PolicyName: SchemaVersionsPolicy PolicyDocument: Statement: - Action: [glue:GetSchemaVersions] Effect: Allow Resource: '*' ManagedPolicyArns: - !Sub arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole Tags: - {Value: SAM, Key: lambda:createdBy} MyKafkaProcessor: Type: AWS::Serverless::Function Properties: Runtime: nodejs18.x Handler: index.handler CodeUri: ${codeuri} Role: Fn::GetAtt: [MyLambdaExecutionRole, Arn] Events: SelfManagedKafkaEvent: Type: SelfManagedKafka Properties: KafkaBootstrapServers: - my-kafka-broker-1:9092 - my-kafka-broker-2:9092 Topics: - SchemaRegistryTestTopic StartingPosition: LATEST SourceAccessConfigurations: - Type: VPC_SUBNET URI: subnet:subnet-12345678 - Type: VPC_SECURITY_GROUP URI: security_group:sg-12345678 - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: AccessConfigs: - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c SchemaValidationConfigs: - Attribute: KEY EventRecordFormat: JSON SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:us-west-2:123456789012:registry/myregistry
ProvisionedPollerConfig 示例
ProvisionedPollerConfig: MinimumPollers: 1 MaximumPollers: 20
自行管理的 Kafka 事件源
以下是 SelfManagedKafka
事件源类型的示例。
YAML
Events: SelfManagedKafkaEvent: Type: SelfManagedKafka Properties: BatchSize: 1000 Enabled: true KafkaBootstrapServers: - abc.xyz.com:xxxx SourceAccessConfigurations: - Type: BASIC_AUTH URI: arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c Topics: - MyKafkaTopic
带有 Amazon Glue 架构注册表的自管理 Kafka 事件源
以下是使用 Amazon Glue 架构注册表配置SelfManagedKafka
的事件源类型的示例。
Events: SelfManagedKafkaEvent: Type: SelfManagedKafka Properties: KafkaBootstrapServers: - abc.xyz.com:9092 Topics: - SchemaRegistryTestTopic StartingPosition: LATEST ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:us-west-2:123456789012:registry/myregistry EventRecordFormat: JSON SchemaValidationConfigs: - Attribute: KEY - Attribute: VALUE SourceAccessConfigurations: - Type: VPC_SUBNET URI: subnet:subnet-12345678 - Type: VPC_SECURITY_GROUP URI: security_group:sg-12345678
带有 Confluent 架构注册表的自管理 Kafka 事件源
以下是使用 Confluent 架构注册表配置SelfManagedKafka
的事件源类型的示例。
Events: SelfManagedKafkaEvent: Type: SelfManagedKafka Properties: KafkaBootstrapServers: - abc.xyz.com:9092 Topics: - SchemaRegistryTestTopic StartingPosition: LATEST ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: SchemaRegistryURI: https://my-schema-registry.confluent.cloud AccessConfigs: - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-secret EventRecordFormat: JSON SchemaValidationConfigs: - Attribute: KEY - Attribute: VALUE SourceAccessConfigurations: - Type: VPC_SUBNET URI: subnet:subnet-12345678 - Type: VPC_SECURITY_GROUP URI: security_group:sg-12345678 - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:kafka-secret