第 1 步:准备 - Amazon Kinesis Data Analytics 开发人员指南
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

第 1 步:准备

对于本练习,在创建 Amazon Kinesis Data Analytics Kinesis 将一个流配置为应用程序的流式传输源,并将另一个流配置为 Kinesis Data Analytics 将应用程序输出永久保存到的目标。

步骤 1.1:创建输入和输出数据流

在本部分中,您创建两个 Kinesis sis sis sis sis sis sis sis sis sis sis sis sisExampleInputStreamExampleOutputStream。您可以使用 Amazon Web Services Management Console或 Amazon CLI 创建这些流。

  • 要使用 控制台

    1. 登录到Amazon Web Services Management Console,然后打开 Kinesis 控制台https://console.aws.amazon.com/kinesis

    2. 选择创建数据流。创建带有一个名为 ExampleInputStream 的分片的流。有关更多信息,请参阅 。创建流中的Amazon Kinesis Data Streams 开发人员指南

    3. 重复上一步骤以创建带有一个名为 ExampleOutputStream 的分片的流。

  • 使用 Amazon CLI

    1. 使用以下 Kinesiscreate-stream Amazon CLI命令创建第一个流 ()ExampleInputStream)。

      $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1 \ --profile adminuser
    2. 运行同一命令,同时将流名称更改为 ExampleOutputStream。此命令创建应用程序用来写入输出的第二个流。

步骤 1.2:将示例记录写入输入流

在此步骤中,您运行 Python 代码以持续生成示例记录,并将这些记录写入 ExampleInputStream 流。

{"heartRate": 60, "rateType":"NORMAL"} ... {"heartRate": 180, "rateType":"HIGH"}
  1. 安装 Python 和 pip

    有关安装 Python 的信息,请访问 Python 网站。

    您可以使用 pip 安装依赖项。有关安装 pip 的信息,请参阅 pip 网站上的安装

  2. 运行以下 Python 代码。代码中的 put-record 命令将 JSON 记录写入到流。

    from enum import Enum import json import random import boto3 STREAM_NAME = 'ExampleInputStream' class RateType(Enum): normal = 'NORMAL' high = 'HIGH' def get_heart_rate(rate_type): if rate_type == RateType.normal: rate = random.randint(60, 100) elif rate_type == RateType.high: rate = random.randint(150, 200) else: raise TypeError return {'heartRate': rate, 'rateType': rate_type.value} def generate(stream_name, kinesis_client, output=True): while True: rnd = random.random() rate_type = RateType.high if rnd < 0.01 else RateType.normal heart_rate = get_heart_rate(rate_type) if output: print(heart_rate) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(heart_rate), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis'))

下一步

第 2 步:创建应用程序