订阅工作流程教程第 2 部分:实现工作流程 - Amazon Simple Workflow Service
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

订阅工作流程教程第 2 部分:实现工作流程

到目前为止,我们的代码都是比较通用的。在此部分,我们将开始实际定义工作流程执行的操作以及需要什么活动才能实现它。

设计工作流程

回想一下,此工作流程的初步构想由以下步骤组成:

  1. 向用户获取订阅地址(电子邮件或手机短信)。

  2. 创建 SNS 主题,然后让所提供的终端节点订阅该主题。

  3. 等待用户确认订阅。

  4. 如果用户确认,则向该主题发布一条祝贺消息。

我们可将工作流程中的每步视为工作流程必须执行的一个活动。我们的工作流程 负责在适当的时间安排每个活动以及协调活动之间的数据传输。

对于此工作流程,我们将为其中每个步骤创建一个单独的活动,并为其提供描述性名称:

  1. get_contact_activity

  2. subscribe_topic_activity

  3. wait_for_confirmation_activity

  4. send_result_activity

这些活动将按顺序执行,并且后续步骤中将使用每步中的数据。

我们可以将应用程序设计为将所有代码放在一个源文件中,但这与 Amazon SWF 的设计宗旨相悖。它适合的工作流程可跨越整个 Internet 范围,因此我们至少要将应用程序分为两个单独的执行文件:

  • swf_sns_workflow.rb - 包含工作流程和工作流程启动者。

  • swf_sns_activities.rb - 包含活动和活动启动者。

可在单独的时段、单独的计算机甚至世界上的不同地点运行工作流程和活动实现。由于 Amazon SWF 会跟踪工作流和活动的细节,因此无论活动在何处运行,工作流都能协调其计划和数据传输。

设置工作流程代码

我们首先将创建一个名为 swf_sns_workflow.rb 的文件。在此文件中,声明一个名为 SampleWorkflow 的类。以下是类声明及其构造函数 initialize 方法。

require_relative 'utils.rb' # SampleWorkflow - the main workflow for the SWF/SNS Sample # # See the file called `README.md` for a description of what this file does. class SampleWorkflow attr_accessor :name def initialize(workflowId) # the domain to look for decision tasks in. @domain = init_domain # the task list is used to poll for decision tasks. @workflowId = workflowId # The list of activities to run, in order. These name/version hashes can be # passed directly to AWS::SimpleWorkflow::DecisionTask#schedule_activity_task. @activity_list = [ { :name => 'get_contact_activity', :version => 'v1' }, { :name => 'subscribe_topic_activity', :version => 'v1' }, { :name => 'wait_for_confirmation_activity', :version => 'v1' }, { :name => 'send_result_activity', :version => 'v1' }, ].reverse! # reverse the order... we're treating this like a stack. register_workflow end

如您所见,我们保留以下类实例数据:

  • domain - 从 init_domain 中的 utils.rb 检索的域名。

  • workflowId - 任务列表传入到 initialize

  • activity_list - 活动列表,其中具有我们将运行的活动的名称和版本。

域名、活动名称和活动版本足以让 Amazon SWF 明确地识别活动类型,因此,这些就是我们要计划活动所需保存的所有数据。

工作流程的 decider 代码将使用任务列表轮询决策任务并安排活动。

在此函数的最后,我们调用一个尚未定义的方法:register_workflow。接下来,我们将定义此方法。

注册工作流程

要使用工作流程类型,必须先注册它。如同活动类型一样,工作流程类型按其域、名称和版本进行标识。此外,与域和活动类型一样,无法重新注册现有的工作流程类型。如果需要更改有关工作流程类型的任何内容,则必须为其提供新版本,基本上就是新建一个类型。

以下是 register_workflow 的代码,它用于检索我们在上次运行时注册的现有工作流程类型,如果尚未注册该工作流程,则注册它。

# Registers the workflow def register_workflow workflow_name = 'swf-sns-workflow' @workflow_type = nil # a default value... workflow_version = '1' # Check to see if this workflow type already exists. If so, use it. @domain.workflow_types.each do | a | if (a.name == workflow_name) && (a.version == workflow_version) @workflow_type = a end end if @workflow_type.nil? options = { :default_child_policy => :terminate, :default_task_start_to_close_timeout => 3600, :default_execution_start_to_close_timeout => 24 * 3600 } puts "registering workflow: #{workflow_name}, #{workflow_version}, #{options.inspect}" @workflow_type = @domain.workflow_types.register(workflow_name, workflow_version, options) end puts "** registered workflow: #{workflow_name}" end

