在 Amazon EMR 中使用 Flink 作业 - Amazon EMR
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在 Amazon EMR 中使用 Flink 作业

与Flink互动的方式有几种 Amazon EMR:至 Amazon EMR 步骤,Flink界面位于 ResourceManager 跟踪UI和命令行。此外,还可通过所有这些方式提交要运行的 Flink 应用程序的 JAR 文件。

此外,可以将 Flink 应用程序作为长时间运行的 YARN 作业或临时集群运行。在长时间运行的作业中,您可以将多个 Flink 应用程序提交给 Amazon EMR 上运行的一个 Flink 集群。如果您将 Flink 作为临时作业运行,则 Amazon EMR 集群仅在其运行 Flink 应用程序的时间内存在,因此您只需为使用的资源和时间付费。在任一情况下,您可以使用 Amazon EMR AddSteps API 操作提交 Flink 作业,也可以将其作为 RunJobFlow 操作或 AWS CLI create-cluster 命令的步骤参数提交。

建议您启动一个长时间运行的 Flink 作业,多个客户端可通过 YARN API 操作提交到该作业。您启动FlinkYARN会话并将作业提交给Flink JobManager,位于托管Flink会话应用程序主守护程序的YARN节点上。要启动 YARN 会话,可从控制台、AWS CLI 或 Java 开发工具包使用以下步骤。

使用控制台提交长时间运行的作业

使用 flink-yarn-session 命令提交现有集群中长时间运行的 Flink 会话。

注意

向 Amazon EMR 版本 5.5.0 中添加了 flink-yarn-session 命令作为 yarn-session.sh 脚本的包装程序以简化执行。如果您使用早期版本的 Amazon EMR,请在随后的步骤中用 bash -c "/usr/lib/flink/bin/yarn-session.sh -n 2 -d" 替换 Argument (参数)

  1. 通过以下网址打开 Amazon EMR 控制台:https://console.amazonaws.cn/elasticmapreduce/

  2. 在集群列表中,选择先前已启动的集群。

  3. 在集群详细信息页面上,选择 Steps (步骤),再选择 Add Step (添加步骤)

  4. 使用随后提供的指南输入参数,然后选择 Add (添加)

    参数 Description

    Step type (步骤类型)

    自定义 JAR

    姓名

    可帮助您标识步骤的名称。例如,Flink_Long_Running_Session.

    Jar location (Jar 位置)

    command-runner.jar

    Arguments

    带适合您的应用的参数的 flink-yarn-session 命令。例如,flink-yarn-session -d -n 2 会在YARN群集中以分离状态启动一个长时间运行的Flink会话(-d)与两位任务经理(-n 2)。有关参数详细信息,请参阅最新 Flink 文档中的 YARN 设置

    注意

    使用 5.5.0 之前的 Amazon EMR 版本时,您必须直接指定 Flink 脚本 yarn-session.sh,而不是指定 flink-yarn-session,从而指定脚本的完整路径。例如,bash -c "/usr/lib/flink/bin/yarn-session.sh -d -n 2".

使用 AWS CLI 提交长时间运行的 Flink 作业

  • 要在 EMR 中启动长时间运行的 Flink 集群,可使用 create-cluster 命令:

    aws emr create-cluster --release-label emr-5.31.0 \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole \ --ec2-attributes KeyName=MyKeyName,InstanceProfile=EMR_EC2_DefaultRole \ --steps Type=CUSTOM_JAR,Jar=command-runner.jar,Name=Flink_Long_Running_Session,\ Args=flink-yarn-session,-d,-n,2

您可以使用命令行选项提交工作,但您也可以使用通过YARN代理的Flink本机界面 ResourceManager。要使用FlinkCLI通过EMR步骤提交,请指定长期运行的Flink群集YARN应用程序ID。为此,请运行 yarn application –list 在EMR命令行或通过 YarnClient API操作:

$ yarn application -list 16/09/07 19:32:13 INFO client.RMProxy: Connecting to ResourceManager at ip-10-181-83-19.ec2.internal/10.181.83.19:8032 Total number of applications (application-types: [] and states: [SUBMITTED, ACCEPTED, RUNNING]):1 Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URL application_1473169569237_0002 Flink session with 14 TaskManagers (detached) Apache Flink hadoop default RUNNING UNDEFINED 100% http://ip-10-136-154-194.ec2.internal:33089

例 适用于 Java 的开发工具包

List<StepConfig> stepConfigs = new ArrayList<StepConfig>(); HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs("flink", "run", "-m", "yarn-cluster", “-yid”, “application_1473169569237_0002”, "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://myBucket/pg11.txt", "--output", "s3://myBucket/alice2/"); StepConfig flinkRunWordCount = new StepConfig() .withName("Flink add a wordcount step") .withActionOnFailure("CONTINUE") .withHadoopJarStep(flinkWordCountConf); stepConfigs.add(flinkRunWordCount); AddJobFlowStepsResult res = emr.addJobFlowSteps(new AddJobFlowStepsRequest() .withJobFlowId("myClusterId") .withSteps(stepConfigs));

例 AWS CLI

