教程:使用Managed Service for Apache Flink的应用程序将数据从 MSK 集群中的一个主题复制到 VPC 中的另一个主题 - Managed Service for Apache Flink
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

教程:使用Managed Service for Apache Flink的应用程序将数据从 MSK 集群中的一个主题复制到 VPC 中的另一个主题

下面的教程演示了如何创建带有 Amazon MSK 集群和两个主题的 Amazon VPC,以及如何创建Managed Service for Apache Flink的应用程序,用于从一个 Amazon MSK 主题读取数据并写入另一个主题。

注意

要为本练习设置所需的先决条件,请先完成入门指南 (DataStream API)练习。

创建带有 Amazon MSK 集群的 Amazon VPC

要创建示例 VPC 和 Amazon MSK 集群以从Managed Service for Apache Flink的应用程序进行访问,请按照 Amazon MSK 入门教程进行操作。

在完成本教程时,请注意以下几点:

  • 步骤 3:创建主题中,重复 kafka-topics.sh --create 命令以创建名为 AWSKafkaTutorialTopicDestination 的目标主题:

    bin/kafka-topics.sh --create --zookeeper ZooKeeperConnectionString --replication-factor 3 --partitions 1 --topic AmazonKafkaTutorialTopicDestination
  • 记录集群的引导服务器列表。您可以使用以下命令获取引导服务器列表(ClusterArn替换为 MSK 集群的 ARN):

    aws kafka get-bootstrap-brokers --region us-west-2 --cluster-arn ClusterArn {... "BootstrapBrokerStringTls": "b-2.awskafkatutorialcluste.t79r6y.c4.kafka.us-west-2.amazonaws.com:9094,b-1.awskafkatutorialcluste.t79r6y.c4.kafka.us-west-2.amazonaws.com:9094,b-3.awskafkatutorialcluste.t79r6y.c4.kafka.us-west-2.amazonaws.com:9094" }
  • 在执行教程中的步骤时,请务必在代码、命令和控制台条目中使用所选的 Amazon 区域。

创建应用程序代码

在本节中,您下载并编译应用程序 JAR 文件。我们建议使用 Java 11。

此示例的 Java 应用程序代码可从中获得 GitHub。要下载应用程序代码,请执行以下操作:

  1. 如果尚未安装 Git 客户端,请安装它。有关更多信息,请参阅安装 Git

  2. 使用以下命令克隆远程存储库:

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
  3. 应用程序代码位于 amazon-kinesis-data-analytics-java-examples/KafkaConnectors/KafkaGettingStartedJob.java 文件中。您可以检查代码以熟悉Managed Service for Apache Flink的应用程序代码的结构。

  4. 使用命令行 Maven 工具或首选的开发环境以创建 JAR 文件。要使用命令行 Maven 工具编译 JAR 文件,请输入以下内容:

    mvn package -Dflink.version=1.15.3

    如果构建成功,则会创建以下文件:

    target/KafkaGettingStartedJob-1.0.jar
    注意

    提供的源代码依赖于 Java 11 中的库。如果您使用的是开发环境,

上传 Apache Flink 流式处理 Java 代码

在本节中,您将应用程序代码上传到在入门指南 (DataStream API)教程中创建的 Amazon S3 存储桶。

注意

如果您从入门教程中删除了 Amazon S3 存储桶,请再次执行上传 Apache Flink 流式处理 Java 代码步骤。

  1. 在 Amazon S3 控制台中,选择 ka-app-code- <username>存储桶,然后选择上传

  2. 选择文件步骤中,选择添加文件。导航到您在上一步中创建的 KafkaGettingStartedJob-1.0.jar 文件。

  3. 您无需更改该对象的任何设置,因此,请选择上传

您的应用程序代码现在存储在 Amazon S3 存储桶中,应用程序可以在其中访问代码。

创建应用程序

  1. 打开 Managed Service for Apache Flink 控制台,网址为 https://console.aws.amazon.com/flink

  2. 在 Managed Service for Apache Flink 控制面板上,选择创建分析应用程序

  3. Managed Service for Apache Flink - 创建应用程序页面上,提供应用程序详细信息,如下所示:

    • 对于应用程序名称,输入 MyApplication

    • 对于运行时,请选择 Apache Flink 版本 1.15.2

  4. 对于访问权限,请选择创建/更新 IAM 角色kinesis-analytics-MyApplication-us-west-2

  5. 选择创建应用程序

