对于新项目,我们建议您使用新的 Kinesis Data Analytics 工作室,而不是 SQL 应用程序的 Kinesis Data Analytics。Kinesis Data Analytics Studio 将易用性与高级分析功能相结合,使您能够在几分钟内构建复杂的流处理应用程序。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
步骤 1:准备数据
在本节中,您将创建 Kinesis 数据流,然后在该数据流上填充订单和交易记录。此式源将用于下一步创建的应用程序。
步骤 1.1:创建流式传输源
您可以使用控制台或创建 Kinesis 数据流Amazon CLI。本示例采用 OrdersAndTradesStream
作为流名称。
-
使用控制台:https://console.aws.amazon.com/kinesis 登录Amazon Web Services Management Console,打开 Kinesis 控制台:
。选择 Data Streams,然后创建带有一个分片的流。有关更多信息,请参阅 Amazon Kinesis Dat a Streams 开发人员指南中的创建流Configuram s。 -
使用Amazon CLI — 使用以下 Kinesis
create-stream
Amazon CLI 命令创建直播:$ aws kinesis create-stream \ --stream-name
OrdersAndTradesStream
\ --shard-count 1 \ --region us-east-1 \ --profile adminuser
步骤 1.2:填充流式传输源
运行以下 Python 脚本以便在 OrdersAndTradesStream
中填充示例记录。如果您使用其他名称创建了流,请相应更新 Python 代码。
-
安装 Python 和
pip
。有关安装 Python 的信息,请访问 Python
网站。 您可以使用 pip 安装依赖项。有关安装 pip 的信息,请参阅 pip 网站上的安装
。 -
运行以下 Python 代码。代码中的
put-record
命令将 JSON 记录写入到流。import json import random import boto3 STREAM_NAME = "OrdersAndTradesStream" PARTITION_KEY = "partition_key" def get_order(order_id, ticker): return { 'RecordType': 'Order', 'Oid': order_id, 'Oticker': ticker, 'Oprice': random.randint(500, 10000), 'Otype': 'Sell'} def get_trade(order_id, trade_id, ticker): return { 'RecordType': "Trade", 'Tid': trade_id, 'Toid': order_id, 'Tticker': ticker, 'Tprice': random.randint(0, 3000)} def generate(stream_name, kinesis_client): order_id = 1 while True: ticker = random.choice(['AAAA', 'BBBB', 'CCCC']) order = get_order(order_id, ticker) print(order) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(order), PartitionKey=PARTITION_KEY) for trade_id in range(1, random.randint(0, 6)): trade = get_trade(order_id, trade_id, ticker) print(trade) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(trade), PartitionKey=PARTITION_KEY) order_id += 1 if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis'))