创建 Amazon EventBridge 管道 - Amazon EventBridge
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

创建 Amazon EventBridge 管道

EventBridge Pipes 支持您在源和目标之间创建点对点集成,包括高级事件转换和富集。要创建 EventBridge 管道,请执行以下步骤:

有关如何使用 Amazon CLI 创建管理的信息,请参阅《Amazon CLI 命令参考》中的 create-pipe

指定源

首先,请指定您希望管道接收事件的源。

使用控制台指定管道源
  1. 访问 https://console.aws.amazon.com/events/,打开 Amazon EventBridge 控制台。

  2. 在导航窗格中,选择管道

  3. 选择创建管道

  4. 输入管道的名称。

  5. (可选)添加管道的描述。

  6. 构建管道选项卡上,对于,选择要为此管道指定的源类型,然后配置此源。

    配置属性因您所选的源类型而异:

    Confluent
    使用控制台将 Confluent 云流配置为源
    1. 对于,选择 Confluent 云

    2. 对于引导服务器,输入代理的 host:port 配对地址。

    3. 对于主题名称,输入管道将要读取的主题名称。

    4. (可选)对于 VPC,选择所需的 VPC。然后,对于 VPC 子网,选择所需的子网。对于 VPC 安全组,选择安全组。

    5. 对于身份验证 - 可选,启用使用身份验证并执行以下操作:

      1. 对于身份验证方法,选择身份验证类型。

      2. 密钥中选择密钥。

      有关更多信息,请参阅 Confluent 文档中的 Authenticate to Confluent Cloud resources

    6. (可选)对于其他设置 - 可选,执行以下操作:

      1. 对于起始位置,请选择以下选项之一:

        • 最新 - 开始读取分片中包含最新记录的流。

        • 修剪水平线 - 开始读取分片中包含最后一条未修剪过的记录的流。这是分片中最早的记录。

      2. 对于批次大小 - 可选,输入每个批次的最大记录数。默认值是 100。

      3. 对于批次时段 - 可选,输入在继续操作之前收集记录的最大秒数。

    DynamoDB
    1. 选择 DynamoDB

    2. 对于 DynamoDB 流,选择要用作源的流。

    3. 对于起始位置,请选择以下选项之一:

      • 最新 - 开始读取分片中包含最新记录的流。

      • 修剪水平线 - 开始读取分片中包含最后一条未修剪过的记录的流。这是分片中最早的记录。

    4. (可选)对于其他设置 - 可选,执行以下操作:

      1. 对于批次大小 - 可选,输入每个批次的最大记录数。默认值是 100。

      2. 对于批次时段 - 可选,输入在继续操作之前收集记录的最大秒数。

      3. 对于每个分片的并发批次 - 可选,输入同一分片中可以同时读取的批次数。

      4. 对于部分批次项目失败时,请选择以下选项:

        • AUTOMATIC_BISECT - 将每个批次一分为二,并分别重试,直到所有记录都处理完毕或批次中还剩下一条失败消息。

        注意

        如果不选择 AUTOMATIC_BISECT,则可以返回特定的失败记录,只会重试这些记录。

    Kinesis
    使用控制台配置 Kinesis 源
    1. 对于,选择 Kinesis

    2. 对于 Kinesis 流,选择要用作源的流。

    3. 对于起始位置,请选择以下选项之一:

      • 最新 - 开始读取分片中包含最新记录的流。

      • 修剪水平线 - 开始读取分片中包含最后一条未修剪过的记录的流。这是分片中最早的记录。

      • 在时间戳处 - 从指定时间开始读取流。在时间戳下,使用 YYYY/MM/DD 和 hh:mm:ss 格式输入日期和时间。

    4. (可选)对于其他设置 - 可选,执行以下操作:

      1. 对于批次大小 - 可选,输入每个批次的最大记录数。默认值是 100。

      2. (可选)对于批次时段 - 可选,输入在继续操作之前收集记录的最大秒数。

      3. 对于每个分片的并发批次 - 可选,输入同一分片中可以同时读取的批次数。

      4. 对于部分批次项目失败时,请选择以下选项:

        • AUTOMATIC_BISECT - 将每个批次一分为二,并分别重试,直到所有记录都处理完毕或批次中还剩下一条失败消息。

        注意

        如果不选择 AUTOMATIC_BISECT,则可以返回特定的失败记录,只会重试这些记录。

    Amazon MQ
    使用控制台配置 Amazon MQ 源
    1. 对于,选择 Amazon MQ

    2. 对于 Amazon MQ 代理,选择要用作源的流。

    3. 对于队列名称,输入管道将要读取的队列名称。

    4. 对于身份验证方法,选择 BASIC_AUTH

    5. 密钥中选择密钥。

    6. (可选)对于其他设置 - 可选,执行以下操作:

      1. 对于批次大小 - 可选,输入每个批次的最大消息数。默认值是 100。

      2. 对于批次时段 - 可选,输入在继续操作之前收集记录的最大秒数。

    Amazon MSK
    使用控制台配置 Amazon MSK 源
    1. 对于,选择 Amazon MSK

    2. 对于 Amazon MSK 集群,选择要使用的集群。

    3. 对于主题名称,输入管道将要读取的主题名称。

    4. (可选)对于使用者组 ID,输入希望管道加入的使用者组的 ID。

    5. (可选)对于身份验证 - 可选,启用使用身份验证并执行以下操作:

      1. 对于身份验证方法,选择所需的类型。

      2. 密钥中选择密钥。

    6. (可选)对于其他设置 - 可选,执行以下操作:

      1. 对于批次大小 - 可选,输入每个批次的最大记录数。默认值是 100。

      2. 对于批次时段 - 可选,输入在继续操作之前收集记录的最大秒数。

      3. 对于起始位置,请选择以下选项之一:

        • 最新 - 开始读取分片中包含最新记录的主题。

        • 修剪水平线 - 开始读取分片中包含最后一条未修剪过的记录的和题。这是分片中最早的记录。

          注意

          修剪水平线与 Apache Kafka 中的最早相同。

    Self managed Apache Kafka
    使用控制台配置自托管 Apache Kafka 源
    1. 对于,选择自我托管式 Apache Kafka

    2. 对于引导服务器,输入代理的 host:port 配对地址。

    3. 对于主题名称,输入管道将要读取的主题名称。

    4. (可选)对于 VPC,选择所需的 VPC。然后,对于 VPC 子网,选择所需的子网。对于 VPC 安全组,选择安全组。

    5. (可选)对于身份验证 - 可选,启用使用身份验证并执行以下操作:

      1. 对于身份验证方法,选择身份验证类型。

      2. 密钥中选择密钥。

    6. (可选)对于其他设置 - 可选,执行以下操作:

      1. 对于起始位置,请选择以下选项之一:

        • 最新 - 开始读取分片中包含最新记录的流。

        • 修剪水平线 - 开始读取分片中包含最后一条未修剪过的记录的流。这是分片中最早的记录。

      2. 对于批次大小 - 可选,输入每个批次的最大记录数。默认值是 100。

      3. 对于批次时段 - 可选,输入在继续操作之前收集记录的最大秒数。

    Amazon SQS
    使用控制台配置 Amazon SQS 源
    1. 对于,选择 SQS

    2. 对于 SQS 队列,请选择要使用的队列。

    3. (可选)对于其他设置 - 可选,执行以下操作:

      1. 对于批次大小 - 可选,输入每个批次的最大记录数。默认值是 100。

      2. 对于批次时段 - 可选,输入在继续操作之前收集记录的最大秒数。

