Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
开始吧 (Scala)
注意
从 1.15 版本开始,Flink 是免费的 Scala。应用程序现在可以使用任何 Scala 版本API中的 Java。Flink 仍然在内部的一些关键组件中使用 Scala,但没有将 Scala 暴露到用户代码类加载器中。因此,您必须将 Scala 依赖项添加到您的 JAR-存档中。
有关 Flink 1.15 中 Scala 变更的更多信息,请参阅 1.15 Scala 免费
在本练习中,您将使用 Kinesis 流作为源和接收器为 Scala 创建适用于 Apache Flink 的托管服务。
创建依赖资源
在本练习中,创建Managed Service for Apache Flink的应用程序之前,您需要创建以下从属资源:
两个 Kinesis 流用于输入和输出。
存储应用程序代码 (
ka-app-code-
) 的 Amazon S3 存储桶<username>
您可以使用控制台创建 Kinesis 流和 Amazon S3 存储桶。有关创建这些资源的说明,请参阅以下主题:
Amazon Kinesis Data Streams 开发人员指南中的创建和更新数据流。将数据流命名为
ExampleInputStream
和ExampleOutputStream
。创建数据流 (Amazon CLI)
要创建第一个直播 (
ExampleInputStream
),请使用以下 Amazon Kinesis create-stre Amazon CLI am 命令。aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
要创建应用程序用来写入输出的第二个流,请运行同一命令(将流名称更改为
ExampleOutputStream
)。aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
《Amazon Simple Storage Service 用户指南》中的如何创建 S3 存储桶?。附加您的登录名,以便为 Amazon S3 存储桶指定全局唯一的名称,例如
ka-app-code-
。<username>
其他资源
在您创建应用程序时,适用于 Apache Flink 的托管服务会创建以下 Amazon CloudWatch 资源(如果这些资源尚不存在):
名为
/AWS/KinesisAnalytics-java/MyApplication
的日志组名为
kinesis-analytics-log-stream
的日志流
将样本记录写入输入流
在本节中,您使用 Python 脚本将示例记录写入流,以供应用程序处理。
注意
此部分需要 Amazon SDK for Python (Boto)
注意
本节中的 Python 脚本使用 Amazon CLI。您必须将您的配置 Amazon CLI 为使用您的账户凭证和默认区域。要配置您的 Amazon CLI,请输入以下内容:
aws configure
-
使用以下内容创建名为
stock.py
的文件:import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'event_time': datetime.datetime.now().isoformat(), 'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'price': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))
-
运行
stock.py
脚本:$ python stock.py
在完成本教程的其余部分时,请将脚本保持运行状态。
下载并检查应用程序代码
此示例的 Python 应用程序代码可从中获得 GitHub。要下载应用程序代码,请执行以下操作:
如果尚未安装 Git 客户端,请安装它。有关更多信息,请参阅安装 Git
。 使用以下命令克隆远程存储库:
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
导航到
amazon-kinesis-data-analytics-java-examples/scala/GettingStarted
目录。
请注意有关应用程序代码的以下信息:
build.sbt
文件包含有关应用程序的配置和从属项的信息,包括Managed Service for Apache Flink的库。BasicStreamingJob.scala
文件包含定义应用程序功能的主要方法。应用程序使用 Kinesis 源从源流中进行读取。以下代码段创建 Kinesis 源:
private def createSource: FlinkKinesisConsumer[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val inputProperties = applicationProperties.get("ConsumerConfigProperties") new FlinkKinesisConsumer[String](inputProperties.getProperty(streamNameKey, defaultInputStreamName), new SimpleStringSchema, inputProperties) }
该应用程序还使用 Kinesis 接收器写入结果流。以下代码段创建 Kinesis 接收器:
private def createSink: KinesisStreamsSink[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val outputProperties = applicationProperties.get("ProducerConfigProperties") KinesisStreamsSink.builder[String] .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema) .setStreamName(outputProperties.getProperty(streamNameKey, defaultOutputStreamName)) .setPartitionKeyGenerator((element: String) => String.valueOf(element.hashCode)) .build }
应用程序创建源连接器和接收器连接器,以使用 StreamExecutionEnvironment 对象访问外部资源。
该应用程序将使用动态应用程序属性创建源和接收连接器。读取应用程序的运行时系统属性来配置连接器。有关运行时系统属性的更多信息,请参阅运行时系统属性。
编译并上传应用程序代码
在本节中,您将编译应用程序代码并将其上传到您在 创建依赖资源 节中创建的 Amazon S3 存储桶。
编译应用程序代码
在本节中,您将使用SBT
要使用您的应用程序代码,您需要将其编译并打包成一个JAR文件。你可以用以下方法编译和打包你的代码SBT:
sbt assembly
-
如果应用程序成功编译,则创建以下文件:
target/scala-3.2.0/getting-started-scala-1.0.jar
上传 Apache Flink 流式处理 Scala 代码
在本节中,创建 Amazon S3 存储桶并上传应用程序代码。
打开 Amazon S3 控制台,网址为https://console.aws.amazon.com/s3/
。 选择创建存储桶。
在 存储桶名称 字段中输入
ka-app-code-<username>
。将后缀(如您的用户名)添加到存储桶名称,以使其具有全局唯一性。选择下一步。在配置选项中,让设置保持原样,然后选择下一步。
在设置权限中,让设置保持原样,然后选择下一步。
选择创建存储桶。
选择
ka-app-code-<username>
存储桶,然后选择上传。-
在选择文件步骤中,选择添加文件。导航到您在上一步中创建的
getting-started-scala-1.0.jar
文件。 您无需更改该对象的任何设置,因此,请选择上传。
您的应用程序代码现在存储在 Amazon S3 存储桶中,应用程序可以在其中访问代码。