第 1 步:准备数据 - Amazon Kinesis Data Analytics 开发人员指南
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

第 1 步:准备数据

在为此创建 Amazon Kinesis Data Analytics 应用程序示例中,您创建 Kinesis sis sis sis sis Data 流,以作为应用程序的流式传输源。另外,您还需运行 Python 代码来将模拟血压数据写入流中。

步骤 1.1:创建 Kinesis Data Streams

在本部分中,您创建一个 Kinesis sis sis sis sis Data 流,名为ExampleInputStream。您可以使用 Amazon Web Services Management Console或 Amazon CLI 创建该数据流。

  • 使用控制台:

    1. 登录到Amazon Web Services Management Console,然后打开 Kinesis 控制台https://console.aws.amazon.com/kinesis

    2. 在导航窗格中,选择 Data Streams (数据流)。然后选择创建 Kinesis 流

    3. 对于名称,请键入 ExampleInputStream。对于分片数,请键入 1

  • 或者,要使用 Amazon CLI 创建数据流,请运行以下命令:

    $ aws kinesis create-stream --stream-name ExampleInputStream --shard-count 1

步骤 1.2:将示例记录写入输入流

在此步骤中,您运行 Python 代码以不断生成示例记录并将其写入您创建的数据流中。

  1. 安装 Python 和 pip。

    有关安装 Python 的信息,请参阅 Python

    您可以使用 pip 安装依赖项。有关安装 pip 的信息,请参阅 pip 文档中的安装

  2. 运行以下 Python 代码。可以将区域改为您要用于此示例的区域。代码中的 put-record 命令将 JSON 记录写入到流。

    from enum import Enum import json import random import boto3 STREAM_NAME = "ExampleInputStream" class PressureType(Enum): low = 'LOW' normal = 'NORMAL' high = 'HIGH' def get_blood_pressure(pressure_type): pressure = {'BloodPressureLevel': pressure_type.value} if pressure_type == PressureType.low: pressure['Systolic'] = random.randint(50, 80) pressure['Diastolic'] = random.randint(30, 50) elif pressure_type == PressureType.normal: pressure['Systolic'] = random.randint(90, 120) pressure['Diastolic'] = random.randint(60, 80) elif pressure_type == PressureType.high: pressure['Systolic'] = random.randint(130, 200) pressure['Diastolic'] = random.randint(90, 150) else: raise TypeError return pressure def generate(stream_name, kinesis_client): while True: rnd = random.random() pressure_type = ( PressureType.low if rnd < 0.005 else PressureType.high if rnd > 0.995 else PressureType.normal) blood_pressure = get_blood_pressure(pressure_type) print(blood_pressure) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(blood_pressure), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis'))

下一步

第 2 步:创建分析应用程序