在亚马逊 EMR 中处理 Fink 职位 - Amazon EMR
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在亚马逊 EMR 中处理 Fink 职位

可通过多种方式在 Amazon EMR 上与 Flink 交互:通过控制台、在 ResourceManager 跟踪 UI 中找到的 Flink 接口以及在命令行上。此外,还可通过所有这些方式向 Flink 应用程序提交 JAR 文件。提交后,JAR 文件将成为由 Flink JobManager 管理的作业,后者位于托管 Flink 会话 Application Master 守护程序的 YARN 节点上。

您可以将 Flink 应用程序作为 YARN 作业运行的长时间运行的群集或临时群集上。在长时间运行的群集中,您可以将多个 Flink 作业提交给 Amazon EMR 上运行的一个 Flink 群集。如果您在临时群集上运行 Flink 作业,则 Amazon EMR 群集仅在其运行 Flink 应用程序的时间内存在,因此您只需为使用的资源和时间付费。您可以使用亚马逊 EMR 提交 Flink 作业AddStepsAPI 操作,作为RunJobFlow操作,并通过Amazon CLI add-steps或者create-cluster命令。

要启动多个客户端可以通过 YARN API 操作向其提交工作的 Fink 应用程序,您需要创建集群或将 Fink 应用程序添加到现有集群中。有关创建新群集的说明,请参阅使用 Flink 创建集群。要在现有群集上启动 YARN 会话,可从控制台使用以下步骤:Amazon CLI或 Java 软件开发工具包。

注意

这些区域有:flink-yarn-session命令添加在 Amazon EMR 版本 5.5.0 中作为yarn-session.sh脚本来简化执行。如果您使用早期版本的 Amazon EMR,请将bash -c "/usr/lib/flink/bin/yarn-session.sh -d"对于 来说为Arguments (参数)在控制台中或Args. 在Amazon CLI命令。

使用控制台在现有群集上提交 Flink 作业

提交 Fink 会话使用flink-yarn-session命令。

  1. 从打开 Amazon EMR 控制台https://console.aws.amazon.com/elasticmapreduce/

  2. 在集群列表中,选择先前已启动的集群。

  3. 在集群详细信息页面上,选择 Steps (步骤),再选择 Add Step (添加步骤)

  4. 使用随后提供的指南输入参数,然后选择 Add (添加)

    参数 描述

    Step type (步骤类型)

    自定义 JAR

    名称

    可帮助您标识步骤的名称。例如,闪烁会话

    Jar location (Jar 位置)

    command-runner.jar

    Arguments

    带适合您的应用的参数的 flink-yarn-session 命令。例如,flink-yarn-session以分离状态在 YARN 集群中启动 Fink 会话(-d)。请参阅YARN 设置在最新 Flink 文档中获取参数详细信息。

要在现有群集上提交 Fink 作业,请使用Amazon CLI

  • 要将 Flink 作业添加到 EMR 中长时间运行的群集中,可使用add-steps命令:

    aws emr add-steps --cluster-id j-XXXXXXXX --steps Type=CUSTOM_JAR,Name=CustomJAR,ActionOnFailure=CONTINUE,Jar=s3://mybucket/mytest.jar,Args=arg1,arg2,arg3 Type=CUSTOM_JAR,Name=CustomJAR,ActionOnFailure=CONTINUE,Jar=s3://mybucket/mytest.jar,MainClass=mymainclass,Args=flink-yarn-session -d

如果您在长时间运行的群集上已有 Fink 应用程序,则可以指定集群的 Flink 应用程序 ID,以便向其提交工作。要获取应用程序 ID,请运行yarn application -list上Amazon CLI或通过YarnClientAPI 操作:

$ yarn application -list 16/09/07 19:32:13 INFO client.RMProxy: Connecting to ResourceManager at ip-10-181-83-19.ec2.internal/10.181.83.19:8032 Total number of applications (application-types: [] and states: [SUBMITTED, ACCEPTED, RUNNING]):1 Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URL application_1473169569237_0002 Flink session with 14 TaskManagers (detached) Apache Flink hadoop default RUNNING UNDEFINED 100% http://ip-10-136-154-194.ec2.internal:33089

