Amazon Simple Workflow Service
开发人员指南 (API Version 2012-01-25)
AWS 服务或AWS文档中描述的功能,可能因地区/位置而异。请点击 Amazon AWS 入门,可查看中国地区的具体差异

订阅工作流程教程第 2 部分:实施工作流程

到目前为止,我们的代码都是比较通用的。在此部分,我们将开始实际定义工作流程执行的操作以及需要什么活动才能实现它。

设计工作流程

回想一下,此工作流程的初步构想由以下步骤组成:

  1. 向用户获取订阅地址(电子邮件或手机短信)。

  2. 创建 SNS 主题,然后让所提供的终端节点订阅该主题。

  3. 等待用户确认订阅。

  4. 如果用户确认,则向该主题发布一条祝贺消息。

我们可将工作流程中的每步视为工作流程必须执行的一个活动。我们的工作流程 负责在适当的时间安排每个活动以及协调活动之间的数据传输。

对于此工作流程,我们将为其中每个步骤创建一个单独的活动,并为其提供描述性名称:

  1. get_contact_activity

  2. subscribe_topic_activity

  3. wait_for_confirmation_activity

  4. send_result_activity

这些活动将按顺序执行,并且后续步骤中将使用每步中的数据。

我们可将应用程序设计为所有代码都存在于一个源文件中,但这与 Amazon SWF 的设计宗旨相悖。它适合的工作流程可跨越整个 Internet 范围,因此我们至少要将应用程序分为两个单独的执行文件:

  • swf_sns_workflow.rb - 包含工作流程和工作流程启动程序。

  • swf_sns_activities.rb - 包含活动和活动启动程序。

可在单独的时段、单独的计算机甚至世界上的不同地点运行工作流程和活动实现。由于 Amazon SWF 跟踪工作流程和活动的详细信息,因此无论二者在何处运行,工作流程均可协调活动的安排和数据传输。

设置工作流程代码

我们首先创建名为 swf_sns_workflow.rb 的文件。在此文件中,声明一个名为 SampleWorkflow 的类。以下是类声明及其构造函数 initialize 方法。

require_relative 'utils.rb' # SampleWorkflow - the main workflow for the SWF/SNS Sample # # See the file called `README.md` for a description of what this file does. class SampleWorkflow attr_accessor :name def initialize(task_list) # the domain to look for decision tasks in. @domain = init_domain # the task list is used to poll for decision tasks. @task_list = task_list # The list of activities to run, in order. These name/version hashes can be # passed directly to AWS::SimpleWorkflow::DecisionTask#schedule_activity_task. @activity_list = [ { :name => 'get_contact_activity', :version => 'v1' }, { :name => 'subscribe_topic_activity', :version => 'v1' }, { :name => 'wait_for_confirmation_activity', :version => 'v1' }, { :name => 'send_result_activity', :version => 'v1' }, ].reverse! # reverse the order... we're treating this like a stack. register_workflow end

如您所见,我们保留以下类实例数据:

  • domain - 从 init_domain 在于 utils.rb 中检索的域名。

  • task_list - 任务列表传入到 initialize

  • activity_list - 活动列表,其中具有我们将运行的活动的名称和版本。

域名、活动名称和活动版本足以使 Amazon SWF 明确地识别活动类型,因此为了安排活动,只需保留这些数据。

工作流程的 decider 代码将使用任务列表轮询决策任务并安排活动。

在此函数的最后,我们调用一个尚未定义的方法:register_workflow。接下来,我们将定义此方法。

注册工作流程

要使用工作流程类型,必须先注册它。如同活动类型一样,工作流程类型按其域、名称和版本进行标识。此外,如同域和活动类型二者一样,无法重新注册现有的工作流程类型。如果需要更改有关工作流程类型的任何内容,则必须为其提供新版本,基本上就是新建一个类型。

以下是 register_workflow 的代码,它用于检索我们在上次运行时注册的现有工作流程类型,如果尚未注册该工作流程,则注册它。

# Registers the workflow def register_workflow workflow_name = 'swf-sns-workflow' @workflow_type = nil # a default value... workflow_version = '1' # Check to see if this workflow type already exists. If so, use it. @domain.workflow_types.each do | a | if (a.name == workflow_name) && (a.version == workflow_version) @workflow_type = a end end if @workflow_type.nil? options = { :default_child_policy => :terminate, :default_task_start_to_close_timeout => 3600, :default_execution_start_to_close_timeout => 24 * 3600 } puts "registering workflow: #{workflow_name}, #{workflow_version}, #{options.inspect}" @workflow_type = @domain.workflow_types.register(workflow_name, workflow_version, options) end puts "** registered workflow: #{workflow_name}" end

