第 5 步:实施使用者 - Amazon Kinesis Data Streams
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

第 5 步:实施使用者

教程:使用 KPL 和 KCL 1.x 处理实时股票数据 中的使用者应用程序持续处理您在 第 4 步:实施创建器 中创建的股票交易流。然后,它输出每分钟买入和卖出最多的股票。该应用程序基于 Kinesis Client Library(KCL)构建,后者需要完成对使用器应用程序常见的大量繁重工作。有关更多信息,请参阅开发 KCL 1.x 消费端

请参阅源代码并查看以下信息。

StockTradesProcessor 类

为您提供的使用者的主类,它将执行以下任务:

  • 读取作为参数传递的应用程序名称、流名称和区域名称。

  • ~/.aws/credentials 读取凭证。

  • 创建一个 RecordProcessorFactory 实例,该实例提供由 RecordProcessor 实例实施的 StockTradeRecordProcessor 的实例。

  • 利用 RecordProcessorFactory 实例和标准配置(包括流名称、凭证和应用程序名称)创建 KCL 工作程序。

  • 此工作程序为每个分片(已分配给此使用器实例)创建一个线程,以持续循环读取 Kinesis Data Streams 中的记录。之后,它调用 RecordProcessor 实例以处理收到的每批记录。

StockTradeRecordProcessor 类

RecordProcessor 实例的实施,该实例反过来将实施三个必需方法:initializeprocessRecordsshutdown

正如其名称所示,initializeshutdown 由 Kinesis Client Library 使用,旨在让记录处理器了解何时应准备好开始接收记录以及何时应停止接收记录,因此该方法可以执行任何特定于应用程序的设置和终止任务。将为您提供这些方法的代码。processRecords 方法中进行的主要处理,该处理反过来对每条记录使用 processRecord。后一个方法主要作为空框架代码提供给您,以便您在下一步骤中实施,届时将进一步对其进行说明。

另外要注意的是 processRecord 的支持方法 reportStatsresetStats 的实施,二者在初始源代码中为空。

已为您实施 processRecords 方法,并执行了以下步骤:

  • 对于传入的每条记录,对其调用 processRecord

  • 如果自上一次报告以来已过去至少 1 分钟,请调用 reportStats()(它将打印出最新统计数据),然后调用 resetStats()(它将清除统计数据以便下一个间隔仅包含新记录)。

  • 设置下一次报告时间。

  • 如果自上一检查点以来已过去至少 1 分钟,请调用 checkpoint()

  • 设置下一次检查点操作时间。

此方法使用 60 秒间隔作为报告和检查点操作比率。有关检查点操作的更多信息,请参阅 有关使用者的附加信息

StockStats 类

此类提供一段时间内针对最热门股票的数据保留和统计数据跟踪。此代码已提供给您并包含以下方法:

  • addStockTrade(StockTrade):将给定的 StockTrade 注入正在使用的统计数据。

  • toString():以格式化字符串形式返回统计数据。

此类跟踪最热门股票的方式是,保留每只股票的总交易数的连续计数和最大计数。每当股票交易达成时,它都会更新这些计数。

将代码添加到 StockTradeRecordProcessor 类的方法,如以下步骤中所示。

实施使用者
  1. 通过实例化大小正确的 processRecord 对象并将记录数据添加到该对象来实施 StockTrade 方法,并在出现问题时记录警告。

    StockTrade trade = StockTrade.fromJsonAsBytes(record.getData().array()); if (trade == null) { LOG.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.getPartitionKey()); return; } stockStats.addStockTrade(trade);
  2. 实施简单的 reportStats 方法。可随时将输出格式修改为您的首选格式。

    System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" + stockStats + "\n" + "****************************************************************\n");
  3. 最后,实施 resetStats 方法,这将创建新的 stockStats 实例。

    stockStats = new StockStats();
运行使用者
  1. 运行您在 第 4 步:实施创建器 中编写的创建者以将模拟股票交易记录引入流中。

  2. 验证之前(在创建 IAM 用户时)检索到的访问密钥和私有密钥对是否保存到文件 ~/.aws/credentials 中。

  3. 使用以下参数运行 StockTradesProcessor 类:

    StockTradesProcessor StockTradeStream us-west-2

    请注意,如果您在 us-west-2 之外的区域中创建流,则必须改为在此处指定该区域。

1 分钟后,您应看到类似以下内容的输出,并且输出在此后每分钟刷新一次:

****** Shard shardId-000000000001 stats for last 1 minute ****** Most popular stock being bought: WMT, 27 buys. Most popular stock being sold: PTR, 14 sells. ****************************************************************

有关使用者的附加信息

如果熟悉 Kinesis Client Library 的好处(在 开发 KCL 1.x 消费端 中和其他位置已讨论),您可能想知道为何应在此处使用它。虽然您只使用单个分片流和单个使用器实例来处理它,但使用 KCL 实施使用器仍会更轻松。将创建器部分中的代码实施步骤与使用者部分中的进行对比,您会发现实施使用者相对来说容易一些。这主要是因为 KCL 提供的服务。

在此应用程序中,您专注于实施可处理单条记录的记录处理器类。您无需担心如何从 Kinesis Data Streams 提取记录;当有可用的新记录时,KCL 就会提取这些记录并调用记录处理器。此外,不必担心分片和使用者实例的数量。如果已扩展流,则不必重写应用程序以处理多个分片或多个使用者实例。

术语检查点操作是指记录到流中目前已使用和处理的数据记录所在的点,这样一来,当应用程序发生崩溃时,系统将从该点读取流,而不是从头开始读取流。检查点操作主题、各种设计模式及其最佳实践不在本章讨论范围之内。但是,生产环境中可能会涉及上述内容。

正如您在 第 4 步:实施创建器 中了解到的,Kinesis Data Streams API 中的 put 操作将采用分区键作为输入。Kinesis Data Streams 使用分区键作为跨多个分片拆分记录的机制(当流中有多个分片时)。相同的分区键将始终路由到同一个分片。这使得能够基于以下假设来设计用于处理特定分片的使用者:具有相同分区键的记录只会发送给该使用者,具有相同分区键的任何记录都不会在任何其他使用者处结束。因此,使用者的工作程序可聚合具有相同分区键的所有记录而不用担心丢失所需的数据。

在此应用程序中,使用器对记录的处理并不集中,因此您可以使用一个分片并在与 KCL 线程相同的线程中执行处理。但在实际应用中,请先考虑增加分片数量。在某些情况下,您可能需要将处理切换到其他线程或需要使用线程池(如果您的记录处理应是集中的)。这样一来,KCL 可以更快地提取新记录,而其他线程可并行处理记录。多线程设计并不是无关紧要的,应使用先进技术来实现,因此增加分片计数通常是最有效、最轻松的扩展方法。

后续步骤

第 6 步:(可选) 扩展使用者