Amazon EMR
Amazon EMR 版本指南
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

在 Amazon EMR 中使用 Flink 作业

可通过多种方式与 Amazon EMR 上的 Flink 交互:通过 Amazon EMR 步骤、在 ResourceManager 跟踪 UI 中找到的 Flink 接口以及在命令行上。此外,还可通过所有这些方式提交要运行的 Flink 应用程序的 JAR 文件。

此外,可以将 Flink 应用程序作为长时间运行的 YARN 作业或临时集群运行。在长时间运行的作业中,您可以将多个 Flink 应用程序提交给 Amazon EMR 上运行的一个 Flink 集群。如果您将 Flink 作为临时作业运行,则 Amazon EMR 集群仅在其运行 Flink 应用程序的时间内存在,因此您只需为使用的资源和时间付费。在任一情况下,您可以使用 Amazon EMR AddSteps API 操作提交 Flink 作业,也可以将其作为 RunJobFlow 操作或 AWS CLI create-cluster 命令的步骤参数提交。

建议您启动一个长时间运行的 Flink 作业,多个客户端可通过 YARN API 操作提交到该作业。您启动一个 Flink YARN 会话,并将作业提交到 Flink JobManager,后者位于托管 Flink 会话 Application Master 守护程序的 YARN 节点上。要启动 YARN 会话,可从控制台、AWS CLI 或 Java 开发工具包使用以下步骤。

使用控制台提交长时间运行的作业

使用 flink-yarn-session 命令提交现有集群中长时间运行的 Flink 会话。

注意

向 Amazon EMR 版本 5.5.0 中添加了 flink-yarn-session 命令作为 yarn-session.sh 脚本的包装程序以简化执行。如果您使用早期版本的 Amazon EMR,请在随后的步骤中用 bash -c "/usr/lib/flink/bin/yarn-session.sh -n 2 -d" 替换 Argument (参数)

  1. 通过以下网址打开 Amazon EMR 控制台:https://console.amazonaws.cn/elasticmapreduce/

  2. 在集群列表中,选择先前已启动的集群。

  3. 在集群详细信息页面上,选择 Steps (步骤),再选择 Add Step (添加步骤)

  4. 使用随后提供的指南输入参数,然后选择 Add (添加)

    参数 描述

    Step type (步骤类型)

    自定义 JAR

    名称

    可帮助您标识步骤的名称。例如,Flink_Long_Running_Session

    Jar location (Jar 位置)

    command-runner.jar

    Arguments (参数)

    带适合您的应用的参数的 flink-yarn-session 命令。例如,flink-yarn-session -n 2 -d 将使用两个任务管理器 (-n 2) 以分离状态 (-d) 在 YARN 集群中启动长时间运行的 Flink 会话。有关参数详细信息,请参阅最新 Flink 文档中的 YARN 设置

    注意

    使用 5.5.0 之前的 Amazon EMR 版本时,您必须直接指定 Flink 脚本 yarn-session.sh,而不是指定 flink-yarn-session,从而指定脚本的完整路径。例如,bash -c "/usr/lib/flink/bin/yarn-session.sh -n 2 -d"

使用 AWS CLI 提交长时间运行的 Flink 作业

  • 要在 EMR 中启动长时间运行的 Flink 集群,可使用 create-cluster 命令:

    aws emr create-cluster --release-label emr-5.25.0 \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --instance-type m4.large \ --instance-count 2 \ --service-role EMR_DefaultRole \ --ec2-attributes KeyName=MyKeyName,InstanceProfile=EMR_EC2_DefaultRole \ --steps Type=CUSTOM_JAR,Jar=command-runner.jar,Name=Flink_Long_Running_Session,\ Args="flink-yarn-session -n 2 -d"

您可以使用命令行选项提交工作,也可以使用通过 YARN ResourceManager 代理的 Flink 的本机接口。要使用 Flink CLI 通过 EMR 步骤提交,请指定长时间运行的 Flink 集群的 YARN 应用程序 ID。为此,请在 EMR 命令行上或通过 YarnClient API 操作运行 yarn application –list

$ yarn application -list 16/09/07 19:32:13 INFO client.RMProxy: Connecting to ResourceManager at ip-10-181-83-19.ec2.internal/10.181.83.19:8032 Total number of applications (application-types: [] and states: [SUBMITTED, ACCEPTED, RUNNING]):1 Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URL application_1473169569237_0002 Flink session with 14 TaskManagers (detached) Apache Flink hadoop default RUNNING UNDEFINED 100% http://ip-10-136-154-194.ec2.internal:33089

例 适用于 Java 的开发工具包

List<StepConfig> stepConfigs = new ArrayList<StepConfig>(); HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs("flink", "run", "-m", "yarn-cluster", “-yid”, “application_1473169569237_0002”, "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://myBucket/pg11.txt", "--output", "s3://myBucket/alice2/"); StepConfig flinkRunWordCount = new StepConfig() .withName("Flink add a wordcount step") .withActionOnFailure("CONTINUE") .withHadoopJarStep(flinkWordCountConf); stepConfigs.add(flinkRunWordCount); AddJobFlowStepsResult res = emr.addJobFlowSteps(new AddJobFlowStepsRequest() .withJobFlowId("myClusterId") .withSteps(stepConfigs));

