入门 (Scala) - Amazon Kinesis Data Analytics
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

入门 (Scala)

注意

从 1.15 版本开始,Flink 是免费的 Scala。应用程序现在可以使用任何 Scala 版本的 Java API。Flink 仍然在一些关键组件中使用 Scala,但不会将 Scala 暴露给用户代码类加载器。因此,用户需要将 Scala 依赖项添加到他们的 jar 存档中。

有关 Flink 1.15 中 Scala 变更的更多信息,请参阅 One Feifteen 中的 Scala Free

在本练习中,您将创建 Scala 的 Kinesis Data Analytics 应用程序应用程序,将 Kinesis 流作为源和接收器。

创建相关资源

在为本练习创建 Kinesis Data Analytics 应用程序之前,您需要创建以下依赖资源:

  • 两个 Kinesis 数据流用于输入和输出。

  • 用于存储应用程序代码的 Amazon S3 存储桶 (ka-app-code-<username>)

您可以使用控制台创建 Kinesis Streams 和 Amazon S3 存储桶。有关创建这些资源的说明,请参阅以下主题:

  • Amazon Kinesis Data St reams Data Stream s? 将数据流命名为 ExampleInputStreamExampleOutputStream

    创建数据流 (Amazon CLI)

    • 要创建第一个直播 (ExampleInputStream),请使用以下 Amazon Kinesis create-streamAmazon CLI 命令。

      aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
    • 要创建应用程序用来写入输出的第二个流,请运行同一命令(将流名称更改为 ExampleOutputStream)。

      aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
  • 如何创建 S3 存储桶?Amazon Simple Storage 用户指南中。通过附加您的登录名,为 Amazon S3 存储桶指定一个全球唯一的名称,例如ka-app-code-<username>

其他资源

在创建应用程序时,Kinesis Data Analytics 将创建以下Amazon CloudWatch 资源(如果尚不存在):

  • 一个名为的日志组/aws/kinesis-analytics-java/MyApplication

  • 名为的日志流kinesis-analytics-log-stream

将示例记录写入输入流

在本节中,您使用 Python 脚本将示例记录写入流,以供应用程序处理。

注意

此部分需要 Amazon SDK for Python (Boto)

注意

本节中的 Python 脚本使用Amazon CLI. 您必须将您的配置Amazon CLI为使用您的账户凭证和默认区域。要配置您的Amazon CLI,输入以下内容:

aws configure
  1. 使用以下内容创建名为 stock.py 的文件:

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'event_time': datetime.datetime.now().isoformat(), 'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'price': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))
  2. 运行 stock.py 脚本:

    $ python stock.py

    在完成本教程的其余部分时,请将脚本保持运行状态。

下载并检查应用程序代码

此示例的 Python 应用程序代码可从以下网址获得 GitHub。要下载应用程序代码,请执行以下操作:

  1. 如果尚未安装 Git 客户端,请安装它。有关更多信息,请参阅安装 Git

  2. 使用以下命令克隆远程存储库:

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples
  3. 导航到 amazon-kinesis-data-analytics-java-examples/scala/GettingStarted目录。

请注意有关应用程序代码的以下信息:

  • build.sbt文件包含有关应用程序配置和依赖关系的信息,包括 Kinesis Data Analytics 库。

  • BasicStreamingJob.scala文件包含定义应用程序功能的主要方法。

  • 该应用程序使用 Kinesis 源从源码流中读取数据。以下片段创建了 Kinesis 源代码:

    private def createSource: FlinkKinesisConsumer[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val inputProperties = applicationProperties.get("ConsumerConfigProperties") new FlinkKinesisConsumer[String](inputProperties.getProperty(streamNameKey, defaultInputStreamName), new SimpleStringSchema, inputProperties) }

    该应用程序还使用 Kinesis 接收器来写入结果流。以下片段创建了 Kinesis 水槽:

    private def createSink: KinesisStreamsSink[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val outputProperties = applicationProperties.get("ProducerConfigProperties") KinesisStreamsSink.builder[String] .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema) .setStreamName(outputProperties.getProperty(streamNameKey, defaultOutputStreamName)) .setPartitionKeyGenerator((element: String) => String.valueOf(element.hashCode)) .build }
  • 应用程序创建源和接收连接器以使用 StreamExecutionEnvironment 对象访问外部资源。

  • 应用程序使用动态应用程序属性创建源连接器和接收器连接器。读取运行时应用程序的属性以配置连接器。有关运行时属性的更多信息,请参见运行时属性

编译并上传应用程序代码

在本节中,您将编译应用程序代码并将其上传到您在该创建相关资源部分创建的 Amazon S3 存储桶。

编译应用程序代码

在本节中,您将使用 SBT 构建工具为应用程序构建 Scala 代码。要安装 SBT,请参阅使用 cs 设置安装 sbt。您还需要安装 Java 开发工具包 (JDK)。请参阅完成练习的先决条件

  1. 要使用您的应用程序代码,您将其编译和打包成 JAR 文件。你可以用 SBT 编译和打包你的代码:

    sbt assembly
  2. 如果应用程序成功编译,则创建以下文件:

    target/scala-3.2.0/getting-started-scala-1.0.jar
上传 Apache Flink 直播 Scala 代码

在此部分,您将创建一个 Simple Storage S3 存储桶并上传应用程序代码。

  1. 通过以下网址打开 Simple Storage Service(Amazon S3)控制台:https://console.aws.amazon.com/s3/

  2. 选择创建存储桶

  3. ka-app-code-<username>存储段名称字段中输入。将后缀(如您的用户名)添加到存储桶名称,以使其具有全局唯一性。选择下一步

  4. 配置选项中,保持设置不变,然后选择下一步

  5. 设置权限中,保持设置不变,然后选择下一步

  6. 选择创建桶

  7. 选择该ka-app-code-<username>存储桶,然后选择 Upload(上载)。

  8. 选择文件步骤中,选择添加文件。导航到您在上一步中创建的 getting-started-scala-1.0.jar 文件。

  9. 您无需更改该对象的任何设置,因此,请选择 Upload (上传)

您的应用程序代码现在存储在 Amazon S3 存储桶中,应用程序可以在其中访问应用程序。