注意

在使用控制台创建应用程序的 Managed Service for Apache Flink时,您可以选择为应用程序创建 IAM 角色和策略。您的应用程序使用此角色和策略访问其从属资源。这些 IAM 资源是使用您的应用程序名称和区域命名的,如下所示:

  • 策略:kinesis-analytics-service-MyApplication-us-west-2

  • 角色:kinesisanalytics-MyApplication-us-west-2

配置应用程序

  1. MyApplication页面上,选择配置

  2. 配置应用程序 页面上,提供 代码位置

    • 对于Amazon S3 存储桶,请输入ka-app-code-<username>

    • 在 Amazon S3 对象的路径中,输入KafkaGettingStartedJob-1.0.jar

  3. 对应用程序的访问权限 下,对于 访问权限,选择 创建/更新 IAM 角色 kinesis-analytics-MyApplication-us-west-2

    注意

    当您使用控制台指定应用程序资源(例如 CloudWatch 日志或 Amazon VPC)时,控制台会修改您的应用程序执行角色以授予访问这些资源的权限。

  4. Properties (属性) 下面,选择 Add Group (添加组)。输入以下属性:

    组 ID
    KafkaSource topic AmazonKafkaTutorialTopic
    KafkaSource bootstrap.servers 您以前保存的引导服务器列表
    KafkaSource security.protocol SSL
    KafkaSource ssl.truststore.location /usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts
    KafkaSource ssl.truststore.password changeit
    注意

    默认证书的 ssl.truststore.password 为“changeit”;如果使用默认证书,则不需要更改该值。

    再次选择 Add Group (添加组)。输入以下属性:

    组 ID
    KafkaSink topic AmazonKafkaTutorialTopicDestination
    KafkaSink bootstrap.servers 您以前保存的引导服务器列表
    KafkaSink security.protocol SSL
    KafkaSink ssl.truststore.location /usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts
    KafkaSink ssl.truststore.password changeit
    KafkaSink transaction.timeout.ms 1000

    应用程序代码读取上述应用程序属性,以配置用于与 VPC 和 Amazon MSK 集群交互的源和接收器。有关使用属性的更多信息,请参阅运行时属性

  5. Snapshots (快照) 下面,选择 Disable (禁用)。这样,就可以轻松更新应用程序,而无需加载无效的应用程序状态数据。

  6. 监控 下,确保 监控指标级别 设置为 应用程序

  7. 要进行CloudWatch 日志记录,请选中 “启用” 复选框。

  8. Virtual Private Cloud (VPC) 部分中,选择要与应用程序关联的 VPC。选择与您的 VPC 关联的子网和安全组,您希望应用程序使用它们访问 VPC 资源。

  9. 选择更新

注意

当您选择启用 CloudWatch 日志记录时,适用于 Apache Flink 的托管服务会为您创建日志组和日志流。这些资源的名称如下所示:

  • 日志组:/aws/kinesis-analytics/MyApplication

  • 日志流:kinesis-analytics-log-stream

该日志流用于监控应用程序。

运行应用程序

可以通过运行应用程序、打开 Apache Flink 控制面板并选择所需的 Flink 任务来查看 Flink 任务图。

测试应用程序

在本节中,您将记录写入到源主题。应用程序从源主题中读取记录,并将其写入到目标主题中。您可以将记录写入到源主题以及从目标主题中读取记录,以验证应用程序是否正常工作。

要写入和读取主题中的记录,请按照 Amazon MSK 入门教程中的步骤 6:生成和使用数据中的步骤进行操作。

要从目标主题中读取,请在到集群的第二个连接中使用目标主题名称,而不是源主题:

bin/kafka-console-consumer.sh --bootstrap-server BootstrapBrokerString --consumer.config client.properties --topic AmazonKafkaTutorialTopicDestination --from-beginning

如果在目标主题中没有任何记录,请参阅故障排除主题中的无法访问 VPC 中的资源一节。