为 Apache Flink 应用程序创建和运行 Kinesis Data Analytics - Amazon Kinesis Data Analytics
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

为 Apache Flink 应用程序创建和运行 Kinesis Data Analytics

在本练习中,您创建一个 Kinesis Data Analytics 应用程序,它将 Amazon MSK 主题作为源,并将 Amazon S3 存储桶作为接收器。

创建相关资源

在为本练习创建针对 Apache Flink 的 Kinesis Data Analytics 应用程序之前,请创建以下相关资源:

  • 基于 Amazon VPC 和 Amazon MSK 集群的虚拟私有云 (VPC)

  • 存储应用程序代码和输出的 Amazon S3 存储桶 (ka-app-<username>

创建 VPC 和 Amazon MSK 群集

要创建 VPC 和 Amazon MSK 集群以从 Kinesis Data Analytics 应用程序中访问,请执行入门使用 Amazon MSK教程。

在完成本教程时,请注意以下几点:

  • 记录集群的引导服务器列表。您可以使用以下命令获取引导服务器列表,替换ClusterArn使用 MSK 集群的 Amazon 资源名称 (ARN):

    aws kafka get-bootstrap-brokers --region us-west-2 --cluster-arn ClusterArn {... "BootstrapBrokerStringTls": "b-2.awskafkatutorialcluste.t79r6y.c4.kafka.us-west-2.amazonaws.com:9094,b-1.awskafkatutorialcluste.t79r6y.c4.kafka.us-west-2.amazonaws.com:9094,b-3.awskafkatutorialcluste.t79r6y.c4.kafka.us-west-2.amazonaws.com:9094" }
  • 在执行教程中的步骤时,请务必使用所选的Amazon代码、命令和控制台条目中的区域。

创建 Amazon S3 存储桶

您可以使用控制台创建 Amazon S3 存储桶。有关创建此资源的说明,请参阅以下主题:

  • 如何创建 S3 存储桶?中的Amazon Simple Storage Service 用户指南. 附加您的登录名,以便为 Amazon S3 存储桶指定全局唯一的名称,例如ka-app-<username>.

其他资源

当您创建应用程序时,Kinesis Data Analytics 会创建以下亚马逊CloudWatch资源(如果不存在)

  • 名为 /aws/kinesis-analytics-java/MyApplication 的日志组。

  • 名为 kinesis-analytics-log-stream 的日志流。

将示例记录写入输入流

在本节中,您使用 Python 脚本将示例记录写入 Amazon MSK 主题,以供应用程序处理。

  1. Connect 到您在中创建的客户端实例步骤 4: 创建客户端计算机开始使用亚马逊 MSK教程。

  2. 安装 Python3、Pip 和 Kafka Python 库:

    $ sudo yum install python37 $ curl -O https://bootstrap.pypa.io/get-pip.py $ python3 get-pip.py --user $ pip install kafka-python
  3. 使用以下内容创建名为 stock.py 的文件。替换BROKERS您之前记录的引导经纪商名单的价值。

    from kafka import KafkaProducer import json import random from datetime import datetime BROKERS = "<Bootstrap Brokers List>" producer = KafkaProducer( bootstrap_servers=BROKERS, value_serializer=lambda v: json.dumps(v).encode('utf-8'), retry_backoff_ms=500, request_timeout_ms=20000, security_protocol='PLAINTEXT') def getStock(): data = {} now = datetime.now() str_now = now.strftime("%Y-%m-%d %H:%M:%S") data['event_time'] = str_now data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']) price = random.random() * 100 data['price'] = round(price, 2) return data while True: data =getStock() # print(data) try: future = producer.send("AWSKafkaTutorialTopic", value=data) producer.flush() record_metadata = future.get(timeout=10) print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset)) except Exception as e: print(e.with_traceback())
  4. 在本教程的后面部分,您运行 stock.py 脚本,以将数据发送到应用程序。

    $ python3 stock.py

下载并检查 Apache Flink 流式处理 Java 代码

此示例的 Java 应用程序代码可从GitHub.

要下载 Java 应用程序代码

  1. 使用以下命令克隆远程存储库:

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples
  2. 导航到 amazon-kinesis-data-analytics-java-examples/GettingStartedTable目录。

请注意有关应用程序代码的以下信息:

  • 一个项目对象模型 (pom.xml)文件包含有关应用程序的配置和依赖关系(包括 Kinesis Data Analytics 库)的信息。

  • StreamingJob.java 文件包含定义应用程序功能的 main 方法。

  • 该应用程序使用FlinkKafkaConsumer阅读亚马逊 MSK 主题。以下代码段创建FlinkKafkaConsumer对象:

    final FlinkKafkaConsumer<StockRecord> consumer = new FlinkKafkaConsumer<StockRecord>(kafkaTopic, new KafkaEventDeserializationSchema(), kafkaProps);
  • 您的应用程序使用创建源和接收连接器以访问外部资源StreamExecutionEnvironmentTableEnvironment对象。

  • 应用程序使用动态应用程序属性创建源连接器和接收器连接器,因此您可以在不重新编译代码的情况下指定应用程序参数(如 S3 存储桶)。

    //read the parameters from the Kinesis Analytics environment Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); Properties flinkProperties = null; String kafkaTopic = parameter.get("kafka-topic", "AWSKafkaTutorialTopic"); String brokers = parameter.get("brokers", ""); String s3Path = parameter.get("s3Path", ""); if (applicationProperties != null) { flinkProperties = applicationProperties.get("FlinkApplicationProperties"); } if (flinkProperties != null) { kafkaTopic = flinkProperties.get("kafka-topic").toString(); brokers = flinkProperties.get("brokers").toString(); s3Path = flinkProperties.get("s3Path").toString(); }

    有关运行时属性的更多信息,请参阅运行时属性

编译应用程序代码

在本节中,您使用 Apache Maven 编译器创建应用程序的 Java 代码。有关安装 Apache Maven 和 Java 开发工具包 (JDK) 的信息,请参阅完成练习的先决条件

编译应用程序代码

  1. 要使用您的应用程序代码,您将其编译和打包成 JAR 文件。您可以通过两种方式之一编译和打包您的代码:

    • 使用命令行 Maven 工具。在包含 pom.xml 文件的目录中通过运行以下命令创建您的 JAR 文件:

      mvn package -Dflink.version=1.13.2
    • 设置开发环境。有关详细信息,请参阅您的开发环境文档。

      注意

      提供的源代码依赖于 Java 11 中的库。

    您可以作为 JAR 文件上传您的包,也可以将包压缩为 ZIP 文件并上传。如果您使用 Amazon CLI 创建应用程序,您可以指定您的代码内容类型(JAR 或 ZIP)。

  2. 如果编译时出错,请验证 JAVA_HOME 环境变量设置正确。

如果应用程序成功编译,则创建以下文件:

target/aws-kinesis-analytics-java-apps-1.0.jar

上传 Apache Flink 流式处理 Java 代码

在本节中,您将创建 Amazon S3 存储桶并上传应用程序代码。

上传应用程序代码

  1. 通过以下网址打开 Amazon S3 控制台:https://console.aws.amazon.com/s3/

  2. 选择 Create bucket (创建存储桶)

  3. Enterka-app-code-<username>中的Bucket name字段中返回的子位置类型。将后缀(如您的用户名)添加到存储桶名称,以使其具有全局唯一性。请选择 Next (下一步)

  4. 配置选项步骤中,让设置保持原样,然后选择下一步

  5. 设置权限步骤中,让设置保持原样,然后选择下一步

  6. 请选择 Create bucket (创建存储桶)

  7. 在 Amazon S3 控制台中,选择ka-app-code-<username>存储桶,然后选择上传.

  8. 选择文件步骤中,选择添加文件。导航到您在上一步中创建的 aws-kinesis-analytics-java-apps-1.0.jar 文件。请选择 Next (下一步)

  9. 您无需更改该对象的任何设置,因此,请选择 Upload (上传)

您的应用程序代码现在存储在 Amazon S3 存储桶中,应用程序可以在其中访问代码。

创建和运行 Kinesis Data Analytics 应用程序

按照以下步骤,使用控制台创建、配置、更新和运行应用程序。

创建 应用程序

  1. 打开 Kinesis Data Analytics 控制台https://console.aws.amazon.com/kinesisanalytics.

  2. 在 Kinesis Data Analytics 仪表板上,选择创建分析应用.

  3. Kinesis Analytics – 创建应用程序页面上,提供应用程序详细信息,如下所示:

    • 对于 Application name (应用程序名称),输入 MyApplication

    • 对于描述,输入 My java test app

    • 对于 Runtime (运行时),请选择 Apache Flink

    • 将版本保留为Apache Flink 1.13.2 (推荐版本).

  4. 对于访问权限,请选择 Create / update IAM role (创建/更新 IAM 角色) kinesis-analytics-MyApplication-us-west-2

  5. 选择 Create application(创建应用程序)。

注意

使用控制台创建 Kinesis Data Analytics 应用程序时,您可以选择为您的应用程序创建 IAM 角色和策略。您的应用程序使用此角色和策略访问其从属资源。这些 IAM 资源使用您的应用程序名称和区域命名,如下所示:

  • 策略:kinesis-analytics-service-MyApplication-us-west-2

  • 角色:kinesis-analytics-MyApplication-us-west-2

编辑 IAM 策略

编辑 IAM 策略以添加权限,以访问 Amazon S3 存储桶。

编辑 IAM 策略以添加 S3 存储桶权限

  1. 通过以下网址打开 IAM 控制台:https://console.aws.amazon.com/iam/

  2. 选择策略。选择控制台在上一部分中为您创建的 kinesis-analytics-service-MyApplication-us-west-2 策略。

  3. Summary (摘要) 页面上,选择 Edit policy (编辑策略)。请选择 JSON 选项卡。

  4. 将以下策略示例中突出显示的部分添加到策略中。

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-username/aws-kinesis-analytics-java-apps-1.0.jar" ] }, { "Sid": "DescribeLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "WriteObjects", "Effect": "Allow", "Action": [ "s3:Abort*", "s3:DeleteObject*", "s3:GetObject*", "s3:GetBucket*", "s3:List*", "s3:ListBucket", "s3:PutObject" ], "Resource": [ "arn:aws:s3:::ka-app-<username>", "arn:aws:s3:::ka-app-<username>/*" ] } ] }

