在 Amazon EMR 中使用 Flink 作业 - Amazon EMR
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在 Amazon EMR 中使用 Flink 作业

在 Amazon EMR 上与 Flink 进行交互的方式有多种:通过控制台、跟踪界面上 ResourceManager 的 Flink 界面和命令行。您可通过以上任一方式将 JAR 文件提交到 Flink 应用程序。提交 JAR 文件后,它就会变成由 Flink JobManager 管理的作业。 JobManager 位于托管 Flink 会话 Application Master 守护程序的 YARN 节点上。

可以将 Flink 应用程序作为长时间运行集群或临时集群上的 YARN 作业。在长时间运行集群上,您可以将多个 Flink 作业提交给 Amazon EMR 上运行的一个 Flink 集群。如果您在临时集群上运行 Flink 作业,则 Amazon EMR 集群仅在其运行 Flink 应用程序的时间内存在,因此您只需为使用的资源和时间付费。您可以使用 Amazon EMR AddSteps API 操作,并通过 Amazon CLI add-stepscreate-cluster 命令来提交 Flink 作业,作为 RunJobFlow 操作的步骤参数。

要启动 Flink 应用程序,使多个客户端能够通过 YARN API 操作向其提交工作,需要您创建集群或将 Flink 应用程序添加到现有集群中。有关如何创建新集群的说明,请参阅使用 Flink 创建集群。要在现有集群上启动 YARN 会话,可从控制台、Amazon CLI 或 Java SDK 使用以下步骤。

注意

向 Amazon EMR 5.5.0 版本中添加了 flink-yarn-session 命令作为 yarn-session.sh 脚本的包装程序以简化执行。如果您使用 Amazon EMR 的更早版本,请将 bash -c "/usr/lib/flink/bin/yarn-session.sh -d" 在控制台中替换为 Arguments (参数) 或在 Amazon CLI 命令中替换为 Args

使用控制台在现有集群上提交 Flink 作业

使用 flink-yarn-session 命令在现有集群中提交 Flink 会话。

  1. 打开亚马逊 EMR 控制台,网址为 https://console.aws.amazon.com/emr。

  2. 在集群列表中,选择先前已启动的集群。

  3. 在集群详细信息页面上,选择 Steps (步骤),再选择 Add Step (添加步骤)

  4. 使用随后提供的指南输入参数,然后选择添加

    参数 描述

    Step type (步骤类型)

    自定义 JAR

    名称

    可帮助您标识步骤的名称。例如,< example-flink-step-name >

    Jar location (Jar 位置)

    command-runner.jar

    Arguments (参数)

    带适合您的应用的参数的 flink-yarn-session 命令。例如,flink-yarn-session -d 在您的 YARN 集群中以分离状态启动 Flink 会话 () -d。有关参数详细信息,请参阅新版 Flink 文档中的 YARN 设置

使用 Amazon CLI 在现有集群上提交 Flink 作业
  • 使用 add-steps 命令将 Flink 任务添加到长时间运行的集群。以下示例命令指定 Args="flink-yarn-session", "-d" 在分离状态下 (-d) 在 YARN 集群中启动 Flink 会话。有关参数详细信息,请参阅新版 Flink 文档中的 YARN 设置

    aws emr add-steps --cluster-id <j-XXXXXXXX> --steps Type=CUSTOM_JAR,Name=<example-flink-step-name>,Jar=command-runner.jar,Args="flink-yarn-session","-d"

如果您在长时间运行的集群上已有 Flink 应用程序,则可以指定集群的 Flink 应用程序 ID,以便向其提交工作。要获取应用程序 ID,请在yarn application -listAmazon CLI或通过 YarnClientAPI 操作运行:

$ yarn application -list 16/09/07 19:32:13 INFO client.RMProxy: Connecting to ResourceManager at ip-10-181-83-19.ec2.internal/10.181.83.19:8032 Total number of applications (application-types: [] and states: [SUBMITTED, ACCEPTED, RUNNING]):1 Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URL application_1473169569237_0002 Flink session with 14 TaskManagers (detached) Apache Flink hadoop default RUNNING UNDEFINED 100% http://ip-10-136-154-194.ec2.internal:33089

