在 Amazon EMR 中配置 Flink - Amazon EMR
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

在 Amazon EMR 中配置 Flink

Amazon EMR 版本 6.9.0 及更高版本支持 Hive 元存储和 Amazon Glue 目录使用 Apache Flink 连接器连接到 Hive。本部分概括介绍了使用 Flink 配置 Amazon Glue 目录Hive 元存储所需的步骤。

  1. 创建 EMR 集群,其中包含版本 6.9.0 或更高版本,并至少包含两个应用程序:HiveFlink

  2. 使用脚本运行程序将以下脚本作为步骤函数执行:

    hive-metastore-setup.sh

    sudo cp /usr/lib/hive/lib/antlr-runtime-3.5.2.jar /usr/lib/flink/lib sudo cp /usr/lib/hive/lib/hive-exec-3.1.3*.jar /lib/flink/lib sudo cp /usr/lib/hive/lib/libfb303-0.9.3.jar /lib/flink/lib sudo cp /usr/lib/flink/opt/flink-connector-hive_2.12-1.15.2.jar /lib/flink/lib sudo chmod 755 /usr/lib/flink/lib/antlr-runtime-3.5.2.jar sudo chmod 755 /usr/lib/flink/lib/hive-exec-3.1.3*.jar sudo chmod 755 /usr/lib/flink/lib/libfb303-0.9.3.jar sudo chmod 755 /usr/lib/flink/lib/flink-connector-hive_2.12-1.15.2.jar
  1. 创建 EMR 集群,其中包含版本 6.9.0 或更高版本,并至少包含两个应用程序:HiveFlink

  2. 在 Amazon Glue 数据目录设置中选择用于 Hive 表元数据,以在集群中启用数据目录。

  3. 使用脚本运行程序并将以下脚本作为阶跃函数执行:在 Amazon EMR 集群上运行命令和脚本

    glue-catalog-setup.sh

    sudo cp /usr/lib/hive/auxlib/aws-glue-datacatalog-hive3-client.jar /usr/lib/flink/lib sudo cp /usr/lib/hive/lib/antlr-runtime-3.5.2.jar /usr/lib/flink/lib sudo cp /usr/lib/hive/lib/hive-exec-3.1.3*.jar /lib/flink/lib sudo cp /usr/lib/hive/lib/libfb303-0.9.3.jar /lib/flink/lib sudo cp /usr/lib/flink/opt/flink-connector-hive_2.12-1.15.2.jar /lib/flink/lib sudo chmod 755 /usr/lib/flink/lib/aws-glue-datacatalog-hive3-client.jar sudo chmod 755 /usr/lib/flink/lib/antlr-runtime-3.5.2.jar sudo chmod 755 /usr/lib/flink/lib/hive-exec-3.1.3*.jar sudo chmod 755 /usr/lib/flink/lib/libfb303-0.9.3.jar sudo chmod 755 /usr/lib/flink/lib/flink-connector-hive_2.12-1.15.2.jar

您可以使用 Amazon EMR 配置 API 通过配置文件配置 Flink。目前,可在 API 中配置的文件包括:

  • flink-conf.yaml

  • log4j.properties

  • flink-log4j-session

  • log4j-cli.properties

Flink 的主配置文件的名称为 flink-conf.yaml

从 Amazon CLI 配置用于 Flink 的任务槽的数目
  1. 创建文件 configurations.json 并输入以下内容:

    [ { "Classification": "flink-conf", "Properties": { "taskmanager.numberOfTaskSlots":"2" } } ]
  2. 接下来,使用以下配置创建集群:

    aws emr create-cluster --release-label emr-5.36.1 \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole_V2 \ --ec2-attributes KeyName=YourKeyName,InstanceProfile=EMR_EC2_DefaultRole
注意

您也可以使用 Flink API 更改某些配置。有关更多信息,请参阅 Flink 文档中的概念

对于 Amazon EMR 5.21.0 及更高版本,您可以覆盖集群配置,并为运行的集群中的每个实例组指定额外的配置分类。要完成此操作,您可以使用 Amazon EMR 控制台、Amazon Command Line Interface(Amazon CLI)或 Amazon SDK。有关更多信息,请参阅为运行的集群中的实例组提供配置

作为应用程序所有者,您最了解应将哪些资源分配给 Flink 中的任务。对于本文档中的示例,请使用与您用于应用程序的任务实例相同的任务数量。通常,我们建议对初始并行级别执行此操作,但您也可以使用任务槽来增加并行粒度,它一般不应超过每实例虚拟内核数量。有关 Flink 架构的更多信息,请参阅 Flink 文档中的 Concepts

在包含多个主节点的 Amazon EMR 集群中进行主节点失效转移的过程中,Flink 的 JobManager 仍然可用。从 Amazon EMR 版本 5.28.0 开始,JobManager 的高可用性也会自动启用。无需手动配置。

