示例:使用自定义接收器写入到 CloudWatch Logs - Amazon Kinesis Data Analytics
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

示例:使用自定义接收器写入到 CloudWatch Logs

在本练习中,您创建一个Kinesis Data Analytics for Java application,它将 Kinesis 数据流作为源,并将 Amazon CloudWatch 日志流作为接收器。通过使用接收器,您可以在 CloudWatch 控制台中验证应用程序输出。

注意

要为本练习设置所需的先决条件,请先完成入门练习。

创建相关资源

在为本练习创建Kinesis Data Analytics for Java application之前,请创建以下相关资源:

  • Kinesis 数据流 (ExampleInputStream)。

  • 存储应用程序代码 (ka-app-code-<username>) 的 Amazon S3 存储桶

您可以使用控制台创建 Kinesis 流和 Amazon S3 存储桶。有关创建这些资源的说明,请参阅以下主题:

  • Amazon Kinesis Data Streams 开发人员指南 中的创建和更新数据流。将数据流命名为 ExampleInputStream

  • Amazon Simple Storage Service 开发人员指南 中的如何创建 S3 存储桶?。附加您的登录名,以便为 Amazon S3 存储桶指定全局唯一的名称,例如 ka-app-code-<username>

应用程序将创建以下 CloudWatch 资源(如果不存在):

  • 名为 /aws/kinesis-analytics-java/test 的日志组。

  • 名为 StockPriceStatistics 的日志流。您可以查看该日志流,以验证应用程序是否正常工作。

将示例记录写入输入流

在本节中,您使用 Python 脚本将示例记录写入流,以供应用程序处理。

注意

