对于新项目,建议您使用新的适用于 Apache Flink Studio 的托管服务,而不是使用适用于 SQL 应用程序的 Kinesis Data Analytics。Managed Service for Apache Flink Studio 不仅操作简单,还具有高级分析功能,使您能够在几分钟内构建复杂的流处理应用程序。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
步骤 1:准备
在为本练习创建 Amazon Kinesis Data Analytics 应用程序之前,您必须创建两个 Kinesis 数据流。将一个流配置为应用程序的流式传输源,并将另一个流配置为目标(Kinesis Data Analytics 在其中永久保存应用程序输出)。
步骤 1.1:创建输入和输出数据流
在此部分中,您创建两个 Kinesis 流:ExampleInputStream
和 ExampleOutputStream
。您可以使用 Amazon Web Services Management Console或 Amazon CLI 创建这些流。
-
要使用 控制台
登录到 Amazon Web Services Management Console,然后通过以下网址打开 Kinesis 控制台:https://console.aws.amazon.com/kinesisvideo/home
。 -
选择创建数据流。创建带有一个名为
ExampleInputStream
的分片的流。有关更多信息,请参阅 Amazon Kinesis Data Streams 开发人员指南中的创建流。 -
重复上一步骤以创建带有一个名为
ExampleOutputStream
的分片的流。
-
使用 Amazon CLI
-
使用下面的 Kinesis
create-stream
Amazon CLI 命令创建第一个流 (ExampleInputStream
)。$ aws kinesis create-stream \ --stream-name
ExampleInputStream
\ --shard-count 1 \ --region us-east-1 \ --profile adminuser -
运行同一命令,同时将流名称更改为
ExampleOutputStream
。此命令创建应用程序用来写入输出的第二个流。
-
步骤 1.2:将示例记录写入输入流
在此步骤中,您运行 Python 代码以持续生成示例记录,并将这些记录写入 ExampleInputStream
流。
{"heartRate": 60, "rateType":"NORMAL"} ... {"heartRate": 180, "rateType":"HIGH"}
-
安装 Python 和
pip
。有关安装 Python 的信息,请访问 Python
网站。 您可以使用 pip 安装依赖项。有关安装 pip 的信息,请参阅 pip 网站上的安装
。 -
运行以下 Python 代码。代码中的
put-record
命令将 JSON 记录写入到流。from enum import Enum import json import random import boto3 STREAM_NAME = "ExampleInputStream" class RateType(Enum): normal = "NORMAL" high = "HIGH" def get_heart_rate(rate_type): if rate_type == RateType.normal: rate = random.randint(60, 100) elif rate_type == RateType.high: rate = random.randint(150, 200) else: raise TypeError return {"heartRate": rate, "rateType": rate_type.value} def generate(stream_name, kinesis_client, output=True): while True: rnd = random.random() rate_type = RateType.high if rnd < 0.01 else RateType.normal heart_rate = get_heart_rate(rate_type) if output: print(heart_rate) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(heart_rate), PartitionKey="partitionkey", ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))