Amazon EMR
Amazon EMR 版本指南
AWS 服务或AWS文档中描述的功能,可能因地区/位置而异。请点击 Amazon AWS 入门,可查看中国地区的具体差异

Apache Flink

Apache Flink 是一个流式处理数据流引擎,利用此引擎,可在高吞吐量数据源上轻松运行实时流处理。它支持无序事件的事件时间语义、确切一次语义、反向压力控制以及已为写入流和批处理应用程序优化的 API。

此外,Flink 具有适用于第三方数据源的连接器,例如以下内容:

目前,Amazon EMR 支持 Flink 作为 YARN 应用程序,以便您能管理资源以及集群中的其他应用程序。Flink-on-YARN 提供了一种简单方法来提交临时 Flink 作业,您也可以创建一个长时间运行的集群,该集群接受多个作业并根据整体 YARN 预留分配资源。

注意

在 Amazon EMR 版本 5.2.1 中增加了对 FlinkKinesisConsumer 类的支持。

应用程序 Amazon EMR 发行版标签 随此应用程序安装的组件

Flink 1.2.0

emr-5.4.0

emrfs、hadoop-client、hadoop-mapred、hadoop-hdfs-datanode、hadoop-hdfs-library、hadoop-hdfs-namenode、hadoop-httpfs-server、hadoop-kms-server、hadoop-yarn-nodemanager、hadoop-yarn-resourcemanager、flink-client

可使用 AWS Management Console、AWS CLI 或 AWS 开发工具包启动集群。

使用控制台启动安装了 Flink 的集群

  1. Open the Amazon EMR console at https://console.amazonaws.cn/elasticmapreduce/.

  2. 选择 Create clusterGo to advanced options

  3. 对于 Software Configuration,选择 EMR Release emr-5.1.0 或更高版本。

  4. 选择 Flink 作为应用程序 (与要安装的任何其他应用程序一起)。

  5. 根据需要选择其他选项,然后选择 Create cluster

使用 AWS CLI 启动带有 Flink 的集群

  • 使用下面的命令创建集群:

    aws emr create-cluster --name "Cluster with Flink" --release-label emr-5.4.0 \ --applications Name=Flink --ec2-attributes KeyName=myKey \ --instance-type m3.xlarge --instance-count 3 --use-default-roles

    注意

    包含了 Linux 行继续符 (\) 以提高可读性。可以在 Linux 命令中删除或使用它们。对于 Windows,请删除它们或将其替换为脱字号 (^)。

建议您使用配置文件配置 Flink。例如,Flink 的主配置文件的名称为 flink-conf.yaml。这可使用 Amazon EMR 配置 API 进行配置,这样在启动集群时,您可以提供要修改的文件的配置。

使用 AWS CLI 配置用于 Flink 的任务槽的数目

  1. 创建文件 configuration.json 并输入以下内容:

    [ { "Classification": "flink-conf", "Properties": { "taskmanager.numberOfTaskSlots":"2" } } ]
  2. 接下来,使用以下配置创建集群:

    aws emr create-cluster --release-label emr-5.4.0 \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --instance-type m3.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole \ --ec2-attributes KeyName=YourKeyName,InstanceProfile=EMR_EC2_DefaultRole

注意

也可以使用 Flink API 更改某些配置。有关更多信息,请参阅 Flink 文档中的基本 API 概念

作为应用程序所有者,您最了解应将哪些资源分配给 Flink 中的任务。在本文档的示例中,使用与用于应用程序的从属实例的数目相同的任务。通常,我们建议对初始并行级别执行此操作,但您也可以使用任务槽来增加并行粒度,它一般不应超过每实例虚拟核心数。有关 Flink 架构的更多信息,请参阅 Flink 文档中的概念

目前,可在 Amazon EMR 配置 API 中配置的文件包括:

  • flink-conf.yaml

  • log4j.properties

  • log4j-yarn-session.properties

  • log4j-cli.properties

可通过多种方式与 Amazon EMR 上的 Flink 交互:通过 Amazon EMR 步骤、在 ResourceManager 跟踪 UI 中找到的 Flink 接口以及在命令行上。此外,还可通过所有这些方式提交要运行的 Flink 应用程序的 JAR 文件。

此外,可以将 Flink 应用程序作为长时间运行的 YARN 作业或临时集群运行。在长时间运行的作业中,您可以将多个 Flink 应用程序提交给 Amazon EMR 上运行的一个 Flink 集群。如果您将 Flink 作为临时作业运行,则 Amazon EMR 集群仅在其运行 Flink 应用程序的时间内存在,因此您只需为使用的资源和时间付费。在任一情况下,您可以使用 Amazon EMR AddSteps API 操作提交 Flink 作业,也可以将其作为 RunJobFlow 操作或 AWS CLI create-cluster 命令的步骤参数提交。

