示例:从不同账户的 Kinesis 流中读取 - Amazon Kinesis Data Analytics
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

示例:从不同账户的 Kinesis 流中读取

该示例说明了如何创建 Amazon Kinesis Data Analytics 应用程序,以从不同账户的 Kinesis 流中读取数据。在该示例中,您将一个账户用于源 Kinesis 流,并将第二个账户用于 Kinesis Data Analytics 应用程序和接收器 Kinesis 流。

先决条件

  • 在本教程中,您修改入门示例以从不同账户的 Kinesis 流中读取数据。在继续之前,请完成入门教程。

  • 您需要使用两个 AWS 账户以完成本教程:一个账户用于源流,另一个账户用于应用程序和接收器流。将您用于入门教程的 AWS 账户用于应用程序和接收器流。将一个不同的 AWS 账户用于源流。

设置

您将使用命名的配置文件访问两个 AWS 账户。修改您的 AWS 凭证和配置文件以包含两个配置文件,其中包含两个账户的区域和连接信息。

以下示例凭证文件包含两个命名的配置文件:ka-source-stream-account-profileka-sink-stream-account-profile。将您用于入门教程的账户作为接收器流账户。

[ka-source-stream-account-profile] aws_access_key_id=AKIAIOSFODNN7EXAMPLE aws_secret_access_key=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY [ka-sink-stream-account-profile] aws_access_key_id=AKIAI44QH8DHBEXAMPLE aws_secret_access_key=je7MtGbClwBF/2Zp9Utk/h3yCo8nvbEXAMPLEKEY

以下示例配置文件包含具有区域和输出格式信息的相同命名配置文件。

[profile ka-source-stream-account-profile] region=us-west-2 output=json [profile ka-sink-stream-account-profile] region=us-west-2 output=json
注意

本教程不使用 ka-sink-stream-account-profile。它是作为如何使用配置文件访问两个不同 AWS 账户的示例提供的。

有关将命名的配置文件与 AWS CLI 一起使用的更多信息,请参阅 AWS 命令行界面 文档中的命名的配置文件

创建源 Kinesis 流

在本节中,您在源账户中创建 Kinesis 流。

输入以下命令以创建 Kinesis 流,应用程序将该流用于输入。请注意,--profile 参数指定要使用的账户配置文件。

$ aws kinesis create-stream \ --stream-name SourceAccountExampleInputStream \ --shard-count 1 \ --profile ka-source-stream-account-profile

创建和更新 IAM 角色和策略

要允许跨 AWS 账户访问对象,您可以在源账户中创建 IAM 角色和策略。然后,您在接收器账户中修改 IAM 策略。有关创建 IAM 角色和策略的信息,请参阅 AWS Identity and Access Management 用户指南 中的以下主题:

