步骤 1. 创建输入和输出流 - Amazon Kinesis Data Analytics for SQL 应用程序开发人员指南
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

如果我们为英文版本指南提供翻译,那么如果存在任何冲突,将以英文版本指南为准。在提供翻译时使用机器翻译。

步骤 1. 创建输入和输出流

在为热点示例创建 Amazon Kinesis Data Analytics 应用程序之前,您创建两个 Kinesis 数据流。将一个流配置为应用程序的流式传输源,并将另一个流配置为目标(Kinesis Data Analytics 在其中永久保存应用程序输出)。

步骤1.1: 创建 Kinesis 数据流

在本节中,您创建两个 Kinesis 数据流: ExampleInputStreamExampleOutputStream.

使用控制台或 AWS CLI 创建这些数据流。

  • 使用控制台创建数据流:

    1. 登录 AWS 管理控制台并通过以下网址打开 Kinesis 控制台:https://console.amazonaws.cn/kinesis

    2. 在导航窗格中,选择 Data Streams (数据流)

    3. 选择创建 Kinesis 流,然后创建带有一个名为 ExampleInputStream 的分片的流。

    4. 重复上一步骤以创建带有一个名为 ExampleOutputStream 的分片的流。

  • 要使用 AWS CLI 创建数据流,请执行以下操作:

    • 使用以下 Kinesis create-stream AWS CLI 命令创建流(ExampleInputStreamExampleOutputStream)。要创建另一个流 (应用程序将用于写入输出),请运行同一命令以将流名称更改为 ExampleOutputStream

      $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser

步骤1.2: 将示例记录写入输入流

在此步骤中,您运行 Python 代码以持续生成示例记录并将其写入 ExampleInputStream 流。

{"x": 7.921782426109737, "y": 8.746265312709893, "is_hot": "N"} {"x": 0.722248626580026, "y": 4.648868803193405, "is_hot": "Y"}
  1. 安装 Python 和 pip

    有关安装 Python 的信息,请访问 Python 网站。

    您可以使用 pip 安装依赖项。有关安装 pip 的信息,请参阅 pip 网站上的安装

  2. 运行以下 Python 代码。此代码将执行以下操作:

    • 在 (X, Y) 平面上的某个位置生成潜在热点。

    • 为每个热点生成一系列点 (1000 个)。这些点中有 20% 集中在热点周围。其余的点在整个空间内随机生成。

    • put-record 命令将 JSON 记录写入到流。

    重要

    请勿将此文件上传到 Web 服务器,因为它包含您的 AWS 凭证。

    import boto3 import json import time from random import random # Modify this section to reflect your AWS configuration. awsRegion = "" # The AWS region where your Kinesis Analytics application is configured. accessKeyId = "" # Your AWS Access Key ID secretAccessKey = "" # Your AWS Secret Access Key inputStream = "ExampleInputStream" # The name of the stream being used as input into the Kinesis Analytics hotspots application # Variables that control properties of the generated data. xRange = [0, 10] # The range of values taken by the x-coordinate yRange = [0, 10] # The range of values taken by the y-coordinate hotspotSideLength = 1 # The side length of the hotspot hotspotWeight = 0.2 # The fraction ofpoints that are draw from the hotspots def generate_point_in_rectangle(x_min, width, y_min, height): """Generate points uniformly in the given rectangle.""" return { 'x': x_min + random() * width, 'y': y_min + random() * height } class RecordGenerator(object): """A class used to generate points used as input to the hotspot detection algorithm. With probability hotspotWeight, a point is drawn from a hotspot, otherwise it is drawn from the base distribution. The location of the hotspot changes after every 1000 points generated.""" def __init__(self): self.x_min = xRange[0] self.width = xRange[1] - xRange[0] self.y_min = yRange[0] self.height = yRange[1] - yRange[0] self.points_generated = 0 self.hotspot_x_min = None self.hotspot_y_min = None def get_record(self): if self.points_generated % 1000 == 0: self.update_hotspot() if random() < hotspotWeight: record = generate_point_in_rectangle(self.hotspot_x_min, hotspotSideLength, self.hotspot_y_min, hotspotSideLength) record['is_hot'] = 'Y' else: record = generate_point_in_rectangle(self.x_min, self.width, self.y_min, self.height) record['is_hot'] = 'N' self.points_generated += 1 data = json.dumps(record) return {'Data': bytes(data, 'utf-8'), 'PartitionKey': 'partition_key'} def get_records(self, n): return [self.get_record() for _ in range(n)] def update_hotspot(self): self.hotspot_x_min = self.x_min + random() * (self.width - hotspotSideLength) self.hotspot_y_min = self.y_min + random() * (self.height - hotspotSideLength) def main(): kinesis = boto3.client('kinesis') generator = RecordGenerator() batch_size = 10 while True: records = generator.get_records(batch_size) print(records) kinesis.put_records(StreamName="ExampleInputStream", Records=records) time.sleep(0.1) if __name__ == "__main__": main()

下一步

步骤 2. 创建 Kinesis Data Analytics 应用程序