例 AWS CLI

使用 add-steps 子命令将新作业提交到现有 Flink 集群:

aws emr add-steps --cluster-id myClusterId \ --steps Type=CUSTOM_JAR,Name=Flink_Submit_To_Long_Running,Jar=command-runner.jar,\ Args="flink","run","-m","yarn-cluster","-yid","application_1473169569237_0002","-yn","2",\ "/usr/lib/flink/examples/streaming/WordCount.jar",\ "--input","s3://myBucket/pg11.txt","--output","s3://myBucket/alice2/" \ --region myRegion

以下示例通过向现有集群添加步骤来启动 Flink WordCount 示例。

例 控制台

在现有集群的控制台详细信息页面中,通过为 Steps (步骤) 字段选择 Add Step (添加步骤) 来添加步骤。

例 适用于 Java 的开发工具包

这些示例说明了两种运行 Flink 作业的方法。

以下示例将 Flink 作业提交到一个运行的集群:

List<StepConfig> stepConfigs = new ArrayList<StepConfig>(); // The application id specified below is retrieved from the YARN cluster, for example, by running 'yarn application -list' from the master node command line HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs("flink", "run", "-m", "yarn-cluster", "-yid", "application_1473169569237_0002", "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://bucket/for/my/textfile.txt", "--output", "s3://bucket/for/my/output/"); StepConfig flinkRunWordCount = new StepConfig() .withName("Flink add a wordcount step") .withActionOnFailure("CONTINUE") .withHadoopJarStep(flinkWordCountConf); stepConfigs.add(flinkRunWordCount); // Specify the cluster ID of the YARN cluster instead of j-xxxxxxxxx AddJobFlowStepsResult res = emr.addJobFlowSteps(new AddJobFlowStepsRequest() .withJobFlowId("j-xxxxxxxxxx") .withSteps(stepConfigs));

以下示例创建一个集群,它运行 Flink 作业并在完成时将其终止。

import java.util.ArrayList; import java.util.List; import com.amazonaws.AmazonClientException; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.profile.ProfileCredentialsProvider; import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce; import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder; import com.amazonaws.services.elasticmapreduce.model.*; public class Main_test { public static void main(String[] args) { AWSCredentials credentials_profile = null; try { credentials_profile = new ProfileCredentialsProvider("default").getCredentials(); } catch (Exception e) { throw new AmazonClientException( "Cannot load credentials from .aws/credentials file. " + "Make sure that the credentials file exists and the profile name is specified within it.", e); } AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard() .withCredentials(new AWSStaticCredentialsProvider(credentials_profile)) .withRegion(Regions.US_WEST_1) .build(); List<StepConfig> stepConfigs = new ArrayList<StepConfig>(); HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs("bash","-c", "flink", "run", "-m", "yarn-cluster", "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://path/to/input-file.txt", "--output", "s3://path/to/output/"); StepConfig flinkRunWordCountStep = new StepConfig() .withName("Flink add a wordcount step and terminate") .withActionOnFailure("CONTINUE") .withHadoopJarStep(flinkWordCountConf); stepConfigs.add(flinkRunWordCountStep); Application flink = new Application().withName("Flink"); RunJobFlowRequest request = new RunJobFlowRequest() .withName("flink-transient") .withReleaseLabel("emr-5.20.0") .withApplications(flink) .withServiceRole("EMR_DefaultRole") .withJobFlowRole("EMR_EC2_DefaultRole") .withLogUri("s3://path/to/my/logfiles") .withInstances(new JobFlowInstancesConfig() .withEc2KeyName("myEc2Key") .withEc2SubnetId("subnet-12ab3c45") .withInstanceCount(3) .withKeepJobFlowAliveWhenNoSteps(false) .withMasterInstanceType("m4.large") .withSlaveInstanceType("m4.large")) .withSteps(stepConfigs); RunJobFlowResult result = emr.runJobFlow(request); System.out.println("The cluster ID is " + result.toString()); } }

例 AWS CLI

使用 add-steps 子命令将新作业提交到现有 Flink 集群:

aws emr add-steps --cluster-id myClusterId \ --steps Type=CUSTOM_JAR,Name=Flink_Transient_No_Terminate,Jar=command-runner.jar,\ Args="flink","run","-m","yarn-cluster","-yid","application_1473169569237_0002","-yn","2",\ "/usr/lib/flink/examples/streaming/WordCount.jar",\ "--input","s3://myBucket/pg11.txt","--output","s3://myBucket/alice2/" \ --region myRegion

使用 create-cluster 子命令创建一个临时 EMR 集群,该集群在 Flink 作业完成时终止:

aws emr create-cluster --release-label emr-5.2.1 \ --name "Flink_Transient" \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --auto-terminate --instance-type m4.large \ --instance-count 2 \ --service-role EMR_DefaultRole \ --ec2-attributes KeyName=YourKeyName,InstanceProfile=EMR_EC2_DefaultRole \ --steps Type=CUSTOM_JAR,Jar=command-runner.jar,Name=Flink_Long_Running_Session,\ Args="bash","-c","\"flink run -m yarn-cluster -yn 2 /usr/lib/flink/examples/streaming/WordCount.jar --input s3://myBucket/pg11.txt --output s3://myBucket/alice/""