创建亚马逊 EventBridge 管道 - Amazon EventBridge
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

创建亚马逊 EventBridge 管道

EventBridge Pipes 使您能够在源和目标之间创建 point-to-point 集成,包括高级事件转换和扩展。要创建 EventBridge 管道,请执行以下步骤:

有关如何使用 CLI 创建管道的信息,请参阅 Amazon CL Amazon I 命令参考中的 create- pipe。

指定源

首先,请指定您希望管道接收事件的源。

使用控制台指定管道源
  1. 打开亚马逊 EventBridge 控制台,网址为 https://console.aws.amazon.com/events/

  2. 在导航窗格中,选择管道

  3. 选择创建管道

  4. 输入管道的名称。

  5. (可选)添加管道的描述。

  6. 构建管道选项卡上,对于,选择要为此管道指定的源类型,然后配置此源。

    配置属性因您所选的源类型而异:

    Confluent
    要将 Confluent Cloud 直播配置为来源,请使用控制台
    1. 对于,选择 Confluent Cloud。

    2. 对于引导服务器,输入代理的 host:port 配对地址。

    3. 对于主题名称,输入管道将要读取的主题名称。

    4. (可选)对于 VPC,选择所需的 VPC。然后,对于 VPC 子网,选择所需的子网。对于 VPC 安全组,选择安全组。

    5. 对于 “身份验证-可选”,打开 “使用身份验证” 并执行以下操作:

      1. 对于身份验证方法,选择身份验证类型。

      2. 密钥中选择密钥。

      有关更多信息,请参阅 Confluent 文档中的向 Confluent 云资源进行身份验证

    6. (可选)对于其他设置 - 可选,执行以下操作:

      1. 对于起始位置,请选择以下选项之一:

        • 最新 - 开始读取分片中包含最新记录的流。

        • 修剪水平线 - 开始读取分片中包含最后一条未修剪过的记录的流。这是分片中最早的记录。

      2. 对于批次大小 - 可选,输入每个批次的最大记录数。默认值是 100。

      3. 对于批次时段 - 可选,输入在继续操作之前收集记录的最大秒数。

    DynamoDB
    1. 选择 DynamoDB

    2. 对于 DynamoDB 流,选择要用作源的流。

    3. 对于起始位置,请选择以下选项之一:

      • 最新 - 开始读取分片中包含最新记录的流。

      • 修剪水平线 - 开始读取分片中包含最后一条未修剪过的记录的流。这是分片中最早的记录。

    4. (可选)对于其他设置 - 可选,执行以下操作:

      1. 对于批次大小 - 可选,输入每个批次的最大记录数。默认值是 100。

      2. 对于批次时段 - 可选,输入在继续操作之前收集记录的最大秒数。

      3. 对于每个分片的并发批次 - 可选,输入同一分片中可以同时读取的批次数。

      4. 对于部分批次项目失败时,请选择以下选项:

        • AUTOMATIC_BISECT - 将每个批次一分为二,并分别重试,直到所有记录都处理完毕或批次中还剩下一条失败消息。

        注意

        如果不选择 AUTOMATIC_BISECT,则可以返回特定的失败记录,只会重试这些记录。

    Kinesis
    使用控制台配置 Kinesis 源
    1. 对于,选择 Kinesis

    2. 对于 Kinesis 流,选择要用作源的流。

    3. 对于起始位置,请选择以下选项之一:

      • 最新 - 开始读取分片中包含最新记录的流。

      • 修剪水平线 - 开始读取分片中包含最后一条未修剪过的记录的流。这是分片中最早的记录。

      • 在时间戳处 - 从指定时间开始读取流。在时间戳下,使用 YYYY/MM/DD 和 hh:mm:ss 格式输入日期和时间。

    4. (可选)对于其他设置 - 可选,执行以下操作:

      1. 对于批次大小 - 可选,输入每个批次的最大记录数。默认值是 100。

      2. (可选)对于批次时段 - 可选,输入在继续操作之前收集记录的最大秒数。

      3. 对于每个分片的并发批次 - 可选,输入同一分片中可以同时读取的批次数。

      4. 对于部分批次项目失败时,请选择以下选项:

        • AUTOMATIC_BISECT - 将每个批次一分为二,并分别重试,直到所有记录都处理完毕或批次中还剩下一条失败消息。

        注意

        如果不选择 AUTOMATIC_BISECT,则可以返回特定的失败记录,只会重试这些记录。

    Amazon MQ
    使用控制台配置 Amazon MQ 源
    1. 对于,选择 Amazon MQ

    2. 对于 Amazon MQ 代理,选择要用作源的流。

    3. 对于队列名称,输入管道将要读取的队列名称。

    4. 对于身份验证方法,选择 BASIC_AUTH

    5. 密钥中选择密钥。

    6. (可选)对于其他设置 - 可选,执行以下操作:

      1. 对于批次大小 - 可选,输入每个批次的最大消息数。默认值是 100。

      2. 对于批次时段 - 可选,输入在继续操作之前收集记录的最大秒数。

    Amazon MSK
    使用控制台配置 Amazon MSK 源
    1. 对于,选择 Amazon MSK

    2. 对于 Amazon MSK 集群,选择要使用的集群。

    3. 对于主题名称,输入管道将要读取的主题名称。

    4. (可选)对于使用者组 ID,输入希望管道加入的使用者组的 ID。

    5. (可选)对于身份验证 - 可选,启用使用身份验证并执行以下操作:

      1. 对于身份验证方法,选择所需的类型。

      2. 密钥中选择密钥。

    6. (可选)对于其他设置 - 可选,执行以下操作:

      1. 对于批次大小 - 可选,输入每个批次的最大记录数。默认值是 100。

      2. 对于批次时段 - 可选,输入在继续操作之前收集记录的最大秒数。

      3. 对于起始位置,请选择以下选项之一:

        • 最新 - 开始读取分片中包含最新记录的主题。

        • 修剪水平线 - 开始读取分片中包含最后一条未修剪过的记录的和题。这是分片中最早的记录。

          注意

          修剪水平线与 Apache Kafka 中的最早相同。

    Self managed Apache Kafka
    使用控制台配置自托管 Apache Kafka 源
    1. 对于,选择自我托管式 Apache Kafka

    2. 对于引导服务器,输入代理的 host:port 配对地址。

    3. 对于主题名称,输入管道将要读取的主题名称。

    4. (可选)对于 VPC,选择所需的 VPC。然后,对于 VPC 子网,选择所需的子网。对于 VPC 安全组,选择安全组。

    5. (可选)对于身份验证 - 可选,启用使用身份验证并执行以下操作:

      1. 对于身份验证方法,选择身份验证类型。

      2. 密钥中选择密钥。

    6. (可选)对于其他设置 - 可选,执行以下操作:

      1. 对于起始位置,请选择以下选项之一:

        • 最新 - 开始读取分片中包含最新记录的流。

        • 修剪水平线 - 开始读取分片中包含最后一条未修剪过的记录的流。这是分片中最早的记录。

      2. 对于批次大小 - 可选,输入每个批次的最大记录数。默认值是 100。

      3. 对于批次时段 - 可选,输入在继续操作之前收集记录的最大秒数。

    Amazon SQS
    使用控制台配置 Amazon SQS 源
    1. 对于,选择 SQS

    2. 对于 SQS 队列,请选择要使用的队列。

    3. (可选)对于其他设置 - 可选,执行以下操作:

      1. 对于批次大小 - 可选,输入每个批次的最大记录数。默认值是 100。

      2. 对于批次时段 - 可选,输入在继续操作之前收集记录的最大秒数。

