第 1 步:准备数据 - Amazon Kinesis Data Analytics 开发人员指南
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

第 1 步:准备数据

在本部分中,您将创建 Kinesis 数据流,并在其中填充订单和交易记录。此式源将用于下一步创建的应用程序。

步骤 1.1:创建流式传输源

您可以使用控制台或创建 Kinesis Data 流的Amazon CLI。本示例采用 OrdersAndTradesStream 作为流名称。

  • 使用控制台— 登录到Amazon Web Services Management Console,然后打开 Kinesis 控制台https://console.aws.amazon.com/kinesis。选择 Data Streams,然后创建带有一个分片的流。有关更多信息,请参阅 。创建流中的Amazon Kinesis Data Streams 开发者指南

  • 使用Amazon CLI— 使用以下 Kinesiscreate-stream Amazon CLI命令创建流:

    $ aws kinesis create-stream \ --stream-name OrdersAndTradesStream \ --shard-count 1 \ --region us-east-1 \ --profile adminuser

步骤 1.2:填充流式传输源

运行以下 Python 脚本以便在 OrdersAndTradesStream 中填充示例记录。如果您使用其他名称创建了流,请相应更新 Python 代码。

  1. 安装 Python 和 pip

    有关安装 Python 的信息,请访问 Python 网站。

    您可以使用 pip 安装依赖项。有关安装 pip 的信息,请参阅 pip 网站上的安装

  2. 运行以下 Python 代码。代码中的 put-record 命令将 JSON 记录写入到流。

    import json import random import boto3 STREAM_NAME = "OrdersAndTradesStream" PARTITION_KEY = "partition_key" def get_order(order_id, ticker): return { 'RecordType': 'Order', 'Oid': order_id, 'Oticker': ticker, 'Oprice': random.randint(500, 10000), 'Otype': 'Sell'} def get_trade(order_id, trade_id, ticker): return { 'RecordType': "Trade", 'Tid': trade_id, 'Toid': order_id, 'Tticker': ticker, 'Tprice': random.randint(0, 3000)} def generate(stream_name, kinesis_client): order_id = 1 while True: ticker = random.choice(['AAAA', 'BBBB', 'CCCC']) order = get_order(order_id, ticker) print(order) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(order), PartitionKey=PARTITION_KEY) for trade_id in range(1, random.randint(0, 6)): trade = get_trade(order_id, trade_id, ticker) print(trade) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(trade), PartitionKey=PARTITION_KEY) order_id += 1 if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis'))

下一步

第 2 步:创建 应用程序