Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
示例:从不同账户的 Kinesis 流中读取
该示例说明了如何创建Managed Service for Apache Flink的应用程序,以从不同账户的 Kinesis 流中读取数据。在该示例中,您将一个账户用于源 Kinesis 流,并将第二个账户用于Managed Service for Apache Flink的应用程序和接收器 Kinesis 流。
先决条件
在本教程中,您修改入门示例以从不同账户的 Kinesis 流中读取数据。在继续之前,请完成入门指南 (DataStream API)教程。
您需要使用两个 Amazon 账户以完成本教程:一个账户用于源流,另一个账户用于应用程序和接收器流。将您用于入门教程的 Amazon 账户用于应用程序和接收器流。将一个不同的 Amazon 账户用于源流。
设置
您将使用命名的配置文件访问两个 Amazon 账户。修改您的 Amazon 凭证和配置文件以包含两个配置文件,其中包含两个账户的区域和连接信息。
以下示例凭证文件包含两个命名的配置文件:ka-source-stream-account-profile
和 ka-sink-stream-account-profile
。将您用于入门教程的账户作为接收器流账户。
[ka-source-stream-account-profile] aws_access_key_id=AKIAIOSFODNN7EXAMPLE aws_secret_access_key=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY [ka-sink-stream-account-profile] aws_access_key_id=AKIAI44QH8DHBEXAMPLE aws_secret_access_key=je7MtGbClwBF/2Zp9Utk/h3yCo8nvbEXAMPLEKEY
以下示例配置文件包含具有区域和输出格式信息的相同命名配置文件。
[profile ka-source-stream-account-profile] region=us-west-2 output=json [profile ka-sink-stream-account-profile] region=us-west-2 output=json
注意
本教程不使用 ka-sink-stream-account-profile
。它是作为如何使用配置文件访问两个不同 Amazon 账户的示例提供的。
有关在 Amazon CLI 中使用命名配置文件的更多信息,请参阅Amazon Command Line Interface文档中的命名配置文件。
创建源 Kinesis 流
在本节中,您在源账户中创建 Kinesis 流。
输入以下命令以创建 Kinesis 流,应用程序将该流用于输入。请注意,--profile
参数指定要使用的账户配置文件。
$ aws kinesis create-stream \ --stream-name SourceAccountExampleInputStream \ --shard-count 1 \ --profile ka-source-stream-account-profile
创建和更新 IAM 角色和策略
要允许跨 Amazon 账户访问对象,您可以在源账户中创建 IAM 角色和策略。然后,您在接收器账户中修改 IAM policy。有关创建 IAM 角色和策略的信息,请参阅Amazon Identity and Access Management用户指南中的以下主题:
接收器账户角色和策略
编辑入门教程中的
kinesis-analytics-service-MyApplication-us-west-2
策略。该策略允许担任源账户中的角色,以便读取源流。注意
当您使用控制台创建应用程序时,控制台会创建一个名为
kinesis-analytics-service-
的策略和一个名为<application name>
-<application region>
kinesisanalytics-
的角色。<application name>
-<application region>
将下面突出显示的部分添加到策略中。将示例账户 ID (
SOURCE01234567
) 替换为将用于源流的账户的 ID。{ "Version": "2012-10-17", "Statement": [
{ "Sid": "AssumeRoleInSourceAccount", "Effect": "Allow", "Action": "sts:AssumeRole", "Resource": "arn:aws:iam::SOURCE01234567:role/KA-Source-Stream-Role" },
{ "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-username
/aws-kinesis-analytics-java-apps-1.0.jar" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:SINK012345678
:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:SINK012345678
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:SINK012345678
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] } ] }打开
kinesis-analytics-MyApplication-us-west-2
角色,并记下其 Amazon 资源名称 (ARN)。您需要在下一部分中使用该名称。角色 ARN 如下所示。arn:aws:iam::
SINK012345678
:role/service-role/kinesis-analytics-MyApplication-us-west-2
源账户角色和策略
在名为
KA-Source-Stream-Policy
的源账户中创建一个策略。将以下 JSON 用于该策略。将示例账号替换为源账户的账号。{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadInputStream", "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:ListShards" ], "Resource": "arn:aws:kinesis:us-west-2:
SOURCE123456784
:stream/SourceAccountExampleInputStream" } ] }在名为
MF-Source-Stream-Role
的源账户中创建一个角色。执行以下操作以使用 Managed Flink 用例创建角色:在 IAM 管理控制台中,选择创建角色。
在创建角色页面上,选择Amazon服务。在服务列表中,选择 Kinesis。
在选择您的用例部分,选择Managed Service for Apache Flink。
选择下一步:权限。
添加您在上一步中创建的
KA-Source-Stream-Policy
权限策略。选择 Next:Tags (下一步: 标签)。选择下一步:审核。
将角色命名为
KA-Source-Stream-Role
。应用程序将使用该角色以访问源流。
将接收器账户中的
kinesis-analytics-MyApplication-us-west-2
ARN 添加到源账户中的KA-Source-Stream-Role
角色的信任关系中:打开 IAM 控制台中的
KA-Source-Stream-Role
。选择 Trust Relationships 选项卡。
选择 Edit trust relationship (编辑信任关系)。
将以下代码用于信任关系。将示例账户 ID (
) 替换为接收器账户 ID。SINK012345678
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::
:role/service-role/kinesis-analytics-MyApplication-us-west-2" }, "Action": "sts:AssumeRole" } ] }SINK012345678
更新 Python 脚本
在本节中,您更新生成示例数据的 Python 脚本以使用源账户配置文件。
使用以下突出显示的更改更新 stock.py
脚本。
import json import boto3 import random import datetime
import os os.environ['AWS_PROFILE'] ='ka-source-stream-account-profile' os.environ['AWS_DEFAULT_REGION'] = 'us-west-2'
kinesis = boto3.client('kinesis') def getReferrer(): data = {} now = datetime.datetime.now() str_now = now.isoformat() data['event_time'] = str_now data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']) price = random.random() * 100 data['price'] = round(price, 2) return data while True: data = json.dumps(getReferrer()) print(data) kinesis.put_record( StreamName="SourceAccountExampleInputStream
", Data=data, PartitionKey="partitionkey")
更新 Java 应用程序
在本节中,您更新 Java 应用程序代码,以便从源流中读取时担任源账户角色。
对 BasicStreamingJob.java
文件进行以下更改。将示例源账号 (SOURCE01234567
) 替换为您的源账号。
package com.amazonaws.services.managed-flink; import com.amazonaws.services.managed-flink.runtime.KinesisAnalyticsRuntime; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import java.io.IOException; import java.util.Map; import java.util.Properties; /** * A basic Managed Service for Apache Flink for Java application with Kinesis data streams * as source and sink. */ public class BasicStreamingJob { private static final String region = "us-west-2"; private static final String inputStreamName ="SourceAccountExampleInputStream";
private static final String outputStreamName = ExampleOutputStream;private static final String roleArn = "arn:aws:iam::SOURCE01234567:role/KA-Source-Stream-Role"; private static final String roleSessionName = "ksassumedrolesession";
private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) { Properties inputProperties = new Properties();inputProperties.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "ASSUME_ROLE"); inputProperties.setProperty(AWSConfigConstants.AWS_ROLE_ARN, roleArn); inputProperties.setProperty(AWSConfigConstants.AWS_ROLE_SESSION_NAME, roleSessionName);
inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties)); } private static KinesisStreamsSink<String> createSinkFromStaticConfig() { Properties outputProperties = new Properties(); outputProperties.setProperty(AWSConfigConstants.AWS_REGION, region); return KinesisStreamsSink.<String>builder() .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema()) .setStreamName(outputProperties.getProperty("OUTPUT_STREAM", "ExampleOutputStream")) .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) .build(); } public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> input = createSourceFromStaticConfig(env); input.addSink(createSinkFromStaticConfig()); env.execute("Flink Streaming Java API Skeleton"); } }
构建、上传和运行应用程序
执行以下操作以更新和运行应用程序:
在具有
pom.xml
文件的目录中运行以下命令,以再次构建应用程序。mvn package -Dflink.version=1.15.3
从 Amazon Simple Storage Service (Amazon S3) 存储桶中删除以前的 JAR 文件,然后将新的
aws-kinesis-analytics-java-apps-1.0.jar
文件上传到 S3 存储桶中。在Managed Service for Apache Flink的控制台中,在应用程序页面选择配置、更新以重新加载应用程序 JAR 文件。
运行
stock.py
脚本以将数据发送到源流。python stock.py
现在,应用程序从另一个账户的 Kinesis 流中读取数据。
您可以检查 ExampleOutputStream
流的 PutRecords.Bytes
指标,以验证应用程序是否正常工作。如果在输出流中具有活动,则应用程序正常工作。