例如:写入 Kinesis Data Firehose - Amazon Kinesis Data Analytics
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

例如:写入 Kinesis Data Firehose

在本练习中,您创建一个 Kinesis Data Analytics 应用程序,它将 Kinesis 数据流作为源,并将 Kinesis Data Firehose 传输流作为接收器。通过使用接收器,您可以在 Amazon S3 存储桶中验证应用程序输出。

注意

要为本练习设置所需的先决条件,请先完成入门 (DataStreamAPI)练习。

创建相关资源

在为本练习创建适用于 Apache Flink 的 Kinesis Data Analytics 应用程序之前,请创建以下相关资源:

  • Kinesis 数据流 (ExampleInputStream

  • 应用程序将输出写入到的 Kinesis Data Firehose 传输流 (ExampleDeliveryStream)。

  • 用于存储应用程序代码的 Amazon S3 存储桶 (ka-app-code-<username>

您可以使用控制台创建 Kinesis Streams、Amazon S3 存储桶和 Kinesis Data Firehose 传输流。有关创建这些资源的说明,请参阅以下主题:

  • 创建和更新数据流中的Amazon Kinesis Data Streams 开发人员指南. 将数据流命名为 ExampleInputStream

  • 创建 Amazon Kinesis Data Firehose 传输流中的Amazon Kinesis Data Firehose 开发人员指南. 将传输流命名为 ExampleDeliveryStream。在创建 Kinesis Data Firehose 传输流时,还要创建传输流。S3 目标IAM 角色.

  • 如何创建 S3 存储桶?中的Amazon Simple Storage Service 用户指南. 附加您的登录名,以便为 Amazon S3 存储桶指定全局唯一的名称,例如。ka-app-code-<username>.

将示例记录写入输入流

在本节中,您使用 Python 脚本将示例记录写入流,以供应用程序处理。

注意

此部分需要 Amazon SDK for Python (Boto)

  1. 使用以下内容创建名为 stock.py 的文件:

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'EVENT_TIME': datetime.datetime.now().isoformat(), 'TICKER': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'PRICE': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis'))
  2. 运行 stock.py 脚本:

    $ python stock.py

    在完成本教程的其余部分时,请将脚本保持运行状态。

下载并检查 Apache Flink 流式处理 Java 代码

在 GitHub 中提供了该示例的 Java 应用程序代码。要下载应用程序代码,请执行以下操作:

  1. 使用以下命令克隆远程存储库:

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples
  2. 导航到 amazon-kinesis-data-analytics-java-examples/FirehoseSink目录。

应用程序代码位于 FirehoseSinkStreamingJob.java 文件中。请注意有关应用程序代码的以下信息:

  • 应用程序使用 Kinesis 源从源流中进行读取。以下代码段创建 Kinesis 源:

    return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
  • 该应用程序使用 Kinesis Data Firehose 接收器将数据写入到传输流。以下代码段创建 Kinesis Data Firehose 接收器:

    FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputDeliveryStreamName, new SimpleStringSchema(), outputProperties);

编译应用程序代码

要编译应用程序,请执行以下操作:

  1. 如果还没有 Java 和 Maven,请安装它们。有关更多信息,请参阅入门 (DataStreamAPI)教程中的先决条件

  2. 要将 Kinesis 连接器用于以下应用程序,您需要下载、构建并安装 Apache Maven。有关更多信息,请参阅将 Apache Flink Kinesis Streams 连接器与早期 Apache Flink 版本一起使用

  3. 使用以下命令编译应用程序:

    mvn package -Dflink.version=1.13.2
    注意

    提供的源代码依赖于 Java 11 中的库。如果你使用的是开发环境,

编译应用程序将创建应用程序 JAR 文件 (target/aws-kinesis-analytics-java-apps-1.0.jar)。

上传 Apache Flink 流式处理 Java 代码

在本节中,您将应用程序代码上传到在中创建的 Amazon S3 存储桶。创建相关资源部分。

上传应用程序代码

  1. 通过以下网址打开 Simple Storage Service(Amazon S3)控制台:https://console.aws.amazon.com/s3/

  2. 在控制台中,选择 ka-app-code-<username> 存储桶,然后选择 Upload (上传)

  3. 选择文件步骤中,选择添加文件。导航到您在上一步中创建的 java-getting-started-1.0.jar 文件。

  4. 您无需更改该对象的任何设置,因此,请选择 Upload (上传)

您的应用程序代码现在存储在 Amazon S3 存储桶中,应用程序可以在其中访问代码。

创建和运行 Kinesis Data Analytics 应用程序

您可以使用控制台或创建并运行 Kinesis Data Analytics 应用程序。Amazon CLI.

注意

当您使用控制台创建应用程序时,Amazon Identity and Access Management(IAM) 和亚马逊 CloudWatch 日志资源是为你创建的。当您使用 Amazon CLI 创建应用程序时,您可以单独创建这些资源。

创建并运行应用程序(控制台)

按照以下步骤,使用控制台创建、配置、更新和运行应用程序。

创建 应用程序

  1. 打开 Kinesis Data Analytics 控制台https://console.aws.amazon.com/kinesisanalytics.

  2. 在 Kinesis Data Analytics 仪表板上,选择创建分析应用.

  3. Kinesis Analytics – 创建应用程序页面上,提供应用程序详细信息,如下所示:

    • 对于 Application name (应用程序名称),输入 MyApplication

    • 对于描述,输入 My java test app

    • 对于 Runtime (运行时),请选择 Apache Flink

      注意

      Kinesis Data Analytics in Apache Flink 版本 1.13.2。

    • 将版本下拉列表保留为使用 Apache Flink 版本 1.13.2(推荐版本).

  4. 对于访问权限,请选择 Create / update IAM role (创建/更新 IAM 角色) kinesis-analytics-MyApplication-us-west-2

  5. 选择 Create application(创建应用程序)。

注意

在使用控制台创建应用程序时,您可以选择为应用程序创建 IAM 角色和策略。应用程序使用该角色和策略访问其相关资源。这些 IAM 资源使用您的应用程序名称和区域命名,如下所示:

  • 策略:kinesis-analytics-service-MyApplication-us-west-2

  • 角色:kinesis-analytics-MyApplication-us-west-2

编辑 IAM 策略

编辑 IAM 策略以添加访问 Kinesis Data Streams 和 Kinesis Data Firehose 传输流的权限。

  1. 通过以下网址打开 IAM 控制台:https://console.aws.amazon.com/iam/

  2. 选择策略。选择控制台在上一部分中为您创建的 kinesis-analytics-service-MyApplication-us-west-2 策略。

  3. Summary (摘要) 页面上,选择 Edit policy (编辑策略)。请选择 JSON 选项卡。

  4. 将以下策略示例中突出显示的部分添加到策略中。将示例账户 ID (012345678901) 的所有实例替换为您的账户 ID。

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-username/java-getting-started-1.0.jar" ] }, { "Sid": "DescribeLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteDeliveryStream", "Effect": "Allow", "Action": "firehose:*", "Resource": "arn:aws:firehose:us-west-2:012345678901:deliverystream/ExampleDeliveryStream" } ] }

配置应用程序

  1. MyApplication (我的应用程序) 页面上,选择 Configure (配置)

  2. Configure application (配置应用程序) 页面上,提供 Code location (代码位置)

    • 适用于Amazon S3 存储桶输入,ka-app-code-<username>.

    • 适用于Amazon S3 对象的路径输入,java-getting-started-1.0.jar.

  3. Access to application resources (对应用程序的访问权限) 下,对于 Access permissions (访问权限),选择 Create / update IAM role (创建/更新 IAM 角色) kinesis-analytics-MyApplication-us-west-2

  4. Monitoring (监控) 下,确保 Monitoring metrics level (监控指标级别) 设置为 Application (应用程序)

  5. 对于 CloudWatch logging (CloudWatch 日志记录),选中启用复选框。

  6. 选择 Update(更新)。

注意

当你选择启用时 CloudWatch 日志记录时,Kinesis Data Analytics 将为您创建日志组和日志流。这些资源的名称如下所示:

  • 日志组:/aws/kinesis-analytics/MyApplication

  • 日志流:kinesis-analytics-log-stream

运行应用程序

可以通过运行应用程序、打开 Apache Flink 控制面板并选择所需的 Flink 作业来查看 Flink 作业图。

停止应用程序

MyApplication (我的应用程序) 页面上,选择 Stop (停止)。确认该操作。

更新应用程序

使用控制台,您可以更新应用程序设置,例如应用程序属性、监控设置,或应用程序 JAR 文件的位置和文件名。

MyApplication (我的应用程序) 页面上,选择 Configure (配置)。更新应用程序设置,然后选择更新

注意

要在控制台上更新应用程序的代码,您必须更改 JAR 的对象名称,使用不同的 S3 存储桶,或使用更新应用程序代码一节中所述的 Amazon CLI。如果文件名或存储桶未更改,则在您选择时不会重新加载应用程序代码更新配置页.

创建并运行应用程序 (Amazon CLI)

在本部分中,您将使用Amazon CLI创建并运行 Kinesis Data Analytics 应用程序。

创建权限策略

首先,使用两个语句创建权限策略:一个语句授予对源流执行 read 操作的权限,另一个语句授予对接收器流执行 write 操作的权限。然后,您将策略附加到 IAM 角色(下一部分中将创建该角色)。因此,在 Kinesis Data Analytics 担任该角色时,服务具有必要的权限,以从源流中读取和写入接收器流。

使用以下代码创建 KAReadSourceStreamWriteSinkStream 权限策略。Replace用户名其中包含您将用于创建 Amazon S3 存储桶以存储应用程序代码的用户名。将 Amazon 资源名称 (ARN) 中的账户 ID (012345678901) 替换为您的账户 ID。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": ["arn:aws:s3:::ka-app-code-username", "arn:aws:s3:::ka-app-code-username/*" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteDeliveryStream", "Effect": "Allow", "Action": "firehose:*", "Resource": "arn:aws:firehose:us-west-2:012345678901:deliverystream/ExampleDeliveryStream" } ] }

适用于 step-by-step 有关创建权限策略的说明,请参阅教程:创建并附加您的第一个客户托管策略中的IAM 用户指南.

注意

要访问其他亚马逊服务,您可以使用Amazon SDK for Java. Kinesis Data Analytics 会自动将开发工具包所需的凭证设置为与应用程序关联的服务执行 IAM 角色的凭证。无需执行其他步骤。

创建 IAM 角色

在本节中,您创建一个 IAM 角色,Kinesis Data Analytics 应用程序可以代入此角色来读取源流和写入接收器流。

如果 Kinesis Data Analytics 不具有权限,则无法访问流。您通过 IAM 角色授予这些权限。每个 IAM 角色附加了两个策略。信任策略为 Kinesis Data Analytics 授予权限,以担任该角色。权限策略确定在担任该角色后,Kinesis Data Analytics 可以执行的操作。

您将在上一部分中创建的权限策略附加到此角色。

创建 IAM 角色

  1. 通过以下网址打开 IAM 控制台:https://console.aws.amazon.com/iam/

  2. 在导航窗格中,选择 Roles (角色)Create Role (创建角色)

  3. UDR选择受信任实体的类型,选择Amazon服务. 在 Choose the service that will use this role (选择将使用此角色的服务) 下,选择 Kinesis。在选择您的使用案例下,选择 Kinesis Analytics

    选择 Next:。Permissions (下一步:权限)

  4. 在存储库的附加权限策略页面,选择后续:审核。在创建角色后,您可以附加权限策略。

  5. 在存储库的创建角色页面,输入KA-stream-rw-role(对于 )Role name (角色名称). 请选择 Create role(创建角色)。

    现在,您已经创建了一个名为的新 IAM 角色KA-stream-rw-role. 接下来,您更新角色的信任和权限策略。

  6. 将权限策略附加到角色。

    注意

    在本练习中,Kinesis Data Analytics 担任此角色,以从 Kinesis 数据流(源)读取数据和将输出写入到另一个 Kinesis 数据流。因此,您附加在上一步(创建权限策略)中创建的策略。

    1. Summary (摘要) 页上,选择 Permissions (权限) 选项卡。

    2. 选择附加策略

    3. 在搜索框中,输入 KAReadSourceStreamWriteSinkStream(您在上一部分中创建的策略)。

    4. 选择KareadSourcestStream写inkstream策略,然后选择附加策略.

现在,您已创建应用程序用于访问资源的服务执行角色。记下新角色的 ARN。

适用于 step-by-step 有关创建角色的说明,请参阅创建 IAM 角色(控制台)中的IAM 用户指南.

创建 Kinesis Data Analytics 应用程序

  1. 将以下 JSON 代码保存到名为 create_request.json 的文件中。将示例角色 ARN 替换为您之前创建的角色的 ARN。将存储桶 ARN 后缀替换为在创建相关资源一节中选择的后缀 (ka-app-code-<username>)。将服务执行角色中的示例账户 ID (012345678901) 替换为您的账户 ID。

    { "ApplicationName": "test", "ApplicationDescription": "my java test app", "RuntimeEnvironment": "FLINK-1_13", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/KA-stream-rw-role", "ApplicationConfiguration": { "ApplicationCodeConfiguration": { "CodeContent": { "S3ContentLocation": { "BucketARN": "arn:aws:s3:::ka-app-code-username", "FileKey": "java-getting-started-1.0.jar" } }, "CodeContentType": "ZIPFILE" } } } }
  2. 使用上述请求执行 CreateApplication 操作来创建应用程序:

    aws kinesisanalyticsv2 create-application --cli-input-json file://create_request.json

应用程序现已创建。您在下一步中启动应用程序。

启动应用程序

在本节中,您使用 StartApplication 操作来启动应用程序。

启动应用程序

  1. 将以下 JSON 代码保存到名为 start_request.json 的文件中。

    { "ApplicationName": "test", "RunConfiguration": { "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT" } } }
  2. 使用上述请求执行 StartApplication 操作来启动应用程序:

    aws kinesisanalyticsv2 start-application --cli-input-json file://start_request.json

应用程序正在运行。您可以在亚马逊上查看 Kinesis Data Analytics 指标 CloudWatch 以验证应用程序是否正常工作。

停止应用程序

在本节中,您使用 StopApplication 操作来停止应用程序。

停止应用程序

  1. 将以下 JSON 代码保存到名为 stop_request.json 的文件中。

    { "ApplicationName": "test" }
  2. 使用下面的请求执行 StopApplication 操作来停止应用程序:

    aws kinesisanalyticsv2 stop-application --cli-input-json file://stop_request.json

应用程序现已停止。

添加 CloudWatch 记录选项

您可以使用Amazon CLI添加亚马逊 CloudWatch 将流记录到应用程序。有关使用的信息 CloudWatch 随应用程序一起登录,请参阅设置应用程序日志记录.

更新应用程序代码

在您需要使用新版本的代码包更新应用程序代码时,您可以使用 UpdateApplication Amazon CLI 操作。

使用Amazon CLI,请从 Amazon S3 存储桶中删除以前的代码包,上传新版本,然后调用并UpdateApplication,指定相同的 Amazon S3 存储桶和对象名称。

以下示例 UpdateApplication 操作请求重新加载应用程序代码并重新启动应用程序。将 CurrentApplicationVersionId 更新为当前的应用程序版本。您可以使用 ListApplicationsDescribeApplication 操作检查当前的应用程序版本。将存储桶名称后缀 (<username>) 更新为在创建相关资源一节中选择的后缀。

{ "ApplicationName": "test", "CurrentApplicationVersionId": 1, "ApplicationConfigurationUpdate": { "ApplicationCodeConfigurationUpdate": { "CodeContentUpdate": { "S3ContentLocationUpdate": { "BucketARNUpdate": "arn:aws:s3:::ka-app-code-username", "FileKeyUpdate": "java-getting-started-1.0.jar" } } } } }

清理 Amazon 资源

本节包含清理在入门教程中创建的 Amazon 资源的过程。

删除 Kinesis Data Analytics 应用程序

  1. 打开 Kinesis Data Analytics 控制台https://console.aws.amazon.com/kinesisanalytics.

  2. 在 Kinesis Data Analytics 面板中,选择MyApplication.

  3. 选择 Configure(配置)。

  4. Snapshots (快照) 部分中,选择 Disable (禁用),然后选择 Update (更新)

  5. 在应用程序的页面中,选择 Delete (删除),然后确认删除。

删除 Kinesis 数据流

  1. 打开 Kinesis 控制台,网址为:https://console.aws.amazon.com/kinesis

  2. 在 Kinesis Data Streams 面板中,选择示例输入流.

  3. ExampleInputStream 页面中,选择 Delete Kinesis Stream,然后确认删除。

删除 Kinesis Data Firehose 传输流

  1. 打开 Kinesis 控制台,网址为:https://console.aws.amazon.com/kinesis

  2. 在 Kinesis Data Firehose 面板中,选择示例:送货流.

  3. ExampleDeliveryStream 页面中,选择 Delete delivery stream (删除传输流),然后确认删除。

删除您的 Amazon S3 对象和存储桶

  1. 通过以下网址打开 Simple Storage Service(Amazon S3)控制台:https://console.aws.amazon.com/s3/

  2. 选择 ka-app-code-<username> 存储桶。

  3. 选择 Delete (删除),然后输入存储桶名称以确认删除。

  4. 如果您为 Kinesis Data Firehose 传输流目标创建了 Amazon S3 存储桶,则也会删除该存储桶。

删除 IAM 资源

  1. 通过以下网址打开 IAM 控制台:https://console.aws.amazon.com/iam/

  2. 在导航栏中,选择策略

  3. 在筛选条件控件中,输入 kinesis

  4. 选择 kinesis-analytics-service-MyApplication-<您的区域> 策略。

  5. 选择 Policy Actions (策略操作),然后选择 Delete (删除)

  6. 如果您为 Kinesis Data Firehose 传输流创建了新策略,则也会删除该策略。

  7. 在导航栏中,选择 Roles(角色)

  8. 选择 kinesis-analytics-MyApplication-<您的区域> 角色。

  9. 选择 Delete role (删除角色),然后确认删除。

  10. 如果您为 Kinesis Data Firehose 传输流创建了新角色,则也会删除该角色。

删除 CloudWatch 资源

  1. 打开 CloudWatch 控制台https://console.aws.amazon.com/cloudwatch/.

  2. 在导航栏中,选择 Logs (日志)

  3. 选择 /aws/kinesis-analytics/MyApplication 日志组。

  4. 选择 Delete Log Group (删除日志组),然后确认删除。