步骤 1:准备数据 - 适用于 SQL 应用程序的 Amazon Kinesis Data Analytics 开发人员指南
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

对于新项目,建议您使用新的适用于 Apache Flink Studio 的托管服务,而不是使用适用于 SQL 应用程序的 Kinesis Data Analytics。Managed Service for Apache Flink Studio 不仅操作简单,还具有高级分析功能,使您能够在几分钟内构建复杂的流处理应用程序。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

步骤 1:准备数据

在为此示例创建 Amazon Kinesis Data Analytics 应用程序前,需要先创建一个 Kinesis 数据流,以作为应用程序的流式传输源。另外,您还需运行 Python 代码来将模拟血压数据写入流中。

步骤 1.1:创建 Kinesis 数据流

在此部分中,您创建一个名为 ExampleInputStream 的 Kinesis 数据流。您可以使用 Amazon Web Services Management Console或 Amazon CLI 创建该数据流。

  • 使用控制台:

    1. 登录到 Amazon Web Services Management Console,然后通过以下网址打开 Kinesis 控制台:https://console.aws.amazon.com/kinesisvideo/home

    2. 在导航窗格中,选择 数据流。然后选择创建 Kinesis 流

    3. 对于名称,请键入 ExampleInputStream。对于分片数,请键入 1

  • 或者,要使用 Amazon CLI 创建数据流,请运行以下命令:

    $ aws kinesis create-stream --stream-name ExampleInputStream --shard-count 1

步骤 1.2:将示例记录写入输入流

在此步骤中,您运行 Python 代码以不断生成示例记录并将其写入您创建的数据流中。

  1. 安装 Python 和 pip。

    有关安装 Python 的信息,请参阅 Python

    您可以使用 pip 安装依赖项。有关安装 pip 的信息,请参阅 pip 文档中的安装

  2. 运行以下 Python 代码。可以将区域改为您要用于此示例的区域。代码中的 put-record 命令将 JSON 记录写入到流。

    from enum import Enum import json import random import boto3 STREAM_NAME = "ExampleInputStream" class PressureType(Enum): low = "LOW" normal = "NORMAL" high = "HIGH" def get_blood_pressure(pressure_type): pressure = {"BloodPressureLevel": pressure_type.value} if pressure_type == PressureType.low: pressure["Systolic"] = random.randint(50, 80) pressure["Diastolic"] = random.randint(30, 50) elif pressure_type == PressureType.normal: pressure["Systolic"] = random.randint(90, 120) pressure["Diastolic"] = random.randint(60, 80) elif pressure_type == PressureType.high: pressure["Systolic"] = random.randint(130, 200) pressure["Diastolic"] = random.randint(90, 150) else: raise TypeError return pressure def generate(stream_name, kinesis_client): while True: rnd = random.random() pressure_type = ( PressureType.low if rnd < 0.005 else PressureType.high if rnd > 0.995 else PressureType.normal ) blood_pressure = get_blood_pressure(pressure_type) print(blood_pressure) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(blood_pressure), PartitionKey="partitionkey", ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
下一个步骤

步骤 2:创建分析应用程序