建议您启动一个长时间运行的 Flink 作业,多个客户端可通过 YARN API 操作提交到该作业。您启动一个 Flink YARN 会话,并将作业提交到 Flink JobManager,后者位于托管 Flink 会话 Application Master 守护程序的 YARN 节点上。要启动 YARN 会话,可从控制台、AWS CLI 或 Java 开发工具包使用以下步骤。

使用控制台提交长时间运行的作业

使用 flink-yarn-session 命令提交现有集群中长时间运行的 Flink 会话。

注意

向 Amazon EMR 版本 5.5.0 中添加了 flink-yarn-session 命令作为 yarn-session.sh 脚本的包装程序以简化执行。如果您使用早期版本的 Amazon EMR,请在随后的步骤中将 bash -c "/usr/lib/flink/bin/yarn-session.sh -n 2 -d" 替换为 Argument

  1. Open the Amazon EMR console at https://console.amazonaws.cn/elasticmapreduce/.

  2. 在集群列表中,选择先前已启动的集群。

  3. 在集群详细信息页面上,选择 Steps,再选择 Add Step

  4. 使用随后提供的指南输入参数,然后选择 Add

    参数 说明

    步骤类型

    自定义 JAR

    名称

    可帮助您标识步骤的名称。例如,Flink_Long_Running_Session

    Jar location

    command-runner.jar

    参数

    带适合您的应用的参数的 flink-yarn-session 命令。例如,flink-yarn-session -n 2 -d 将使用两个任务管理器 (-n 2) 以分离状态 (-d) 在 YARN 集群中启动长时间运行的 Flink 会话。有关参数详细信息,请参阅最新 Flink 文档中的 YARN 设置

    注意

    使用 5.5.0 之前的 Amazon EMR 版本时,您必须直接指定 Flink 脚本 yarn-session.sh,而不是指定 flink-yarn-session,从而指定脚本的完整路径。例如,bash -c "/usr/lib/flink/bin/yarn-session.sh -n 2 -d"

使用 AWS CLI 提交长时间运行的 Flink 作业

  • 要在 EMR 中启动长时间运行的 Flink 集群,可使用 create-cluster 命令:

    aws emr create-cluster --release-label emr-5.4.0 \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --instance-type m3.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole \ --ec2-attributes KeyName=MyKeyName,InstanceProfile=EMR_EC2_DefaultRole \ --steps Type=CUSTOM_JAR,Jar=command-runner.jar,Name=Flink_Long_Running_Session,\ Args="flink-yarn-session -n 2 -d"

您可以使用命令行选项提交工作,也可以使用通过 YARN ResourceManager 代理的 Flink 的本机接口。要使用 Flink CLI 通过 EMR 步骤提交,请指定长时间运行的 Flink 集群的 YARN 应用程序 ID。为此,请在 EMR 命令行上或通过 YarnClient API 操作运行 yarn application –list

$ yarn application -list 16/09/07 19:32:13 INFO client.RMProxy: Connecting to ResourceManager at ip-10-181-83-19.ec2.internal/10.181.83.19:8032 Total number of applications (application-types: [] and states: [SUBMITTED, ACCEPTED, RUNNING]):1 Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URL application_1473169569237_0002 Flink session with 14 TaskManagers (detached) Apache Flink hadoop default RUNNING UNDEFINED 100% http://ip-10-136-154-194.ec2.internal:33089

例 SDK for Java

List<StepConfig> stepConfigs = new ArrayList<StepConfig>(); HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs("flink", "run", "-m", "yarn-cluster", “-yid”, “application_1473169569237_0002”, "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://myBucket/pg11.txt", "--output", "s3://myBucket/alice2/"); StepConfig flinkRunWordCount = new StepConfig() .withName("Flink add a wordcount step") .withActionOnFailure("CONTINUE") .withHadoopJarStep(flinkWordCountConf); stepConfigs.add(flinkRunWordCount); AddJobFlowStepsResult res = emr.addJobFlowSteps(new AddJobFlowStepsRequest() .withJobFlowId("myClusterId") .withSteps(stepConfigs));

例 AWS CLI

使用 add-steps 子命令将新作业提交到现有 Flink 集群:

aws emr add-steps --cluster-id myClusterId \ --steps Type=CUSTOM_JAR,Name=Flink_Submit_To_Long_Running,Jar=command-runner.jar,\ Args="flink","run","-m","yarn-cluster","-yid","application_1473169569237_0002","-yn","2",\ "/usr/lib/flink/examples/streaming/WordCount.jar",\ "--input","s3://myBucket/pg11.txt","--output","s3://myBucket/alice2/" \ --region myRegion

以下示例通过向现有集群添加步骤来启动 Flink WordCount 示例。

例 控制台

在现有集群的控制台详细信息页面中,通过为 Steps 字段选择 Add Step 来添加步骤。

例 SDK for Java

以下示例说明了两种运行 Flink 作业的方法。第一个示例将 Flink 作业提交到正在运行的集群。第二个示例创建一个运行 Flink 作业的集群,然后在完成时终止。

