本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
配置 Flink
在 Amazon EMR 中将 Flink 配置为 Hive 元存储
Amazon EMR 发行版 6.9.0 及更高版本支持 Hive 元存储和 Amazon Glue 目录使用 Apache Flink 连接器连接到 Hive。本部分概括介绍了使用 Flink 配置 Amazon Glue 目录和 Hive 元存储所需的步骤。
使用 Hive 元存储
-
使用发行版 6.9.0 或更高版本创建 Amazon EMR 集群,其中至少包含两个应用程序:HIVE 和 FLINK。
-
使用脚本运行程序并将以下脚本作为阶跃函数执行:在 Amazon EMR 集群上运行命令和脚本:
hive-metastore-setup.sh
sudo cp /usr/lib/hive/lib/antlr-runtime-3.5.2.jar /usr/lib/flink/lib sudo cp /usr/lib/hive/lib/hive-exec-3.1.3*.jar /lib/flink/lib sudo cp /usr/lib/hive/lib/libfb303-0.9.3.jar /lib/flink/lib sudo cp /usr/lib/flink/opt/flink-connector-hive_2.12-1.15.2.jar /lib/flink/lib sudo chmod 755 /usr/lib/flink/lib/antlr-runtime-3.5.2.jar sudo chmod 755 /usr/lib/flink/lib/hive-exec-3.1.3*.jar sudo chmod 755 /usr/lib/flink/lib/libfb303-0.9.3.jar sudo chmod 755 /usr/lib/flink/lib/flink-connector-hive_2.12-1.15.2.jar
使用 Amazon Glue 目录
-
创建 EMR-6.9.0 集群,其中至少包含两个应用程序:HIVE 和 FLINK。
-
创建 EMR-6.9 集群时,在 Amazon Glue Data Catalog 设置中选择 Use for Hive table metadata(用于 Hive 表元数据),以在集群中启用数据目录。
-
使用脚本运行程序并将以下脚本作为阶跃函数执行:在 Amazon EMR 集群上运行命令和脚本:
glue-catalog-setup.sh
sudo cp /usr/lib/hive/auxlib/aws-glue-datacatalog-hive3-client.jar /usr/lib/flink/lib sudo cp /usr/lib/hive/lib/antlr-runtime-3.5.2.jar /usr/lib/flink/lib sudo cp /usr/lib/hive/lib/hive-exec-3.1.3*.jar /lib/flink/lib sudo cp /usr/lib/hive/lib/libfb303-0.9.3.jar /lib/flink/lib sudo cp /usr/lib/flink/opt/flink-connector-hive_2.12-1.15.2.jar /lib/flink/lib sudo chmod 755 /usr/lib/flink/lib/aws-glue-datacatalog-hive3-client.jar sudo chmod 755 /usr/lib/flink/lib/antlr-runtime-3.5.2.jar sudo chmod 755 /usr/lib/flink/lib/hive-exec-3.1.3*.jar sudo chmod 755 /usr/lib/flink/lib/libfb303-0.9.3.jar sudo chmod 755 /usr/lib/flink/lib/flink-connector-hive_2.12-1.15.2.jar
限制
要详细了解将 Apache Flink 连接器用于 Hive 元存储的限制和注意事项,请参阅 Apache Flink 文档的 Apache Hive
使用配置文件配置 Flink
建议您使用配置文件配置 Flink。例如,Flink 的主配置文件的名称为 flink-conf.yaml
。可以使用 Amazon EMR 配置 API 对其进行配置。
使用 Amazon CLI 配置用于 Flink 的任务槽的数目
-
创建文件
configurations.json
并输入以下内容:[ { "Classification": "flink-conf", "Properties": { "taskmanager.numberOfTaskSlots":"2" } } ]
-
接下来,使用以下配置创建集群:
aws emr create-cluster --release-label
emr-5.36.0
\ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole_V2 \ --ec2-attributes KeyName=YourKeyName
,InstanceProfile=EMR_EC2_DefaultRole
注意
也可以使用 Flink API 更改某些配置。有关更多信息,请参阅 Flink 文档中的概念
对于 Amazon EMR 5.21.0 及更高版本,您可以覆盖集群配置,并为运行的集群中的每个实例组指定额外的配置分类。要完成此操作,您可以使用 Amazon EMR 控制台、Amazon Command Line Interface(Amazon CLI)或 Amazon SDK。有关更多信息,请参阅为运行的集群中的实例组提供配置。
并行选项
作为应用程序所有者,您最了解应将哪些资源分配给 Flink 中的任务。在本文档的示例中,使用与用于应用程序的从属实例的数目相同的任务。通常,我们建议对初始并行级别执行此操作,但您也可以使用任务槽来增加并行粒度,它一般不应超过每实例虚拟内核
可配置的文件
目前,可在 Amazon EMR 配置 API 中配置的文件包括:
-
flink-conf.yaml
-
log4j.properties
-
flink-log4j-session
-
log4j-cli.properties
在包括多个主节点的 EMR 集群中配置 Flink
在包括 JobManager 多个主节点 Amazon EMR 集群中进行主节点故障转移的过程中,Flink 的 Flink 仍然可用。从 Amazon EMR 版本 5.28.2 开始, JobManager 高可用性也将自动启用。无需手动配置。
对于 Amazon EMR 5.27.2 或更早版本, JobManager 是单点故障。 JobManager 失败时,它将丢失所有作业状态并且不会恢复正在运行的作业。您可以通过配置应用程序尝试次数、检查点以及 ZooKeeper 为 Flink 启用 as 状态存储来启用 JobManager 高可用性,如下例所示:
[ { "Classification": "yarn-site", "Properties": { "yarn.resourcemanager.am.max-attempts": "10" } }, { "Classification": "flink-conf", "Properties": { "yarn.application-attempts": "10", "high-availability": "zookeeper", "high-availability.zookeeper.quorum": "%{hiera('hadoop::zk')}", "high-availability.storageDir": "hdfs:///user/flink/recovery", "high-availability.zookeeper.path.root": "/flink" } } ]
您必须同时为 YARN 和 Flink 配置最大的应用程序主尝试次数。有关更多信息,请参阅 YARN 集群高可用性的配置
配置内存进程大小
对于使用 Flink 1.11.x 的 Amazon EMR 版本,您必须为 JobManager(jobmanager.memory.process.size
)和 TaskManager(taskmanager.memory.process.size
)配置总内存处理大小flink-conf.yaml
。您可以通过使用配置 API 来配置集群或通过 SSH 手动取消这些字段来设置这些值。Flink 提供以下默认值。
-
jobmanager.memory.process.size
:1600m -
taskmanager.memory.process.size
:1728m
要排除 JVM 元空间和开销,请使用 Flink 总内存大小 (taskmanager.memory.flink.size
) 而非 taskmanager.memory.process.size
。taskmanager.memory.process.size
的默认值为 1280m。不建议同时设置 taskmanager.memory.process.size
和 taskmanager.memory.process.size
。
所有使用 Flink 1.12.0 及更高版本的 Amazon EMR 版本,都将 Flink 的开源设置中列出的默认值作为 Amazon EMR 上的默认值,因此您无需自行配置。
配置日志输出文件大小
Flink 应用程序容器创建并写入三种类型的日志文件:.out
文件、.log
文件和 .err
文件。仅限将 .err
文件压缩并从文件系统中删除,而将 .log
和 .out
日志文件保留在文件系统中。为确保这些输出文件保持可管理以及集群保持稳定,您可以在 log4j.properties
设置文件的上限数量并限制其大小。
Amazon EMR 版本 5.30.0 及更高版本
从 Amazon EMR 5.30.0 开始,Flink 使用带有配置分类名称 flink-log4j.
的 log4j2 日志记录框架。以下示例配置演示 log4j2 格式。
[ { "Classification": "flink-log4j", "Properties": { "rootLogger.appenderRef.rolling.ref": "RollingFileAppender", "appender.rolling.name": "RollingFileAppender", "appender.rolling.type":"RollingFile", "appender.rolling.append" : "false", "appender.rolling.fileName" : "${sys:log.file}", "appender.rolling.filePattern" : "${sys:log.file}.%i", "appender.rolling.layout.type" : "PatternLayout", "appender.rolling.layout.pattern" : "%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n", "appender.rolling.policies.type" : "Policies", "appender.rolling.policies.size.type" : "SizeBasedTriggeringPolicy", "appender.rolling.policies.size.size" : "100MB", "appender.rolling.strategy.type" : "DefaultRolloverStrategy", "appender.rolling.strategy.max" : "10" }, } ]
Amazon EMR 版本 5.29.0 及较早版本
对于 Amazon EMR 5.29.0 及更早版本,Flink 使用 log4j 日志记录框架。下面的示例配置演示了 log4j 格式。
[ { "Classification": "flink-log4j", "Properties": { "log4j.appender.file": "org.apache.log4j.RollingFileAppender", "log4j.appender.file.append":"true", # keep up to 4 files and each file size is limited to 100MB "log4j.appender.file.MaxFileSize":"100MB", "log4j.appender.file.MaxBackupIndex":4, "log4j.appender.file.layout":"org.apache.log4j.PatternLayout", "log4j.appender.file.layout.ConversionPattern":"%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n" }, } ]