本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
第 1 步:准备数据
在本部分中,您将创建 Kinesis Data Streams,然后在该流上填充订单和交易记录。此式源将用于下一步创建的应用程序。
步骤 1.1:创建流式传输源
可以使用控制台或创建 Kinesis Data StreamsAmazon CLI. 本示例采用 OrdersAndTradesStream
作为流名称。
-
使用控制台— 登录到Amazon Web Services Management Console在处打开 Kinesis 控制台https://console.aws.amazon.com/kinesis
. 选择 Data Streams,然后创建带有一个分片的流。有关更多信息,请参阅 。创建流中的Amazon Kinesis Data Streams 开发人员指南. -
使用Amazon CLI— 使用以下 Kinesis
create-stream
Amazon CLI创建流的命令:$ aws kinesis create-stream \ --stream-name
OrdersAndTradesStream
\ --shard-count 1 \ --region us-east-1 \ --profile adminuser
步骤 1.2:填充流式传输源
运行以下 Python 脚本以便在 OrdersAndTradesStream
中填充示例记录。如果您使用其他名称创建了流,请相应更新 Python 代码。
-
安装 Python 和
pip
。有关安装 Python 的信息,请访问 Python
网站。 您可以使用 pip 安装依赖项。有关安装 pip 的信息,请参阅 pip 网站上的安装
。 -
运行以下 Python 代码。代码中的
put-record
命令将 JSON 记录写入到流。import json import random import boto3 STREAM_NAME = "OrdersAndTradesStream" PARTITION_KEY = "partition_key" def get_order(order_id, ticker): return { 'RecordType': 'Order', 'Oid': order_id, 'Oticker': ticker, 'Oprice': random.randint(500, 10000), 'Otype': 'Sell'} def get_trade(order_id, trade_id, ticker): return { 'RecordType': "Trade", 'Tid': trade_id, 'Toid': order_id, 'Tticker': ticker, 'Tprice': random.randint(0, 3000)} def generate(stream_name, kinesis_client): order_id = 1 while True: ticker = random.choice(['AAAA', 'BBBB', 'CCCC']) order = get_order(order_id, ticker) print(order) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(order), PartitionKey=PARTITION_KEY) for trade_id in range(1, random.randint(0, 6)): trade = get_trade(order_id, trade_id, ticker) print(trade) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(trade), PartitionKey=PARTITION_KEY) order_id += 1 if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis'))
下一个步骤