步骤 1:准备数据 - Amazon Kinesis Data Analytics 开发者指南
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

对于新项目,我们建议您使用新的 Kinesis Data Analytics 工作室,而不是 SQL 应用程序的 Kinesis Data Analytics。Kinesis Data Analytics Studio 将易用性与高级分析功能相结合,使您能够在几分钟内构建复杂的流处理应用程序。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

步骤 1:准备数据

在本节中,您将创建 Kinesis 数据流,然后在该数据流上填充订单和交易记录。此式源将用于下一步创建的应用程序。

步骤 1.1:创建流式传输源

您可以使用控制台或创建 Kinesis 数据流Amazon CLI。本示例采用 OrdersAndTradesStream 作为流名称。

  • 使用控制台:https://console.aws.amazon.com/kinesis 登录Amazon Web Services Management Console,打开 Kinesis 控制台。选择 Data Streams,然后创建带有一个分片的流。有关更多信息,请参阅 Amazon Kinesis Dat a Streams 开发人员指南中的创建流Configuram s。

  • 使用Amazon CLI — 使用以下 Kinesiscreate-streamAmazon CLI 命令创建直播:

    $ aws kinesis create-stream \ --stream-name OrdersAndTradesStream \ --shard-count 1 \ --region us-east-1 \ --profile adminuser

步骤 1.2:填充流式传输源

运行以下 Python 脚本以便在 OrdersAndTradesStream 中填充示例记录。如果您使用其他名称创建了流,请相应更新 Python 代码。

  1. 安装 Python 和 pip

    有关安装 Python 的信息,请访问 Python 网站。

    您可以使用 pip 安装依赖项。有关安装 pip 的信息,请参阅 pip 网站上的安装

  2. 运行以下 Python 代码。代码中的 put-record 命令将 JSON 记录写入到流。

    import json import random import boto3 STREAM_NAME = "OrdersAndTradesStream" PARTITION_KEY = "partition_key" def get_order(order_id, ticker): return { 'RecordType': 'Order', 'Oid': order_id, 'Oticker': ticker, 'Oprice': random.randint(500, 10000), 'Otype': 'Sell'} def get_trade(order_id, trade_id, ticker): return { 'RecordType': "Trade", 'Tid': trade_id, 'Toid': order_id, 'Tticker': ticker, 'Tprice': random.randint(0, 3000)} def generate(stream_name, kinesis_client): order_id = 1 while True: ticker = random.choice(['AAAA', 'BBBB', 'CCCC']) order = get_order(order_id, ticker) print(order) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(order), PartitionKey=PARTITION_KEY) for trade_id in range(1, random.randint(0, 6)): trade = get_trade(order_id, trade_id, ticker) print(trade) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(trade), PartitionKey=PARTITION_KEY) order_id += 1 if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis'))

下一个步骤

步骤 2:创建 应用程序