对于 Amazon EMR 5.27.0 或更早版本,JobManager 是单点故障。当 JobManager 失败时,它会失去所有作业状态,并且不会恢复正在运行的作业。通过配置应用程序尝试计数、开展检查点检验并启用 ZooKeeper 作为 Flink 的状态存储,您可以启用 JobManager 高可用性,如以下示例所示:

[ { "Classification": "yarn-site", "Properties": { "yarn.resourcemanager.am.max-attempts": "10" } }, { "Classification": "flink-conf", "Properties": { "yarn.application-attempts": "10", "high-availability": "zookeeper", "high-availability.zookeeper.quorum": "%{hiera('hadoop::zk')}", "high-availability.storageDir": "hdfs:///user/flink/recovery", "high-availability.zookeeper.path.root": "/flink" } } ]

您必须同时为 YARN 和 Flink 配置最大的应用程序主尝试次数。有关更多信息,请参阅 YARN 集群高可用性的配置。您可能还需要配置 Flink 检查点,以使重新启动的 JobManager 从先前完成的检查点恢复正在运行的作业。有关更多信息,请参阅开展 Flink 检查点检验

对于使用 Flink 1.11.x 的 Amazon EMR 版本,您必须在 flink-conf.yaml 中为 JobManager (jobmanager.memory.process.size) 和 TaskManager (taskmanager.memory.process.size) 配置总内存进程大小。您可以通过使用配置 API 来配置集群或通过 SSH 手动取消这些字段来设置这些值。Flink 提供以下默认值。

  • jobmanager.memory.process.size:1600m

  • taskmanager.memory.process.size:1728m

要排除 JVM 元空间和开销,请使用 Flink 总内存大小 (taskmanager.memory.flink.size) 而非 taskmanager.memory.process.sizetaskmanager.memory.process.size 的默认值为 1280m。不建议同时设置 taskmanager.memory.process.sizetaskmanager.memory.process.size

所有使用 Flink 1.12.0 及更高版本的 Amazon EMR 版本,都将 Flink 的开源设置中列出的默认值作为 Amazon EMR 上的默认值,因此您无需自行配置。

Flink 应用程序容器创建并写入三种类型的日志文件:.out 文件、.log 文件和 .err 文件。仅限将 .err 文件压缩并从文件系统中删除,而将 .log.out 日志文件保留在文件系统中。为确保这些输出文件保持可管理以及集群保持稳定,您可以在 log4j.properties 设置文件的上限数量并限制其大小。

Amazon EMR 版本 5.30.0 及更高版本

从 Amazon EMR 5.30.0 开始,Flink 使用带有配置分类名称 flink-log4j. 的 log4j2 日志记录框架。以下示例配置演示 log4j2 格式。

[ { "Classification": "flink-log4j", "Properties": { "appender.main.name": "MainAppender", "appender.main.type": "RollingFile", "appender.main.append" : "false", "appender.main.fileName" : "${sys:log.file}", "appender.main.filePattern" : "${sys:log.file}.%i", "appender.main.layout.type" : "PatternLayout", "appender.main.layout.pattern" : "%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n", "appender.main.policies.type" : "Policies", "appender.main.policies.size.type" : "SizeBasedTriggeringPolicy", "appender.main.policies.size.size" : "100MB", "appender.main.strategy.type" : "DefaultRolloverStrategy", "appender.main.strategy.max" : "10" }, } ]

Amazon EMR 版本 5.29.0 及较早版本

对于 Amazon EMR 5.29.0 及更早版本,Flink 使用 log4j 日志记录框架。下面的示例配置演示了 log4j 格式。