配置事件筛选(可选)

您可以向管道添加筛选功能,这样就可以只将部分事件从源发送到目标。

使用控制台配置筛选
  1. 选择筛选

  2. 示例事件 - 可选下,您将看到一个示例事件,可以使用它来构建您自己的事件模式,您也可以选择输入您自己的,输入您自己的事件。

  3. 事件模式下,输入要用于筛选事件的事件模式。有关构造过滤器的更多信息,请参阅亚马逊 EventBridge 管道过滤

    以下是一个事件模式示例,仅发送城市字段中的值为西雅图的事件。

    { "data": { "City": ["Seattle"] } }

现在已对事件进行筛选,您可以为管道添加可选的富集和目标。

定义事件富集(可选)

您可以将用于丰富的事件数据发送到 Lambda 函数、 Amazon Step Functions 状态机、Amazon API Gateway 或 API 目标。

选择富集
  1. 选择富集

  2. 详细信息下,为服务选择要用于富集的服务和相关设置。

您也可以先转换数据,再发送以进行增强。

(可选)定义输入转换器
  1. 选择富集输入转换器 - 可选

  2. 对于示例事件/事件有效负载,请选择示例事件类型。

  3. 对于转换器,请输入转换器语法,例如 "Event happened at <$.detail.field>.",其中 <$.detail.field> 是对示例事件中某个字段的引用。您也可以双击示例事件中的某个字段,将其添加到转换器中。

  4. 对于输出,请确认输出符合您的要求。

现在,数据已经过筛选和增强,您必须定义要将事件数据发送到的目标。

配置目标

配置目标
  1. 选择目标

  2. 详细信息下,为目标服务选择目标。显示的字段因您选择的目标而异。根据需要输入此目标类型的特定信息。

您也可以先转换数据,再发送到目标。

