教程:使用 EventBridge 将事件中继到 Amazon Kinesis 流 - Amazon EventBridge
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

教程:使用 EventBridge 将事件中继到 Amazon Kinesis 流

您可以将 EventBridge 中的 AWS API 调用事件中继到 Amazon Kinesis 中的流。

先决条件

安装 AWS CLI。有关更多信息,请参见 AWS Command Line Interface 用户指南

步骤 1:创建 Amazon Kinesis 流

要创建流,请使用下面的 create-stream 命令。

aws kinesis create-stream --stream-name test --shard-count 1

当流状态为 ACTIVE 时,表示流已就绪。要检查流状态,请使用下面的 describe-stream 命令:

aws kinesis describe-stream --stream-name test

步骤 2:创建规则

例如,创建一条规则,以便在您停止 Amazon EC2 实例时将事件发送到流。

创建规则

  1. 通过以下网址打开 Amazon EventBridge 控制台:https://console.amazonaws.cn/events/

  2. 在导航窗格中,选择 Rules (规则)

  3. 选择 Create rule

  4. 为规则输入名称和描述。

  5. 对于 Define pattern (定义模式),请执行以下操作:

    1. 选择 Event pattern

    2. 选择 Pre-defined pattern by service (服务预定义的模式)

    3. 对于 Service provider (服务提供商),选择 AWS

    4. 对于 Service Name (服务名称),请选择 EMR。​

    5. 对于 Event type (事件类型),请选择 Instance State-change Notification (实例状态更改通知)

    6. 依次选择 Specific state(s) (特定状态)running (正在运行)

  6. 对于 Select event bus (选择事件总线),选择 AWS default event bus (AWS 默认事件总线)。当您账户中的某个 AWS 服务发出一个事件时,它始终会发送到您账户的默认事件总线。

  7. 对于 Targets (目标),选择 Kinesis stream (Kinesis 流)

  8. 对于 Stream,选择您创建的流。

  9. 选择 Create a new role for this specific resource

  10. 选择 Create

步骤 3:测试规则

为了测试规则,停止一个 Amazon EC2 实例。等待几分钟,在该实例停止之后,检查 CloudWatch 指标,以验证您的函数是否已调用。

通过停止一个实例来测试您的规则

  1. 打开 Amazon EC2 控制台 https://console.amazonaws.cn/ec2/

  2. 启动一个实例。有关更多信息,请参阅 Amazon EC2 用户指南(适用于 Linux 实例) 中的启动实例

  3. 通过以下网址打开 Amazon EventBridge 控制台:https://console.amazonaws.cn/events/

  4. 在导航窗格中,选择 Rules (规则)

    选择所创建规则的名称,然后选择 Metrics for the rule (规则的指标)

  5. (可选)当您完成后,终止该实例。想要了解更多信息,请参阅 Amazon EC2 用户指南(适用于 Linux 实例) 中的终止您的实例

步骤 4:验证事件是否已中继

您可以从流中获取记录,以验证事件是否已中继。

获取记录

  1. 要开始从 Kinesis 流中读取数据,请使用以下 get-shard-iterator 命令。

    aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name test

    下面是示例输出。

    { "ShardIterator": "AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg=" }
  2. 要获取记录,请使用以下 get-records 命令。分区迭代器是您在上一步获取的。

    aws kinesis get-records --shard-iterator AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg=

    如果命令成功,它将从指定分片的流中请求记录。您可能会收到零个或多个记录。返回的任何记录都不能表示流中的所有记录。如果您未收到预期的数据,请继续调用 get-records

    Kinesis 中的记录是经过 Base64 编码的。但是,AWS CLI 中的流支持不提供 Base64 解码。如果您使用 Base64 解码程序手动解码数据,您会发现它是以 JSON 格式中继到流的事件。