步骤 1:准备 - 适用于 SQL 应用程序的 Amazon Kinesis Data Analytics 开发人员指南
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

对于新项目,建议您使用新的适用于 Apache Flink Studio 的托管服务,而不是使用适用于 SQL 应用程序的 Kinesis Data Analytics。Managed Service for Apache Flink Studio 不仅操作简单,还具有高级分析功能,使您能够在几分钟内构建复杂的流处理应用程序。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

步骤 1:准备

在为本练习创建 Amazon Kinesis Data Analytics 应用程序之前,您必须创建两个 Kinesis 数据流。将一个流配置为应用程序的流式传输源,并将另一个流配置为目标(Kinesis Data Analytics 在其中永久保存应用程序输出)。

步骤 1.1:创建输入和输出数据流

在此部分中,您创建两个 Kinesis 流:ExampleInputStreamExampleOutputStream。您可以使用 Amazon Web Services Management Console或 Amazon CLI 创建这些流。

  • 要使用 控制台
    1. 登录到 Amazon Web Services Management Console,然后通过以下网址打开 Kinesis 控制台:https://console.aws.amazon.com/kinesisvideo/home

    2. 选择创建数据流。创建带有一个名为 ExampleInputStream 的分片的流。有关更多信息,请参阅 Amazon Kinesis Data Streams 开发人员指南中的创建流

    3. 重复上一步骤以创建带有一个名为 ExampleOutputStream 的分片的流。

  • 使用 Amazon CLI
    1. 使用下面的 Kinesis create-stream Amazon CLI 命令创建第一个流 (ExampleInputStream)。

      $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1 \ --profile adminuser
    2. 运行同一命令,同时将流名称更改为 ExampleOutputStream。此命令创建应用程序用来写入输出的第二个流。

步骤 1.2:将示例记录写入输入流

在此步骤中,您运行 Python 代码以持续生成示例记录,并将这些记录写入 ExampleInputStream 流。

{"heartRate": 60, "rateType":"NORMAL"} ... {"heartRate": 180, "rateType":"HIGH"}
  1. 安装 Python 和 pip

    有关安装 Python 的信息,请访问 Python 网站。

    您可以使用 pip 安装依赖项。有关安装 pip 的信息,请参阅 pip 网站上的安装

  2. 运行以下 Python 代码。代码中的 put-record 命令将 JSON 记录写入到流。

    from enum import Enum import json import random import boto3 STREAM_NAME = "ExampleInputStream" class RateType(Enum): normal = "NORMAL" high = "HIGH" def get_heart_rate(rate_type): if rate_type == RateType.normal: rate = random.randint(60, 100) elif rate_type == RateType.high: rate = random.randint(150, 200) else: raise TypeError return {"heartRate": rate, "rateType": rate_type.value} def generate(stream_name, kinesis_client, output=True): while True: rnd = random.random() rate_type = RateType.high if rnd < 0.01 else RateType.normal heart_rate = get_heart_rate(rate_type) if output: print(heart_rate) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(heart_rate), PartitionKey="partitionkey", ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))

下一个步骤

步骤 2:创建应用程序