第 5 步:实施使用者 - Amazon Kinesis Data Streams
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

第 5 步:实施使用者

本教程中的使用者应用程序持续处理您数据流中的股票交易。然后,它输出每分钟买入和卖出最多的股票。该应用程序基于 Kinesis Client Library(KCL)构建,后者需要完成对使用器应用程序常见的大量繁重工作。有关更多信息,请参阅使用 Kinesis Client Library

请参阅源代码并查看以下信息。

StockTradesProcessor 类

为您提供的使用者的主类,它将执行以下任务:

  • 读取作为参数传递的应用程序名称、数据流名称和区域名称。

  • 使用区域名称创建 KinesisAsyncClient 实例。

  • 创建一个 StockTradeRecordProcessorFactory 实例,该实例提供由 ShardRecordProcessor 实例实施的 StockTradeRecordProcessor 的实例。

  • 使用 KinesisAsyncClientStreamNameApplicationNameStockTradeRecordProcessorFactory 实例创建 ConfigsBuilder 实例。这对于创建具有默认值的所有配置非常有用。

  • 使用 ConfigsBuilder 实例创建一个 KCL 计划程序(以前在 KCL 版本 1.x 中称为 KCL 工作线程)。

  • 此计划程序为每个分片(已分配给此使用者实例)创建一个线程,以持续循环从数据量读取记录。之后,它调用 StockTradeRecordProcessor 实例以处理收到的每批记录。

StockTradeRecordProcessor 类

StockTradeRecordProcessor 实例的实施,该实例反过来将实施五个必需方法:initializeprocessRecordsleaseLostshardEndedshutdownRequested

initializeshutdownRequested 方法由 KCL 使用,旨在让记录处理器分别了解何时应准备好开始接收记录,以及何时应停止接收记录,因此该方法可以执行任何特定于应用程序的设置和终止任务。leaseLostshardEnded 用于实施当租约丢失或处理达到分片末尾时需执行的操作的任何逻辑。在此示例中,我们只记录指示这些事件的消息。

将为您提供这些方法的代码。processRecords 方法中进行的主要处理,该处理反过来对每条记录使用 processRecord。后一个方法作为大体为空的框架代码提供给您,以便您在下一步骤中实施,届时将更详细地对其进行说明。

另外要注意的是对 processRecord 的支持方法 reportStatsresetStats 的实施,二者在初始源代码中为空。

已为您实施 processRecords 方法,并执行了以下步骤:

  • 对于传入的每条记录,它会对其调用 processRecord

  • 如果自上一次报告以来已过去至少 1 分钟,请调用 reportStats()(它将打印出最新统计数据),然后调用 resetStats()(它将清除统计数据以便下一个间隔仅包含新记录)。

  • 设置下一次报告时间。

  • 如果自上一检查点以来已过去至少 1 分钟,请调用 checkpoint()

  • 设置下一次检查点操作时间。

此方法使用 60 秒间隔作为报告和检查点操作比率。有关检查点操作的更多信息,请参阅 Using the Kinesis Client Library

StockStats 类

此类提供一段时间内针对最热门股票的数据保留和统计数据跟踪。此代码已提供给您并包含以下方法:

  • addStockTrade(StockTrade):将给定的 StockTrade 注入正在使用的统计数据。

  • toString():以格式化字符串形式返回统计数据。

此类跟踪最热门股票的方式是,保留每只股票的总交易数的连续计数和最大计数。每当股票交易达成时,它都会更新这些计数。

将代码添加到 StockTradeRecordProcessor 类的方法,如以下步骤中所示。

实施使用者
  1. 通过实例化大小正确的 processRecord 对象并将记录数据添加到该对象来实施 StockTrade 方法,并在出现问题时记录警告。

    byte[] arr = new byte[record.data().remaining()]; record.data().get(arr); StockTrade trade = StockTrade.fromJsonAsBytes(arr); if (trade == null) { log.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.partitionKey()); return; } stockStats.addStockTrade(trade);
  2. 实施简单的 reportStats 方法。可随时将输出格式修改为适合您的首选格式。

    System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" + stockStats + "\n" + "****************************************************************\n");
  3. 实施 resetStats 方法,这将创建新的 stockStats 实例。

    stockStats = new StockStats();
  4. 实施 ShardRecordProcessor 接口所需的以下方法

    @Override public void leaseLost(LeaseLostInput leaseLostInput) { log.info("Lost lease, so terminating."); } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { log.info("Reached shard end checkpointing."); shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at shard end. Giving up.", e); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { log.info("Scheduler is shutting down, checkpointing."); checkpoint(shutdownRequestedInput.checkpointer()); } private void checkpoint(RecordProcessorCheckpointer checkpointer) { log.info("Checkpointing shard " + kinesisShardId); try { checkpointer.checkpoint(); } catch (ShutdownException se) { // Ignore checkpoint if the processor instance has been shutdown (fail over). log.info("Caught shutdown exception, skipping checkpoint.", se); } catch (ThrottlingException e) { // Skip checkpoint when throttled. In practice, consider a backoff and retry policy. log.error("Caught throttling exception, skipping checkpoint.", e); } catch (InvalidStateException e) { // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS). log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e); } }
运行使用者
  1. 运行您在 第 4 步:实施创建器 中编写的创建者以将模拟股票交易记录引入流中。

  2. 验证之前(在创建 IAM 用户时)检索到的访问密钥和私有密钥对是否保存到文件 ~/.aws/credentials 中。

  3. 使用以下参数运行 StockTradesProcessor 类:

    StockTradesProcessor StockTradeStream us-west-2

    请注意,如果您在 us-west-2 之外的区域中创建流,则必须改为在此处指定该区域。

1 分钟后,您应看到类似以下内容的输出,并且输出在此后每分钟刷新一次:

****** Shard shardId-000000000001 stats for last 1 minute ****** Most popular stock being bought: WMT, 27 buys. Most popular stock being sold: PTR, 14 sells. ****************************************************************

后续步骤

第 6 步:(可选) 扩展使用者