配置 Flink
建议您使用配置文件配置 Flink。例如,Flink 的主配置文件的名称为 flink-conf.yaml
。可以使用 Amazon EMR 配置 API 对其进行配置。
使用 Amazon CLI 配置用于 Flink 的任务槽的数目
-
创建文件
configurations.json
并输入以下内容:[ { "Classification": "flink-conf", "Properties": { "taskmanager.numberOfTaskSlots":"2" } } ]
-
接下来,使用以下配置创建集群:
aws emr create-cluster --release-label
emr-5.34.0
\ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole \ --ec2-attributes KeyName=YourKeyName
,InstanceProfile=EMR_EC2_DefaultRole
也可以使用 Flink API 更改某些配置。有关更多信息,请参阅 Flink 文档中的概念
对于 Amazon EMR 5.21.0 及更高版本,您可以覆盖集群配置,并为运行的集群中的每个实例组指定额外的配置分类。要完成此操作,您可以使用 Amazon EMR 控制台、Amazon Command Line Interface(Amazon CLI)或 Amazon SDK。有关更多信息,请参阅为运行的集群中的实例组提供配置。
并行选项
作为应用程序所有者,您最了解应将哪些资源分配给 Flink 中的任务。在本文档的示例中,使用与用于应用程序的从属实例的数目相同的任务。通常,我们建议对初始并行级别执行此操作,但您也可以使用任务槽来增加并行粒度,它一般不应超过每实例虚拟核心数
可配置的文件
目前,可在 Amazon EMR 配置 API 中配置的文件包括:
-
flink-conf.yaml
-
log4j.properties
-
flink-log4j-session
-
log4j-cli.properties
在包括多个主节点的 EMR 集群中配置 Flink
在包括多个主节点 EMR 集群中进行主节点故障转移的过程中,Flink 的 JobManager 仍然可用。从 Amazon EMR 版本 5.28.0 开始,JobManager 的高可用性也会自动启用。无需手动配置。
对于 Amazon EMR 5.27.0 或更早版本,JobManager 是单点故障。当 JobManager 失败时,它会失去所有作业状态,并且不会恢复正在运行的作业。通过配置应用程序尝试计数、开展检查点检验并启用 ZooKeeper 作为 Flink 的状态存储,您可以启用 JobManager 高可用性,如以下示例所示:
[ { "Classification": "yarn-site", "Properties": { "yarn.resourcemanager.am.max-attempts": "10" } }, { "Classification": "flink-conf", "Properties": { "yarn.application-attempts": "10", "high-availability": "zookeeper", "high-availability.zookeeper.quorum": "%{hiera('hadoop::zk')}", "high-availability.storageDir": "hdfs:///user/flink/recovery", "high-availability.zookeeper.path.root": "/flink" } } ]
您必须同时为 YARN 和 Flink 配置最大的应用程序主尝试次数。有关更多信息,请参阅 YARN 集群高可用性的配置
配置内存进程大小
对于使用 Flink 1.11.x 的 Amazon EMR 版本,您必须在 flink-conf.yaml
中为 JobManager (jobmanager.memory.process.size
) 和 TaskManager (taskmanager.memory.process.size
) 配置总内存进程大小。您可以通过使用配置 API 来配置集群或通过 SSH 手动取消这些字段来设置这些值。Flink 提供以下默认值。
-
jobmanager.memory.process.size
:1600m -
taskmanager.memory.process.size
:1728m
要排除 JVM 元空间和开销,请使用 Flink 总内存大小 (taskmanager.memory.flink.size
) 而非 taskmanager.memory.process.size
。taskmanager.memory.process.size
的默认值为 1280m。不建议同时设置 taskmanager.memory.process.size
和 taskmanager.memory.process.size
。
所有使用 Flink 1.12.0 及更高版本的 Amazon EMR 版本,都将 Flink 的开源设置中列出的默认值作为 Amazon EMR 上的默认值,因此您无需自行配置。
配置日志输出文件大小
Flink 应用程序容器创建并写入三种类型的日志文件:.out
文件、.log
文件和 .err
文件。仅限将 .err
文件压缩并从文件系统中删除,而将 .log
和 .out
日志文件保留在文件系统中。为确保这些输出文件保持可管理以及集群保持稳定,您可以在 log4j.properties
设置文件的上限数量并限制其大小。
Amazon EMR 版本 5.30.0 及更高版本
从 Amazon EMR 5.30.0 开始,Flink 使用带有配置分类名称 flink-log4j.
的 log4j2 日志记录框架。以下示例配置演示 log4j2 格式。
[ { "Classification": "flink-log4j", "Properties": { "appender.rolling.name": "RollingFileAppender", "appender.rolling.type":"RollingFile", "appender.rolling.append" : "false", "appender.rolling.fileName" : "${sys:log.file}", "appender.rolling.filePattern" : "${sys:log.file}.%i", "appender.rolling.layout.type" : "PatternLayout", "appender.rolling.layout.pattern" : "%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n", "appender.rolling.policies.type" : "Policies", "appender.rolling.policies.size.type" : "SizeBasedTriggeringPolicy", "appender.rolling.policies.size.size" : "100MB", "appender.rolling.strategy.type" : "DefaultRolloverStrategy", "appender.rolling.strategy.max" : "10" }, } ]
Amazon EMR 版本 5.29.0 及较早版本
对于 Amazon EMR 5.29.0 及更早版本,Flink 使用 log4j 日志记录框架。下面的示例配置演示了 log4j 格式。
[ { "Classification": "flink-log4j", "Properties": { "log4j.appender.file": "org.apache.log4j.RollingFileAppender", "log4j.appender.file.append":"true", # keep up to 4 files and each file size is limited to 100MB "log4j.appender.file.MaxFileSize":"100MB", "log4j.appender.file.MaxBackupIndex":4, "log4j.appender.file.layout":"org.apache.log4j.PatternLayout", "log4j.appender.file.layout.ConversionPattern":"%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n" }, } ]