步骤 1:准备数据 - 适用于 Amazon Kinesis Data Analytics·for·SQL 应用程序开发人员指南
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

对于新项目,建议您使用新的适用于 Apache Flink Studio 的托管服务,而不是使用 Kinesis Data Analytics for SQL 应用程序。Managed Service for Apache Flink Studio 不仅操作简单,还具有高级分析功能,使您能够在几分钟内构建复杂的流处理应用程序。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

步骤 1:准备数据

在此部分中,您创建一个 Kinesis 数据流,然后在该流上填充订单和交易记录。此式源将用于下一步创建的应用程序。

步骤 1.1:创建流式传输源

可以使用控制台或 Amazon CLI创建一个 Kinesis 数据流。本示例采用 OrdersAndTradesStream 作为流名称。

步骤 1.2:填充流式传输源

运行以下 Python 脚本以便在 OrdersAndTradesStream 中填充示例记录。如果您使用其他名称创建了流,请相应更新 Python 代码。

  1. 安装 Python 和 pip

    有关安装 Python 的信息,请访问 Python 网站。

    您可以使用 pip 安装依赖项。有关安装 pip 的信息,请参阅 pip 网站上的安装

  2. 运行以下 Python 代码。代码中的 put-record 命令将 JSON 记录写入到流。

    import json import random import boto3 STREAM_NAME = "OrdersAndTradesStream" PARTITION_KEY = "partition_key" def get_order(order_id, ticker): return { "RecordType": "Order", "Oid": order_id, "Oticker": ticker, "Oprice": random.randint(500, 10000), "Otype": "Sell", } def get_trade(order_id, trade_id, ticker): return { "RecordType": "Trade", "Tid": trade_id, "Toid": order_id, "Tticker": ticker, "Tprice": random.randint(0, 3000), } def generate(stream_name, kinesis_client): order_id = 1 while True: ticker = random.choice(["AAAA", "BBBB", "CCCC"]) order = get_order(order_id, ticker) print(order) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(order), PartitionKey=PARTITION_KEY ) for trade_id in range(1, random.randint(0, 6)): trade = get_trade(order_id, trade_id, ticker) print(trade) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(trade), PartitionKey=PARTITION_KEY, ) order_id += 1 if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))

下一个步骤

步骤 2:创建应用程序