[ { "Classification": "flink-log4j", "Properties": { "log4j.appender.file": "org.apache.log4j.RollingFileAppender", "log4j.appender.file.append":"true", # keep up to 4 files and each file size is limited to 100MB "log4j.appender.file.MaxFileSize":"100MB", "log4j.appender.file.MaxBackupIndex":4, "log4j.appender.file.layout":"org.apache.log4j.PatternLayout", "log4j.appender.file.layout.ConversionPattern":"%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n" }, } ]

Amazon EMR 6.12.0 及更高版本为 Flink 提供 Java 11 运行时系统支持。以下各节介绍如何配置集群以为 Flink 提供 Java 11 运行时系统支持。

使用以下步骤创建包含 Flink 和 Java 11 运行时系统的 EMR 集群。添加 Java 11 运行时系统支持所在的配置文件是 flink-conf.yaml

New console
在新控制台中创建包含 Flink 和 Java 11 运行时系统的集群
  1. 登录 Amazon Web Services Management Console 并打开 Amazon EMR 控制台,网址为 https://console.aws.amazon.com/emr

  2. 在导航窗格中的 EC2 上的 EMR 下,选择集群,然后选择创建集群

  3. 选择 Amazon EMR 6.12.0 或更高版本,然后选择安装 Flink 应用程序。选择要在集群上安装的任何其他应用程序。

  4. 继续设置您的集群。在可选的软件设置部分,使用默认的输入配置选项,并输入以下配置:

    [ { "Classification": "flink-conf", "Properties": { "containerized.taskmanager.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "containerized.master.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "env.java.home":"/usr/lib/jvm/jre-11" } } ]
  5. 继续设置并启动您的集群。

Amazon CLI
从 CLI 创建包含 Flink 和 Java 11 运行时系统的集群
  1. 创建一个将 Flink 配置为使用 Java 11 的配置文件 configurations.json

    [ { "Classification": "flink-conf", "Properties": { "containerized.taskmanager.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "containerized.master.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "env.java.home":"/usr/lib/jvm/jre-11" } } ]
  2. 从 Amazon CLI 中,使用 Amazon EMR 6.12.0 或更高版本创建新 EMR 集群,然后安装 Flink 应用程序,如以下示例所示:

    aws emr create-cluster --release-label emr-6.12.0 \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole_V2 \ --ec2-attributes KeyName=YourKeyName,InstanceProfile=EMR_EC2_DefaultRole

使用以下步骤更新包含 Flink 和 Java 11 运行时系统的 EMR 集群。添加 Java 11 运行时系统支持所在的配置文件是 flink-conf.yaml

New console
在新控制台中更新包含 Flink 和 Java 11 运行时系统的正在运行的集群
  1. 登录 Amazon Web Services Management Console 并打开 Amazon EMR 控制台,网址为 https://console.aws.amazon.com/emr

  2. 在导航窗格中的 EC2 上的 EMR 下,选择集群,然后选择要更新的集群。

    注意

    集群必须使用 Amazon EMR 6.12.0 或更高版本才能支持 Java 11。

  3. 选择配置选项卡。

  4. 实例组配置部分,选择要更新的正在运行的实例组,然后从列表操作菜单中选择重新配置

  5. 使用编辑属性选项重新配置实例组,如下所示。在每个配置之后选择添加新配置

    分类 属性 Value

    flink-conf

    containerized.taskmanager.env.JAVA_HOME

    /usr/lib/jvm/jre-11

    flink-conf

    containerized.master.env.JAVA_HOME

    /usr/lib/jvm/jre-11

    flink-conf

    env.java.home

    /usr/lib/jvm/jre-11

  6. 选择保存更改以添加配置。

Amazon CLI
从 CLI 中更新正在运行的集群,以使用 Flink 和 Java 11 运行时系统

使用 modify-instance-groups 命令为运行的集群中的一个实例组指定新配置。

  1. 首先,创建一个将 Flink 配置为使用 Java 11 的配置文件 configurations.json。在以下示例中,将 ig-1xxxxxxx9 替换为您要重新配置的实例组的 ID。将文件保存在您将要运行 modify-instance-groups 命令的同一目录中。

    [ { "InstanceGroupId":"ig-1xxxxxxx9", "Configurations":[ { "Classification":"flink-conf", "Properties":{ "containerized.taskmanager.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "containerized.master.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "env.java.home":"/usr/lib/jvm/jre-11" }, "Configurations":[] } ] } ]
  2. 从 Amazon CLI 运行以下命令。替换您要重新配置的实例组的 ID:

    aws emr modify-instance-groups --cluster-id j-2AL4XXXXXX5T9 \ --instance-groups file://configurations.json

要确定正在运行的集群的 Java 运行时系统,请使用 SSH 登录主节点,如 Connect to the primary node with SSH 中所述。然后运行以下命令:

ps -ef | grep flink

包含 -ef 选项的 ps 命令列出了系统上所有正在运行的进程。您可以使用 grep 过滤该输出,以查找提及 flink 字符串的内容。查看 Java 运行时环境(JRE)值的输出 jre-XX。在以下输出中,jre-11 表示在运行时系统为 Flink 选择了 Java 11。

flink    19130     1  0 09:17 ?        00:00:15 /usr/lib/jvm/jre-11/bin/java -Djava.io.tmpdir=/mnt/tmp -Dlog.file=/usr/lib/flink/log/flink-flink-historyserver-0-ip-172-31-32-127.log -Dlog4j.configuration=file:/usr/lib/flink/conf/log4j.properties -Dlog4j.configurationFile=file:/usr/lib/flink/conf/log4j.properties -Dlogback.configurationFile=file:/usr/lib/flink/conf/logback.xml -classpath /usr/lib/flink/lib/flink-cep-1.17.0.jar:/usr/lib/flink/lib/flink-connector-files-1.17.0.jar:/usr/lib/flink/lib/flink-csv-1.17.0.jar:/usr/lib/flink/lib/flink-json-1.17.0.jar:/usr/lib/flink/lib/flink-scala_2.12-1.17.0.jar:/usr/lib/flink/lib/flink-table-api-java-uber-1.17.0.jar:/usr/lib/flink/lib/flink-table-api-scala-bridge_2.12-1.17.0.

或者,使用 SSH 登录主节点,然后使用命令 flink-yarn-session -d 启动 Flink YARN 会话。输出显示了 Flink 的 Java 虚拟机(JVM),如以下 java-11-amazon-corretto 示例所示:

2023-05-29 10:38:14,129 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: containerized.master.env.JAVA_HOME, /usr/lib/jvm/java-11-amazon-corretto.x86_64