示例:写入到 S3 - Amazon Kinesis Data Analytics
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

示例:写入到 S3

在本练习中,您创建一个Kinesis Data Analytics for Java application,它将 Kinesis 数据流作为源,并将 Amazon S3 存储桶作为接收器。通过使用接收器,您可以在 Amazon S3 控制台中验证应用程序输出。

注意

要为本练习设置所需的先决条件,请先完成入门练习。

创建相关资源

在为本练习创建Kinesis Data Analytics for Java application之前,请创建以下相关资源:

  • Kinesis 数据流 (ExampleInputStream)。

  • 存储应用程序代码和输出 (ka-app-<username>) 的 Amazon S3 存储桶

您可以使用控制台创建 Kinesis 流和 Amazon S3 存储桶。有关创建这些资源的说明,请参阅以下主题:

  • Amazon Kinesis Data Streams 开发人员指南 中的创建和更新数据流。将数据流命名为 ExampleInputStream

  • Amazon Simple Storage Service 开发人员指南 中的如何创建 S3 存储桶?。附加您的登录名,以便为 Amazon S3 存储桶指定全局唯一的名称,例如 ka-app-<username>。在 Amazon S3 存储桶中创建两个文件夹(codedata)。

应用程序将创建以下 CloudWatch 资源(如果不存在):

  • 名为 /aws/kinesis-analytics-java/MyApplication 的日志组。

  • 名为 kinesis-analytics-log-stream 的日志流。

将示例记录写入输入流

在本节中,您使用 Python 脚本将示例记录写入流,以供应用程序处理。

注意