此 Flink 会话的应用程序 ID 为 application_1473169569237_0002,支持您使用 Amazon CLI 或 SDK 来将作业提交到应用程序。

例 SDK for Java
List<StepConfig> stepConfigs = new ArrayList<StepConfig>(); HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs("flink", "run", "-m", "yarn-cluster", "-yid", "application_1473169569237_0002", "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://myBucket/pg11.txt", "--output", "s3://myBucket/alice2/"); StepConfig flinkRunWordCount = new StepConfig() .withName("Flink add a wordcount step") .withActionOnFailure("CONTINUE") .withHadoopJarStep(flinkWordCountConf); stepConfigs.add(flinkRunWordCount); AddJobFlowStepsResult res = emr.addJobFlowSteps(new AddJobFlowStepsRequest() .withJobFlowId("myClusterId") .withSteps(stepConfigs));
例 Amazon CLI
aws emr add-steps --cluster-id <j-XXXXXXXX> \ --steps Type=CUSTOM_JAR,Name=Flink_Submit_To_Long_Running,Jar=command-runner.jar,\ Args="flink","run","-m","yarn-cluster","-yid","application_1473169569237_0002",\ "/usr/lib/flink/examples/streaming/WordCount.jar",\ "--input","s3://myBucket/pg11.txt","--output","s3://myBucket/alice2/" \ --region <region-code>

以下示例启动一个临时集群,它运行 Flink 作业并在完成时将其终止。

例 SDK for Java
import java.util.ArrayList; import java.util.List; import com.amazonaws.AmazonClientException; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.profile.ProfileCredentialsProvider; import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce; import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder; import com.amazonaws.services.elasticmapreduce.model.*; public class Main_test { public static void main(String[] args) { AWSCredentials credentials_profile = null; try { credentials_profile = new ProfileCredentialsProvider("default").getCredentials(); } catch (Exception e) { throw new AmazonClientException( "Cannot load credentials from .aws/credentials file. " + "Make sure that the credentials file exists and the profile name is specified within it.", e); } AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard() .withCredentials(new AWSStaticCredentialsProvider(credentials_profile)) .withRegion(Regions.US_WEST_1) .build(); List<StepConfig> stepConfigs = new ArrayList<StepConfig>(); HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs("bash", "-c", "flink", "run", "-m", "yarn-cluster", "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://path/to/input-file.txt", "--output", "s3://path/to/output/"); StepConfig flinkRunWordCountStep = new StepConfig() .withName("Flink add a wordcount step and terminate") .withActionOnFailure("CONTINUE") .withHadoopJarStep(flinkWordCountConf); stepConfigs.add(flinkRunWordCountStep); Application flink = new Application().withName("Flink"); RunJobFlowRequest request = new RunJobFlowRequest() .withName("flink-transient") .withReleaseLabel("emr-5.20.0") .withApplications(flink) .withServiceRole("EMR_DefaultRole") .withJobFlowRole("EMR_EC2_DefaultRole") .withLogUri("s3://path/to/my/logfiles") .withInstances(new JobFlowInstancesConfig() .withEc2KeyName("myEc2Key") .withEc2SubnetId("subnet-12ab3c45") .withInstanceCount(3) .withKeepJobFlowAliveWhenNoSteps(false) .withMasterInstanceType("m4.large") .withSlaveInstanceType("m4.large")) .withSteps(stepConfigs); RunJobFlowResult result = emr.runJobFlow(request); System.out.println("The cluster ID is " + result.toString()); } }
例 Amazon CLI

使用 create-cluster 子命令创建一个临时集群,该集群在 Flink 作业完成时终止:

aws emr create-cluster --release-label emr-5.2.1 \ --name "Flink_Transient" \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --auto-terminate --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole_V2 \ --ec2-attributes KeyName=<YourKeyName>,InstanceProfile=EMR_EC2_DefaultRole \ --steps Type=CUSTOM_JAR,Jar=command-runner.jar,Name=Flink_Long_Running_Session,\ Args="bash","-c","\"flink run -m yarn-cluster /usr/lib/flink/examples/streaming/WordCount.jar --input s3://myBucket/pg11.txt --output s3://myBucket/alice/""