List<StepConfig> stepConfigs = new ArrayList<StepConfig>(); HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs("flink", "run", "-m", "yarn-cluster", "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://myBucket/pg11.txt", "--output", "s3://myBucket/alice/"); StepConfig flinkRunWordCount = new StepConfig() .withName("Flink add a wordcount step") .withActionOnFailure("CONTINUE") .withHadoopJarStep(flinkWordCountConf); stepConfigs.add(flinkRunWordCount); AddJobFlowStepsResult res = emr.addJobFlowSteps(new AddJobFlowStepsRequest() .withJobFlowId("myClusterId") .withSteps(stepConfigs));
List<StepConfig> stepConfigs = new ArrayList<StepConfig>(); HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs("bash","-c", "flink run -m yarn-cluster -yn 2 /usr/lib/flink/examples/streaming/WordCount.jar " + "--input", "s3://myBucket/pg11.txt", "--output", "s3://myBucket/alice/"); StepConfig flinkRunWordCountStep = new StepConfig() .withName("Flink add a wordcount step and terminate") .withActionOnFailure("CONTINUE") .withHadoopJarStep(flinkWordCountConf); stepConfigs.add(flinkRunWordCountStep); RunJobFlowRequest request = new RunJobFlowRequest() .withName("flink-transient") .withReleaseLabel("emr-5.2.1") .withApplications(myApps) .withServiceRole("EMR_DefaultRole") .withJobFlowRole("EMR_EC2_DefaultRole") .withLogUri("s3://myLogBucket") .withInstances( new JobFlowInstancesConfig().withEc2KeyName("myKeyName").withInstanceCount(2) .withKeepJobFlowAliveWhenNoSteps(false).withMasterInstanceType("m3.xlarge") .withSlaveInstanceType("m3.xlarge")) .withSteps(stepConfigs); RunJobFlowResult result = emr.runJobFlow(request);

例 AWS CLI

使用 add-steps 子命令将新作业提交到现有 Flink 集群:

aws emr add-steps --cluster-id myClusterId \ --steps Type=CUSTOM_JAR,Name=Flink_Transient_No_Terminate,Jar=command-runner.jar,\ Args="flink","run","-m","yarn-cluster","-yid","application_1473169569237_0002","-yn","2",\ "/usr/lib/flink/examples/streaming/WordCount.jar",\ "--input","s3://myBucket/pg11.txt","--output","s3://myBucket/alice2/" \ --region myRegion

使用 create-cluster 子命令创建一个临时 EMR 集群,该集群在 Flink 作业完成时终止:

aws emr create-cluster --release-label emr-5.2.1 \ --name "Flink_Transient" \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --auto-terminate --instance-type m3.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole \ --ec2-attributes KeyName=YourKeyName,InstanceProfile=EMR_EC2_DefaultRole \ --steps Type=CUSTOM_JAR,Jar=command-runner.jar,Name=Flink_Long_Running_Session,\ Args="bash","-c","\"flink run -m yarn-cluster -yn 2 /usr/lib/flink/examples/streaming/WordCount.jar --input s3://myBucket/pg11.txt --output s3://myBucket/alice/""

目前,适用于 EMR 集群的 Flink Scala 外壳仅配置为启动新的 YARN 会话。您可以通过以下过程使用 Scala 外壳。

在主节点上使用 Flink Scala 外壳

  1. 使用 SSH 连接到主节点中所述,使用 SSH 登录到主节点。

  2. 键入以下命令启动外壳:

    在 Amazon EMR 版本 5.5.0 及更高版本中,您可以:

    % flink-scala-shell yarn -n 1

    在 Amazon EMR 早期版本中,使用:

    % /usr/lib/flink/bin/start-scala-shell.sh yarn -n 1

    这将启动 Flink Scala 外壳,以便您能以交互方式使用 Flink。与使用其他接口和选项一样,您可以基于要从外壳运行的任务数来缩放示例中使用的 -n 选项值。

属于 Flink 应用程序的 Application Master 将托管 Flink Web 界面,它是一种将 JAR 提交为作业或查看其他作业的当前状态的替代方法。只要有 Flink 会话在运行,Flink Web 界面就处于活动状态。如果您已激活一个长时间运行的 YARN 作业,则可按照 Amazon EMR 管理指南 中的使用 SSH 连接到主节点主题中的说明操作,以连接到 YARN ResourceManager。例如,如果您已设置 SSH 隧道并且已在浏览器中激活代理,则可在 EMR 集群详细信息页面中的 Connections 下选择 ResourceManager 连接。

在找到 ResourceManager 后,选择正在托管 Flink 会话的 YARN 应用程序。选择 Tracking UI 列下的链接。

在 Flink Web 界面中,您可以查看配置,将自己的自定义 JAR 作为作业提交或监视正在进行的作业。