首先,我们通过循环访问域的 workflow_types 集合,查看是否已注册工作流程名称和版本。如果找到了匹配项,则将使用已注册的工作流程类型。

如果未找到匹配项,则注册一个新的工作流程类型(通过调用register在我们从中搜索工作流程相同的workflow_types集合),其名称为“swf-sns-workflow”,版本为“1”,并具有以下选项。

options = { :default_task_start_to_close_timeout => 3600, :default_execution_start_to_close_timeout => 24 * 3600 }

注册期间传入的选项用于为工作流程类型设置默认行为,因此不需要在每次开始执行新工作流程时设置这些值。

在这里,我们只设置了一些超时值:从任务开始到结束时可用的最长时间(1 小时)以及工作流程执行完毕可用的最长时间(24 小时)。如果超出其中任意一个时间,则任务或工作流程将超时。

有关超时值的详细信息,请参阅超时类型

轮询决策

在每个工作流程执行的核心部分都有一个决策程序。决策程序的职责是管理工作流程自身的执行。决策程序接收决策任务,然后通过安排新活动、删除并重新启动活动,或通过将工作流程执行的状态设置为完成、已取消或失败,响应这些任务。

决策程序按照工作流程执行的任务列表 名称接收要响应的决策任务。要轮询决策任务,请对域的 decision_tasks 集合调用 poll 以遍历可用的决策任务。然后,可通过循环访问决策任务的 new_events 集合,检查其中是否有新事件。

返回的事件为 AWS::SimpleWorkflow::HistoryEvent 对象,并可使用返回的事件的 event_type 成员获取该事件的类型。有关历史记录事件类型的列表和描述,请参阅 Amazon Simple Workflow Service API Reference 中的 HistoryEvent

下面是决策任务轮询器逻辑的开头。我们的工作流程类中一个名为 poll_for_decisions 的新方法。

def poll_for_decisions # first, poll for decision tasks... @domain.decision_tasks.poll(@task_list) do | task | task.new_events.each do | event | case event.event_type

我们现在将根据收到的 event_type 划分决策的执行。我们收到的第一个类型可能是 WorkflowExecutionStarted。收到此事件后,它表示 Amazon SWF 正在示意您的决策程序应开始执行工作流程。我们首先将通过对轮询时收到的任务调用 schedule_activity_task,安排第一个活动。

我们将在活动列表中声明的第一个活动传递给它,由于我们颠倒了列表,因此可将其用作堆栈,而该活动占据列表上的 last 位置。我们定义的“活动”仅仅是由名称和版本号组成的映射,但 Amazon SWF 只需此项即可识别所安排的活动,假定已注册该活动。

when 'WorkflowExecutionStarted' # schedule the last activity on the (reversed, remember?) list to # begin the workflow. puts "** scheduling activity task: #{@activity_list.last[:name]}" task.schedule_activity_task( @activity_list.last, { :task_list => "#{@task_list}-activities" } )

安排活动时,Amazon SWF 向在安排该活动时传入的活动任务列表发送一个活动任务,示意任务开始。我们将在 第 3 部分:实施活动 中处理活动任务,但值得注意的是,我们在此并未执行任务。我们仅告知 Amazon SWF 应安排 该任务。

我们将需要处理的下一个活动是 ActivityTaskCompleted 事件,当 Amazon SWF 从某个活动任务收到活动已完成的响应时发生该事件。

when 'ActivityTaskCompleted' # we are running the activities in strict sequential order, and # using the results of the previous activity as input for the next # activity. last_activity = @activity_list.pop if(@activity_list.empty?) puts "!! All activities complete! Sending complete_workflow_execution..." task.complete_workflow_execution return true; else # schedule the next activity, passing any results from the # previous activity. Results will be received in the activity # task. puts "** scheduling activity task: #{@activity_list.last[:name]}" if event.attributes.has_key?('result') task.schedule_activity_task( @activity_list.last, { :input => event.attributes[:result], :task_list => "#{@task_list}-activities" } ) else task.schedule_activity_task( @activity_list.last, { :task_list => "#{@task_list}-activities" } ) end end

由于我们按线性方式执行任务,并且一次仅执行一个活动,因此我们将借此机会从 activity_list 堆栈中弹出已完成的任务。如果这样导致列表变空,则表示我们的工作流程已完成。在此情况下,我们通过对任务调用 complete_workflow_execution,向 Amazon SWF 示意工作流程已完成。

