订阅工作流程教程第 4 部分:实现活动任务轮询器 - Amazon Simple Workflow Service
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

订阅工作流程教程第 4 部分:实现活动任务轮询器

在 Amazon SWF 中,运行工作流执行的活动任务会显示在活动任务列表中,该列表在您计划工作流中的活动时提供。

我们将实施一个基本的活动轮询器来处理工作流的这些任务,并在 Amazon SWF 将任务放入活动任务列表以启动活动时使用它来启动我们的活动。

首先,新建一个名为 swf_sns_activities.rb 的文件。我们将使用它:

  • 将我们创建的活动类实例化。

  • 将每个活动注册到 Amazon SWF。

  • 轮询活动,并在其名称出现在活动任务列表上时对每个活动调用 do_activity

swf_sns_activities.rb 中,添加以下语句以需要我们定义的每个活动类。

require_relative 'get_contact_activity.rb' require_relative 'subscribe_topic_activity.rb' require_relative 'wait_for_confirmation_activity.rb' require_relative 'send_result_activity.rb'

现在,我们将创建类并提供一些初始化代码。

class ActivitiesPoller def initialize(domain, workflowId) @domain = domain @workflowId = workflowId @activities = {} # These are the activities we'll run activity_list = [ GetContactActivity, SubscribeTopicActivity, WaitForConfirmationActivity, SendResultActivity ] activity_list.each do | activity_class | activity_obj = activity_class.new puts "** initialized and registered activity: #{activity_obj.name}" # add it to the hash @activities[activity_obj.name.to_sym] = activity_obj end end

除了保存传入的任务列表 以外,此代码还将我们创建的每个活动类实例化。由于每个类都会注册与其关联的活动(如需查看代码,请参阅 basic_activity.rb),这足以让 Amazon SWF 知道我们将运行的所有活动。

对于每个实例化的活动,我们都将其存储在使用活动名称(如 get_contact_activity)作为键的映射上,因此我们可在活动轮询器代码(接下来我们将定义这段代码)中轻松查找这些活动。

创建一个名为 poll_for_activities 的新方法,并调用域托管的 activity_tasks 上的 poll 来获取活动任务。

def poll_for_activities @domain.activity_tasks.poll(@workflowId) do | task | activity_name = task.activity_type.name

我们可从任务的 activity_type 成员获取活动名称。接下来,我们将使用与此任务关联的活动名称查找要对其运行 do_activity 的类,并向其传递此任务(其中包括应传输到活动的任何输入数据)。

# find the task on the activities list, and run it. if @activities.key?(activity_name.to_sym) activity = @activities[activity_name.to_sym] puts "** Starting activity task: #{activity_name}" if activity.do_activity(task) puts "++ Activity task completed: #{activity_name}" task.complete!({ :result => activity.results }) # if this is the final activity, stop polling. if activity_name == 'send_result_activity' return true end else puts "-- Activity task failed: #{activity_name}" task.fail!( { :reason => activity.results[:reason], :details => activity.results[:detail] } ) end else puts "couldn't find key in @activities list: #{activity_name}" puts "contents: #{@activities.keys}" end end end end

这段代码仅等待 do_activity 的完成,然后根据返回代码对此任务调用 complete!fail!

注意

启动最后一个活动后,此代码即退出轮询器,因为它已完成其任务并已启动所有活动。在您自己的 Amazon SWF 代码中,如果您的活动可能再次运行,您可能需要使活动轮询器无限期运行。

至此,我们的 ActivitiesPoller 类代码结束,但我们将在文件结尾再添加一些代码,以使用户可在命令行下运行它。

if __FILE__ == $0 if ARGV.count < 1 puts "You must supply a task-list name to use!" exit end poller = ActivitiesPoller.new(init_domain, ARGV[0]) poller.poll_for_activities puts "All done!" end

如果用户在命令行下运行该文件(向其传递活动任务列表作为第一个参数),则此代码将轮询器类实例化,并启动它,轮询活动。轮询器结束后(在它启动最后一个活动后),我们打印一条消息即退出。

活动轮询器就此结束。接下来,您只需运行代码,观察其如何发挥作用,如订阅工作流程教程:运行工作流程所述。