首先,我们通过循环访问域的 workflow_types 集合,查看是否已注册工作流名称和版本。如果找到了匹配项,则将使用已注册的工作流程类型。

如果没有找到匹配项,就需要注册一个新工作流类型(方法是在我们搜索工作流的同一个 workflow_types 集合上调用 register),命名为“swf-sns-workflow”,版本为“1”,并设置如下选项。

options = { :default_child_policy => :terminate, :default_task_start_to_close_timeout => 3600, :default_execution_start_to_close_timeout => 24 * 3600 }

注册期间传入的选项用于为工作流程类型设置默认行为,因此不需要在每次开始执行新工作流程时设置这些值。

在这里,我们只设置了一些超时值:从任务开始到结束时可用的最长时间(1 小时)以及工作流程执行完毕可用的最长时间(24 小时)。如果超出其中任意一个时间,则任务或工作流程将超时。

有关超时值的详细信息,请参阅Amazon SWF 超时类型

轮询决策

在每个工作流程执行的核心部分都有一个决策程序。决策程序的职责是管理工作流程自身的执行。决策程序接收决策任务,然后通过安排新活动、删除并重新启动活动,或通过将工作流程执行的状态设置为完成、已取消或失败,响应这些任务。

决策程序按照工作流程执行的任务列表 名称接收要响应的决策任务。要轮询决策任务,可在域的 decision_tasks 集合上调用 poll,以循环遍历可用的决策任务。然后,可通过循环访问决策任务的 new_events 集合,检查其中是否有新事件。

返回的事件为 AWS::SimpleWorkflow::HistoryEvent 对象,您可以使用返回事件的 event_type 成员获取事件的类型。有关历史记录事件类型的列表和说明,请参阅《Amazon Simple Workflow Service API Reference》中的 HistoryEvent

下面是决策任务轮询器逻辑的开头。我们的工作流程类中一个名为 poll_for_decisions 的新方法。

def poll_for_decisions # first, poll for decision tasks... @domain.decision_tasks.poll(@workflowId) do | task | task.new_events.each do | event | case event.event_type

我们现在将根据收到的 event_type 划分决策的执行。我们收到的第一个类型可能是 WorkflowExecutionStarted。收到该事件意味着 Amazon SWF 正在向决策程序发出信号,示意它应开始执行工作流。我们首先将通过对轮询时收到的任务调用 schedule_activity_task,安排第一个活动。

我们将在活动列表中声明的第一个活动传递给它,由于我们颠倒了列表,因此可将其用作堆栈,而该活动占据列表上的 last 位置。我们定义的“活动”只是由名称和版本号组成的映射,但这就是 Amazon SWF 识别活动以进行计划所需的全部信息(假定已注册该活动)。

when 'WorkflowExecutionStarted' # schedule the last activity on the (reversed, remember?) list to # begin the workflow. puts "** scheduling activity task: #{@activity_list.last[:name]}" task.schedule_activity_task( @activity_list.last, { :workflowId => "#{@workflowId}-activities" } )

当我们计划活动时,Amazon SWF 会向我们在计划时传入的活动任务列表发送一个活动任务,示意任务开始。我们将在 订阅工作流教程第 3 部分:实现活动 中处理活动任务,但值得注意的是,我们在此并未执行任务。我们仅需告知 Amazon SWF 应该计划该任务。

我们需要处理的下一个活动是 ActivityTaskCompleted 事件,当 Amazon SWF 从某个活动任务收到活动已完成的响应时就会发生该事件。

when 'ActivityTaskCompleted' # we are running the activities in strict sequential order, and # using the results of the previous activity as input for the next # activity. last_activity = @activity_list.pop if(@activity_list.empty?) puts "!! All activities complete! Sending complete_workflow_execution..." task.complete_workflow_execution return true; else # schedule the next activity, passing any results from the # previous activity. Results will be received in the activity # task. puts "** scheduling activity task: #{@activity_list.last[:name]}" if event.attributes.has_key?('result') task.schedule_activity_task( @activity_list.last, { :input => event.attributes[:result], :workflowId => "#{@workflowId}-activities" } ) else task.schedule_activity_task( @activity_list.last, { :workflowId => "#{@workflowId}-activities" } ) end end