如果列表中仍有条目,则我们将安排列表上的下一活动(仍处于最后一个位置)。但是,这次我们将查看上一活动在完成时是否向 Amazon SWF 返回了任何结果数据(将在事件属性的可选 result 键中提供这些结果数据)。如果该活动产生了结果,则我们将其作为 input 选项传递给所安排的下一活动以及活动任务列表。

通过检索完成的活动的 result 值以及通过设置安排的活动的 input 值,我们可将数据从一个活动传递到下一个,或可使用活动中的数据,根据活动得到的结果更改决策程序中的行为。

就本教程而言,在定义工作流程的行为时,这两种事件类型最为重要。但是,活动可生成 ActivityTaskCompleted 以外的事件。我们将通过为 ActivityTaskTimedOutActivityTaskFailed 事件以及为 WorkflowExecutionCompleted 事件(当 Amazon SWF 处理我们在用尽要运行的活动时作出的 complete_workflow_execution 调用时将生成它)提供演示处理程序代码,包装决策程序代码。

when 'ActivityTaskTimedOut' puts "!! Failing workflow execution! (timed out activity)" task.fail_workflow_execution return false when 'ActivityTaskFailed' puts "!! Failing workflow execution! (failed activity)" task.fail_workflow_execution return false when 'WorkflowExecutionCompleted' puts "## Yesss, workflow execution completed!" task.workflow_execution.terminate return false end end end end

启动工作流程执行

在将为要轮询的工作流程生成任何决策任务之前,我们需要启动工作流程执行。

要启动工作流程执行,请对您已注册的工作流程类型 (AWS::SimpleWorkflow::WorkflowType) 调用 start_execution。我们将在此周围定义一个小型包装器,以利用在类构造函数中检索的 workflow_type 实例成员。

def start_execution workflow_execution = @workflow_type.start_execution( { :task_list => @task_list } ) poll_for_decisions end end

一旦执行工作流程,工作流程的任务列表(它作为工作流程执行选项传入 start_execution)上即开始显示决策事件。

与注册工作流程类型时提供的选项不同,不将传递给 start_execution 的选项视为工作流程类型的一部分。无需更改工作流程的版本,即可随意在每次执行工作流程时更改这些选项。

由于我们希望在运行文件时开始执行工作流程,因此添加一些将类实例化的代码,然后调用刚刚定义的 start_execution 方法。

if __FILE__ == $0 require 'securerandom' # Use a different task list name every time we start a new workflow execution. # # This avoids issues if our pollers re-start before SWF considers them closed, # causing the pollers to get events from previously-run executions. task_list = SecureRandom.uuid # Let the user start the activity worker first... puts "" puts "Amazon SWF Example" puts "------------------" puts "" puts "Start the activity worker, preferably in a separate command-line window, with" puts "the following command:" puts "" puts "> ruby swf_sns_activities.rb #{task_list}-activities" puts "" puts "You can copy & paste it if you like, just don't copy the '>' character." puts "" puts "Press return when you're ready..." i = gets # Now, start the workflow. puts "Starting workflow execution." sample_workflow = SampleWorkflow.new(task_list) sample_workflow.start_execution end

为避免任务列表命名发生任何冲突,我们将使用 SecureRandom.uuid 生成可用作任务列表名称的随机 UUID,确保将不同的任务列表名称用于每次工作流程执行。

注意

任务列表用于记录有关工作流程执行的事件,因此,如果将同一任务列表用于多次执行同一工作流程类型,则可能获得在上一次执行期间生成的事件,当多次执行间隔较小(试验新代码或运行测试时经常这样)时尤为如此。

为了避免必须处理以前执行中的项目的问题,可对每次执行使用新任务列表,在开始工作流程执行时指定该列表。

此处还有一些代码,可向运行代码的人员(很可能是您)提供说明以及提供任务列表的“活动”版本。决策程序使用此任务列表名称为工作流程安排活动,而活动实现将对此任务列表名称侦听活动事件以了解何时开始安排的活动并提供有关活动执行的最新消息。

这段代码还等待用户开始运行活动启动程序,然后再 开始工作流程执行,因此在所提供的任务列表上开始出现活动任务时,活动启动程序将准备好作出响应。

后续步骤

我们已完成工作流程实现。接下来,在第 3 部分:实施活动中,我们将定义活动和活动启动程序。