接收器账户角色和策略

  1. 编辑入门教程中的 kinesis-analytics-service-MyApplication-us-west-2 策略。该策略允许担任源账户中的角色,以便读取源流。

    将下面突出显示的部分添加到策略中。将示例账户 ID (SOURCE01234567) 替换为将用于源流的账户的 ID。

    { "Version": "2012-10-17", "Statement": [ { "Sid": "AssumeRoleInSourceAccount", "Effect": "Allow", "Action": "sts:AssumeRole", "Resource": "arn:aws:iam::SOURCE01234567:role/KA-Source-Stream-Role" }, { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-username/aws-kinesis-analytics-java-apps-1.0.jar" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:SINK012345678:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:SINK012345678:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:SINK012345678:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] } ] }
  2. 打开 kinesis-analytics-MyApplication-us-west-2 角色,并记下其 Amazon 资源名称 (ARN)。您需要在下一节中使用该名称。角色 ARN 如下所示。

    arn:aws:iam::SINK012345678:role/service-role/kinesis-analytics-MyApplication-us-west-2

源账户角色和策略

  1. 在名为 KA-Source-Stream-Policy 的源账户中创建一个策略。将以下 JSON 用于该策略。将示例账号替换为源账户的账号。

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadInputStream", "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:ListShards" ], "Resource": "arn:aws:kinesis:us-west-2:SOURCE123456784:stream/SourceAccountExampleInputStream" } ] }
  2. 在名为 KA-Source-Stream-Role 的源账户中创建一个角色。执行以下操作以使用 Kinesis Analytics 使用案例创建角色:

    1. 在 IAM 管理控制台中,选择 Create Role (创建角色)

    2. Create Role (创建角色) 页面上,选择 AWS Service (AWS 服务)。在服务列表中,选择 Kinesis

    3. Select your use case (选择使用案例) 部分中,选择 Kinesis Analytics

    4. 选择 Next: Permissions (下一步:权限)

    5. 添加您在上一步中创建的 KA-Source-Stream-Policy 权限策略。选择 Next:Tags (下一步: 标签)

    6. 选择 Next: Review

    7. 将角色命名为 KA-Source-Stream-Role。应用程序将使用该角色以访问源流。

  3. 将接收器账户中的 kinesis-analytics-MyApplication-us-west-2 ARN 添加到源账户中的 KA-Source-Stream-Role 角色的信任关系中:

    1. 在 IAM 控制台中打开 KA-Source-Stream-Role

    2. 选择 Trust Relationships 选项卡。

    3. 选择编辑信任关系

    4. 将以下代码用于信任关系。将示例账户 ID (SINK012345678) 替换为接收器账户 ID。

      { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::SINK012345678:role/service-role/kinesis-analytics-MyApplication-us-west-2" }, "Action": "sts:AssumeRole" } ] }

更新 Python 脚本

在本节中,您更新生成示例数据的 Python 脚本以使用源账户配置文件。

使用以下突出显示的更改更新 stock.py 脚本。

import json import boto3 import random import datetime import os os.environ['AWS_PROFILE'] ='ka-source-stream-account-profile' os.environ['AWS_DEFAULT_REGION'] = 'us-west-2' kinesis = boto3.client('kinesis') def getReferrer(): data = {} now = datetime.datetime.now() str_now = now.isoformat() data['EVENT_TIME'] = str_now data['TICKER'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']) price = random.random() * 100 data['PRICE'] = round(price, 2) return data while True: data = json.dumps(getReferrer()) print(data) kinesis.put_record( StreamName="SourceAccountExampleInputStream", Data=data, PartitionKey="partitionkey")

更新 Java 应用程序

在本节中,您更新 Java 应用程序代码,以便从源流中读取时担任源账户角色。

BasicStreamingJob.java 文件进行以下更改。将示例源账号 (SOURCE01234567) 替换为您的源账号。

package com.amazonaws.services.kinesisanalytics; import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants; import java.io.IOException; import java.util.Map; import java.util.Properties; /** * A basic Kinesis Data Analytics for Java application with Kinesis data streams * as source and sink. */ public class BasicStreamingJob { private static final String region = "us-west-2"; private static final String inputStreamName = "SourceAccountExampleInputStream"; private static final String outputStreamName = ExampleOutputStream; private static final String roleArn = "arn:aws:iam::SOURCE01234567:role/KA-Source-Stream-Role"; private static final String roleSessionName = "ksassumedrolesession"; private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) { Properties inputProperties = new Properties(); inputProperties.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "ASSUME_ROLE"); inputProperties.setProperty(AWSConfigConstants.AWS_ROLE_ARN, roleArn); inputProperties.setProperty(AWSConfigConstants.AWS_ROLE_SESSION_NAME, roleSessionName); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties)); } private static FlinkKinesisProducer<String> createSinkFromStaticConfig() { Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); outputProperties.setProperty("AggregationEnabled", "false"); FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), outputProperties); sink.setDefaultStream(outputStreamName); sink.setDefaultPartition("0"); return sink; } public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> input = createSourceFromStaticConfig(env); input.addSink(createSinkFromStaticConfig()); env.execute("Flink Streaming Java API Skeleton"); } }

构建、上传和运行应用程序

执行以下操作以更新和运行应用程序:

  1. 在具有 pom.xml 文件的目录中运行以下命令,以再次构建应用程序。

    mvn package -Dflink.version=1.8.2
  2. 从 Amazon Simple Storage Service (Amazon S3) 存储桶中删除以前的 JAR 文件,然后将新的 aws-kinesis-analytics-java-apps-1.0.jar 文件上传到 S3 存储桶中。

  3. 在 Kinesis Data Analytics 控制台上的应用程序页面中,选择 Configure (配置) > Update (更新) 以重新加载应用程序 JAR 文件。

  4. 运行 stock.py 脚本以将数据发送到源流。

    python stock.py

现在,应用程序从另一个账户的 Kinesis 流中读取数据。

您可以检查 ExampleOutputStream 流的 PutRecords.Bytes 指标,以验证应用程序是否正常工作。如果在输出流中具有活动,则应用程序正常工作。