由于我们按线性方式执行任务,并且一次仅执行一个活动,因此我们将借此机会从 activity_list 堆栈中弹出已完成的任务。如果这样导致列表变空,则表示我们的工作流程已完成。在这种情况下,我们通过调用任务的 complete_workflow_execution 向 Amazon SWF 发出工作流已完成的信号。

如果列表中仍有条目,则我们将安排列表上的下一活动(仍处于最后一个位置)。但是,这一次我们将查看上一活动在完成后是否向 Amazon SWF 返回了任何结果数据,这些数据将在事件属性中的可选 result 键中提供给工作流。如果该活动产生了结果,则我们将其作为 input 选项传递给所安排的下一活动以及活动任务列表。

通过检索完成的活动的 result 值以及通过设置安排的活动的 input 值,我们可将数据从一个活动传递到下一个,或可使用活动中的数据,根据活动得到的结果更改决策程序中的行为。

就本教程而言,在定义工作流程的行为时,这两种事件类型最为重要。但是,活动可生成 ActivityTaskCompleted 以外的事件。我们将通过为 ActivityTaskTimedOutActivityTaskFailed 事件以及 WorkflowExecutionCompleted 事件(将在 Amazon SWF 处理 complete_workflow_execution 调用时生成)提供演示处理程序代码来封装决策程序代码。

when 'ActivityTaskTimedOut' puts "!! Failing workflow execution! (timed out activity)" task.fail_workflow_execution return false when 'ActivityTaskFailed' puts "!! Failing workflow execution! (failed activity)" task.fail_workflow_execution return false when 'WorkflowExecutionCompleted' puts "## Yesss, workflow execution completed!" task.workflow_execution.terminate return false end end end end

启动工作流程执行

在将为要轮询的工作流程生成任何决策任务之前,我们需要启动工作流程执行。

要开始执行工作流,请在已注册的工作流类型 (AWS::SimpleWorkflow::WorkflowType) 上调用 start_execution。我们将在此周围定义一个小型包装器,以利用在类构造函数中检索的 workflow_type 实例成员。

def start_execution workflow_execution = @workflow_type.start_execution( { :workflowId => @workflowId } ) poll_for_decisions end end

一旦执行工作流,工作流的任务列表(它作为工作流执行选项传入 start_execution)上即开始显示决策事件。

与注册工作流程类型时提供的选项不同,不将传递给 start_execution 的选项视为工作流程类型的一部分。无需更改工作流程的版本,即可随意在每次执行工作流程时更改这些选项。

由于我们希望在运行文件时开始执行工作流程,因此添加一些将类实例化的代码,然后调用刚刚定义的 start_execution 方法。

if __FILE__ == $0 require 'securerandom' # Use a different task list name every time we start a new workflow execution. # # This avoids issues if our pollers re-start before SWF considers them closed, # causing the pollers to get events from previously-run executions. workflowId = SecureRandom.uuid # Let the user start the activity worker first... puts "" puts "Amazon SWF Example" puts "------------------" puts "" puts "Start the activity worker, preferably in a separate command-line window, with" puts "the following command:" puts "" puts "> ruby swf_sns_activities.rb #{workflowId}-activities" puts "" puts "You can copy & paste it if you like, just don't copy the '>' character." puts "" puts "Press return when you're ready..." i = gets # Now, start the workflow. puts "Starting workflow execution." sample_workflow = SampleWorkflow.new(workflowId) sample_workflow.start_execution end

为避免任务列表命名发生任何冲突,我们将使用 SecureRandom.uuid 生成可用作任务列表名称的随机 UUID,确保将不同的任务列表名称用于每次工作流程执行。

注意

任务列表用于记录有关工作流程执行的事件,因此,如果将同一任务列表用于多次执行同一工作流程类型,则可能获得在上一次执行期间生成的事件,当多次执行间隔较小(试验新代码或运行测试时经常这样)时尤为如此。

为了避免必须处理以前执行中的项目的问题,可对每次执行使用新任务列表,在开始工作流程执行时指定该列表。

此处还有一些代码,可向运行代码的人员(很可能是您)提供说明以及提供任务列表的“活动”版本。决策程序使用此任务列表名称为工作流程安排活动,而活动实现将对此任务列表名称侦听活动事件以了解何时开始安排的活动并提供有关活动执行的最新消息。

这段代码还等待用户开始运行活动启动程序,然后再 开始工作流程执行,因此在所提供的任务列表上开始出现活动任务时,活动启动程序将准备好作出响应。

后续步骤

您已实现工作流程。接下来,将在订阅工作流教程第 3 部分:实现活动中定义活动和活动启动程序。