(可选)定义输入转换器
  1. 选择目标输入变压器 - 可选

  2. 对于示例事件/事件有效负载,请选择示例事件类型。

  3. 对于转换器,请输入转换器语法,例如 "Event happened at <$.detail.field>.",其中 <$.detail.field> 是对示例事件中某个字段的引用。您也可以双击示例事件中的某个字段,将其添加到转换器中。

  4. 对于输出,请确认输出符合您的要求。

现在,管道已配置完毕,请确保其设置配置正确。

配置管道设置

默认情况下管道处于活动状态,但您可以将其停用。您还可以指定管道的权限、设置管道日志记录和添加标签。

配置管道设置
  1. 选择管道设置选项卡。

  2. 默认情况下,新创建的管道创建好就处于活动状态。如果要创建非活动管道,请在激活下,为激活管道关闭活动

  3. 权限下,对于执行角色,执行以下操作之一:

    1. 要为此管道 EventBridge 创建新的执行角色,请选择为此特定资源创建新角色。角色名称下,您可以选择编辑角色名称。

    2. 要使用现有的执行角色,请选择使用现有角色。在角色名称下,选择角色。

  4. (可选)如果您已将 Kinesis 或 DynamoDB 流指定为管道源,则可以配置重试策略和死信队列 (DLQ)。

    对于重试策略和死信队列 - 可选,请执行以下操作:

    重试策略下,执行以下操作:

    1. 如果要启用重试策略,请打开重试。默认情况下,新创建的管道未启用重试策略。

    2. 对于 Maximum age of event(事件的最大时长),输入一分钟(00:01)与 24 小时(24:00)之间的值。

    3. 对于重试尝试,输入 0 到 185 之间的数字。

    4. 如果要使用死信队列 (DLQ),请打开死信队列,选择您的方法,然后选择要使用的队列或主题。默认情况下,新创建的管道不使用 DLQ。

  5. (可选)在日志 - 可选下,您可以设置 EventBridge Pipes 如何向支持的服务发送日志信息,包括如何配置这些日志。

    有关管道记录日志的更多信息,请参阅 记录 Amazon Pi EventBridge pes

    CloudWatch 默认情况下,日志被选为日志目标,ERROR日志级别也是如此。因此,默认情况下,Pip EventBridge es 会创建一个新的 CloudWatch 日志组,向该组发送包含详细ERROR级别的日志记录。

    要让 Pip EventBridge es 将日志记录发送到任何支持的日志目的地,请执行以下操作:

    1. 日志-可选下,选择要将日志记录传送到的目的地。

    2. 对于日志级别,选择 EventBridge 要包含在日志记录中的信息级别。默认情况下选中 ERROR 日志级别。

      有关更多信息,请参阅 指定 EventBridge 管道日志级别

    3. 如果 EventBridge 要在日志记录中包含事件负载信息以及服务请求和响应信息,请选择 “包括执行数据”。

      有关更多信息,请参阅 在 Pip EventBridge es 日志中包含执行数据

    4. 配置您选择的每个日志目标:

      对于 CloudWatch Logs 日志,请在CloudWatch 日志下执行以下操作:

      • 对于CloudWatch 日志组,请选择是否 EventBridge 创建新的日志组,或者您可以选择现有日志组或指定现有日志组的 ARN。

      • 对于新的日志组,请根据需要编辑日志组名称。

      CloudWatch 默认情况下,日志处于选中状态。

      对于 Firehose 流日志,在Firehose 流日志下,选择该 Firehose 流。

      对于 Amazon S3 日志,在 S3 日志下执行以下操作:

      • 输入要用作日志目标的桶的名称。

      • 输入存储桶所有者的 Amazon 账户 ID。

      • 输入 EventBridge 创建 S3 对象时要使用的任何前缀文本。

        有关更多信息,请参阅《Amazon Simple Storage Service 用户指南》中的使用前缀组织对象

      • 选择 EventBridge 要如何格式化 S3 日志记录:

  6. (可选)在标签 - 可选下,选择添加新标签,然后为规则输入一个或多个标签。有关更多信息,请参阅 亚马逊 EventBridge 标签

  7. 选择创建管道

验证配置参数

创建管道后, EventBridge 验证以下配置参数:

  • IAM 角色 — 由于创建管道后无法更改管道的来源,因此需要 EventBridge 验证提供的 IAM 角色是否可以访问该源。

    注意

    EventBridge 不会对浓缩或目标执行相同的验证,因为它们可以在创建管道后进行更新。

  • 批处理- EventBridge 验证源的批量大小是否超过目标的最大批次大小。如果是,则 EventBridge 需要较小的批次大小。此外,如果目标不支持批处理,则无法 EventBridge 为源配置批处理。

  • 扩充 — EventBridge 验证 API Gateway 和 API 目标丰富版的批量大小是否为 1,因为仅支持批量大小为 1。