示例:在 应用程序中添加引用数据 - 适用于 SQL 应用程序的 Amazon Kinesis Data Analytics 开发人员指南
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

对于新项目,建议您使用新的适用于 Apache Flink Studio 的托管服务,而不是使用适用于 SQL 应用程序的 Kinesis Data Analytics。Managed Service for Apache Flink Studio 不仅操作简单,还具有高级分析功能,使您能够在几分钟内构建复杂的流处理应用程序。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

示例:在 应用程序中添加引用数据

在本练习中,您在现有 数据分析应用程序中添加引用数据。有关引用数据的信息,请参阅以下主题:

在本练习中,您将引用数据添加到在 Kinesis Data Analytics 入门练习中创建的应用程序。引用数据提供每个股票代号对应的公司名称;例如:

Ticker, Company AMZN,Amazon ASD, SomeCompanyA MMB, SomeCompanyB WAS, SomeCompanyC

首先,请完成入门练习中的步骤,以创建一个启动应用程序。然后,执行以下步骤进行设置,并将引用数据添加到应用程序:

  1. 准备数据

    • 将上述参考数据作为对象存储在 Amazon Simple Storage Service (Amazon S3) 中

    • 创建 IAM 角色,Kinesis Data Analytics 可以担任该角色来代表您读取 Amazon S3 对象。

  2. 将引用数据源添加到您的应用程序。

    读取 对象,并创建应用程序内部引用表,您可以在应用程序代码中查询该表。

  3. 测试代码。

    在应用程序代码中,将编写一个联接查询来联接应用程序内部流和应用程序内部引用表,从而获取每个股票代码的对应公司名称。

步骤 1:准备

在此部分中,您将示例引用数据作为对象存储在 存储桶中。您还将创建 IAM 角色, 可担任该角色来代表您读取对象。

将引用数据存储为 Amazon S3 对象。

在这一步中,将示例引用数据存储为 Amazon S3 对象。

  1. 打开文本编辑器,添加以下数据,然后将文件另存为 TickerReference.csv

    Ticker, Company AMZN,Amazon ASD, SomeCompanyA MMB, SomeCompanyB WAS, SomeCompanyC

  2. TickerReference.csv 文件上传到 S3 存储桶。有关说明,请参阅 Amazon Simple Storage Service 用户指南中的上传对象

创建 IAM 角色

接下来,创建一个 Kinesis Data Analytics 可以代入的 IAM 角色并读取 Amazon S3 对象。

  1. 在 Amazon Identity and Access Management (IAM) 中,创建名为 KinesisAnalytics-ReadS3Object 的 IAM 角色。要创建该角色,请按照 IAM 用户指南中的为 Amazon Web Service 创建角色(Amazon Web Services Management Console)中的说明操作。

    在 &IAM; 控制台上,指定以下项:

    • 选择角色类型中,选择 Amazon Lambda。在创建角色后,您将更改信任策略以允许 (而非 )代入该角色。

    • 不要在 Attach Policy 页面上附加任何策略。

  2. 更新 &IAM; 角色策略:

    1. 在 &IAM; 控制台上,选择您创建的角色。

    2. 信任关系选项卡上,更新信任策略以便为 Kinesis Data Analytics 授予权限以代入该角色。下面显示了信任策略:

      { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "kinesisanalytics.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

    3. 权限 选项卡上,附加名为 AmazonS3ReadOnlyAccess 的 Amazon 托管策略。这会为该角色授予权限以读取 对象。下面显示了此策略:

      { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:Get*", "s3:List*" ], "Resource": "*" } ] }

步骤 2:将引用数据源添加到应用程序配置中

在该步骤中,将引用数据源添加到应用程序配置中。要开始操作,需要以下信息:

  • S3 存储桶名称和对象键名称

  • &IAM; 角色的 Amazon 资源名称 (ARN)

  1. 在应用程序主页面中,选择 Connect reference data (连接引用数据)

  2. 连接引用数据来源页面上,选择包含引用数据对象的 Amazon S3 存储桶,并输入对象的键名称。

  3. 应用程序内部引用表名称处输入 CompanyName

  4. Access to chosen resources (访问所选的资源) 部分中,选择 Choose from IAM roles that Kinesis Analytics can assume (从 Kinesis Analytics 可代入的 IAM 角色中选择),然后选择您在上一部分中创建的 KinesisAnalytics-ReadS3Object IAM 角色。

  5. 选择 发现架构。控制台检测到引用数据中的两列。

  6. 选择 Save and close

步骤 3:测试:查询应用程序内部引用表

现在,您可以查询应用程序内部引用表 CompanyName。您可以联接股票价格数据和引用表以使用引用信息扩充应用程序。结果显示公司名称。

  1. 用以下内容替换您的应用程序代码。查询会联接应用程序内部输入流和应用程序内部引用表。应用程序代码将结果写入到另一应用程序内部流 DESTINATION_SQL_STREAM

    CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ticker_symbol VARCHAR(4), "Company" varchar(20), sector VARCHAR(12), change DOUBLE, price DOUBLE); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, "c"."Company", sector, change, price FROM "SOURCE_SQL_STREAM_001" LEFT JOIN "CompanyName" as "c" ON "SOURCE_SQL_STREAM_001".ticker_symbol = "c"."Ticker";
  2. 验证 SQLResults 选项卡中是否会显示应用程序输出。确保某些行显示公司名称 (示例引用数据不包含所有公司名称)。