此 Fink 会话的应用程序 ID 为application_1473169569237_0002,您可以使用它将工作提交到应用程序使用Amazon CLI或开发工具包。

例 适用于 Java 的 开发工具包

List<StepConfig> stepConfigs = new ArrayList<StepConfig>(); HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs("flink", "run", "-m", "yarn-cluster", “-yid”, “application_1473169569237_0002”, "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://myBucket/pg11.txt", "--output", "s3://myBucket/alice2/"); StepConfig flinkRunWordCount = new StepConfig() .withName("Flink add a wordcount step") .withActionOnFailure("CONTINUE") .withHadoopJarStep(flinkWordCountConf); stepConfigs.add(flinkRunWordCount); AddJobFlowStepsResult res = emr.addJobFlowSteps(new AddJobFlowStepsRequest() .withJobFlowId("myClusterId") .withSteps(stepConfigs));

例 Amazon CLI

aws emr add-steps --cluster-id myClusterId \ --steps Type=CUSTOM_JAR,Name=Flink_Submit_To_Long_Running,Jar=command-runner.jar,\ Args="flink","run","-m","yarn-cluster","-yid","application_1473169569237_0002",\ "/usr/lib/flink/examples/streaming/WordCount.jar",\ "--input","s3://myBucket/pg11.txt","--output","s3://myBucket/alice2/" \ --region myRegion

以下示例启动一个临时群集,该群集运行 Flink 作业,然后在完成时终止。

例 适用于 Java 的 开发工具包

import java.util.ArrayList; import java.util.List; import com.amazonaws.AmazonClientException; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.profile.ProfileCredentialsProvider; import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce; import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder; import com.amazonaws.services.elasticmapreduce.model.*; public class Main_test { public static void main(String[] args) { AWSCredentials credentials_profile = null; try { credentials_profile = new ProfileCredentialsProvider("default").getCredentials(); } catch (Exception e) { throw new AmazonClientException( "Cannot load credentials from .aws/credentials file. " + "Make sure that the credentials file exists and the profile name is specified within it.", e); } AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard() .withCredentials(new AWSStaticCredentialsProvider(credentials_profile)) .withRegion(Regions.US_WEST_1) .build(); List<StepConfig> stepConfigs = new ArrayList<StepConfig>(); HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs("bash","-c", "flink", "run", "-m", "yarn-cluster", "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://path/to/input-file.txt", "--output", "s3://path/to/output/"); StepConfig flinkRunWordCountStep = new StepConfig() .withName("Flink add a wordcount step and terminate") .withActionOnFailure("CONTINUE") .withHadoopJarStep(flinkWordCountConf); stepConfigs.add(flinkRunWordCountStep); Application flink = new Application().withName("Flink"); RunJobFlowRequest request = new RunJobFlowRequest() .withName("flink-transient") .withReleaseLabel("emr-5.20.0") .withApplications(flink) .withServiceRole("EMR_DefaultRole") .withJobFlowRole("EMR_EC2_DefaultRole") .withLogUri("s3://path/to/my/logfiles") .withInstances(new JobFlowInstancesConfig() .withEc2KeyName("myEc2Key") .withEc2SubnetId("subnet-12ab3c45") .withInstanceCount(3) .withKeepJobFlowAliveWhenNoSteps(false) .withMasterInstanceType("m4.large") .withSlaveInstanceType("m4.large")) .withSteps(stepConfigs); RunJobFlowResult result = emr.runJobFlow(request); System.out.println("The cluster ID is " + result.toString()); } }

例 Amazon CLI

使用create-cluster子命令创建一个临时群集,该群集在 Flink 作业完成时终止:

aws emr create-cluster --release-label emr-5.2.1 \ --name "Flink_Transient" \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --auto-terminate --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole \ --ec2-attributes KeyName=YourKeyName,InstanceProfile=EMR_EC2_DefaultRole \ --steps Type=CUSTOM_JAR,Jar=command-runner.jar,Name=Flink_Long_Running_Session,\ Args="bash","-c","\"flink run -m yarn-cluster /usr/lib/flink/examples/streaming/WordCount.jar --input s3://myBucket/pg11.txt --output s3://myBucket/alice/""