适当地关闭活动和工作流工作线程 - 适用于 Java 的 AWS 开发工具包
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

适当地关闭活动和工作流工作线程

构建简单的 Amazon SWF 应用程序主题中介绍实施包括注册应用程序、活动、工作流工作线程以及工作流启动程序的简单工作流应用程序的整个过程。

工作线程类设计为持续运行,轮询 Amazon SWF 发送的任务,以便运行活动或返回决策。完成轮询请求后,Amazon SWF 会记录轮询器,并尝试为其分配任务。

如果工作流工作线程在长轮询过程中终止,Amazon SWF 可能仍然会尝试向终止的工作线程发送任务,导致该任务丢失 (直至该任务超时)。

解决上述情况的一个方法是等待所有长轮询请求返回,然后再终止工作线程。

在该主题中,我们会使用 Java 的关闭挂钩来重写 helloswf 中的活动工作线程,以尝试适当地关闭活动工作线程。

以下是完整的代码:

import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import com.amazonaws.regions.Regions; import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflow; import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflowClientBuilder; import com.amazonaws.services.simpleworkflow.model.ActivityTask; import com.amazonaws.services.simpleworkflow.model.PollForActivityTaskRequest; import com.amazonaws.services.simpleworkflow.model.RespondActivityTaskCompletedRequest; import com.amazonaws.services.simpleworkflow.model.RespondActivityTaskFailedRequest; import com.amazonaws.services.simpleworkflow.model.TaskList; public class ActivityWorkerWithGracefulShutdown { private static final AmazonSimpleWorkflow swf = AmazonSimpleWorkflowClientBuilder.standard().withRegion(Regions.DEFAULT_REGION).build(); private static CountDownLatch waitForTermination = new CountDownLatch(1); private static volatile boolean terminate = false; private static String executeActivityTask(String input) throws Throwable { return "Hello, " + input + "!"; } public static void main(String[] args) { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { try { terminate = true; System.out.println("Waiting for the current poll request" + " to return before shutting down."); waitForTermination.await(60, TimeUnit.SECONDS); } catch (InterruptedException e) { // ignore } } }); try { pollAndExecute(); } finally { waitForTermination.countDown(); } } public static void pollAndExecute() { while (!terminate) { System.out.println("Polling for an activity task from the tasklist '" + HelloTypes.TASKLIST + "' in the domain '" + HelloTypes.DOMAIN + "'."); ActivityTask task = swf.pollForActivityTask(new PollForActivityTaskRequest() .withDomain(HelloTypes.DOMAIN) .withTaskList(new TaskList().withName(HelloTypes.TASKLIST))); String taskToken = task.getTaskToken(); if (taskToken != null) { String result = null; Throwable error = null; try { System.out.println("Executing the activity task with input '" + task.getInput() + "'."); result = executeActivityTask(task.getInput()); } catch (Throwable th) { error = th; } if (error == null) { System.out.println("The activity task succeeded with result '" + result + "'."); swf.respondActivityTaskCompleted( new RespondActivityTaskCompletedRequest() .withTaskToken(taskToken) .withResult(result)); } else { System.out.println("The activity task failed with the error '" + error.getClass().getSimpleName() + "'."); swf.respondActivityTaskFailed( new RespondActivityTaskFailedRequest() .withTaskToken(taskToken) .withReason(error.getClass().getSimpleName()) .withDetails(error.getMessage())); } } } } }

在该版本中,原始版本的 main 功能中的轮询代码已被移至其自己的 pollAndExecute 方法中。

函数现在将 mainCountDownLatch关闭挂钩结合使用,使线程在请求终止后等待最多 60 秒,然后才允许线程关闭。