

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 使用 Amazon Kinesis Producer Library（KPL）开发产生器
使用 Amazon Kinesis Producer Library（KPL）开发产生器

Amazon Kinesis Data Streams 创建器是指将用户数据记录放入 Kinesis 数据流中（也称为*数据摄取*）的应用程序。Amazon Kinesis Producer Library（KPL）简化了创建器应用程序的开发，从而允许开发人员实现到 Kinesis 数据流的高写入吞吐量。

您可以通过 Amazon CloudWatch 监控 KPL。有关更多信息，请参阅 [使用亚马逊监控 Kinesis 制作人库 CloudWatch](monitoring-with-kpl.md)。

**Topics**
+ [

## 查看 KPL 的作用
](#developing-producers-with-kpl-role)
+ [

## 了解使用 KPL 的优势
](#developing-producers-with-kpl-advantage)
+ [

## 了解何时不使用 KPL
](#developing-producers-with-kpl-when)
+ [

# 安装 KPL
](kinesis-kpl-dl-install.md)
+ [

# 从 KPL 0.x 迁移至 KPL 1.x
](kpl-migration-1x.md)
+ [

# 转换为适用于 KPL 的 Amazon Trust Services（ATS）证书
](kinesis-kpl-upgrades.md)
+ [

# KPL 受支持平台
](kinesis-kpl-supported-plats.md)
+ [

# KPL 的重要概念
](kinesis-kpl-concepts.md)
+ [

# 将 KPL 与产生器代码集成
](kinesis-kpl-integration.md)
+ [

# 使用 KPL 写入 Kinesis Data Streams
](kinesis-kpl-writing.md)
+ [

# 配置 Amazon Kinesis Producer Library
](kinesis-kpl-config.md)
+ [

# 实现消费端取消聚合
](kinesis-kpl-consumer-deaggregation.md)
+ [

# 将 KPL 与 Amazon Data Firehose 搭配使用
](kpl-with-firehose.md)
+ [

# 将 KPL 与 Amazon Glue 架构注册表一起使用
](kpl-with-schemaregistry.md)
+ [

# 配置 KPL 代理配置
](kpl-proxy-configuration.md)
+ [

# KPL 版本生命周期策略
](kpl-version-lifecycle-policy.md)

**注意**  
建议您升级到最新 KPL 版本。KPL 会定期更新至最新版本，包括最新依赖项和安全补丁、错误修复以及向后兼容的新功能。有关更多信息，请参阅 [https://github.com/awslabs/amazon-kinesis-producer/releases/](https://github.com/awslabs/amazon-kinesis-producer/releases/)。

## 查看 KPL 的作用


KPL 是一个高度可配置的库 easy-to-use，可帮助您写入 Kinesis 数据流。它在您的创建器应用程序代码和 Kinesis Data Streams API 操作之间充当中介。KPL 执行以下主要任务：
+ 利用可配置的自动重试机制对一个或多个 Kinesis 数据流进行写入
+ 收集记录并使用 `PutRecords` 根据请求将多条记录写入多个分片
+ 聚合用户记录以增加负载大小并提高吞吐量
+ 与 [Kinesis Client Library](https://docs.amazonaws.cn/kinesis/latest/dev/developing-consumers-with-kcl.html)（KCL）无缝集成以在消费端上取消聚合批记录
+ 代表您提交 Amazon CloudWatch 指标，以提供对制作人绩效的可见性

请注意，KPL 与中提供的 Kinesis Data Streams API 不同。[Amazon SDKs](https://www.amazonaws.cn/tools/)Kinesis Data Streams API 可帮助您管理 Kinesis Data Streams 的许多方面（包括创建流、重新分片以及放置并获取记录），而 KPL 提供专用于摄取数据的提取层。有关 Kinesis Data Streams API 的信息，请参阅 [Amazon Kinesis API Reference](https://docs.amazonaws.cn/kinesis/latest/APIReference/)。

## 了解使用 KPL 的优势


以下列表说明了使用 KPL 开发 Kinesis Data Streams 创建器的部分主要优势。

KPL 可在同步或异步使用案例中使用。除非存在使用同步操作的具体原因，否则建议您使用异步接口的较高性能。有关这两种使用案例和示例代码的更多信息，请参阅 [使用 KPL 写入 Kinesis Data Streams](kinesis-kpl-writing.md)。

 **性能优势**   
KPL 可帮助构建高性能创建器。考虑以下情况：您的 Amazon EC2 实例充当代理，从数以百计或数以千计的低功率设备中收集 100 个字节的事件并将记录写入 Kinesis 数据流中。这些 EC2 实例均须将每秒数以千计的事件写入您的数据流。要实现所需的吞吐量，创建器必须实施复杂逻辑（例如，批处理或多线程处理）及重试逻辑并在消费端端取消记录聚合。KPL 为您执行所有此类任务。

 **消费端端易于使用**   
对于使用采用 Java 的 KCL 的消费端端开发人员而言，KPL 无需额外工作即可集成。当 KCL 检索包含多个 KPL 用户记录的已聚合 Kinesis Data Streams 记录时，它将自动调用 KPL，以在将单个用户记录返还到用户之前提取此类记录。  
对于未使用 KCL 而是直接使用 API 操作 `GetRecords` 的消费端端开发人员而言，KPL Java 库可用于在将用户记录返还给用户之前提取此类记录。

 **创建器监控**   
您可以使用 CloudWatch 亚马逊和 KPL 收集、监控和分析您的 Kinesis Data Streams 制作者。KPL 代表您向 CloudWatch 发送吞吐量、错误和其他指标，并且可以配置为在流、分片或生产者级别进行监控。

 **异步架构**   
由于 KPL 可在将记录发送到 Kinesis Data Streams 之前对其进行缓冲处理，因此它在继续运行时之前不会强制调用方应用程序阻止和等待确认记录已到达服务器。用于将记录放入 KPL 中的调用应始终立即返回，并且不等待发送记录或接收来自服务器的响应。相反，将创建一个 `Future` 对象，该对象稍后将接收向 Kinesis Data Streams 发送记录的结果。这与 Amazon SDK 中的异步客户端行为相同。

## 了解何时不使用 KPL


当 KPL 会导致库（用户可配置）中产生高达 `RecordMaxBufferedTime` 的额外处理延迟时。`RecordMaxBufferedTime` 值越大，产生的包装效率和性能就越高。无法容忍此额外延迟的应用程序可能需要直接使用 Amazon 开发工具包。有关将软件开发工具包与 Kinesis Amazon Data Streams 配合使用的更多信息，[使用 Amazon Kinesis Data Streams API 开发制作人 适用于 Java 的 Amazon SDK](developing-producers-with-sdk.md)请参阅。有关 `RecordMaxBufferedTime` 和 KPL 其他用户可配置属性的更多信息，请参阅 [配置 Amazon Kinesis Producer Library](kinesis-kpl-config.md)。

# 安装 KPL
安装 KPL

Amazon 提供了针对 macOS、Windows 和最新 Linux 发行版（有关支持的平台的详细信息，请参阅下一部分）的 C\$1\$1 Amazon Kinesis Producer Library（KPL）的预先构建的二进制文件。这些二进制文件打包在 Java .jar 文件中，如果您使用 Maven 安装程序包，将自动调用和使用这些二进制文件。要查找最新版的 KPL 和 KCL，请使用以下 Maven 搜索链接：
+ [KPL](https://search.maven.org/#search|ga|1|amazon-kinesis-producer)
+ [KCL](https://search.maven.org/#search|ga|1|amazon-kinesis-client)

Linux 二进制文件已采用 GNU 编译器集合 (GCC) 进行编译并已静态链接到 Linux 上的 libstdc\$1\$1。这些二进制文件应适用于包含 glibc 2.5 版或更高版本的任何 64 位 Linux 发行版。

早期的 Linux 发行版的用户可以使用随源代码一起提供的编译说明来构建 KPL。 GitHub要从中下载 KPL GitHub，请参阅 [Amazon Kinesis 制作](https://github.com/awslabs/amazon-kinesis-producer)人库。

**重要**  
亚马逊 Kinesis Producer Library (KPL) 0.x 将于 2026 年 1 月 30 日 end-of-support上线。我们**强烈建议**您在 2026 年 1 月 30 日之前，将使用版本 0.x 的 KPL 应用程序迁移到最新的 KPL 版本。要查找最新的 KPL 版本，请参阅 [Github 上的 KPL 页面](https://github.com/awslabs/amazon-kinesis-producer)。有关从 KPL 0.x 迁移至 KPL 1.x 的信息，请参阅[从 KPL 0.x 迁移至 KPL 1.x](kpl-migration-1x.md)。

# 从 KPL 0.x 迁移至 KPL 1.x
迁移至 KPL 1.x

本主题提供了将您的消费者从 KPL 0.x 迁移到 KPL 1.x 的 step-by-step说明。KPL 1.x 引入了对 适用于 Java 的 Amazon SDK 2.x 的支持，同时保持了与先前版本的接口兼容性。无需更新核心数据处理逻辑即可迁移至 KPL 1.x。

1. **确保有以下先决条件：**
   + Java Development Kit（JDK）8 或更高版本
   + 适用于 Java 的 Amazon SDK 2.x
   + 用于依赖项管理的 Maven 或 Gradle

1. **添加依赖关系**

   如果您使用的是 Maven，请将以下依赖项添加到您的 pom.xml 文件中。务必将 GroupID 从 `com.amazonaws` 更新到 `software.amazon.kinesis`，并将版本 `1.x.x` 更新到最新的 KPL 版本。

   ```
   <dependency>
       <groupId>software.amazon.kinesis</groupId>
       <artifactId>amazon-kinesis-producer</artifactId>
       <version>1.x.x</version> <!-- Use the latest version -->
   </dependency>
   ```

   如果使用 Gradle，请在 `build.gradle` 文件中添加以下信息。请务必将 `1.x.x` 替换为最新 KPL 版本。

   ```
   implementation 'software.amazon.kinesis:amazon-kinesis-producer:1.x.x'
   ```

   可以在 [Maven Central 存储库](https://central.sonatype.com/search?q=amazon-kinesis-producer)中查看最新版本的 KPL。

1. **更新 KPL 的导入语句**

   KPL 1.x 使用 适用于 Java 的 Amazon SDK 2.x 并使用以开头的更新后的软件包名称`software.amazon.kinesis`，而之前的 KPL 中的软件包名称以开头。`com.amazonaws.services.kinesis`

   将 `com.amazonaws.services.kinesis` 的导入替换为 `software.amazon.kinesis`。下表列出了必须替换的导入。  
**导入替换**    
[\[See the AWS documentation website for more details\]](http://docs.amazonaws.cn/streams/latest/dev/kpl-migration-1x.html)

1. **更新 Amazon 凭证提供程序类的导入语句**

   迁移到 KPL 1.x 时，必须将 KPL 应用程序代码中基于 适用于 Java 的 Amazon SDK 1.x 的导入包和类更新为基于 2.x 的相应包和类。 适用于 Java 的 Amazon SDK KPL 应用程序中通常导入的是凭证提供程序类。有关[凭证提供程序变更](https://docs.amazonaws.cn/sdk-for-java/latest/developer-guide/migration-client-credentials.html)的完整列表，请参阅 适用于 Java 的 Amazon SDK 2.x 迁移指南文档中的凭证提供程序更改。下面是可能需要在 KPL 应用程序中进行的常见导入更改。

   **KPL 0.x 中的导入**

   ```
   import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
   ```

   **KPL 1.x 中的导入**

   ```
   import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
   ```

   如果您导入任何其他基于 适用于 Java 的 Amazon SDK 1.x 的凭证提供程序，则必须将其更新为 适用于 Java 的 Amazon SDK 2.x 的等效凭证提供程序。如果您没有 classes/packages 从 适用于 Java 的 Amazon SDK 1.x 中导入任何内容，则可以忽略此步骤。

1. **更新 KPL 配置中的凭证提供程序配置**

   KPL 1.x 中的凭证提供程序配置需要 适用于 Java 的 Amazon SDK 2.x 凭证提供程序。如果您`KinesisProducerConfiguration`通过覆盖默认凭证提供程序来传递中 适用于 Java 的 Amazon SDK 1.x 的凭证提供程序，则必须使用 适用于 Java 的 Amazon SDK 2.x 凭据提供程序对其进行更新。有关[凭证提供程序变更](https://docs.amazonaws.cn/sdk-for-java/latest/developer-guide/migration-client-credentials.html)的完整列表，请参阅 适用于 Java 的 Amazon SDK 2.x 迁移指南文档中的凭证提供程序更改。如果未覆盖 KPL 配置中的默认凭证提供程序，可忽略此步骤。

   例如，如果要使用以下代码覆盖 KPL 的默认凭证提供程序：

   ```
   KinesisProducerConfiguration config = new KinesisProducerConfiguration();
   // SDK v1 default credentials provider
   config.setCredentialsProvider(new DefaultAWSCredentialsProviderChain());
   ```

   必须使用以下代码进行更新才能使用 适用于 Java 的 Amazon SDK 2.x 凭证提供程序：

   ```
   KinesisProducerConfiguration config = new KinesisProducerConfiguration();
   // New SDK v2 default credentials provider
   config.setCredentialsProvider(DefaultCredentialsProvider.create());
   ```

# 转换为适用于 KPL 的 Amazon Trust Services（ATS）证书


在 2018 年 2 月 9 日上午 9:00（太平洋标准时间），Amazon Kinesis Data Streams 安装了 ATS 证书。要能继续使用 Amazon Kinesis Producer Library（KPL）向 Kinesis Data Streams 写入记录，您必须将安装的 KPL 升级到[版本 0.12.6](http://search.maven.org/#artifactdetails|com.amazonaws|amazon-kinesis-producer|0.12.6|jar) 或更高版本。此更改会影响所有 Amazon 区域。

有关迁移到 ATS 的信息，请参阅[如何为 Amazon迁移到自己的证书颁发机构做好准备](https://www.amazonaws.cn/blogs/security/how-to-prepare-for-aws-move-to-its-own-certificate-authority/)。

如果您遇到问题，需要技术支持，请通过 Amazon Support 中心[创建案例](https://console.amazonaws.cn/support/v1#/case/create)。

# KPL 受支持平台


Amazon Kinesis Producer Library（KPL）采用 C\$1\$1 编写，作为主用户进程的子进程运行。预编译的 64 位本机二进制文件与 Java 版本捆绑在一起并由 Java 包装程序管理。

无需在以下操作系统上安装任何其他库即可运行 Java 程序包：
+ 内核为 2.6.18（2006 年 9 月）及更高版本的 Linux 发行版
+ Apple iOS X 10.9 及更高版本
+ Windows Server 2008 及更高版本
**重要**  
0.14.0 之前的所有 KPL 版本支持 Windows Server 2008 及更高版本。  
KPL 0.14.0 及更高版本将不再支持该 Windows 平台。

请注意，KPL 仅为 64 位。

## 源代码


如果 KPL 安装中提供的二进制文件无法满足您的环境，则 KPL 的内核将编写为 C\$1\$1 模块。C\$1\$1 模块和 Java 接口的源代码根据亚马逊公共许可发布，可在亚马逊 [Kinesis Producer Library GitHub 上找到。](https://github.com/awslabs/amazon-kinesis-producer)虽然 KPL 可在安装了最新的符合标准的 C\$1\$1 编译器和 JRE 的任何平台上使用，但 Amazon 仍未正式支持不在受支持的平台列表中的任何平台。

# KPL 的重要概念


以下部分包含了解 Amazon Kinesis Producer Library（KPL）并从中获益所需的概念和术语。

**Topics**
+ [

## 记录
](#kinesis-kpl-concepts-records)
+ [

## 批处理
](#kinesis-kpl-concepts-batching)
+ [

## 聚合
](#kinesis-kpl-concepts-aggretation)
+ [

## 集合
](#kinesis-kpl-concepts-collection)

## 记录


在本指南中，我们区分了 *KPL 用户记录*和 *Kinesis Data Streams 记录*。当我们使用没有限定词的术语*记录*时，我们指 *KPL 用户记录*。当我们提及 Kinesis Data Streams 记录时，我们明确指 *Kinesis Data Streams* 记录。

KPL 用户记录是对用户有特别含义的数据 Blob。示例包括表示网站上的 UI 事件的 JSON Blob 或 Web 服务器中的日志条目。

Kinesis Data Streams 记录是 Kinesis Data Streams 服务 API 所定义 `Record` 数据结构的实例。它包含一个分区键、序列号和数据 Blob。

## 批处理


*批处理* 指对多个项执行单个操作而不是对每个单独的项重复执行操作。

在此背景下，“项”是一条记录，操作是将项发送到 Kinesis Data Streams。在非批处理情况下，您会将每条记录放置在单独的 Kinesis Data Streams 记录中，并发出一条 HTTP 请求以将其发送到 Kinesis Data Streams。利用批处理，每个 HTTP 请求可携带多条记录而不仅仅是一条记录。

KPL 支持两种批处理：
+ *聚合* – 在单条 Kinesis Data Streams 记录中存储多条记录。
+ *集合* – 使用 API 操作 `PutRecords` 将多条 Kinesis Data Streams 记录发送到 Kinesis 数据流中的一个或多个分片。

这两种类型的 KPL 批处理旨在共存并可彼此独立启用或禁用。默认情况下，两种批处理将同时启用。

## 聚合


*聚合*指在一条 Kinesis Data Streams 记录中存储多条记录。聚合允许客户增加每个 API 调用发送的记录的数目，这将有效增加创建器吞吐量。

Kinesis Data Streams 分片每秒支持最多 1,000 条 Kinesis Data Streams 记录或 1 MB 吞吐量。每秒 Kinesis Data Streams 记录数限制将绑定具有 1 KB 以下记录的客户。通过记录聚合，客户可以将多条记录合并为一条 Kinesis Data Streams 记录。这使客户能够提高其每分片吞吐量。

考虑以下情况：区域 us-east-1 中的一个分片当前正在以每秒 1000 条记录的恒速运行，其中每条记录的大小为 512 个字节。利用 KPL 聚合，您可将 1000 条记录打包到 10 条 Kinesis Data Streams 记录中，从而将 RPS 降低到 10（每条记录 50 KB）。

## 集合


*集合*指批处理多条 Kinesis Data Streams 记录，并通过调用 API 操作 `PutRecords` 在单个 HTTP 请求中发送此类记录，而无需在其自己的 HTTP 请求中发送每条 Kinesis Data Streams 记录。

与不使用集合相比，这将增加吞吐量，因为它减少了发出多个单独 HTTP 请求的开销。实际上，`PutRecords` 本身专用于实现此目的。

集合与聚合的不同之处在于，前者处理的是 Kinesis Data Streams 记录组。正在收集的 Kinesis Data Streams 记录仍可包含来自该用户的多条记录。可通过如下方式来可视化此关系：

```
record 0 --|
record 1   |        [ Aggregation ]
    ...    |--> Amazon Kinesis record 0 --|
    ...    |                              |
record A --|                              |
                                          |
    ...                   ...             |
                                          |
record K --|                              |
record L   |                              |      [ Collection ]
    ...    |--> Amazon Kinesis record C --|--> PutRecords Request
    ...    |                              |
record S --|                              |
                                          |
    ...                   ...             |
                                          |
record AA--|                              |
record BB  |                              |
    ...    |--> Amazon Kinesis record M --|
    ...    |
record ZZ--|
```

# 将 KPL 与产生器代码集成


Amazon Kinesis Producer Library（KPL）在单独进程中运行，并使用 IPC 与您的父用户进程通信。此架构有时称为[微服务](http://en.wikipedia.org/wiki/Microservices)，出于两个主要原因选择该架构：

**1) 即使 KPL 发生崩溃，也不会妨碍您的用户进程**  
您的进程可具有与 Kinesis Data Streams 不相关的任务，并且在 KPL 发生崩溃时仍能继续运行。对于您的父用户进程来说，也可重新启动 KPL 并恢复到完全运行状态（此功能位于正式包装程序中）。

例如，将指标发送到 Kinesis Data Streams 的 Web 服务器；即使 Kinesis Data Streams 部分已停止运行，该服务器仍能继续使用页面。由于 KPL 中存在错误，导致整个服务器发生崩溃，从而造成不必要的中断。

**2) 可支持任意客户端**  
始终存在使用官方支持的语言之外的语言的客户。这些客户也应能轻松使用 KPL。

## 推荐的使用矩阵


以下使用矩阵列出了不同用户的推荐设置，并向您提供了有关是否以及如何使用 KPL 的建议。请记住，如果启用聚合，则还必须使用取消聚合来提取消费端端的记录。


| 创建器端语言 | 消费端端语言 | KCL 版本 | 检查点逻辑 | 您是否可以使用 KPL？ | 警告 | 
| --- | --- | --- | --- | --- | --- | 
| 除 Java 之外的任何语言 | \$1 | \$1 | \$1 | 否 | 不适用 | 
| Java | Java | 直接使用 Java 软件开发工具包 | 不适用 | 是 | 如果使用聚合，您必须在 GetRecords 调用后使用提供的取消聚合库。 | 
| Java | 除 Java 之外的任何语言 | 直接使用软件开发工具包 | 不适用 | 是 | 必须禁用聚合。 | 
| Java | Java | 1.3.x | 不适用 | 是 | 必须禁用聚合。 | 
| Java | Java  | 1.4.x | 调用不带任何参数的检查点 | 是 | 无 | 
| Java | Java | 1.4.x | 调用带明确序列号的检查点 | 是 | 禁用聚合或更改代码以使用扩展的序列号进行检查点操作。 | 
| Java | 除 Java 之外的任何语言  | 1.3.x \$1 多语言守护进程 \$1 特定于语言的包装程序 | 不适用 | 是 | 必须禁用聚合。 | 

# 使用 KPL 写入 Kinesis Data Streams
使用 KPL 写入 Kinesis Data Streams

以下部分说明了正在运行的示例代码，从最基本的产生器到完全异步的代码。

## Barebones 产生器代码


以下代码是写入最小工作创建器所需的全部代码。Amazon Kinesis Producer Library（KPL）用户记录在后台处理。

```
// KinesisProducer gets credentials automatically like 
// DefaultAWSCredentialsProviderChain. 
// It also gets region automatically from the EC2 metadata service. 
KinesisProducer kinesis = new KinesisProducer();  
// Put some records 
for (int i = 0; i < 100; ++i) {
    ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8"));
    // doesn't block       
    kinesis.addUserRecord("myStream", "myPartitionKey", data); 
}  
// Do other stuff ...
```

## 同步响应结果


在上一个示例中，该代码未检查 KPL 用户记录是否已成功。KPL 执行为说明失败原因所需的任何重试。但如果您要检查结果，可使用从 `addUserRecord` 返回的 `Future` 对象检查结果，如以下示例中所示（显示上一示例是为了提供上下文信息）：

```
KinesisProducer kinesis = new KinesisProducer();  

// Put some records and save the Futures 
List<Future<UserRecordResult>> putFutures = new LinkedList<Future<UserRecordResult>>(); 
for (int i = 0; i < 100; i++) {
    ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8"));
    // doesn't block 
    putFutures.add(
        kinesis.addUserRecord("myStream", "myPartitionKey", data)); 
}  

// Wait for puts to finish and check the results 
for (Future<UserRecordResult> f : putFutures) {
    UserRecordResult result = f.get(); // this does block     
    if (result.isSuccessful()) {         
        System.out.println("Put record into shard " + 
                            result.getShardId());     
    } else {
        for (Attempt attempt : result.getAttempts()) {
            // Analyze and respond to the failure         
        }
    }
}
```

## 异步响应结果


上一个示例对 `Future` 对象调用 `get()`，这会阻止运行时。如果您不想阻止运行时，则可使用异步回调，如以下示例所示：

```
KinesisProducer kinesis = new KinesisProducer();

FutureCallback<UserRecordResult> myCallback = new FutureCallback<UserRecordResult>() {     
    @Override public void onFailure(Throwable t) {
        /* Analyze and respond to the failure  */ 
    };     
    @Override public void onSuccess(UserRecordResult result) { 
        /* Respond to the success */ 
    };
};

for (int i = 0; i < 100; ++i) {
    ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8"));      
    ListenableFuture<UserRecordResult> f = kinesis.addUserRecord("myStream", "myPartitionKey", data);     
    // If the Future is complete by the time we call addCallback, the callback will be invoked immediately.
    Futures.addCallback(f, myCallback); 
}
```

# 配置 Amazon Kinesis Producer Library
配置 KPL

虽然默认设置应适用于大多数使用案例，但您可能想更改部分默认设置以定制 `KinesisProducer` 的行为来满足您的需求。为此，可将 `KinesisProducerConfiguration` 类的实例传递给 `KinesisProducer` 构造函数，例如：

```
KinesisProducerConfiguration config = new KinesisProducerConfiguration()
        .setRecordMaxBufferedTime(3000)
        .setMaxConnections(1)
        .setRequestTimeout(60000)
        .setRegion("us-west-1");
        
final KinesisProducer kinesisProducer = new KinesisProducer(config);
```

您也可从属性文件中加载配置：

```
KinesisProducerConfiguration config = KinesisProducerConfiguration.fromPropertiesFile("default_config.properties");
```

您可替换用户进程可访问的任何路径和文件名。此外，您可在通过此方式创建的 `KinesisProducerConfiguration` 实例上调用 set 方法来自定义 config。

属性文件应使用中的名称来指定参数 PascalCase。这些名称将与 `KinesisProducerConfiguration` 类中的设置方法中使用的名称匹配。例如：

```
RecordMaxBufferedTime = 100
MaxConnections = 4
RequestTimeout = 6000
Region = us-west-1
```

有关配置参数使用规则和值限制的更多信息，请参阅[上的配置属性文件示例 GitHub](https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties)。

请注意，初始化 `KinesisProducer` 后，更改已使用的 `KinesisProducerConfiguration` 实例不会产生进一步的影响。`KinesisProducer` 当前不支持动态重新配置。

# 实现消费端取消聚合


从版本 1.4.0 开始，KCL 支持自动取消聚合 KPL 用户记录。在更新 KCL 后，将编译利用 KCL 早期版本编写的消费端应用程序代码，而无需进行任何修改。不过，如果正在创建器端使用 KPL 聚合，则有一个与检查点操作相关的细微之处：已聚合记录中的所有子记录都具有相同的序列号，因此如果您需要区分子记录，则必须利用检查点存储额外数据。此额外数据称作*子序列号*。

**Topics**
+ [

## 从以前的 KCL 版本迁移
](#kinesis-kpl-consumer-deaggregation-migration)
+ [

## 将 KCL 扩展用于 KPL 取消聚合
](#kinesis-kpl-consumer-deaggregation-extensions)
+ [

## GetRecords 直接使用
](#kinesis-kpl-consumer-deaggregation-getrecords)

## 从以前的 KCL 版本迁移


您无需更改现有调用来执行检查点操作与聚合。仍确保您能够成功检索 Kinesis Data Streams 中存储的所有记录。KCL 现在提供两个新检查点操作来支持特定的使用案例，如下所述。

如果您的现有代码先于 KPL 支持之前已为 KCL 写入，并且调用了没有参数的检查点操作，则它等同于对批处理中的上个 KPL 用户记录的序列号进行检查点操作。如果调用带序列号字符串的检查点操作，则它等同于对批处理的指定序列号以及隐式子序列号 0（零）进行检查点操作。

调用没有任何参数的新 KCL 检查点 `checkpoint()` 操作，在语义上等同于对批处理中的上个 `Record` 调用的序列号以及隐式子序列号 0（零）进行检查点操作。

调用新 KCL 检查点操作 `checkpoint(Record record)`，在语义上等同于对指定 `Record` 的序列号以及隐式子序列号 0（零）进行检查点操作。如果 `Record` 调用实际为 `UserRecord`，则对 `UserRecord` 序列号和子序列号进行检查点操作。

调用新 KCL 检查点操作 `checkpoint(String sequenceNumber, long subSequenceNumber)` 会对给定的序列号以及子序列号进行显式检查点操作。

在上述任意情况下，在将检查点存储在 Amazon DynamoDB 检查点表中后，KCL 可正确地恢复检索记录，即使是在应用程序发生崩溃并重新启动时也是如此。如果在序列中包含多条记录，则检索会从具有已进行检查点操作的最新序列号的记录中的下个子序列号记录开始。如果最新检查点包括上一条序列号记录的最新子序列号，则检索会从具有下个序列号的记录开始。

以下部分将讨论必须避免跳过和重复记录的消费端的序列和子序列检查点操作的详细信息。如果在停止并重新启动消费端的记录处理时跳过（或重复）记录并不重要，则可运行您的现有代码而无需进行修改。

## 将 KCL 扩展用于 KPL 取消聚合


KPL 取消聚合会涉及子序列检查点操作。为了帮助使用子序列检查点操作，已将 `UserRecord` 类添加到 KCL：

```
public class UserRecord extends Record {     
    public long getSubSequenceNumber() {
    /* ... */
    }      
    @Override 
    public int hashCode() {
    /* contract-satisfying implementation */ 
    }      
    @Override 
    public boolean equals(Object obj) {
    /* contract-satisfying implementation */ 
    } 
}
```

现在使用的是此类而不是 `Record`。这不会破坏现有代码，因为它是 `Record` 的子类。`UserRecord` 类同时表示实际子记录和标准非聚合记录。非聚合记录可被视为刚好具有一条子记录的聚合记录。

此外，将两个新的操作添加到 `IRecordProcessorCheckpointer`：

```
public void checkpoint(Record record); 
public void checkpoint(String sequenceNumber, long subSequenceNumber);
```

要开始使用子序列号检查点操作，您可执行以下转换。更改以下形式代码：

```
checkpointer.checkpoint(record.getSequenceNumber());
```

新的形式代码：

```
checkpointer.checkpoint(record);
```

建议您使用 `checkpoint(Record record)` 形式来进行子序列检查点操作。不过，如果您已以字符串形式存储 `sequenceNumbers` 以用于检查点操作，则您现在也应存储 `subSequenceNumber`，如以下示例所示：

```
String sequenceNumber = record.getSequenceNumber(); 
long subSequenceNumber = ((UserRecord) record).getSubSequenceNumber();  // ... do other processing  
checkpointer.checkpoint(sequenceNumber, subSequenceNumber);
```

从 `Record` 到 `UserRecord` 的转换始终会成功，因为其实现始终使用 `UserRecord`。除非需要对序列号进行算术运算，否则建议不要采用此方法。

在处理 KPL 用户记录时，KCL 会将子序列号作为每行的额外字段写入 Amazon DynamoDB。在恢复检查点操作时，早期版本的 KCL 使用 `AFTER_SEQUENCE_NUMBER` 提取记录。而具有 KPL 支持的当前 KCL 则使用 `AT_SEQUENCE_NUMBER`。在检索带已进行检查点操作的序列号的记录时，会检查已进行检查点操作的子序列号，而且会根据需要删除子记录（如果上一条子记录为已进行检查点操作的记录，则可能删除所有子记录）。同样，非聚合记录可被视为具有单条子记录的聚合记录，因此相同的算法同时适用于聚合和非聚合记录。

## GetRecords 直接使用


您也可以选择不使用 KCL 而是直接调用 API 操作 `GetRecords` 来检索 Kinesis Data Streams 记录。要将已检索到的记录提取到原始 KPL 用户记录中，可在 `UserRecord.java` 中调用下列静态操作之一：

```
public static List<Record> deaggregate(List<Record> records)

public static List<UserRecord> deaggregate(List<UserRecord> records, BigInteger startingHashKey, BigInteger endingHashKey)
```

第一个操作使用 `0` 的默认值 `startingHashKey`（零）和 `2^128 -1` 的默认值 `endingHashKey`。

每个操作都会取消将 Kinesis Data Streams 记录的给定列表聚合到 KPL 用户记录的列表中。将从返回的记录列表中删除其显式哈希键或分区键位于 `startingHashKey`（包含）和 `endingHashKey`（包含）的范围之外的任何 KPL 用户记录。

# 将 KPL 与 Amazon Data Firehose 搭配使用


如果使用 Kinesis Producer Library（KPL）将数据写入 Kinesis 数据流，则可以使用聚合来合并写入该 Kinesis 数据流的记录。如果随后将该数据流用作 Firehose 传输流的来源，Firehose 将在记录传送到目标之前取消聚合。如果您配置传输流来转换数据，Firehose 在将记录传输到 Amazon Lambda之前，会先取消聚合记录。有关更多信息，请参阅 [Writing to Amazon Firehose Using Kinesis Data Streams](https://docs.amazonaws.cn/firehose/latest/dev/writing-with-kinesis-streams.html)。

# 将 KPL 与 Amazon Glue 架构注册表一起使用


您可以将 Kinesis 数据流与 Amazon Glue 架构注册表集成。 Amazon Glue 架构注册表能帮助您集中发现、控制和演变架构，同时确保注册架构持续验证生成的数据。架构定义了数据记录的结构和格式。架构是用于可靠数据发布、使用或存储的版本化规范。 Amazon Glue Schema Registry 使您能够改善流媒体应用程序中的 end-to-end数据质量和数据治理。有关更多信息，请参阅 [Amazon Glue Schema Registry](https://docs.amazonaws.cn/glue/latest/dg/schema-registry.html)。设置此集成的方法之一是通过 Java 中的 KPL 和 Kinesis Client Library（KCL）进行。

**重要**  
目前，只有使用用 Java 实现的 KPL 生成器的 Kinesis 数据流支持 Kinesis 数据流 Amazon Glue 和架构注册表集成。不提供多语言支持。

有关如何使用 KPL 设置 Kinesis Data Streams 与架构注册表集成的详细说明，请参阅 “用[例：将 Amazon Kinesis Data Streams 与 Glu KPL/KCL e 架构注册表集成” 中的 “使用库与 Amazon](https://docs.amazonaws.cn/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds)数据交互” 部分。

# 配置 KPL 代理配置


对于无法直接连接到互联网的应用程序，所有 Amazon SDK 客户端都支持使用 HTTP 或 HTTPS 代理。在一般企业环境中，所有出站网络流量都必须通过代理服务器。如果您的应用程序使用 Kinesis Producer 库 (KPL) 在使用代理服务器的环境 Amazon 中收集和发送数据，则您的应用程序将需要配置 KPL 代理。KPL 是一个基于 Kinesi Amazon s 软件开发工具包构建的高级库。它分为原生进程和包装器。原生进程执行处理和发送记录的所有工作，而包装器则管理原生进程并与之通信。有关更多信息，请参阅 [Implementing Efficient and Reliable Producers with the Amazon Kinesis Producer Library](https://www.amazonaws.cn/blogs/big-data/implementing-efficient-and-reliable-producers-with-the-amazon-kinesis-producer-library/)。

包装器在 Java 中编写，而原生进程使用 Kinesis 开发工具包在 C\$1\$1 中编写。KPL 0.14.7 及更高版本当前支持 Java 包装器中的代理配置，可以将所有代理配置传递至原生进程。有关更多信息，请参见 [https://github.com/awslabs/amazon-kinesis-producer/releases/tag/v0.14.7](https://github.com/awslabs/amazon-kinesis-producer/releases/tag/v0.14.7)。

您可以使用以下代码向 KPL 应用程序添加代理配置。

```
KinesisProducerConfiguration configuration = new KinesisProducerConfiguration();
// Next 4 lines used to configure proxy 
configuration.setProxyHost("10.0.0.0"); // required
configuration.setProxyPort(3128); // default port is set to 443
configuration.setProxyUserName("username"); // no default 
configuration.setProxyPassword("password"); // no default

KinesisProducer kinesisProducer = new KinesisProducer(configuration);
```

# KPL 版本生命周期策略


本主题概述了 Amazon Kinesis 生产者库 (KPL) 的版本生命周期政策。 Amazon 定期为 KPL 版本提供新版本，以支持新功能和增强功能、错误修复、安全补丁和依赖项更新。我们建议您继续 up-to-date使用 KPL 版本，以了解最新功能、安全更新和底层依赖关系。我们**不**建议继续使用不受支持的 KPL 版本。

主要 KPL 版本的生命周期包括以下三个阶段：
+ **正式发布 (GA)**-在此阶段，将完全支持主要版本。 Amazon 提供常规的次要版本和补丁版本，其中包括对 Kinesis Data Streams 的新功能或 API 更新的支持，以及错误和安全修复。
+ **维护模式**-将补丁版本的发布 Amazon 限制为仅解决关键错误修复和安全问题。主要版本不会收到有关新功能或 Kinesis Data APIs Streams 的更新。
+ **E nd-of-support** — 主版本将不再接收更新或发布。之前发布的版本将继续通过公共包管理器提供，并且代码将保持不变 GitHub。用户可以自行决定 end-of-support是否使用已达到的版本。建议您升级到最新的主要版本。


| 主要版本 | 当前阶段 | 发行日期 | 维护模式日期 | End-of-support 日期 | 
| --- | --- | --- | --- | --- | 
| KPL 0.x | 维护模式 | 2015-06-02 | 2025-04-17 | 2026-01-30 | 
| KPL 1.x | 正式发布 | 2024-12-15 | -- | -- | 