第 1 步:创建输入和输出流 - Amazon Kinesis Data Analytics 开发人员指南
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

第 1 步:创建输入和输出流

在您创建 Amazon Kinesis Data Analytics 应用程序前热点示例中,则创建两个 Kinesis 数据流。将一个流配置为应用程序的流式传输源,并将另一个流配置为 Kinesis Data Analytics 将应用程序输出永久保存到的目标。

步骤 1.1:创建 Kinesis Data Streams

在本部分中,您创建两个 Kinesis sis sis sis sis sis Data 流:ExampleInputStreamExampleOutputStream

使用控制台或 Amazon CLI 创建这些数据流。

  • 使用控制台创建数据流:

    1. 登录到Amazon Web Services Management Console,然后打开 Kinesis 控制台https://console.aws.amazon.com/kinesis

    2. 在导航窗格中,选择 Data Streams (数据流)

    3. 选择创建 Kinesis 流,然后创建带有一个名为 ExampleInputStream 的分片的流。

    4. 重复上一步骤以创建带有一个名为 ExampleOutputStream 的分片的流。

  • 要使用 Amazon CLI 创建数据流,请执行以下操作:

    • 创建流 ()ExampleInputStreamExampleOutputStream) 使用以下 Kinesiscreate-stream Amazon CLI命令。要创建另一个流 (应用程序将用于写入输出),请运行同一命令以将流名称更改为 ExampleOutputStream

      $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser

步骤 1.2:将示例记录写入输入流

在此步骤中,您运行 Python 代码以持续生成示例记录并将其写入 ExampleInputStream 流。

{"x": 7.921782426109737, "y": 8.746265312709893, "is_hot": "N"} {"x": 0.722248626580026, "y": 4.648868803193405, "is_hot": "Y"}
  1. 安装 Python 和 pip

    有关安装 Python 的信息,请访问 Python 网站。

    您可以使用 pip 安装依赖项。有关安装 pip 的信息,请参阅 pip 网站上的安装

  2. 运行以下 Python 代码。此代码将执行以下操作:

    • 在 (X, Y) 平面上的某个位置生成潜在热点。

    • 为每个热点生成一系列点 (1000 个)。这些点中有 20% 集中在热点周围。其余的点在整个空间内随机生成。

    • put-record 命令将 JSON 记录写入到流。

    重要

    请勿将此文件上传到 Web 服务器,因为它包含您的Amazon凭证。

    import json from pprint import pprint import random import time import boto3 STREAM_NAME = "ExampleInputStream" def get_hotspot(field, spot_size): hotspot = { 'left': field['left'] + random.random() * (field['width'] - spot_size), 'width': spot_size, 'top': field['top'] + random.random() * (field['height'] - spot_size), 'height': spot_size } return hotspot def get_record(field, hotspot, hotspot_weight): rectangle = hotspot if random.random() < hotspot_weight else field point = { 'x': rectangle['left'] + random.random() * rectangle['width'], 'y': rectangle['top'] + random.random() * rectangle['height'], 'is_hot': 'Y' if rectangle is hotspot else 'N' } return {'Data': json.dumps(point), 'PartitionKey': 'partition_key'} def generate( stream_name, field, hotspot_size, hotspot_weight, batch_size, kinesis_client): """ Generates points used as input to a hotspot detection algorithm. With probability hotspot_weight (20%), a point is drawn from the hotspot; otherwise, it is drawn from the base field. The location of the hotspot changes for every 1000 points generated. """ points_generated = 0 hotspot = None while True: if points_generated % 1000 == 0: hotspot = get_hotspot(field, hotspot_size) records = [ get_record(field, hotspot, hotspot_weight) for _ in range(batch_size)] points_generated += len(records) pprint(records) kinesis_client.put_records(StreamName=stream_name, Records=records) time.sleep(0.1) if __name__ == "__main__": generate( stream_name=STREAM_NAME, field={'left': 0, 'width': 10, 'top': 0, 'height': 10}, hotspot_size=1, hotspot_weight=0.2, batch_size=10, kinesis_client=boto3.client('kinesis'))

下一步

第 2 步:创建 Kinesis Data Analytics 应用程序