Amazon Kinesis Data Streams
开发人员指南
AWS 服务或AWS文档中描述的功能,可能因地区/位置而异。请点击 Amazon AWS 入门,可查看中国地区的具体差异

第 5 步:实施使用者

您正在开发的使用者应用程序将持续处理您在 第 4 步:实施创建器 中创建的股票交易流,并输出每分钟买入和卖出最多的股票。该应用程序基于 KCL 构建,后者需要完成对使用者应用程序常见的大量繁重工作。有关更多信息,请参阅 使用 Kinesis Client Library 开发 Amazon Kinesis Data Streams 使用者

请参阅源代码并查看以下信息。

StockTradesProcessor 类

为您提供的使用者的主类,它将执行以下任务:

  • 读取作为参数传入的应用程序名称、流名称和区域名称。

  • ~/.aws/credentials 读取凭证。

  • 创建一个 RecordProcessorFactory 实例,该实例提供由 StockTradeRecordProcessor 实例实施的 RecordProcessor 的实例。

  • 利用 RecordProcessorFactory 实例和标准配置(包括流名称、凭证和应用程序名称)创建 KCL 工作程序。

  • 工作程序为分配给此使用者实例的每个分片创建一个新线程,该线程将持续循环以从 Kinesis Data Streams 读取记录,然后调用 RecordProcessor 实例以处理收到的每批记录。

StockTradeRecordProcessor 类

RecordProcessor 实例的实施,该实例反过来将实施三个必需方法:initializeprocessRecordsshutdown

正如其名称所示,shutdowninitialize 由 KCL 使用,旨在让记录处理器了解何时应准备好开始接收记录以及何时应停止接收记录,因此该方法可以执行任何特定于应用程序的设置和终止任务。将为您提供这些方法的代码。processRecords 方法中进行的主要处理,该处理反过来对每条记录使用 processRecord。后一个方法主要作为空框架代码提供给您,以便您在下一步骤中实施,届时将进一步对其进行说明。

另外要注意的是 processRecord 的支持方法 reportStatsresetStats 的实施,二者在初始源代码中为空。

已为您实施 processsRecords 方法,并执行了以下步骤:

  • 对于每条传入的记录,对其调用 processRecord

  • 如果自上一次报告以来已过去至少 1 分钟,请调用 reportStats()(它将打印出最新统计数据),然后调用 resetStats()(它将清除统计数据以便下一个间隔仅包含新记录)。

  • 设置下一次报告时间。

  • 如果自上一次检查点操作以来已过去至少 1 分钟,请调用 checkpoint()

  • 设置下一次检查点操作时间。

此方法使用 60 秒间隔作为报告和检查点操作比率。有关检查点操作的更多信息,请参阅 有关使用者的附加信息

StockStats 类

此类提供一段时间内针对最热门股票的数据保留和统计数据跟踪。此代码已提供给您并包含以下方法:

  • addStockTrade(StockTrade):将给定的 StockTrade 注入正在使用的统计数据。

  • toString():以格式化字符串形式返回统计数据。

此类跟踪最热门股票的方式是,保留每只股票的总交易数的连续计数并保留最大计数。它确保每当股票交易达成时更新这些计数。

将代码添加到 StockTradeRecordProcessor 类的方法,如以下步骤中所示。

实施使用者

  1. 通过实例化大小正确的 StockTrade 对象并将记录数据添加到该对象来实施 processRecord 方法,并在出现问题时记录警告。

    StockTrade trade = StockTrade.fromJsonAsBytes(record.getData().array()); if (trade == null) { LOG.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.getPartitionKey()); return; } stockStats.addStockTrade(trade);
  2. 实施简单的 reportStats 方法。可随时将输出格式修改为您的首选格式。

    System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" + stockStats + "\n" + "****************************************************************\n");
  3. 最后,实施 resetStats 方法,这将创建新的 stockStats 实例。

    stockStats = new StockStats();

运行使用者

  1. 运行您在上一模块中编写的创建器以将模拟股票交易记录注入流。

  2. 验证之前(在创建 IAM 用户时)检索到的访问密钥和私有密钥对是否保存在文件 ~/.aws/credentials 中。

  3. 使用以下参数运行 StockTradesProcessor 类:

    StockTradesProcessor StockTradeStream us-west-2

    请注意,如果您在 us-west-2 之外的区域中创建流,则必须改为在此处指定该区域。

1 分钟后,您应看到类似以下内容的输出,并且输出在此后每分钟刷新一次:

****** Shard shardId-000000000001 stats for last 1 minute ****** Most popular stock being bought: WMT, 27 buys. Most popular stock being sold: PTR, 14 sells. ****************************************************************

有关使用者的附加信息

如果您熟悉 KCL 的好处(在 使用 Kinesis Client Library 开发 Amazon Kinesis Data Streams 使用者 中和其他位置已讨论),您可能想知道为何您应在此处使用它。虽然您只使用单个分片流和单个使用者实例来处理它,但使用 KCL 实施使用者仍会更轻松。将创建器部分中的代码实施步骤与使用者部分中的进行对比,您会发现实施使用者相对来说容易一些。这很大程度上是因为 KCL 提供的服务。

在此应用程序中,您专注于实施可处理单条记录的记录处理器类。您无需担心如何从 Kinesis Data Streams 提取记录;当有可用的新记录时,KCL 就会提取这些记录并调用记录处理器。此外,您无需担心有多少个分片和使用者实例;如果已扩展流,则您无需重写应用程序以处理多个分片或多个使用者实例。

术语检查点操作是指记录到流中目前已使用和处理的数据记录所在的点,这样一来,当应用程序发生崩溃时,系统将从该点读取流,而不是从头开始读取流。检查点操作主题、各种设计模式以及相关最佳实践不在本章讨论范围内,但您在生产环境中可能会遇到。

正如您在第 4 步:实施创建器中了解到的,Kinesis Data Streams API 中的 put 操作将采用分区键 作为输入。分区键被 Kinesis Data Streams 用作将记录拆分到多个分片中的机制(当流中有多个分片时)。相同的分区键将始终路由到同一个分片。这使得能够基于以下假设来设计用于处理特定分片的使用者:具有相同分区键的记录只会发送给该使用者,具有相同分区键的任何记录都不会在任何其他使用者处结束。因此,使用者的工作程序可聚合具有相同分区键的所有记录而不用担心丢失所需的数据。

在此应用程序中,使用者对记录的处理并不集中,因此您可以使用一个分片并在与 KCL 线程相同的线程中执行处理。但在实际应用中,请先考虑增大分片数,本学习系列中的下一个模块将对此进行演示。在某些情况下,您可能需要将处理切换到其他线程或需要使用线程池(如果您的记录处理应是集中的)。这样一来,KCL 可以更快地提取新记录,而其他线程可并行处理记录。请注意,多线程设计并不是无关紧要的,应使用先进技术来实现,因此增加分片计数通常是最有效、最轻松的扩展方法。