入门 (Scala) - Managed Service for Apache Flink
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

入门 (Scala)

注意

从 Flink 1.15 版本开始,Scala 免费。应用程序现在可以使用任何 Scala 版本的 Java API。Flink 仍然在内部的一些关键组件中使用 Scala,但没有将 Scala 暴露到用户代码类加载器中。因此,用户需要将 Scala 从属项添加到其 jar 存档中。

有关 Flink 1.15 中 Scala 变更的更多信息,请参阅 Sc ala Free in One Fi fteen。

在本练习中,您将创建面向 Scala 的 Managed Service for Apache Flink 应用程序,并将 Kinesis 流作为源和接收器。

创建相关资源

在本练习中,创建Managed Service for Apache Flink的应用程序之前,您需要创建以下从属资源:

  • 两个 Kinesis 流用于输入和输出。

  • 存储应用程序代码 (ka-app-code-<username>) 的 Amazon S3 存储桶

您可以使用控制台创建 Kinesis 流和 Amazon S3 存储桶。有关创建这些资源的说明,请参阅以下主题:

  • Amazon Kinesis Data Streams 开发人员指南中的创建和更新数据流。将数据流命名为 ExampleInputStreamExampleOutputStream

    创建数据流 (Amazon CLI)

    • 要创建第一个流 (ExampleInputStream),请使用以下 Amazon Kinesis create-stream(创建-流)Amazon CLI命令。

      aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
    • 要创建应用程序用来写入输出的第二个流,请运行同一命令(将流名称更改为 ExampleOutputStream)。

      aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
  • 《Amazon Simple Storage Service 用户指南》中的如何创建 S3 存储桶?。附加您的登录名,以便为 Amazon S3 存储桶指定全局唯一的名称,例如 ka-app-code-<username>

其他资源

在您创建应用程序时,Managed Service for Apache Flink 会创建以下 Amazon CloudWatch 资源(如果这些资源尚不存在):

  • 名为 /AWS/KinesisAnalytics-java/MyApplication 的日志组

  • 名为 kinesis-analytics-log-stream 的日志流

将示例记录写入输入流

在本节中,您使用 Python 脚本将示例记录写入流,以供应用程序处理。

注意

此部分需要 Amazon SDK for Python (Boto)

注意

本节中的 Python 脚本使用Amazon CLI。您必须将您的配置Amazon CLI为使用您的账户凭证和默认区域。要配置您的 Amazon CLI,请输入以下内容:

aws configure
  1. 使用以下内容创建名为 stock.py 的文件:

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'event_time': datetime.datetime.now().isoformat(), 'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'price': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))
  2. 运行 stock.py 脚本:

    $ python stock.py

    在完成本教程的其余部分时,请将脚本保持运行状态。

下载并检查应用程序代码

在 GitHub 中提供了该示例的 Python 应用程序代码。要下载应用程序代码,请执行以下操作:

  1. 如果尚未安装 Git 客户端,请安装它。有关更多信息,请参阅安装 Git

  2. 使用以下命令克隆远程存储库:

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
  3. 导航到 amazon-kinesis-data-analytics-java-examples/scala/GettingStarted目录。

请注意有关应用程序代码的以下信息:

  • build.sbt 文件包含有关应用程序的配置和从属项的信息,包括Managed Service for Apache Flink的库。

  • BasicStreamingJob.scala 文件包含定义应用程序功能的主要方法。

  • 应用程序使用 Kinesis 源从源流中进行读取。以下代码段创建 Kinesis 源:

    private def createSource: FlinkKinesisConsumer[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val inputProperties = applicationProperties.get("ConsumerConfigProperties") new FlinkKinesisConsumer[String](inputProperties.getProperty(streamNameKey, defaultInputStreamName), new SimpleStringSchema, inputProperties) }

    该应用程序还使用 Kinesis 接收器写入结果流。以下代码段创建 Kinesis 接收器:

    private def createSink: KinesisStreamsSink[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val outputProperties = applicationProperties.get("ProducerConfigProperties") KinesisStreamsSink.builder[String] .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema) .setStreamName(outputProperties.getProperty(streamNameKey, defaultOutputStreamName)) .setPartitionKeyGenerator((element: String) => String.valueOf(element.hashCode)) .build }
  • 应用程序使用 StreamExecutionEnvironment 对象创建源和接收连接器以访问外部资源。

  • 该应用程序将使用动态应用程序属性创建源和接收连接器。读取应用程序的运行时系统属性来配置连接器。有关运行时属性的更多信息,请参阅运行时系统属性

编译并上传应用程序代码

在本节中,您将编译应用程序代码并将其上传到您在 创建相关资源 节中创建的 Amazon S3 存储桶。

编译应用程序代码

在本节中,您使用 SBT 构建工具为应用程序构建 Scala 代码。要安装 SBT,请参阅使用 cs 安装程序安装 sbt。您还需要安装 Java 开发工具包 (JDK)。参阅完成练习的先决条件

  1. 要使用您的应用程序代码,您将其编译和打包成 JAR 文件。您可以用 SBT 编译和打包代码:

    sbt assembly
  2. 如果应用程序成功编译,则创建以下文件:

    target/scala-3.2.0/getting-started-scala-1.0.jar
上传 Apache Flink 流式处理 Scala 代码

在本节中,创建 Amazon S3 存储桶并上传应用程序代码。

  1. 通过以下网址打开 Amazon S3 控制台:https://console.aws.amazon.com/s3/

  2. 选择创建存储桶

  3. 存储桶名称 字段中输入 ka-app-code-<username>。将后缀(如您的用户名)添加到存储桶名称,以使其具有全局唯一性。选择 Next(下一步)。

  4. 配置选项中,让设置保持原样,然后选择下一步

  5. 设置权限中,让设置保持原样,然后选择下一步

  6. 选择创建桶

  7. 选择存储桶,然后选择 ka-app-code-<username>Upload(上传)。

  8. 选择文件步骤中,选择添加文件。导航到您在上一步中创建的 getting-started-scala-1.0.jar 文件。

  9. 您无需更改该对象的任何设置,因此,请选择 Upload (上传)

您的应用程序代码现在存储在 Amazon S3 存储桶中,应用程序可以在其中访问代码。