使用 add-steps 子命令将新作业提交到现有 Flink 集群:

aws emr add-steps --cluster-id myClusterId \ --steps Type=CUSTOM_JAR,Name=Flink_Submit_To_Long_Running,Jar=command-runner.jar,\ Args="flink","run","-m","yarn-cluster","-yid","application_1473169569237_0002","-yn","2",\ "/usr/lib/flink/examples/streaming/WordCount.jar",\ "--input","s3://myBucket/pg11.txt","--output","s3://myBucket/alice2/" \ --region myRegion

以下示例启动Flink WordCount 示例,通过向现有群集添加步骤。

例 Console

在现有群集的控制台详细信息页面中,通过为 Steps 字段选择 Add Step 来添加步骤。

例 适用于 Java 的开发工具包

这些示例说明了两种运行 Flink 作业的方法。

以下示例将 Flink 作业提交到一个运行的集群:

List<StepConfig> stepConfigs = new ArrayList<StepConfig>(); // The application id specified below is retrieved from the YARN cluster, for example, by running 'yarn application -list' from the master node command line HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs("flink", "run", "-m", "yarn-cluster", "-yid", "application_1473169569237_0002", "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://bucket/for/my/textfile.txt", "--output", "s3://bucket/for/my/output/"); StepConfig flinkRunWordCount = new StepConfig() .withName("Flink add a wordcount step") .withActionOnFailure("CONTINUE") .withHadoopJarStep(flinkWordCountConf); stepConfigs.add(flinkRunWordCount); // Specify the cluster ID of the YARN cluster instead of j-xxxxxxxxx AddJobFlowStepsResult res = emr.addJobFlowSteps(new AddJobFlowStepsRequest() .withJobFlowId("j-xxxxxxxxxx") .withSteps(stepConfigs));

以下示例创建一个集群,它运行 Flink 作业并在完成时将其终止。

import java.util.ArrayList; import java.util.List; import com.amazonaws.AmazonClientException; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.profile.ProfileCredentialsProvider; import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce; import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder; import com.amazonaws.services.elasticmapreduce.model.*; public class Main_test { public static void main(String[] args) { AWSCredentials credentials_profile = null; try { credentials_profile = new ProfileCredentialsProvider("default").getCredentials(); } catch (Exception e) { throw new AmazonClientException( "Cannot load credentials from .aws/credentials file. " + "Make sure that the credentials file exists and the profile name is specified within it.", e); } AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard() .withCredentials(new AWSStaticCredentialsProvider(credentials_profile)) .withRegion(Regions.US_WEST_1) .build(); List<StepConfig> stepConfigs = new ArrayList<StepConfig>(); HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs("bash","-c", "flink", "run", "-m", "yarn-cluster", "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://path/to/input-file.txt", "--output", "s3://path/to/output/"); StepConfig flinkRunWordCountStep = new StepConfig() .withName("Flink add a wordcount step and terminate") .withActionOnFailure("CONTINUE") .withHadoopJarStep(flinkWordCountConf); stepConfigs.add(flinkRunWordCountStep); Application flink = new Application().withName("Flink"); RunJobFlowRequest request = new RunJobFlowRequest() .withName("flink-transient") .withReleaseLabel("emr-5.20.0") .withApplications(flink) .withServiceRole("EMR_DefaultRole") .withJobFlowRole("EMR_EC2_DefaultRole") .withLogUri("s3://path/to/my/logfiles") .withInstances(new JobFlowInstancesConfig() .withEc2KeyName("myEc2Key") .withEc2SubnetId("subnet-12ab3c45") .withInstanceCount(3) .withKeepJobFlowAliveWhenNoSteps(false) .withMasterInstanceType("m4.large") .withSlaveInstanceType("m4.large")) .withSteps(stepConfigs); RunJobFlowResult result = emr.runJobFlow(request); System.out.println("The cluster ID is " + result.toString()); } }

例 AWS CLI

使用 add-steps 子命令将新作业提交到现有 Flink 集群:

aws emr add-steps --cluster-id myClusterId \ --steps Type=CUSTOM_JAR,Name=Flink_Transient_No_Terminate,Jar=command-runner.jar,\ Args="flink","run","-m","yarn-cluster","-yid","application_1473169569237_0002","-yn","2",\ "/usr/lib/flink/examples/streaming/WordCount.jar",\ "--input","s3://myBucket/pg11.txt","--output","s3://myBucket/alice2/" \ --region myRegion

使用 create-cluster 子命令创建一个临时 EMR 集群,该集群在 Flink 作业完成时终止:

aws emr create-cluster --release-label emr-5.2.1 \ --name "Flink_Transient" \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --auto-terminate --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole \ --ec2-attributes KeyName=YourKeyName,InstanceProfile=EMR_EC2_DefaultRole \ --steps Type=CUSTOM_JAR,Jar=command-runner.jar,Name=Flink_Long_Running_Session,\ Args="bash","-c","\"flink run -m yarn-cluster -yn 2 /usr/lib/flink/examples/streaming/WordCount.jar --input s3://myBucket/pg11.txt --output s3://myBucket/alice/""