配置应用程序

可以使用以下过程配置应用程序。

要配置应用程序

  1. 在存储库的MyApplication在页面上,选择配置.

  2. Configure application (配置应用程序) 页面上,提供 Code location (代码位置)

    • 适用于Amazon S3 存储桶输入ka-app-code-<username>.

    • 适用于指向 Amazon S3 对象的路径输入aws-kinesis-analytics-java-apps-1.0.jar.

  3. Access to application resources (对应用程序的访问权限) 下,对于 Access permissions (访问权限),选择 Create / update IAM role (创建/更新 IAM 角色) kinesis-analytics-MyApplication-us-west-2

  4. UNDER属性,选择创建组. 适用于Group ID (组 ID)输入FlinkApplicationProperties.

  5. 输入以下应用程序属性和值:

    密钥
    kafka-topic AWSKafkaTutorialTopic
    brokers Your Amazon MSK cluster's Bootstrap Brokers list
    s3Path ka-app-<username>
    security.protocol SSL
    ssl.truststore.location /usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts
    ssl.truststore.password changeit
  6. Monitoring (监控) 下,确保 Monitoring metrics level (监控指标级别) 设置为 Application (应用程序)

  7. 适用于CloudWatch记录中,选择启用”复选框。

  8. Virtual Private Cloud (VPC)选择部分,选择基于 Amazon MSK 集群的 VPC 配置. 选择AWSKafkaTutorialCluster.

  9. 选择 Update(更新)。

注意

当您选择启用亚马逊时CloudWatchKinesis Data Analytics 为您创建日志组和日志流。这些资源的名称如下所示:

  • 日志组:/aws/kinesis-analytics/MyApplication

  • 日志流:kinesis-analytics-log-stream

运行应用程序

可以使用以下过程运行应用程序。

要运行应用程序

  1. 在存储库的MyApplication在页面上,选择运行. 确认该操作。

  2. 当应用程序正在运行时,请刷新页面。控制台将显示 Application graph (应用程序图表)

  3. 在您的 Amazon EC2 客户端中,运行之前创建的 Python 脚本,将记录写入 Amazon MSK 集群以便您的应用程序处理:

    $ python3 stock.py

停止应用程序

要停止应用程序,请在MyApplication在页面上,选择停止. 确认该操作。

下一个步骤

清理 Amazon 资源