Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
教程:使用Managed Service for Apache Flink的应用程序将数据从 MSK 集群中的一个主题复制到 VPC 中的另一个主题
下面的教程演示了如何创建带有 Amazon MSK 集群和两个主题的 Amazon VPC,以及如何创建Managed Service for Apache Flink的应用程序,用于从一个 Amazon MSK 主题读取数据并写入另一个主题。
注意
要为本练习设置所需的先决条件,请先完成入门指南 (DataStream API)练习。
本教程包含以下部分:
创建带有 Amazon MSK 集群的 Amazon VPC
要创建示例 VPC 和 Amazon MSK 集群以从Managed Service for Apache Flink的应用程序进行访问,请按照 Amazon MSK 入门教程进行操作。
在完成本教程时,请注意以下几点:
在步骤 3:创建主题中,重复
kafka-topics.sh --create
命令以创建名为AWSKafkaTutorialTopicDestination
的目标主题:bin/kafka-topics.sh --create --zookeeper
ZooKeeperConnectionString
--replication-factor 3 --partitions 1 --topic AmazonKafkaTutorialTopicDestination记录集群的引导服务器列表。您可以使用以下命令获取引导服务器列表(
ClusterArn
替换为 MSK 集群的 ARN):aws kafka get-bootstrap-brokers --region us-west-2 --cluster-arn
ClusterArn
{... "BootstrapBrokerStringTls": "b-2.awskafkatutorialcluste.t79r6y.c4.kafka.us-west-2.amazonaws.com:9094,b-1.awskafkatutorialcluste.t79r6y.c4.kafka.us-west-2.amazonaws.com:9094,b-3.awskafkatutorialcluste.t79r6y.c4.kafka.us-west-2.amazonaws.com:9094" }在执行教程中的步骤时,请务必在代码、命令和控制台条目中使用所选的 Amazon 区域。
创建应用程序代码
在本节中,您下载并编译应用程序 JAR 文件。我们建议使用 Java 11。
此示例的 Java 应用程序代码可从中获得 GitHub。要下载应用程序代码,请执行以下操作:
如果尚未安装 Git 客户端,请安装它。有关更多信息,请参阅安装 Git
。 使用以下命令克隆远程存储库:
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
应用程序代码位于
amazon-kinesis-data-analytics-java-examples/KafkaConnectors/KafkaGettingStartedJob.java
文件中。您可以检查代码以熟悉Managed Service for Apache Flink的应用程序代码的结构。使用命令行 Maven 工具或首选的开发环境以创建 JAR 文件。要使用命令行 Maven 工具编译 JAR 文件,请输入以下内容:
mvn package -Dflink.version=1.15.3
如果构建成功,则会创建以下文件:
target/KafkaGettingStartedJob-1.0.jar
注意
提供的源代码依赖于 Java 11 中的库。如果您使用的是开发环境,
上传 Apache Flink 流式处理 Java 代码
在本节中,您将应用程序代码上传到在入门指南 (DataStream API)教程中创建的 Amazon S3 存储桶。
注意
如果您从入门教程中删除了 Amazon S3 存储桶,请再次执行上传 Apache Flink 流式处理 Java 代码步骤。
-
在 Amazon S3 控制台中,选择 ka-app-code-
<username>存储桶,然后选择上传。
-
在选择文件步骤中,选择添加文件。导航到您在上一步中创建的
KafkaGettingStartedJob-1.0.jar
文件。 您无需更改该对象的任何设置,因此,请选择上传。
您的应用程序代码现在存储在 Amazon S3 存储桶中,应用程序可以在其中访问代码。
创建应用程序
打开 Managed Service for Apache Flink 控制台,网址为 https://console.aws.amazon.com/flink
-
在 Managed Service for Apache Flink 控制面板上,选择创建分析应用程序。
-
在Managed Service for Apache Flink - 创建应用程序页面上,提供应用程序详细信息,如下所示:
-
对于应用程序名称,输入
MyApplication
。 -
对于运行时,请选择 Apache Flink 版本 1.15.2。
-
-
对于访问权限,请选择创建/更新 IAM 角色
kinesis-analytics-MyApplication-us-west-2
。 -
选择创建应用程序。
注意
在使用控制台创建应用程序的 Managed Service for Apache Flink时,您可以选择为应用程序创建 IAM 角色和策略。您的应用程序使用此角色和策略访问其从属资源。这些 IAM 资源是使用您的应用程序名称和区域命名的,如下所示:
-
策略:
kinesis-analytics-service-
MyApplication
-us-west-2
-
角色:
kinesisanalytics-
MyApplication
-us-west-2
配置应用程序
-
在MyApplication页面上,选择配置。
-
在 配置应用程序 页面上,提供 代码位置:
-
对于Amazon S3 存储桶,请输入
ka-app-code-
。<username>
-
在 Amazon S3 对象的路径中,输入
KafkaGettingStartedJob-1.0.jar
。
-
-
在 对应用程序的访问权限 下,对于 访问权限,选择 创建/更新 IAM 角色
kinesis-analytics-MyApplication-us-west-2
。注意
当您使用控制台指定应用程序资源(例如 CloudWatch 日志或 Amazon VPC)时,控制台会修改您的应用程序执行角色以授予访问这些资源的权限。
-
在 Properties (属性) 下面,选择 Add Group (添加组)。输入以下属性:
组 ID 键 值 KafkaSource
topic AmazonKafkaTutorialTopic KafkaSource
bootstrap.servers 您以前保存的引导服务器列表
KafkaSource
security.protocol SSL KafkaSource
ssl.truststore.location /usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts KafkaSource
ssl.truststore.password changeit 注意
默认证书的 ssl.truststore.password 为“changeit”;如果使用默认证书,则不需要更改该值。
再次选择 Add Group (添加组)。输入以下属性:
组 ID 键 值 KafkaSink
topic AmazonKafkaTutorialTopicDestination KafkaSink
bootstrap.servers 您以前保存的引导服务器列表
KafkaSink
security.protocol SSL KafkaSink
ssl.truststore.location /usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts KafkaSink
ssl.truststore.password changeit KafkaSink
transaction.timeout.ms 1000 应用程序代码读取上述应用程序属性,以配置用于与 VPC 和 Amazon MSK 集群交互的源和接收器。有关使用属性的更多信息,请参阅运行时属性。
-
在 Snapshots (快照) 下面,选择 Disable (禁用)。这样,就可以轻松更新应用程序,而无需加载无效的应用程序状态数据。
-
在 监控 下,确保 监控指标级别 设置为 应用程序。
-
要进行CloudWatch 日志记录,请选中 “启用” 复选框。
-
在 Virtual Private Cloud (VPC) 部分中,选择要与应用程序关联的 VPC。选择与您的 VPC 关联的子网和安全组,您希望应用程序使用它们访问 VPC 资源。
-
选择更新。
注意
当您选择启用 CloudWatch 日志记录时,适用于 Apache Flink 的托管服务会为您创建日志组和日志流。这些资源的名称如下所示:
-
日志组:
/aws/kinesis-analytics/MyApplication
-
日志流:
kinesis-analytics-log-stream
该日志流用于监控应用程序。
运行应用程序
可以通过运行应用程序、打开 Apache Flink 控制面板并选择所需的 Flink 任务来查看 Flink 任务图。
测试应用程序
在本节中,您将记录写入到源主题。应用程序从源主题中读取记录,并将其写入到目标主题中。您可以将记录写入到源主题以及从目标主题中读取记录,以验证应用程序是否正常工作。
要写入和读取主题中的记录,请按照 Amazon MSK 入门教程中的步骤 6:生成和使用数据中的步骤进行操作。
要从目标主题中读取,请在到集群的第二个连接中使用目标主题名称,而不是源主题:
bin/kafka-console-consumer.sh --bootstrap-server
BootstrapBrokerString
--consumer.config client.properties --topic AmazonKafkaTutorialTopicDestination --from-beginning
如果在目标主题中没有任何记录,请参阅故障排除主题中的无法访问 VPC 中的资源一节。