此部分需要 AWS SDK for Python (Boto)

  1. 使用以下内容创建名为 stock.py 的文件:

    import json import boto3 import random import datetime kinesis = boto3.client('kinesis') def getReferrer(): data = {} now = datetime.datetime.now() str_now = now.isoformat() data['EVENT_TIME'] = str_now data['TICKER'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']) price = random.random() * 100 data['PRICE'] = round(price, 2) return data while True: data = json.dumps(getReferrer()) print(data) kinesis.put_record( StreamName="ExampleInputStream", Data=data, PartitionKey="partitionkey")
  2. 运行 stock.py 脚本:

    $ python stock.py

    在完成本教程的其余部分时,请将脚本保持运行状态。

下载并检查应用程序代码

在 GitHub 中提供了该示例的 Java 应用程序代码。要下载应用程序代码,请执行以下操作:

  1. 如果尚未安装 Git 客户端,请安装它。有关更多信息,请参阅安装 Git

  2. 使用以下命令克隆远程存储库:

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples
  3. 导航到 amazon-kinesis-data-analytics-java-examples/S3Sink 目录。

应用程序代码位于 S3StreamingSinkJob.java 文件中。请注意有关应用程序代码的以下信息:

  • 应用程序使用 Kinesis 源从源流中进行读取。以下代码段创建 Kinesis 源:

    return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
  • 应用程序使用 Apache Flink S3 接收器以写入到 Amazon S3。

    接收器在滚动窗口中读取消息,将消息编码为 S3 存储桶对象,然后将编码的对象发送到 S3 接收器。以下代码将对象进行编码以发送到 Amazon S3:

    input.flatMap(new Tokenizer()) // Tokenizer for generating words .keyBy(0) // Logically partition the stream for each word .timeWindow(Time.minutes(1)) // Tumbling window definition .sum(1) // Sum the number of words per partition .map(value -> value.f0 + " count: " + value.f1.toString() + "\n") .addSink(createS3SinkFromStaticConfig());
注意

应用程序使用 Flink StreamingFileSink 对象以写入到 Amazon S3。有关 StreamingFileSink 的更多信息,请参阅 Apache Flink 文档中的 StreamingFileSink

修改应用程序代码

在本节中,您修改应用程序代码以将输出写入到 Amazon S3 存储桶。

使用您的用户名更新以下行以指定应用程序的输出位置:

private static final String s3SinkPath = "s3a://ka-app-<username>/data";

编译应用程序代码

要编译应用程序,请执行以下操作:

  1. 如果还没有 Java 和 Maven,请安装它们。有关更多信息,请参阅入门教程中的先决条件

  2. 要将 Kinesis 连接器用于以下应用程序,您需要下载、构建并安装 Apache Maven。有关更多信息,请参阅使用 Apache Flink Kinesis Streams 连接器

  3. 使用以下命令编译应用程序:

    mvn package -Dflink.version=1.8.2

编译应用程序将创建应用程序 JAR 文件 (target/aws-kinesis-analytics-java-apps-1.0.jar)。

注意

提供的源代码依赖于 Java 1.8 中的库。如果使用开发环境,请确保项目的 Java 版本为 1.8。

上传 Apache Flink 流式处理 Java 代码

在本节中,您将应用程序代码上传到在创建相关资源 将示例记录写入输入流一节中创建的 Amazon S3 存储桶。

  1. 在 Amazon S3 控制台中,选择 ka-app-<username> 存储桶,导航到 code 文件夹,然后选择 Upload (上传)

  2. 选择文件步骤中,选择添加文件。导航到您在上一步中创建的 aws-kinesis-analytics-java-apps-1.0.jar 文件。

  3. 您无需更改该对象的任何设置,因此,请选择 Upload (上传)

您的应用程序代码现在存储在 Amazon S3 存储桶中,应用程序可以在其中访问代码。

创建并运行 Kinesis Data Analytics 应用程序

按照以下步骤,使用控制台创建、配置、更新和运行应用程序。

创建应用程序

  1. 打开 Kinesis Data Analytics 控制台 ( https://console.amazonaws.cn/kinesisanalytics)。

  2. 在 Amazon Kinesis Data Analytics 控制面板上,选择 Create analytics application (创建分析应用程序)

  3. Kinesis Analytics – 创建应用程序页面上,提供应用程序详细信息,如下所示:

    • 对于 Application name (应用程序名称),输入 MyApplication

    • 对于 Runtime (运行时),请选择 Apache Flink

    • 将版本下拉列表保留为 Apache Flink 1.8 (Recommended Version) (Apache Flink 1.8 (建议的版本))

  4. 对于访问权限,请选择 Create / update IAM role (创建/更新 IAM 角色) kinesis-analytics-MyApplication-us-west-2

    
                                控制台屏幕截图,显示创建应用程序页面上的设置。
  5. 选择 Create application (创建应用程序)

注意

使用控制台创建 Kinesis Data Analytics for Java application 时,您可以选择为您的应用程序创建一个 IAM 角色和策略。您的应用程序使用此角色和策略访问其从属资源。这些 IAM 资源是使用您的应用程序名称和区域命名的,如下所示:

  • 策略:kinesis-analytics-service-MyApplication-us-west-2

  • 角色:kinesis-analytics-MyApplication-us-west-2

编辑 IAM 策略

编辑 IAM 策略以添加访问 Kinesis 数据流的权限。

  1. 通过以下网址打开 IAM 控制台:https://console.amazonaws.cn/iam/

  2. 选择策略。选择控制台在上一部分中为您创建的 kinesis-analytics-service-MyApplication-us-west-2 策略。

  3. Summary (摘要) 页面上,选择 Edit policy (编辑策略)。选择 JSON 选项卡。

  4. 将以下策略示例中突出显示的部分添加到策略中。将示例账户 ID (012345678901) 替换为您的账户 ID。将 <username> 替换为您的用户名。

    { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::kinesis-analytics-placeholder-s3-bucket/kinesis-analytics-placeholder-s3-object" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:region:account-id:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:region:account-id:log-group:%LOG_GROUP_PLACEHOLDER%:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:region:account-id:log-group:%LOG_GROUP_PLACEHOLDER%:log-stream:%LOG_STREAM_PLACEHOLDER%" ] } , { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteObjects", "Effect": "Allow", "Action": [ "s3:ListBucket", "s3:PutObject" ], "Resource": [ "arn:aws:s3:::ka-app-<username>", "arn:aws:s3:::ka-app-<username>/*" ] } ] }

配置应用程序

  1. MyApplication (我的应用程序) 页面上,选择 Configure (配置)

    
                                显示 MyApplication (我的应用程序) 页面以及配置和运行按钮的屏幕截图。
  2. Configure application (配置应用程序) 页面上,提供 Code location (代码位置)

    • 对于 Amazon S3 bucket (Amazon S3 存储桶),请输入 ka-app-<username>

    • 对于 Path to Amazon S3 object (Amazon S3 对象路径),请输入 code/aws-kinesis-analytics-java-apps-1.0.jar

  3. Access to application resources (对应用程序的访问权限) 下,对于 Access permissions (访问权限),选择 Create / update IAM role (创建/更新 IAM 角色) kinesis-analytics-MyApplication-us-west-2

  4. Monitoring (监控) 下,确保 Monitoring metrics level (监控指标级别) 设置为 Application (应用程序)

  5. 对于 CloudWatch logging (CloudWatch 日志记录),选中启用复选框。

  6. 选择 Update (更新)

    
                                “配置应用程序”页面的屏幕截图,具有在此过程中所述的设置页面。
注意

在选择启用 CloudWatch 日志记录时,Kinesis Data Analytics 将为您创建日志组和日志流。这些资源的名称如下所示:

  • 日志组:/aws/kinesis-analytics/MyApplication

  • 日志流:kinesis-analytics-log-stream

该日志流用于监控应用程序。这与应用程序用于发送结果的日志流不同。

运行应用程序

  1. MyApplication (我的应用程序) 页面上,选择 Run (运行)。确认该操作。

    
                                MyApplication (我的应用程序) 页面和运行按钮的屏幕截图。
  2. 当应用程序正在运行时,请刷新页面。控制台将显示 Application graph (应用程序图表)


                        应用程序图表的屏幕截图。

验证应用程序输出

在 Amazon S3 控制台中,打开 S3 存储桶中的 data 文件夹。

几分钟后,将显示包含来自应用程序的聚合数据的对象。

清理 AWS 资源

本节包含清理在Amazon S3教程中创建的 AWS 资源的过程。

删除 Kinesis Data Analytics 应用程序

  1. 打开 Kinesis Data Analytics 控制台 ( https://console.amazonaws.cn/kinesisanalytics)。

  2. 在 Kinesis Data Analytics 面板中,选择 MyApplication (我的应用程序)

  3. 选择 Configure (配置)

  4. Snapshots (快照) 部分中,选择 Disable (禁用),然后选择 Update (更新)

  5. 在应用程序的页面中,选择 Delete (删除),然后确认删除。

删除 Kinesis 数据流

  1. 通过以下网址打开 Kinesis 控制台:https://console.amazonaws.cn/kinesis

  2. 在 Kinesis Data Streams 面板中,选择 ExampleInputStream

  3. ExampleInputStream 页面中,选择 Delete Kinesis Stream,然后确认删除。

删除 Amazon S3 对象和存储桶

  1. 通过以下网址打开 Amazon S3 控制台:https://console.amazonaws.cn/s3/

  2. 选择 ka-app-<username> 存储桶。

  3. 选择 Delete (删除),然后输入存储桶名称以确认删除。

删除 IAM 资源

  1. 通过以下网址打开 IAM 控制台:https://console.amazonaws.cn/iam/

  2. 在导航栏中,选择策略

  3. 在筛选条件控件中,输入 kinesis

  4. 选择 kinesis-analytics-service-MyApplication-<您的区域> 策略。

  5. 选择 Policy Actions (策略操作),然后选择 Delete (删除)

  6. 在导航栏中,选择 Roles (角色)

  7. 选择 kinesis-analytics-MyApplication-<您的区域> 角色。

  8. 选择 Delete role (删除角色),然后确认删除。

删除 CloudWatch 资源

  1. 通过以下网址打开 CloudWatch 控制台:https://console.amazonaws.cn/cloudwatch/

  2. 在导航栏中,选择 Logs (日志)

  3. 选择 /aws/kinesis-analytics/MyApplication 日志组。

  4. 选择 Delete Log Group (删除日志组),然后确认删除。