本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
例如:从不同账户的 Kinesis Street 中读取
该示例说明了如何创建 Amazon Kinesis Data Analytics 应用程序,以从不同账户的 Kinesis 流中读取数据。在本示例中,您将一个账户用于源 Kinesis 流,并将第二个账户用于 Kinesis Data Analytics 应用程序和接收器 Kinesis 流。
先决条件
在本教程中,您将修改开始使用从不同账户的 Kinesis 流中读取数据的示例。在继续之前,请完成入门 (DataStreamAPI)教程。
您需要使用两个 Amazon 账户以完成本教程:一个账户用于源流,另一个账户用于应用程序和接收器流。将您用于入门教程的 Amazon 账户用于应用程序和接收器流。将一个不同的 Amazon 账户用于源流。
设置
您将使用命名的配置文件访问两个 Amazon 账户。修改Amazon凭证和配置文件中包含两个配置文件,其中包含两个账户的区域和连接信息。
以下示例凭证文件包含两个命名的配置文件:ka-source-stream-account-profile
和 ka-sink-stream-account-profile
。将您用于入门教程的账户作为接收器流账户。
[ka-source-stream-account-profile] aws_access_key_id=AKIAIOSFODNN7EXAMPLE aws_secret_access_key=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY [ka-sink-stream-account-profile] aws_access_key_id=AKIAI44QH8DHBEXAMPLE aws_secret_access_key=je7MtGbClwBF/2Zp9Utk/h3yCo8nvbEXAMPLEKEY
以下示例配置文件包含具有区域和输出格式信息的相同命名配置文件。
[profile ka-source-stream-account-profile] region=us-west-2 output=json [profile ka-sink-stream-account-profile] region=us-west-2 output=json
本教程不使用 ka-sink-stream-account-profile
。它是作为如何使用配置文件访问两个不同 Amazon 账户的示例提供的。
有关将命名配置文件与Amazon CLI请参阅,命名配置文件中的Amazon Command Line Interface文档中)。
创建源 Kinesis 流
在本节中,您在源账户中创建 Kinesis 流。
输入以下命令以创建 Kinesis Streams,应用程序将该流用于输入。请注意,--profile
参数指定要使用的账户配置文件。
$ aws kinesis create-stream \ --stream-name SourceAccountExampleInputStream \ --shard-count 1 \ --profile ka-source-stream-account-profile
创建和更新 IAM 角色和策略
允许跨越对象访问Amazon账户中,您可以在源账户中创建 IAM 角色和策略。然后,您在接收器账户中修改 IAM 策略。有关创建 IAM 角色和策略的信息,请参阅中的以下主题:Amazon Identity and Access Management用户指南:
接收器账户角色和策略
编辑入门教程中的
kinesis-analytics-service-MyApplication-us-west-2
策略。该策略允许担任源账户中的角色,以便读取源流。注意 当您使用控制台创建应用程序时,控制台将创建一个名为的策略。
kinesis-analytics-service-
,还有一个名为的角色<application name>
-<application region>
kinesis-analytics-
.<application name>
-<application region>
将下面突出显示的部分添加到策略中。将示例账户 ID (
SOURCE01234567
) 替换为将用于源流的账户的 ID。{ "Version": "2012-10-17", "Statement": [
{ "Sid": "AssumeRoleInSourceAccount", "Effect": "Allow", "Action": "sts:AssumeRole", "Resource": "arn:aws:iam::SOURCE01234567:role/KA-Source-Stream-Role" },
{ "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-username/aws-kinesis-analytics-java-apps-1.0.jar" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:SINK012345678:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:SINK012345678:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:SINK012345678:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] } ] }打开
kinesis-analytics-MyApplication-us-west-2
角色,并记下其 Amazon 资源名称 (ARN)。您需要在下一节中使用该名称。角色 ARN 如下所示。arn:aws:iam::SINK012345678:role/service-role/kinesis-analytics-MyApplication-us-west-2
源账户角色和策略
在名为
KA-Source-Stream-Policy
的源账户中创建一个策略。将以下 JSON 用于该策略。将示例账号替换为源账户的账号。{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadInputStream", "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:ListShards" ], "Resource": "arn:aws:kinesis:us-west-2:
SOURCE123456784
:stream/SourceAccountExampleInputStream" } ] }在名为
KA-Source-Stream-Role
的源账户中创建一个角色。执行以下操作以使用 Kinesis Analytics 使用案例创建角色:在 IAM 管理控制台中,选择创建角色.
在存储库的创建角色页面,选择Amazon服务. 在服务列表中,选择 Kinesis。
在 Select your use case (选择使用案例) 部分中,选择 Kinesis Analytics。
选择 Next:。Permissions (下一步:权限)。
添加您在上一步中创建的
KA-Source-Stream-Policy
权限策略。选择 Next:Tags (下一步: 标签)。选择 Next:。审核。
将角色命名为
KA-Source-Stream-Role
。应用程序将使用该角色以访问源流。
将接收器账户中的
kinesis-analytics-MyApplication-us-west-2
ARN 添加到源账户中的KA-Source-Stream-Role
角色的信任关系中:打开
KA-Source-Stream-Role
在 IAM 控制台中。选择 Trust Relationships 选项卡。
选择 Edit trust relationship (编辑信任关系)。
将以下代码用于信任关系。将示例账户 ID (
SINK012345678
) 替换为接收器账户 ID。{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::
SINK012345678
:role/service-role/kinesis-analytics-MyApplication-us-west-2" }, "Action": "sts:AssumeRole" } ] }
更新 Python 脚本
在本节中,您更新生成示例数据的 Python 脚本以使用源账户配置文件。
使用以下突出显示的更改更新 stock.py
脚本。
import json import boto3 import random import datetime
import os os.environ['AWS_PROFILE'] ='ka-source-stream-account-profile' os.environ['AWS_DEFAULT_REGION'] = 'us-west-2'
kinesis = boto3.client('kinesis') def getReferrer(): data = {} now = datetime.datetime.now() str_now = now.isoformat() data['EVENT_TIME'] = str_now data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']) price = random.random() * 100 data['price'] = round(price, 2) return data while True: data = json.dumps(getReferrer()) print(data) kinesis.put_record( StreamName="SourceAccountExampleInputStream
", Data=data, PartitionKey="partitionkey")
更新 Java 应用程序
在本节中,您更新 Java 应用程序代码,以便从源流中读取时担任源账户角色。
对 BasicStreamingJob.java
文件进行以下更改。将示例源账号 (SOURCE01234567
) 替换为您的源账号。
package com.amazonaws.services.kinesisanalytics; import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import java.io.IOException; import java.util.Map; import java.util.Properties; /** * A basic Kinesis Data Analytics for Java application with Kinesis data streams * as source and sink. */ public class BasicStreamingJob { private static final String region = "us-west-2"; private static final String inputStreamName ="SourceAccountExampleInputStream";
private static final String outputStreamName = ExampleOutputStream;private static final String roleArn = "arn:aws:iam::SOURCE01234567:role/KA-Source-Stream-Role"; private static final String roleSessionName = "ksassumedrolesession";
private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) { Properties inputProperties = new Properties();inputProperties.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "ASSUME_ROLE"); inputProperties.setProperty(AWSConfigConstants.AWS_ROLE_ARN, roleArn); inputProperties.setProperty(AWSConfigConstants.AWS_ROLE_SESSION_NAME, roleSessionName);
inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties)); } private static FlinkKinesisProducer<String> createSinkFromStaticConfig() { Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); outputProperties.setProperty("AggregationEnabled", "false"); FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), outputProperties); sink.setDefaultStream(outputStreamName); sink.setDefaultPartition("0"); return sink; } public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> input = createSourceFromStaticConfig(env); input.addSink(createSinkFromStaticConfig()); env.execute("Flink Streaming Java API Skeleton"); } }
构建、上传和运行应用程序
执行以下操作以更新和运行应用程序:
在具有
pom.xml
文件的目录中运行以下命令,以再次构建应用程序。mvn package -Dflink.version=1.13.2
从 Amazon Simple Storage Service (Amazon S3) 存储桶中删除以前的 JAR 文件,然后上传新的
aws-kinesis-analytics-java-apps-1.0.jar
文件到 S3 存储桶。在 Kinesis Data Analytics 控制台的应用程序页面中,选择配置、更新以重新加载应用程序 JAR 文件。
运行
stock.py
脚本以将数据发送到源流。python stock.py
现在,应用程序从另一个账户的 Kinesis 流中读取数据。
您可以检查 ExampleOutputStream
流的 PutRecords.Bytes
指标,以验证应用程序是否正常工作。如果在输出流中具有活动,则应用程序正常工作。