在 Amazon EMR 中使用 Flink 作业 - Amazon EMR
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

在 Amazon EMR 中使用 Flink 作业

可通过多种方式与 Amazon EMR 上的 Flink 交互:通过控制台、在 ResourceManager 跟踪 UI 中找到的 Flink 接口,以及在命令行。您可通过所有这些方式提交 JAR 文件到 Flink 应用程序。一经提交,JAR 文件将成为由 Flink JobManager 管理的作业,该 Flink JobManager 位于托管 Flink 会话 Application Master 守护进程的 YARN 节点上。

可以将 Flink 应用程序作为长时间运行集群或临时集群上的 YARN 作业。在长时间运行集群上,您可以将多个 Flink 作业提交给 Amazon EMR 上运行的一个 Flink 集群。如果您在临时集群上运行 Flink 作业,则 Amazon EMR 集群仅在其运行 Flink 应用程序的时间内存在,因此您只需为使用的资源和时间付费。您可以使用 Amazon EMR AddSteps API 操作,并通过 Amazon CLI add-stepscreate-cluster 命令来提交 Flink 作业,作为 RunJobFlow 操作的步骤参数。

要启动 Flink 应用程序,使多个客户端能够通过 YARN API 操作向其提交工作,需要您创建集群或将 Flink 应用程序添加到现有集群中。有关如何创建新集群的说明,请参阅使用 Flink 创建集群。要在现有集群上启动 YARN 会话,可从控制台、Amazon CLI 或 Java SDK 使用以下步骤。

注意

向 Amazon EMR 5.5.0 版本中添加了 flink-yarn-session 命令作为 yarn-session.sh 脚本的包装程序以简化执行。如果您使用 Amazon EMR 的更早版本,请将 bash -c "/usr/lib/flink/bin/yarn-session.sh -d" 在控制台中替换为 Arguments (参数) 或在 Amazon CLI 命令中替换为 Args

使用控制台在现有集群上提交 Flink 作业

