Amazon Simple Workflow Service
开发人员指南 (API Version 2012-01-25)
AWS 服务或AWS文档中描述的功能,可能因地区/位置而异。请点击 Amazon AWS 入门,可查看中国地区的具体差异

订阅工作流程教程第 4 部分:实施活动任务轮询器

在 Amazon SWF 中,运行工作流程执行的活动任务显示在活动任务列表 上,在工作流程中安排活动时提供该列表。

我们将实现一个基本活动轮询器,用于为我们的工作流程处理这些任务,并在 Amazon SWF 将任务放入活动任务列表以启动我们的活动时将其用于启动该活动。

首先,创建名为 swf_sns_activities.rb 的新文件。我们将使用它:

  • 将我们创建的活动类实例化。

  • 将每个活动注册到 Amazon SWF。

  • 轮询活动,并在其名称出现在活动任务列表上时对每个活动调用 do_activity

swf_sns_activities.rb 中,添加以下语句以要求我们定义的每个活动类。

require_relative 'get_contact_activity.rb' require_relative 'subscribe_topic_activity.rb' require_relative 'wait_for_confirmation_activity.rb' require_relative 'send_result_activity.rb'

现在,我们将创建类并提供一些初始化代码。

class ActivitiesPoller def initialize(domain, task_list) @domain = domain @task_list = task_list @activities = {} # These are the activities we'll run activity_list = [ GetContactActivity, SubscribeTopicActivity, WaitForConfirmationActivity, SendResultActivity ] activity_list.each do | activity_class | activity_obj = activity_class.new puts "** initialized and registered activity: #{activity_obj.name}" # add it to the hash @activities[activity_obj.name.to_sym] = activity_obj end end

除了保存传入的任务列表 以外,此代码还将我们创建的每个活动类实例化。因为每个类都注册与其关联的活动(如果需要查看相关代码,请参阅 basic_activity.rb),所以这足以让 Amazon SWF 了解我们将运行的所有活动。

对于每个实例化的活动,我们都将其存储在使用活动名称(如 get_contact_activity)作为键的映射上,因此我们可在活动轮询器代码(接下来我们将定义这段代码)中轻松查找这些活动。

新建一个名为 poll_for_activities 的方法,调用 poll 在对域持有的 activity_tasks 上面以获取活动任务。

def poll_for_activities @domain.activity_tasks.poll(@task_list) do | task | activity_name = task.activity_type.name

我们可从任务的 activity_type 成员获取活动名称。接下来,我们将使用与此任务关联的活动名称查找要对其运行 do_activity 的类,并向其传递此任务(其中包括应传输到活动的任何输入数据)。

# find the task on the activities list, and run it. if @activities.key?(activity_name.to_sym) activity = @activities[activity_name.to_sym] puts "** Starting activity task: #{activity_name}" if activity.do_activity(task) puts "++ Activity task completed: #{activity_name}" task.complete!({ :result => activity.results }) # if this is the final activity, stop polling. if activity_name == 'send_result_activity' return true end else puts "-- Activity task failed: #{activity_name}" task.fail!( { :reason => activity.results[:reason], :details => activity.results[:detail] } ) end else puts "couldn't find key in @activities list: #{activity_name}" puts "contents: #{@activities.keys}" end end end end

这段代码仅等待 do_activity 的完成,然后根据返回代码对此任务调用 complete!fail!

注意

启动最后一个活动后,此代码即退出轮询器,因为轮询器已完成其任务并已启动所有活动。在您自己的 Amazon SWF 代码中,如果可能会再次运行您的活动,则可能要使活动轮询器无限期运行。

至此,我们的 ActivitiesPoller 类代码结束,但我们将在文件结尾再添加一些代码,以使用户可在命令行下运行它。

if __FILE__ == $0 if ARGV.count < 1 puts "You must supply a task-list name to use!" exit end poller = ActivitiesPoller.new(init_domain, ARGV[0]) poller.poll_for_activities puts "All done!" end

如果用户在命令行下运行该文件(向其传递活动任务列表作为第一个参数),则此代码将轮询器类实例化,并启动它,轮询活动。轮询器结束后(在它启动最后一个活动后),我们打印一条消息即退出。

活动轮询器就此结束。接下来,您只需运行代码,观察其如何发挥作用,如运行工作流程所述。