配置事件筛选(可选)

您可以向管道添加筛选功能,这样就可以只将部分事件从源发送到目标。

使用控制台配置筛选
  1. 选择筛选

  2. 示例事件 - 可选下,您将看到一个示例事件,可以使用它来构建您自己的事件模式,您也可以选择输入您自己的,输入您自己的事件。

  3. 事件模式下,输入要用于筛选事件的事件模式。有关构造筛选条件的更多信息,请参阅 Amazon EventBridge 管道中的事件筛选

    以下是一个事件模式示例,仅发送城市字段中的值为西雅图的事件。

    { "data": { "City": ["Seattle"] } }

现在已对事件进行筛选,您可以为管道添加可选的富集和目标。

定义事件富集(可选)

您可以将用于富集的事件数据发送到 Lambda 函数、Amazon Step Functions 状态机、Amazon API Gateway 或 API 目标。

选择富集
  1. 选择富集

  2. 详细信息下,为服务选择要用于富集的服务和相关设置。

您也可以先转换数据,再发送以进行增强。

(可选)定义输入转换器
  1. 选择富集输入转换器 - 可选

  2. 对于示例事件/事件有效负载,请选择示例事件类型。

  3. 对于转换器,请输入转换器语法,例如 "Event happened at <$.detail.field>.",其中 <$.detail.field> 是对示例事件中某个字段的引用。您也可以双击示例事件中的某个字段,将其添加到转换器中。

  4. 对于输出,请确认输出符合您的要求。

现在,数据已经过筛选和增强,您必须定义要将事件数据发送到的目标。

配置目标

配置目标
  1. 选择目标

  2. 详细信息下,为目标服务选择目标。显示的字段因您选择的目标而异。根据需要输入此目标类型的特定信息。

您也可以先转换数据,再发送到目标。

(可选)定义输入转换器
  1. 选择目标输入变压器 - 可选

  2. 对于示例事件/事件有效负载,请选择示例事件类型。

  3. 对于转换器,请输入转换器语法,例如 "Event happened at <$.detail.field>.",其中 <$.detail.field> 是对示例事件中某个字段的引用。您也可以双击示例事件中的某个字段,将其添加到转换器中。

  4. 对于输出,请确认输出符合您的要求。

现在,管道已配置完毕,请确保其设置配置正确。

配置管道设置

默认情况下管道处于活动状态,但您可以将其停用。您还可以指定管道的权限、设置管道日志记录和添加标签。