此部分需要 AWS SDK for Python (Boto)

  1. 使用以下内容创建名为 stock.py 的文件:

    import json import boto3 import random import datetime kinesis = boto3.client('kinesis') def getReferrer(): data = {} now = datetime.datetime.now() str_now = now.isoformat() data['EVENT_TIME'] = str_now data['TICKER'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']) price = random.random() * 100 data['PRICE'] = round(price, 2) return data while True: data = json.dumps(getReferrer()) print(data) kinesis.put_record( StreamName="ExampleInputStream", Data=data, PartitionKey="partitionkey")
  2. 运行 stock.py 脚本:

    $ python stock.py

    在完成本教程的其余部分时,请将脚本保持运行状态。

下载并检查应用程序代码

在 GitHub 中提供了该示例的 Java 应用程序代码。要下载应用程序代码,请执行以下操作:

  1. 如果尚未安装 Git 客户端,请安装它。有关更多信息,请参阅安装 Git

  2. 使用以下命令克隆远程存储库:

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples
  3. 导航到 amazon-kinesis-data-analytics-java-examples/CloudWatchSink 目录。

应用程序代码位于 CustomSinkStreamingJob.javaCloudWatchLogSink.java 文件中。请注意有关应用程序代码的以下信息:

  • 应用程序使用 Kinesis 源从源流中进行读取。以下代码段创建 Kinesis 源:

    return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
  • 应用程序使用自定义 Flink 接收器写入到 CloudWatch Logs。接收器是在 CloudWatchLogSink.java 文件中定义的。

    接收器将消息批量发送到日志流。以下代码将消息写入到批次,然后将批次发送到 CloudWatch Logs。它按 MAX_BUFFER_TIME_MILLIS 间隔(60 秒)发送消息,或者在批次长度达到 MAX_BATCH_SIZE(10,000 条消息)时发送消息:

    logEvents.add(new InputLogEvent().withMessage(message).withTimestamp(System.currentTimeMillis())); if (logEvents.size() >= MAX_BATCH_SIZE || lastFlushTimeMillis + MAX_BUFFER_TIME_MILLIS <= System.currentTimeMillis()) { // flush the messages PutLogEventsRequest putLogEventsRequest = new PutLogEventsRequest() .withLogEvents(logEvents) .withLogGroupName(logGroupName) .withLogStreamName(logStreamName) .withSequenceToken(getUploadSequenceToken()); awsLogsClient.putLogEvents(putLogEventsRequest); lastFlushTimeMillis = System.currentTimeMillis(); logEvents.clear(); }
  • 应用程序创建一个输出流,其中包含每种股票在 10 分钟的滑动窗口(以 5 分钟为增量)内的最高价格。有关 Flink 时间窗口的更多信息,请参阅 Apache Flink 文档中的窗口。然后,接收器创建的输出流将聚合数据发送到 CloudWatch Logs 接收器。

    CustomSinkStreamingJob.java 文件中的以下代码将聚合数据发送到 CloudWatch Logs 接收器:

    input.map(value -> { JsonNode jsonNode = jsonParser.readValue(value, JsonNode.class); return new Tuple2<>(jsonNode.get("TICKER").asText(), jsonNode.get("PRICE").asDouble()); }).returns(Types.TUPLE(Types.STRING, Types.DOUBLE)) .keyBy(0) .timeWindow(Time.seconds(10), Time.seconds(5)) .max(1) .map(value -> value.f0 + ": max - " + value.f1.toString() + "\n") .addSink(new CloudWatchLogSink(region, CLOUD_WATCH_LOG_GROUP, CLOUD_WATCH_LOG_STREAM));

编译应用程序代码

要编译应用程序,请执行以下操作:

  1. 如果还没有 Java 和 Maven,请安装它们。有关更多信息,请参阅入门教程中的先决条件

  2. 要将 Kinesis 连接器用于以下应用程序,您需要下载、构建并安装 Apache Maven。有关更多信息,请参阅使用 Apache Flink Kinesis Streams 连接器

  3. 使用以下命令编译应用程序:

    mvn package -Dflink.version=1.8.2
    注意

    提供的源代码依赖于 Java 1.8 中的库。如果使用开发环境,请确保项目的 Java 版本为 1.8。

编译应用程序将创建应用程序 JAR 文件 (target/aws-kinesis-analytics-java-apps-1.0.jar)。

上传 Apache Flink 流式处理 Java 代码

在本节中,您将应用程序代码上传到在创建相关资源 将示例记录写入输入流一节中创建的 Amazon S3 存储桶。

  1. 在 Amazon S3 控制台中,选择 ka-app-code-<username> 存储桶,然后选择 Upload (上传)

  2. 选择文件步骤中,选择添加文件。导航到您在上一步中创建的 aws-kinesis-analytics-java-apps-1.0.jar 文件。

  3. 您无需更改该对象的任何设置,因此,请选择 Upload (上传)

您的应用程序代码现在存储在 Amazon S3 存储桶中,应用程序可以在其中访问代码。

创建并运行 Kinesis Data Analytics 应用程序

按照以下步骤,使用控制台创建、配置、更新和运行应用程序。

创建应用程序

  1. 打开 Kinesis Data Analytics 控制台 ( https://console.amazonaws.cn/kinesisanalytics)。

  2. 在 Amazon Kinesis Data Analytics 控制面板上,选择 Create analytics application (创建分析应用程序)

  3. Kinesis Analytics – 创建应用程序页面上,提供应用程序详细信息,如下所示:

    • 对于 Application name (应用程序名称),输入 MyApplication

    • 对于 Runtime (运行时),请选择 Apache Flink

    • 将版本下拉列表保留为 Apache Flink 1.8 (Recommended Version) (Apache Flink 1.8 (建议的版本))

  4. 对于访问权限,请选择 Create / update IAM role (创建/更新 IAM 角色) kinesis-analytics-MyApplication-us-west-2

    TODO:更新或删除屏幕截图

    
                                控制台屏幕截图,显示创建应用程序页面上的设置。
  5. 选择 Create application (创建应用程序)

注意

使用控制台创建 Kinesis Data Analytics for Java application 时,您可以选择为您的应用程序创建一个 IAM 角色和策略。您的应用程序使用此角色和策略访问其从属资源。这些 IAM 资源是使用您的应用程序名称和区域命名的,如下所示:

  • 策略:kinesis-analytics-service-MyApplication-us-west-2

  • 角色:kinesis-analytics-MyApplication-us-west-2

编辑 IAM 策略

编辑 IAM 策略添加权限,以访问 Kinesis 数据流。

  1. 通过以下网址打开 IAM 控制台:https://console.amazonaws.cn/iam/

  2. 选择策略。选择控制台在上一部分中为您创建的 kinesis-analytics-service-MyApplication-us-west-2 策略。

  3. Summary (摘要) 页面上,选择 Edit policy (编辑策略)。选择 JSON 选项卡。

  4. 将以下策略示例中突出显示的部分添加到策略中。将示例账户 ID (012345678901) 替换为您的账户 ID。

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "logs:DescribeLogGroups", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*", "arn:aws:s3:::ka-app-code-<username>/aws-kinesis-analytics-java-apps-1.0.jar" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": "logs:DescribeLogStreams", "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": "logs:PutLogEvents", "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" }, { "Sid": "DescribeLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "CloudWatchLogGroupPermissions", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups", "logs:DescribeLogStreams", "logs:CreateLogGroup", "logs:PutRetentionPolicy" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics-java/test:log-stream", "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics-java/test:log-stream:*" ] }, { "Sid": "CloudwatchLogStreamsPermissions", "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics-java/test:log-stream:StockPriceStatistics" ] } ] }

配置应用程序

  1. MyApplication (我的应用程序) 页面上,选择 Configure (配置)

    
                                显示 MyApplication (我的应用程序) 页面以及配置和运行按钮的屏幕截图。
  2. Configure application (配置应用程序) 页面上,提供 Code location (代码位置)

    • 对于 Amazon S3 bucket (Amazon S3 存储桶),请输入 ka-app-code-<username>

    • 对于 Path to Amazon S3 object (Amazon S3 对象路径),请输入 aws-kinesis-analytics-java-apps-1.0.jar

  3. Access to application resources (对应用程序的访问权限) 下,对于 Access permissions (访问权限),选择 Create / update IAM role (创建/更新 IAM 角色) kinesis-analytics-MyApplication-us-west-2

  4. Monitoring (监控) 下,确保 Monitoring metrics level (监控指标级别) 设置为 Application (应用程序)

  5. 对于 CloudWatch logging (CloudWatch 日志记录),选中启用复选框。

  6. 选择 Update (更新)

    
                                “配置应用程序”页面的屏幕截图,具有在此过程中所述的设置页面。
注意

在选择启用 CloudWatch 日志记录时,Kinesis Data Analytics 将为您创建日志组和日志流。这些资源的名称如下所示:

  • 日志组:/aws/kinesis-analytics/MyApplication

  • 日志流:kinesis-analytics-log-stream

该日志流用于监控应用程序。这与应用程序用于发送结果的日志流不同。

运行应用程序

  1. MyApplication (我的应用程序) 页面上,选择 Run (运行)。确认该操作。

    
                                MyApplication (我的应用程序) 页面和运行按钮的屏幕截图。
  2. 当应用程序正在运行时,请刷新页面。控制台将显示 Application graph (应用程序图表)


                        应用程序图表的屏幕截图。

验证应用程序输出

在 CloudWatch 控制台中,打开 /aws/kinesis-analytics-java/test/StockPriceStatistics 日志流。

几分钟后,日志流将包含来自应用程序的聚合数据。


                    显示应用程序输出的 CloudWatch 日志的屏幕截图。

清理 AWS 资源

本节包含清理在CloudWatch教程中创建的 AWS 资源的过程。

删除 Kinesis Data Analytics 应用程序

  1. 打开 Kinesis Data Analytics 控制台 ( https://console.amazonaws.cn/kinesisanalytics)。

  2. 在 Kinesis Data Analytics 面板中,选择 MyApplication (我的应用程序)

  3. 选择 Configure (配置)

  4. Snapshots (快照) 部分中,选择 Disable (禁用),然后选择 Update (更新)

  5. 在应用程序的页面中,选择 Delete (删除),然后确认删除。

删除 Kinesis 数据流

  1. 通过以下网址打开 Kinesis 控制台:https://console.amazonaws.cn/kinesis

  2. 在 Kinesis Data Streams 面板中,选择 ExampleInputStream

  3. ExampleInputStream 页面中,选择 Delete Kinesis Stream,然后确认删除。

  4. Kinesis streams (Kinesis 流) 页面中,选择 ExampleOutputStream,选择 Actions (操作),选择 Delete (删除),然后确认删除。

删除 Amazon S3 对象和存储桶

  1. 通过以下网址打开 Amazon S3 控制台:https://console.amazonaws.cn/s3/

  2. 选择 ka-app-code-<username> 存储桶。

  3. 选择 Delete (删除),然后输入存储桶名称以确认删除。

删除 IAM 资源

  1. 通过以下网址打开 IAM 控制台:https://console.amazonaws.cn/iam/

  2. 在导航栏中,选择策略

  3. 在筛选条件控件中,输入 kinesis

  4. 选择 kinesis-analytics-service-MyApplication-<您的区域> 策略。

  5. 选择 Policy Actions (策略操作),然后选择 Delete (删除)

  6. 在导航栏中,选择 Roles (角色)

  7. 选择 kinesis-analytics-MyApplication-<您的区域> 角色。

  8. 选择 Delete role (删除角色),然后确认删除。

删除 CloudWatch 资源

  1. 通过以下网址打开 CloudWatch 控制台:https://console.amazonaws.cn/cloudwatch/

  2. 在导航栏中,选择 Logs (日志)

  3. 选择 /aws/kinesis-analytics/MyApplication 日志组。

  4. 选择 Delete Log Group (删除日志组),然后确认删除。

  5. 选择 /aws/kinesis-analytics-java/test 日志组。

  6. 选择 Delete Log Group (删除日志组),然后确认删除。