第 4 步:实施创建器 - Amazon Kinesis Data Streams
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

第 4 步:实施创建器

此教程使用股票市场交易监控的实际场景。以下准则简要说明了此场景如何映射到创建器及其支持的代码结构。

请参阅源代码并查看以下信息。

StockTrade 类

单次股票交易由 StockTrade 类的一个实例表示。此实例包含一些属性,如股票代号、价格、股份数、交易类型(买入或卖出)以及唯一标识交易的 ID。将为您实现此类。

流记录

流是一个记录序列。记录是 JSON 格式的 StockTrade 实例序列化。例如:

{ "tickerSymbol": "AMZN", "tradeType": "BUY", "price": 395.87, "quantity": 16, "id": 3567129045 }
StockTradeGenerator 类

StockTradeGenerator 包含一个名为 getRandomTrade() 的方法,当调用此方法时,它将返回一个随机生成的新股票交易。将为您实现此类。

StockTradesWriter 类

创建器的 main 方法 StockTradesWriter 将持续检索随机交易,然后通过执行以下任务将该交易发送到 Kinesis Data Streams:

  1. 将数据流名称和区域名称作为输入读取。

  2. 使用 KinesisAsyncClientBuilder 来设置区域、凭证和客户端配置。

  3. 检查流是否存在且处于活动状态 (如果不是这样,它将退出并显示错误)。

  4. 在连续循环中,会依次调用 StockTradeGenerator.getRandomTrade() 方法和 sendStockTrade 方法以便每 100 毫秒将交易发送到流一次。

sendStockTrade 类的 StockTradesWriter 方法具有以下代码:

private static void sendStockTrade(StockTrade trade, KinesisAsyncClient kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library. if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; } LOG.info("Putting trade: " + trade.toString()); PutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build(); try { kinesisClient.putRecord(request).get(); } catch (InterruptedException e) { LOG.info("Interrupted, assuming shutdown."); } catch (ExecutionException e) { LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e); } }

请参阅以下代码细分:

  • PutRecord API 需要一个字节数组,并且您需要将交易转换为 JSON 格式。此行代码将执行该操作:

    byte[] bytes = trade.toJsonAsBytes();
  • 您需要先创建新的 PutRecordRequest 实例(此示例中称为请求),然后才能发送交易。每个 request 均需要流名称、分区键和数据 Blob。

    PutPutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build();

    该示例使用股票行情自动收录器作为将记录映射到特定分片的分区键。实际上,每个分片应具有数百或数千个分区键,以便记录均匀地分布在流中。有关如何将数据添加到流的更多信息,请参阅 将数据写入 Amazon Kinesis Data Streams

    现在 request 已准备好发送到客户端(put 操作):

    kinesisClient.putRecord(request).get();
  • 错误检查和日志记录始终是有用的附加功能。此代码将记录错误条件:

    if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; }

    围绕 put 操作添加 try/catch 块:

    try { kinesisClient.putRecord(request).get(); } catch (InterruptedException e) { LOG.info("Interrupted, assuming shutdown."); } catch (ExecutionException e) { LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e); }

    这是因为 Kinesis 数据流 put 操作可能因网络错误或数据流达到其吞吐量限额并受到限制而导致失败。建议您仔细考虑针对 put 操作的重试策略以避免数据丢失(如用作简单重试)。

  • 状态日志记录很有用,但它是可选的:

    LOG.info("Putting trade: " + trade.toString());

此处显示的创建器使用 Kinesis Data Streams API 单记录功能 PutRecord。实际上,如果单个创建者生成许多记录,则使用 PutRecords 的多记录功能并一次性发送批量记录通常会更有效。有关更多信息,请参阅将数据写入 Amazon Kinesis Data Streams

运行创建器
  1. 验证在步骤 2:创建 IAM policy 和用户中检索到的访问密钥和私有密钥对是否保存到文件 ~/.aws/credentials 中。

  2. 使用以下参数运行 StockTradeWriter 类:

    StockTradeStream us-west-2

    如果您在 us-west-2 之外的区域中创建流,则必须改为在此处指定该区域。

您应该可以看到类似于如下所示的输出内容:

Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18 Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85 Feb 16, 2015 3:53:01 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08

您的股票交易现在正由 Kinesis Data Streams 摄取。

后续步骤

第 5 步:实施使用者