配置管道设置
  1. 选择管道设置选项卡。

  2. 默认情况下,新创建的管道创建好就处于活动状态。如果要创建非活动管道,请在激活下,为激活管道关闭活动

  3. 权限下,对于执行角色,执行以下操作之一:

    1. 要让 EventBridge 为此管道创建新的执行角色,请选择为此特定资源创建新角色。在角色名称下,您可以选择编辑角色名称。

    2. 要使用现有的执行角色,请选择使用现有角色。在角色名称下,选择角色。

  4. (可选)如果您已指定 Kinesis 或 DynamoDB 流作为管道源,可以配置重试策略和死信队列(DLQ)。

    对于重试策略和死信队列 - 可选,请执行以下操作:

    重试策略下,执行以下操作:

    1. 如果要启用重试策略,请打开重试。默认情况下,新创建的管道未启用重试策略。

    2. 对于 Maximum age of event(事件的最大时长),输入一分钟(00:01)与 24 小时(24:00)之间的值。

    3. 对于重试尝试,输入 0 到 185 之间的数字。

    4. 如果要使用死信队列 (DLQ),请打开死信队列,选择您的方法,然后选择要使用的队列或主题。默认情况下,新创建的管道不使用 DLQ。

  5. 选择加密管道数据时 EventBridge 要使用的 KMS key。

    有关 EventBridge 如何使用 KMS keys的更多信息,请参阅静态加密

    • 为 EventBridge 选择使用 Amazon 拥有的密钥以使用 Amazon 拥有的密钥加密数据。

      这种 Amazon 拥有的密钥是 EventBridge 拥有并管理的 KMS key,可在多个 Amazon 账户中使用。通常,Amazon 拥有的密钥 是一个不错的选择,除非您需要审计或控制保护资源的加密密钥。

      这是默认模式。

    • 为 EventBridge 选择使用客户托管式密钥,以使用您指定或创建的客户托管式密钥对数据进行加密。

      客户自主管理型密钥是您的 Amazon 账户中由您创建、拥有和管理的 KMS keys。您对这些 KMS keys拥有完全控制权。

      1. 指定现有客户托管式密钥 或选择创建新的 KMS key

        EventBridge 显示密钥状态以及与指定客户托管式密钥关联的所有密钥别名。

  6. (可选)在日志 - 可选下,您可以设置 EventBridge Pipes 如何向支持的服务发送日志信息,包括如何配置这些日志。

    有关管道记录日志的更多信息,请参阅 记录 Amazon EventBridge 管道的性能

    默认情况下,CloudWatch 日志被选为日志目标,ERROR 日志级别也是如此。因此,默认情况下,EventBridge Pipes 会创建一个新的 CloudWatch 日志组,向该组发送包含 ERROR 级别详情的日志记录。

    要让 EventBridge Pipes 将日志记录发送到任何支持的日志目的地,请执行以下操作:

    1. 日志-可选下,选择要将日志记录传送到的目的地。

    2. 对于日志级别,选择 EventBridge 要包含在日志记录中的信息级别。默认情况下选中 ERROR 日志级别。

      有关更多信息,请参阅 指定 EventBridge Pipes 日志级别

    3. 如果希望 EventBridge 在日志记录中包含事件有效负载信息以及服务请求和响应信息,请选择包括执行数据

      有关更多信息,请参阅 在 EventBridge Pipes 日志中包含执行数据

    4. 配置您选择的每个日志目标:

      对于 CloudWatch Logs 日志,请在 CloudWatch 日志下执行以下操作:

      • 对于 CloudWatch 日志组,请选择 EventBridge 是否创建新的日志组,您也可以选择现有日志组,或指定现有日志组的 ARN。

      • 对于新的日志组,请根据需要编辑日志组名称。

      CloudWatch 日志默认处于选中状态。

      对于 Firehose 流日志,在 Firehose 流日志下,选择 Firehose 流。

      对于 Amazon S3 日志,请在 S3 日志下执行以下操作:

      • 输入要用作日志目标的桶的名称。

      • 输入桶拥有者的 Amazon 账户 ID。

      • 输入 EventBridge 创建 S3 对象时要使用的任何前缀文本。

        有关更多信息,请参阅《Amazon Simple Storage Service 用户指南》中的使用前缀组织对象

      • 选择您希望 EventBridge 如何格式化 S3 日志记录:

  7. (可选)在标签 - 可选下,选择添加新标签,然后为规则输入一个或多个标签。有关更多信息,请参阅 在 Amazon EventBridge 中为资源添加标签

  8. 选择创建管道

验证配置参数

创建管道后,EventBridge 会验证以下配置参数:

  • IAM 角色 - 由于创建管道后无法更改管道的源,因此 EventBridge 会验证所提供的 IAM 角色是否可以访问该源。

    注意

    EventBridge 不会对富集或目标执行相同的验证,因为它们可以在创建管道后进行更新。

  • 批处理 - EventBridge 会验证源的批次大小是否超过目标的最大批次大小。如果超过,EventBridge 会要求设置较小的批次大小。此外,如果目标不支持批处理,则无法在 EventBridge 中为源配置批处理。

  • 富集 - EventBridge 会验证 API Gateway 和 API 目标富集的批次大小是否为 1,因为仅支持大小为 1 的批次。