本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
实现消费端
教程:使用 KPL 和 KCL 1.x 处理实时股票数据 中的消费端应用程序持续处理您在 实现产生器 中创建的股票交易流。然后,它输出每分钟买入和卖出最多的股票。该应用程序基于 Kinesis Client Library(KCL)构建,后者需要完成对消费端应用程序常见的大量繁重工作。有关更多信息,请参阅 培养 KCL 1.x 消费者。
请参阅源代码并查看以下信息。
- StockTradesProcessor 类
-
为您提供的消费端的主类,它将执行以下任务:
-
读取作为参数传递的应用程序名称、流名称和区域名称。
-
从
~/.aws/credentials
读取凭证。 -
创建一个
RecordProcessorFactory
实例,该实例提供由RecordProcessor
实例实施的StockTradeRecordProcessor
的实例。 -
利用
RecordProcessorFactory
实例和标准配置(包括流名称、凭证和应用程序名称)创建 KCL 工作程序。 -
此工作程序为每个分片(已分配给此消费端实例)创建一个线程,以持续循环读取 Kinesis Data Streams 中的记录。之后,它调用
RecordProcessor
实例以处理收到的每批记录。
-
- StockTradeRecordProcessor 类
-
RecordProcessor
实例的实施,该实例反过来将实施三个必需方法:initialize
、processRecords
和shutdown
。正如其名称所示,
initialize
和shutdown
由 Kinesis Client Library 使用,旨在让记录处理器了解何时应准备好开始接收记录以及何时应停止接收记录,因此该方法可以执行任何特定于应用程序的设置和终止任务。将为您提供这些方法的代码。processRecords
方法中进行的主要处理,该处理反过来对每条记录使用processRecord
。后一个方法主要作为空框架代码提供给您,以便您在下一步骤中实施,届时将进一步对其进行说明。另外要注意的是
processRecord
的支持方法reportStats
和resetStats
的实施,二者在初始源代码中为空。已为您实施
processRecords
方法,并执行了以下步骤:-
对于传入的每条记录,对其调用
processRecord
。 -
如果自上一次报告以来已过去至少 1 分钟,请调用
reportStats()
(它将打印出最新统计数据),然后调用resetStats()
(它将清除统计数据以便下一个间隔仅包含新记录)。 -
设置下一次报告时间。
-
如果自上一检查点以来已过去至少 1 分钟,请调用
checkpoint()
。 -
设置下一次检查点操作时间。
此方法使用 60 秒间隔作为报告和检查点操作比率。有关检查点操作的更多信息,请参阅 有关消费端的其他信息。
-
- StockStats 类
-
此类提供一段时间内针对最热门股票的数据保留和统计数据跟踪。此代码已提供给您并包含以下方法:
-
addStockTrade(StockTrade)
:将给定的StockTrade
注入正在使用的统计数据。 -
toString()
:以格式化字符串形式返回统计数据。
此类跟踪最热门股票的方式是,保留每只股票的总交易数的连续计数和最大计数。每当股票交易达成时,它都会更新这些计数。
-
将代码添加到 StockTradeRecordProcessor
类的方法,如以下步骤中所示。
实施消费端
-
通过实例化大小正确的
processRecord
对象并将记录数据添加到该对象来实施StockTrade
方法,并在出现问题时记录警告。StockTrade trade = StockTrade.fromJsonAsBytes(record.getData().array()); if (trade == null) { LOG.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.getPartitionKey()); return; } stockStats.addStockTrade(trade);
-
实施简单的
reportStats
方法。可随时将输出格式修改为您的首选格式。System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" + stockStats + "\n" + "****************************************************************\n");
-
最后,实施
resetStats
方法,这将创建新的stockStats
实例。stockStats = new StockStats();
运行消费端
-
运行您在 实现产生器 中编写的创建者以将模拟股票交易记录引入流中。
-
验证之前(在创建 IAM 用户时)检索到的访问密钥和私有密钥对是否保存到文件
~/.aws/credentials
中。 -
使用以下参数运行
StockTradesProcessor
类:StockTradesProcessor StockTradeStream us-west-2
请注意,如果您在
us-west-2
之外的区域中创建流,则必须改为在此处指定该区域。
1 分钟后,您应看到类似以下内容的输出,并且输出在此后每分钟刷新一次:
****** Shard shardId-000000000001 stats for last 1 minute ******
Most popular stock being bought: WMT, 27 buys.
Most popular stock being sold: PTR, 14 sells.
****************************************************************
有关消费端的其他信息
如果熟悉 Kinesis Client Library 的好处(在 培养 KCL 1.x 消费者 中和其他位置已讨论),您可能想知道为何应在此处使用它。虽然您只使用单个分片流和单个消费端实例来处理它,但使用 KCL 实施消费端仍会更轻松。将创建器部分中的代码实施步骤与消费端部分中的进行对比,您会发现实施消费端相对来说容易一些。这主要是因为 KCL 提供的服务。
在此应用程序中,您专注于实施可处理单条记录的记录处理器类。您无需担心如何从 Kinesis Data Streams 提取记录;当有可用的新记录时,KCL 就会提取这些记录并调用记录处理器。此外,不必担心分片和消费端实例的数量。如果已扩展流,则不必重写应用程序以处理多个分片或多个消费端实例。
检查点一词是指记录流中的点,其中直到迄今为止已经使用和处理的数据记录。如果应用程序崩溃,则从该点读取流,而不是从流的开头进行读取。检查点操作主题、各种设计模式及其最佳实践不在本章讨论范围之内。但是,生产环境中可能会涉及上述内容。
正如您在 实现产生器 中了解到的,Kinesis Data Streams API 中的 put
操作将采用分区键作为输入。Kinesis Data Streams 使用分区键作为跨多个分片拆分记录的机制(当流中有多个分片时)。相同的分区键将始终路由到同一个分片。这使得能够基于以下假设来设计用于处理特定分片的消费端:具有相同分区键的记录只会发送给该消费端,具有相同分区键的任何记录都不会在任何其他消费端处结束。因此,消费端的工作程序可聚合具有相同分区键的所有记录而不用担心丢失所需的数据。
在此应用程序中,消费端对记录的处理并不集中,因此您可以使用一个分片并在与 KCL 线程相同的线程中执行处理。但在实际应用中,请先考虑增加分片数量。在某些情况下,您可能需要将处理切换到其他线程或需要使用线程池(如果您的记录处理应是集中的)。这样一来,KCL 可以更快地提取新记录,而其他线程可并行处理记录。多线程设计并不是无关紧要的,应使用先进技术来实现,因此增加分片计数通常是最有效的纵向扩展方法。