Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
示例:写入 Amazon S3 存储桶
在本练习中,您创建一个Managed Service for Apache Flink,它将 Kinesis 数据流作为源,并将 Amazon S3 存储桶作为接收器。通过使用接收器,您可以在 Amazon S3 控制台中验证应用程序的输出。
注意
要为本练习设置所需的先决条件,请先完成入门指南 (DataStream API)练习。
本主题包含下列部分:
创建相关资源
在本练习中创建 Managed Service for Apache Flink之前,您需要创建以下从属资源:
-
Kinesis 数据流 (
ExampleInputStream
)。 -
存储应用程序代码和输出的 Amazon S3 存储桶 (
ka-app-code-
)<username>
注意
在 Managed Service for Apache Flink 上启用服务器端加密的情况下,Managed Service for Apache Flink 无法将数据写入 Amazon S3。
您可以使用控制台创建 Kinesis 流和 Amazon S3 存储桶。有关创建这些资源的说明,请参阅以下主题:
-
Amazon Kinesis Data Streams 开发人员指南中的创建和更新数据流。将数据流命名为
ExampleInputStream
。 -
Amazon Simple Storage Service 用户指南中的如何创建 S3 存储桶?。附加您的登录名,以便为 Amazon S3 存储桶指定全局唯一的名称,例如
ka-app-code-
。在 Amazon S3 存储桶中创建两个文件夹(<username>
code
和data
)。
如果以下 CloudWatch 资源尚不存在,则应用程序会创建这些资源:
-
名为
/AWS/KinesisAnalytics-java/MyApplication
的日志组。 -
名为
kinesis-analytics-log-stream
的日志流。
将示例记录写入输入流
在本节中,您使用 Python 脚本将示例记录写入流,以供应用程序处理。
注意
此部分需要 Amazon SDK for Python (Boto)
-
使用以下内容创建名为
stock.py
的文件:import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'event_time': datetime.datetime.now().isoformat(), 'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'price': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))
-
运行
stock.py
脚本:$ python stock.py
在完成本教程的其余部分时,请将脚本保持运行状态。
下载并检查应用程序代码
此示例的 Java 应用程序代码可从中获得 GitHub。要下载应用程序代码,请执行以下操作:
-
如果尚未安装 Git 客户端,请安装它。有关更多信息,请参阅安装 Git
。 -
使用以下命令克隆远程存储库:
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
-
导航到
amazon-kinesis-data-analytics-java-examples/S3Sink
目录。
应用程序代码位于 S3StreamingSinkJob.java
文件中。请注意有关应用程序代码的以下信息:
-
应用程序使用 Kinesis 源从源流中进行读取。以下代码段创建 Kinesis 源:
return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
-
您需要添加以下导入语句:
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
-
应用程序使用 Apache Flink S3 接收器以写入 Amazon S3。
接收器在滚动窗口中读取消息,将消息编码为 S3 存储桶对象,然后将编码的对象发送到 S3 接收器。以下代码将对象进行编码以发送到 Amazon S3:
input.map(value -> { // Parse the JSON JsonNode jsonNode = jsonParser.readValue(value, JsonNode.class); return new Tuple2<>(jsonNode.get("ticker").toString(), 1); }).returns(Types.TUPLE(Types.STRING, Types.INT)) .keyBy(v -> v.f0) // Logically partition the stream for each word .window(TumblingProcessingTimeWindows.of(Time.minutes(1))) .sum(1) // Count the appearances by ticker per partition .map(value -> value.f0 + " count: " + value.f1.toString() + "\n") .addSink(createS3SinkFromStaticConfig());
注意
应用程序使用 Flink StreamingFileSink
对象以写入 Amazon S3。有关的更多信息StreamingFileSink
,请参阅 Apache Flink 文档StreamingFileSink
修改应用程序代码
在本节中,您修改应用程序代码以将输出写入 Amazon S3 存储桶。
使用您的用户名更新以下行以指定应用程序的输出位置:
private static final String s3SinkPath = "s3a://ka-app-code-
<username>
/data";
编译应用程序代码
要编译应用程序,请执行以下操作:
-
如果还没有 Java 和 Maven,请安装它们。有关更多信息,请参阅入门指南 (DataStream API)教程中的先决条件。
-
使用以下命令编译应用程序:
mvn package -Dflink.version=1.15.3
编译应用程序将创建应用程序 JAR 文件 (target/aws-kinesis-analytics-java-apps-1.0.jar
)。
注意
提供的源代码依赖于 Java 11 中的库。
上传 Apache Flink 流式处理 Java 代码
在本节中,您将应用程序代码上传到在创建相关资源一节中创建的 Amazon S3 存储桶。
-
在 Amazon S3 控制台中,选择 ka-app-code-
<username>存储桶,导航到代码文件夹,然后选择上传。
-
在选择文件步骤中,选择添加文件。导航到您在上一步中创建的
aws-kinesis-analytics-java-apps-1.0.jar
文件。 -
您无需更改该对象的任何设置,因此,请选择上传。
您的应用程序代码现在存储在 Amazon S3 存储桶中,应用程序可以在其中访问代码。
创建并运行 Managed Service for Apache Flink
按照以下步骤,使用控制台创建、配置、更新和运行应用程序。
创建应用程序
打开 Managed Service for Apache Flink 控制台,网址为 https://console.aws.amazon.com/flink
-
在 Managed Service for Apache Flink 控制面板上,选择创建分析应用程序。
-
在Managed Service for Apache Flink - 创建应用程序页面上,提供应用程序详细信息,如下所示:
-
对于 应用程序名称 ,输入
MyApplication
。 -
对于 运行时,请选择 Apache Flink。
将版本下拉列表保留为 Apache Flink 1.15.2 (Recommended Version) (Apache Flink 1.15.2 (建议的版本))。
-
-
对于访问权限,请选择 创建/更新 IAM 角色
kinesis-analytics-MyApplication-us-west-2
。 -
选择创建应用程序。
注意
在使用控制台创建应用程序的 Managed Service for Apache Flink时,您可以选择为应用程序创建 IAM 角色和策略。您的应用程序使用此角色和策略访问其从属资源。这些 IAM 资源是使用您的应用程序名称和区域命名的,如下所示:
-
对于 应用程序名称 ,输入
MyApplication
。 -
对于 运行时,请选择 Apache Flink。
-
将版本保留为 Apache Flink 版本 1.15.2(建议的版本)。
-
-
对于访问权限,请选择 创建/更新 IAM 角色
kinesis-analytics-MyApplication-us-west-2
。 -
选择创建应用程序。
注意
使用控制台创建Managed Service for Apache Flink时,您可以选择为您的应用程序创建一个 IAM 角色和策略。您的应用程序使用此角色和策略访问其从属资源。这些 IAM 资源是使用您的应用程序名称和区域命名的,如下所示:
-
策略:
kinesis-analytics-service-
MyApplication
-us-west-2
-
角色:
kinesisanalytics-
MyApplication
-us-west-2
编辑 IAM policy
编辑 IAM policy 以添加访问 Kinesis 数据流的权限。
通过 https://console.aws.amazon.com/iam/
打开 IAM 控制台。 -
选择策略。选择控制台在上一部分中为您创建的
kinesis-analytics-service-MyApplication-us-west-2
策略。 -
在 摘要 页面上,选择 编辑策略。选择 JSON 选项卡。
-
将以下策略示例中突出显示的部分添加到策略中。将示例账户 ID (
012345678901
) 替换为您的账户 ID。将 <用户名> 替换为您的用户名。{ "Sid": "S3", "Effect": "Allow", "Action": [ "s3:Abort*", "s3:DeleteObject*", "s3:GetObject*", "s3:GetBucket*", "s3:List*", "s3:ListBucket", "s3:PutObject" ], "Resource": [ "arn:aws:s3:::ka-app-code-<username>", "arn:aws:s3:::ka-app-code-<username>/*" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:region:account-id:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:region:account-id:log-group:%LOG_GROUP_PLACEHOLDER%:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:region:account-id:log-group:%LOG_GROUP_PLACEHOLDER%:log-stream:%LOG_STREAM_PLACEHOLDER%" ] }
, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:
] }012345678901
:stream/ExampleInputStream" },
配置应用程序
-
在MyApplication页面上,选择配置。
-
在 配置应用程序 页面上,提供 代码位置:
-
对于Amazon S3 存储桶,请输入
ka-app-code-
。<username>
-
在 Amazon S3 对象的路径中,输入
code/aws-kinesis-analytics-java-apps-1.0.jar
。
-
-
在 对应用程序的访问权限 下,对于 访问权限,选择 创建/更新 IAM 角色
kinesis-analytics-MyApplication-us-west-2
。 -
在 监控 下,确保 监控指标级别 设置为 应用程序。
-
要进行CloudWatch 日志记录,请选中 “启用” 复选框。
-
选择更新。
注意
当您选择启用 CloudWatch 日志记录时,适用于 Apache Flink 的托管服务会为您创建日志组和日志流。这些资源的名称如下所示:
-
日志组:
/aws/kinesis-analytics/MyApplication
-
日志流:
kinesis-analytics-log-stream
该日志流用于监控应用程序。这与应用程序用于发送结果的日志流不同。
运行应用程序
-
在MyApplication页面上,选择 “运行”。保持不使用快照运行选项处于选中状态,然后确认操作。
-
当应用程序正在运行时,请刷新页面。控制台将显示 Application graph (应用程序图表)。
验证应用程序输出
在 Amazon S3 控制台中,打开 S3 存储桶中的 data 文件夹。
几分钟后,将显示包含来自应用程序的聚合数据的对象。
注意
在 Flink 中,默认情况下已启用聚合。要禁用,请使用以下命令:
sink.producer.aggregation-enabled' = 'false'
可选:自定义源和接收器
在本节中,您将自定义源对象和接收器对象的设置。
注意
更改以下各节所述的代码部分后,请执行以下操作以重新加载应用程序代码:
-
重复本编译应用程序代码节中的步骤以编译更新的应用程序代码。
-
重复本上传 Apache Flink 流式处理 Java 代码节中的步骤以编译更新的应用程序代码。
-
在控制台的应用程序页面上,选择配置,然后选择更新,将更新的应用程序代码重新加载到您的应用程序中。
配置数据分区
在本节中,您将配置流式文件接收器在 S3 存储桶中创建的文件夹的名称。可以通过向流式文件接收器添加存储桶分配器来完成此操作。
要自定义在 S3 存储桶中创建的文件夹名称,请执行以下操作:
-
在
S3StreamingSinkJob.java
文件开头添加以下导入语句:import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
-
更新代码中的
createS3SinkFromStaticConfig()
方法,使其看起来与以下内容类似:private static StreamingFileSink<String> createS3SinkFromStaticConfig() { final StreamingFileSink<String> sink = StreamingFileSink .forRowFormat(new Path(s3SinkPath), new SimpleStringEncoder<String>("UTF-8"))
.withBucketAssigner(new DateTimeBucketAssigner("yyyy-MM-dd--HH")) .withRollingPolicy(DefaultRollingPolicy.create().build())
.build(); return sink; }
前面的代码示例使用带有自定义日期格式的DateTimeBucketAssigner
,在 S3 存储桶中创建文件夹。DateTimeBucketAssigner
使用当前系统时间来创建存储桶名称。如果您想创建自定义存储桶分配器以进一步自定义已创建的文件夹名称,则可以创建一个实现BucketAssignergetBucketId
方法实现自定义逻辑。
自定义实现BucketAssigner
可以使用 Context
配置读取频率
在本节中,您将配置对源流的读取频率。
默认情况下,Kinesis Streams 使用者每秒从源流中读取五次。如果有多个客户端从流中读取数据,或者应用程序需要重试读取记录,则此频率将导致出现问题。您可以通过设置使用者的读取频率来避免这些问题。
要设置 Kinesis 使用者的读取频率,您需要设置该SHARD_GETRECORDS_INTERVAL_MILLIS
设置。
以下代码示例将SHARD_GETRECORDS_INTERVAL_MILLIS
设置设置为一秒:
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "1000");
配置写入缓冲
在本节中,您将配置接收器的写入频率和其他设置。
默认情况下,应用程序每分钟写入一次目标存储桶。您可以通过配置DefaultRollingPolicy
对象来更改此间隔和其他设置。
注意
每次应用程序创建检查点时,Apache Flink 流式文件接收器都会写入其输出存储桶。默认情况下,应用程序每分钟创建一个检查点。要增加 S3 接收器的写入间隔,还必须增加检查点间隔。
若要配置DefaultRollingPolicy
对象,请执行以下操作:
-
增加应用程序的
CheckpointInterval
设置。以下 UpdateApplication操作输入将检查点间隔设置为 10 分钟:{ "ApplicationConfigurationUpdate": { "FlinkApplicationConfigurationUpdate": { "CheckpointConfigurationUpdate": { "ConfigurationTypeUpdate" : "CUSTOM", "CheckpointIntervalUpdate": 600000 } } }, "ApplicationName": "MyApplication", "CurrentApplicationVersionId":
5
}要使用上述代码,请指定当前应用程序版本。您可以使用ListApplications操作检索应用程序版本。
-
在
S3StreamingSinkJob.java
文件开头添加以下导入语句:import java.util.concurrent.TimeUnit;
-
更新
S3StreamingSinkJob.java
文件中的createS3SinkFromStaticConfig
方法,使其看起来与以下内容类似:private static StreamingFileSink<String> createS3SinkFromStaticConfig() { final StreamingFileSink<String> sink = StreamingFileSink .forRowFormat(new Path(s3SinkPath), new SimpleStringEncoder<String>("UTF-8"))
.withBucketAssigner(new DateTimeBucketAssigner("yyyy-MM-dd--HH")) .withRollingPolicy( DefaultRollingPolicy.create() .withRolloverInterval(TimeUnit.MINUTES.toMillis(8)) .withInactivityInterval(TimeUnit.MINUTES.toMillis(5)) .withMaxPartSize(1024 * 1024 * 1024) .build())
.build(); return sink; }前面的代码示例将写入 Amazon S3 存储桶的频率设置为 8 分钟。
有关配置 Apache Flink 流式文件接收器的更多信息,请参阅 Apache Flink 文档
清理 Amazon 资源
本节包括清理在 Amazon S3 教程中创建的Amazon资源的步骤。
本主题包含下列部分:
删除 Managed Service for Apache Flink 应用程序
打开 Managed Service for Apache Flink 控制台,网址为 https://console.aws.amazon.com/flink
-
在 “适用于 Apache Flink 的托管服务” 面板中,选择。MyApplication
-
在应用程序的页面中,选择删除,然后确认删除。
删除 Kinesis 数据流
打开 Kinesis 控制台,网址为:https://console.aws.amazon.com/kinesis
。 -
在 Kinesis Data Streams 面板中,ExampleInputStream选择。
-
在ExampleInputStream页面上,选择 “删除 Kinesis Stream”,然后确认删除。
删除您的 Amazon S3 对象和存储桶
打开 Amazon S3 控制台,网址为:https://console.aws.amazon.com/s3/
。 -
选择 ka-app-code-
存储桶。 <username>
-
选择 删除,然后输入存储桶名称以确认删除。
删除您的 IAM 资源
通过 https://console.aws.amazon.com/iam/
打开 IAM 控制台。 -
在导航栏中,选择策略。
-
在筛选条件控件中,输入 kinesis。
-
选择 kinesis-analytics-service--us-MyApplication west-2 策略。
-
选择 策略操作,然后选择 删除。
-
在导航栏中,选择角色。
-
选择 k inesis-analytics-us-west-2 角色MyApplication。
-
选择 删除角色,然后确认删除。
删除您的 CloudWatch 资源
打开 CloudWatch 控制台,网址为 https://console.aws.amazon.com/cloudwatch/
。 -
在导航栏中,选择日志。
-
选择 /aws/kinesis-analytics/ 日志组MyApplication。
-
选择 删除日志组,然后确认删除。