使用 flink-yarn-session 命令在现有集群中提交 Flink 会话。

  1. 通过以下链接打开 Amazon EMR 控制台:https://console.aws.amazon.com/elasticmapreduce/

  2. 在集群列表中,选择先前已启动的集群。

  3. 在集群详细信息页面上,选择 Steps (步骤),再选择 Add Step (添加步骤)

  4. 使用随后提供的指南输入参数,然后选择 Add (添加)

    参数 描述

    Step type (步骤类型

    自定义 JAR

    名称

    可帮助您标识步骤的名称。例如,<example-flink-step-name>

    Jar location (Jar 位置

    command-runner.jar

    Arguments

    带适合您的应用的参数的 flink-yarn-session 命令。例如,flink-yarn-session -d 在 YARN 集群中以分离状态(-d)启动 Flink 会话。有关参数详细信息,请参阅新版 Flink 文档中的 YARN 设置

使用 Amazon CLI 在现有集群上提交 Flink 作业

  • 使用 add-steps 命令将 Flink 任务添加到长时间运行的集群。以下示例命令指定 Args="flink-yarn-session", "-d" 在分离状态下 (-d) 在 YARN 集群中启动 Flink 会话。有关参数详细信息,请参阅新版 Flink 文档中的 YARN 设置

    aws emr add-steps --cluster-id <j-XXXXXXXX> --steps Type=CUSTOM_JAR,Name=<example-flink-step-name>,Jar=command-runner.jar,Args="flink-yarn-session","-d"

如果您在长时间运行的集群上已有 Flink 应用程序,则可以指定集群的 Flink 应用程序 ID,以便向其提交工作。要获取应用程序 ID,请在 Amazon CLI 上运行 yarn application -list 或通过 YarnClient API 操作:

$ yarn application -list 16/09/07 19:32:13 INFO client.RMProxy: Connecting to ResourceManager at ip-10-181-83-19.ec2.internal/10.181.83.19:8032 Total number of applications (application-types: [] and states: [SUBMITTED, ACCEPTED, RUNNING]):1 Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URL application_1473169569237_0002 Flink session with 14 TaskManagers (detached) Apache Flink hadoop default RUNNING UNDEFINED 100% http://ip-10-136-154-194.ec2.internal:33089

此 Flink 会话的应用程序 ID 为 application_1473169569237_0002,支持您使用 Amazon CLI 或 SDK 来将工作提交到应用程序。

例 SDK for Java

List<StepConfig> stepConfigs = new ArrayList<StepConfig>(); HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs("flink", "run", "-m", "yarn-cluster", "-yid", "application_1473169569237_0002", "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://myBucket/pg11.txt", "--output", "s3://myBucket/alice2/"); StepConfig flinkRunWordCount = new StepConfig() .withName("Flink add a wordcount step") .withActionOnFailure("CONTINUE") .withHadoopJarStep(flinkWordCountConf); stepConfigs.add(flinkRunWordCount); AddJobFlowStepsResult res = emr.addJobFlowSteps(new AddJobFlowStepsRequest() .withJobFlowId("myClusterId") .withSteps(stepConfigs));

例 Amazon CLI

aws emr add-steps --cluster-id <j-XXXXXXXX> \ --steps Type=CUSTOM_JAR,Name=Flink_Submit_To_Long_Running,Jar=command-runner.jar,\ Args="flink","run","-m","yarn-cluster","-yid","application_1473169569237_0002",\ "/usr/lib/flink/examples/streaming/WordCount.jar",\ "--input","s3://myBucket/pg11.txt","--output","s3://myBucket/alice2/" \ --region <region-code>

以下示例启动一个临时集群,它运行 Flink 作业并在完成时将其终止。

例 SDK for Java

import java.util.ArrayList; import java.util.List; import com.amazonaws.AmazonClientException; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.profile.ProfileCredentialsProvider; import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce; import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder; import com.amazonaws.services.elasticmapreduce.model.*; public class Main_test { public static void main(String[] args) { AWSCredentials credentials_profile = null; try { credentials_profile = new ProfileCredentialsProvider("default").getCredentials(); } catch (Exception e) { throw new AmazonClientException( "Cannot load credentials from .aws/credentials file. " + "Make sure that the credentials file exists and the profile name is specified within it.", e); } AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard() .withCredentials(new AWSStaticCredentialsProvider(credentials_profile)) .withRegion(Regions.US_WEST_1) .build(); List<StepConfig> stepConfigs = new ArrayList<StepConfig>(); HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs("bash","-c", "flink", "run", "-m", "yarn-cluster", "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://path/to/input-file.txt", "--output", "s3://path/to/output/"); StepConfig flinkRunWordCountStep = new StepConfig() .withName("Flink add a wordcount step and terminate") .withActionOnFailure("CONTINUE") .withHadoopJarStep(flinkWordCountConf); stepConfigs.add(flinkRunWordCountStep); Application flink = new Application().withName("Flink"); RunJobFlowRequest request = new RunJobFlowRequest() .withName("flink-transient") .withReleaseLabel("emr-5.20.0") .withApplications(flink) .withServiceRole("EMR_DefaultRole") .withJobFlowRole("EMR_EC2_DefaultRole") .withLogUri("s3://path/to/my/logfiles") .withInstances(new JobFlowInstancesConfig() .withEc2KeyName("myEc2Key") .withEc2SubnetId("subnet-12ab3c45") .withInstanceCount(3) .withKeepJobFlowAliveWhenNoSteps(false) .withMasterInstanceType("m4.large") .withSlaveInstanceType("m4.large")) .withSteps(stepConfigs); RunJobFlowResult result = emr.runJobFlow(request); System.out.println("The cluster ID is " + result.toString()); } }

例 Amazon CLI

使用 create-cluster 子命令创建一个临时集群,该集群在 Flink 作业完成时终止:

aws emr create-cluster --release-label emr-5.2.1 \ --name "Flink_Transient" \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --auto-terminate --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole \ --ec2-attributes KeyName=<YourKeyName>,InstanceProfile=EMR_EC2_DefaultRole \ --steps Type=CUSTOM_JAR,Jar=command-runner.jar,Name=Flink_Long_Running_Session,\ Args="bash","-c","\"flink run -m yarn-cluster /usr/lib/flink/examples/streaming/WordCount.jar --input s3://myBucket/pg11.txt --output s3://myBucket/alice/""