

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 亚马逊 Kinesis 直播作为 Pipes 的来源 EventBridge
<a name="eb-pipes-kinesis"></a>

你可以使用 Pi EventBridge pes 接收 Kinesis 数据流中的记录。然后，您可以选择筛选或增强这些记录，然后再将它们发送到可用的目标之一进行处理。在设置管道时，可以选择特定于 Kinesis 的设置。 EventBridge 当将数据发送到目标时，管道会保持数据流中记录的顺序。

Kinesis 数据流是一组[分区](https://docs.amazonaws.cn/kinesis/latest/dev/key-concepts.html#shard)。每个分片包含一系列数据记录。**使用者** 是一种处理 Kinesis 数据流中的数据的应用程序。[您可以将 Pi EventBridge pe 映射到共享吞吐量使用者（标准迭代器），或者映射到具有增强扇出功能的专用吞吐量使用器。](https://docs.amazonaws.cn/kinesis/latest/dev/enhanced-consumers.html)

对于标准迭代器， EventBridge 使用 HTTP 协议轮询 Kinesis 流中的每个分片以获取记录。管道与分片的其他使用者共享读取吞吐量。

为了最大限度地减少延迟并最大限度地提高读取吞吐量，您可以创建具有增强扇出功能的数据流使用者。流使用者将获得与每个分片的专用连接，这不会影响从流中读取信息的其他应用程序。如果您有许多应用程序读取相同的数据，或者您正在重新处理具有大记录的流，则专用吞吐量可以提供帮助。Kinesis 通过 HTTP/2 将记录推送到 EventBridge 。有关 Kinesis 数据流的信息，请参阅[读取 Amazon Kinesis Data Streams 中的数据](https://docs.amazonaws.cn/kinesis/latest/dev/building-consumers.html)。

**示例事件**

以下示例事件显示了管道接收到的信息。您可以使用此事件来创建和筛选您的事件模式，或定义输入转换。并非所有字段都可以筛选。有关可筛选字段的更多信息，请参阅 [Amazon P EventBridge ipes 中的事件筛选](eb-pipes-event-filtering.md)。

```
[
  {
    "kinesisSchemaVersion": "1.0",
    "partitionKey": "1",
    "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
    "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
    "approximateArrivalTimestamp": 1545084650.987,
    "eventSource": "aws:kinesis",
    "eventVersion": "1.0",
    "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
    "eventName": "aws:kinesis:record",
    "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
    "awsRegion": "us-east-2",
    "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
  },
  {
    "kinesisSchemaVersion": "1.0",
    "partitionKey": "1",
    "sequenceNumber": "49590338271490256608559692540925702759324208523137515618",
    "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=",
    "approximateArrivalTimestamp": 1545084711.166,
    "eventSource": "aws:kinesis",
    "eventVersion": "1.0",
    "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618",
    "eventName": "aws:kinesis:record",
    "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
    "awsRegion": "us-east-2",
    "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
  }
]
```

## 轮询和批处理流
<a name="pipes-ak-polling"></a>

EventBridge 以每秒一次的基本速率对你的 Kinesis 直播中的碎片进行轮询以寻找记录。当有记录可用时， EventBridge 处理事件并等待结果。如果处理成功，则 EventBridge 恢复轮询直到收到更多记录。

默认情况下，只要有记录，就会 EventBridge 调用你的管道。如果从源 EventBridge 读取的批次中只有一条记录，则只处理一个事件。为避免处理数量较少的记录，您可以配置批处理时段，让管道缓冲最多五分钟的记录。在处理事件之前，会 EventBridge 继续从源读取记录，直到它收集了完整批次、批处理窗口到期或批处理达到 6 MB 的有效载荷限制。

您还可以通过并行处理每个分片中的多个批次来提高并发性。 EventBridge 每个分片中最多可以同时处理 10 个批次。如果增加每个分片的并发批次数， EventBridge 仍可确保分区键级别的按顺序处理。

配置 `ParallelizationFactor` 设置，同时处理 Kinesis 或 DynamoDB 数据流一个分片中的多个管道执行。您可以通过从 1（默认）到 10 的并行化系数指定从分片 EventBridge 轮询的并发批次数。例如，当您设置为 `ParallelizationFactor` 2 时，最多可以有 200 个并发 EventBridge Pipe 执行来处理 100 个 Kinesis 数据分片。这有助于在数据量不稳定并且 `IteratorAge` 较高时纵向扩展处理吞吐量。请注意，如果使用 Kinesis 聚合，并行化因子将不起作用。

## 轮询和流的起始位置
<a name="pipes-ak-stream-start-position"></a>

请注意，管道创建和更新期间的流源轮询最终将是一致的。
+ 在管道创建期间，可能需要几分钟才能开始轮询来自流的事件。
+ 在管道更新源轮询配置期间，可能需要几分钟才能停止和重新开始轮询来自流的事件。

这意味着，如果您指定 `LATEST` 作为流的起始位置，在创建或更新管道期间，管道可能会错过发送的事件。为确保不会错过任何事件，请将流的起始位置指定为 `TRIM_HORIZON` 或 `AT_TIMESTAMP`。

## 报告批处理项目失败
<a name="pipes-ak-batch-failures"></a>

在 EventBridge 使用和处理来自源的流数据时，默认情况下，它会检查批次的最高序号，但前提是批处理完全成功时。为避免重新处理失败批次中已成功处理的消息，您可以配置富集或目标，返回对象来指示哪些消息处理成功、哪些失败。这称为部分批处理响应。

有关更多信息，请参阅 [部分批处理故障](eb-pipes-batching-concurrency.md#pipes-partial-batch-failure)。

### 成功和失败的条件
<a name="pipes-ak-batch-failures-conditions"></a>

如果您返回以下任意一项，则将批次 EventBridge 视为完全成功：
+ 空的 `batchItemFailure` 列表
+ Null `batchItemFailure` 列表
+ 空的 `EventResponse`
+ Null `EventResponse`

如果您返回以下任何内容，则会将批次 EventBridge 视为完全失败：
+ 空字符串 `itemIdentifier`
+ Null `itemIdentifier`
+ 包含错误密钥名的 `itemIdentifier`

EventBridge 根据您的重试策略重试失败。