Amazon Simple Workflow Service
开发人员指南 (API 版本 2012-01-25)
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 Amazon AWS 入门

订阅工作流程教程第 3 部分:实施活动

我们现在将实现工作流程中的每个活动,首先是基类,它为活动代码提供某些共有功能。

定义基本活动类型

设计工作流程时,我们指定了以下活动:

  • get_contact_activity

  • subscribe_topic_activity

  • wait_for_confirmation_activity

  • send_result_activity

我们现在将实现这些活动中的每个。由于这些活动将共用某些功能,因此我们来做一点基础工作,并创建这些活动可共用的一些共有代码。我们将它称为 BasicActivity,并在一个名为 basic_activity.rb 的新文件中定义它。

如同其他源文件一样,我们将包含 utils.rb,以便访问 init_domain 函数,从而设置示例域。

require_relative 'utils.rb'

接下来,我们将声明基本活动类和我们在每种活动中将感兴趣的一些共有数据。我们将保存活动的 AWS::SimpleWorkflow::ActivityType 实例、其 name,并提供一个特殊的数据成员,用于存储活动的 results

class BasicActivity attr_accessor :activity_type attr_accessor :name attr_accessor :results

这些属性访问在类的 initialize 方法中定义的实例数据,该方法采用活动的 name、可选的 version 以及将该活动注册到 Amazon SWF 时要使用的 options 的映射。

def initialize(name, version = 'v1', options = nil) @activity_type = nil @name = name @results = nil # get the domain to use for activity tasks. @domain = init_domain # Check to see if this activity type already exists. @domain.activity_types.each do | a | if (a.name == @name) && (a.version == version) @activity_type = a end end if @activity_type.nil? # If no options were specified, use some reasonable defaults. if options.nil? options = { # All timeouts are in seconds. :default_task_heartbeat_timeout => 900, :default_task_schedule_to_start_timeout => 120, :default_task_schedule_to_close_timeout => 3800, :default_task_start_to_close_timeout => 3600 } end @activity_type = @domain.activity_types.register(@name, version, options) end end

与工作流程类型注册一样,如果已注册某个活动类型,则可通过查看域的 activity_types 集合,检索该类型。如果找不到该活动,则将注册该活动。

此外,与工作流程类型一样,可设置在注册活动类型时与其存储在一起的默认选项

我们的基本活动最后得到的是运行它的一致方式。我们将定义一个 do_activity 方法,该方法采用活动任务。如下所示,我们可使用传入的活动任务通过其 input 实例属性接收数据。

def do_activity(task) @results = task.input # may be nil return true end end

这样即封装 BasicActivity 类。现在,我们将用它使我们的活动变得简单一致。

定义 GetContactActivity

在工作流程执行期间运行的第一个活动是 get_contact_activity,该活动检索用户的 Amazon SNS 主题订阅信息。

新建一个名为 get_contact_activity.rb 的文件,并且需要 yaml(我们将使用它准备要传递给 Amazon SWF 的字符串)和 basic_activity.rb(我们将使用它作为此 GetContactActivity 类的基础)。

require 'yaml' require_relative 'basic_activity.rb' # **GetContactActivity** provides a prompt for the user to enter contact # information. When the user successfully enters contact information, the # activity is complete. class GetContactActivity < BasicActivity

由于我们将活动注册代码放入 BasicActivity 中,因此 initialize 方法给 GetContactActivity 是相当简单的。我们仅仅用活动名称 get_contact_activity 调用基类构造函数。只需此项即可注册我们的活动。

# initialize the activity def initialize super('get_contact_activity') end

现在我们将定义 do_activity 方法,它提示输入用户的电子邮件和/或电话号码。

def do_activity(task) puts "" puts "Please enter either an email address or SMS message (mobile phone) number to" puts "receive SNS notifications. You can also enter both to use both address types." puts "" puts "If you enter a phone number, it must be able to receive SMS messages, and must" puts "be 11 digits (such as 12065550101 to represent the number 1-206-555-0101)." input_confirmed = false while !input_confirmed puts "" print "Email: " email = $stdin.gets.strip print "Phone: " phone = $stdin.gets.strip puts "" if (email == '') && (phone == '') print "You provided no subscription information. Quit? (y/n)" confirmation = $stdin.gets.strip.downcase if confirmation == 'y' return false end else puts "You entered:" puts " email: #{email}" puts " phone: #{phone}" print "\nIs this correct? (y/n): " confirmation = $stdin.gets.strip.downcase if confirmation == 'y' input_confirmed = true end end end # make sure that @results is a single string. YAML makes this easy. @results = { :email => email, :sms => phone }.to_yaml return true end end

do_activity 的结尾处,我们获得从用户检索的电子邮件和电话号码,将其放入映射中,然后使用 to_yaml 将整个映射转换为 YAML 字符串。这样做有一个重要原因:在完成活动时传递给 Amazon SWF 的任何结果只能是字符串数据。Ruby 可轻松地将对象转换为 YAML 字符串,然后再转换回对象,这一点非常适合此用途。

这是 get_contact_activity 执行的结束。此数据将在接下来的 subscribe_topic_activity 执行中使用。

定义 SubscribeTopicActivity

我们现在将深入研究 Amazon SNS 并创建一个活动,该活动使用由 get_contact_activity 生成的信息让用户订阅 Amazon SNS 主题。

新建一个名为 subscribe_topic_activity.rb 的文件,添加我们用于 get_contact_activity 的相同要求,声明类,并提供其 initialize 方法。

require 'yaml' require_relative 'basic_activity.rb' # **SubscribeTopicActivity** sends an SMS / email message to the user, asking for # confirmation. When this action has been taken, the activity is complete. class SubscribeTopicActivity < BasicActivity def initialize super('subscribe_topic_activity') end

既然代码已就位,可设置和注册活动,那么添加一些代码以创建 Amazon SNS 主题。为此,我们将使用 AWS::SNS::Client 对象的 create_topic 方法。

create_topic 方法添加到您的类,该方法采用传入的 Amazon SNS 客户端对象。

def create_topic(sns_client) topic_arn = sns_client.create_topic(:name => 'SWF_Sample_Topic')[:topic_arn] if topic_arn != nil # For an SMS notification, setting `DisplayName` is *required*. Note that # only the *first 10 characters* of the DisplayName will be shown on the # SMS message sent to the user, so choose your DisplayName wisely! sns_client.set_topic_attributes( { :topic_arn => topic_arn, :attribute_name => 'DisplayName', :attribute_value => 'SWFSample' } ) else @results = { :reason => "Couldn't create SNS topic", :detail => "" }.to_yaml return nil end return topic_arn end

一旦具有该主题的Amazon 资源名称 (ARN),即可将其与 Amazon SNS 客户端的 set_topic_attributes 方法配合使用以设置该主题的 DisplayName,用 Amazon SNS 发送手机短信时需要该项。

最后,我们将定义 do_activity 方法。首先将收集在安排该活动时通过 input 选项传递的任何数据。如前所述,必须以字符串(我们使用 to_yaml 创建了它)形式传递此项。在检索它时,我们将使用 YAML.load 将数据转换为 Ruby 对象。

以下是 do_activity 的开头,我们在此处检索输入数据。

def do_activity(task) activity_data = { :topic_arn => nil, :email => { :endpoint => nil, :subscription_arn => nil }, :sms => { :endpoint => nil, :subscription_arn => nil }, } if task.input != nil input = YAML.load(task.input) activity_data[:email][:endpoint] = input[:email] activity_data[:sms][:endpoint] = input[:sms] else @results = { :reason => "Didn't receive any input!", :detail => "" }.to_yaml puts(" #{@results.inspect}") return false end # Create an SNS client. This is used to interact with the service. Set the # region to $SMS_REGION, which is a region that supports SMS notifications # (defined in the file `swf_sns_utils.rb`). sns_client = AWS::SNS::Client.new( :config => AWS.config.with(:region => $SMS_REGION))

如果我们未收到任何输入,则无计可施,因此我们只好将活动视为失败。

但是,假如一切正常,则我们将继续充实 do_activity 方法,用适用于 Ruby 的 AWS 开发工具包获取 Amazon SNS 客户端,然后将其传递给 create_topic 方法以创建 Amazon SNS 主题。

# Create the topic and get the ARN activity_data[:topic_arn] = create_topic(sns_client) if activity_data[:topic_arn].nil? return false end

在此,有几点值得注意:

  • 我们使用 AWS.config.with 设置 Amazon SNS 客户端的地区。因为我们需要发送手机短信,所以使用在 utils.rb 中声明的手机短信启用区。

  • 将该主题的 ARN 保存在 activity_data 映射中。这是将传递给我们的工作流程中下一活动的部分数据。

最后,此活动使用传入的终端节点(电子邮件和手机短信)让用户订阅该 Amazon SNS 主题。我们不要求用户同时输入两个 终端节点,但是,我们至少需要一个。

# Subscribe the user to the topic, using either or both endpoints. [:email, :sms].each do | x | ep = activity_data[x][:endpoint] # don't try to subscribe an empty endpoint if (ep != nil && ep != "") response = sns_client.subscribe( { :topic_arn => activity_data[:topic_arn], :protocol => x.to_s, :endpoint => ep } ) activity_data[x][:subscription_arn] = response[:subscription_arn] end end

AWS::SNS::Client.subscribe 采用主题 ARN、protocol(我们巧妙地将它伪装成相应终端节点的 activity_data 映射键)。

最后,我们以 YAML 格式重新打包下一活动的信息,以使我们可将其发回 Amazon SWF。

# if at least one subscription arn is set, consider this a success. if (activity_data[:email][:subscription_arn] != nil) or (activity_data[:sms][:subscription_arn] != nil) @results = activity_data.to_yaml else @results = { :reason => "Couldn't subscribe to SNS topic", :detail => "" }.to_yaml puts(" #{@results.inspect}") return false end return true end end

至此,即完成了 subscribe_topic_activity 的执行。接下来,我们将定义 wait_for_confirmation_activity

定义 WaitForConfirmationActivity

用户订阅 Amazon SNS 主题后,仍需要确认订阅请求。在这种情况下,我们将等待用户通过电子邮件或手机短信进行确认。

等待用户确认订阅的活动称为 wait_for_confirmation_activity,下面我们将定义它。首先,新建一个名为 wait_for_confirmation_activity.rb 的文件,像设置之前的活动那样设置它。

require 'yaml' require_relative 'basic_activity.rb' # **WaitForConfirmationActivity** waits for the user to confirm the SNS # subscription. When this action has been taken, the activity is complete. It # might also time out... class WaitForConfirmationActivity < BasicActivity # Initialize the class def initialize super('wait_for_confirmation_activity') end

接下来,我们将开始定义 do_activity 方法,然后检索向名为 subscription_data 的本地变量中输入的任何数据。

def do_activity(task) if task.input.nil? @results = { :reason => "Didn't receive any input!", :detail => "" }.to_yaml return false end subscription_data = YAML.load(task.input)

既然我们已有主题 ARN,那么可通过新建 AWS::SNS::Topic 实例,检索主题,然向其传递该 ARN。

topic = AWS::SNS::Topic.new(subscription_data[:topic_arn]) if topic.nil? @results = { :reason => "Couldn't get SWF topic ARN", :detail => "Topic ARN: #{topic.arn}" }.to_yaml return false end

现在,我们将检查该主题以了解用户是否已使用某个终端节点确认了订阅。我们只需要一个终端节点得到确认,即将活动视为成功。

Amazon SNS 主题保留该主题作出的订阅的列表,并且我们可通过了解订阅的 ARN 是否设置为 PendingConfirmation 以外的内容,检查用户是否已确认某个特定的订阅。

# loop until we get some indication that a subscription was confirmed. subscription_confirmed = false while(!subscription_confirmed) topic.subscriptions.each do | sub | if subscription_data[sub.protocol.to_sym][:endpoint] == sub.endpoint # this is one of the endpoints we're interested in. Is it subscribed? if sub.arn != 'PendingConfirmation' subscription_data[sub.protocol.to_sym][:subscription_arn] = sub.arn puts "Topic subscription confirmed for (#{sub.protocol}: #{sub.endpoint})" @results = subscription_data.to_yaml return true else puts "Topic subscription still pending for (#{sub.protocol}: #{sub.endpoint})" end end end

如果获取订阅的 ARN,则将其保存在活动的结果数据中,将其转换为 YAML,然后从 do_activity 返回 true,这表示活动成功完成。

由于等待确认订阅可能需要一段时间,因此我们将偶尔对活动任务调用 record_heartbeat。这样可告知 Amazon SWF 活动仍在进行处理,并可用于提供有关活动进度的最新消息(如果正在进行某些可报告进度的操作,如处理文件)。

task.record_heartbeat!( { :details => "#{topic.num_subscriptions_confirmed} confirmed, #{topic.num_subscriptions_pending} pending" }) # sleep a bit. sleep(4.0) end

至此,我们的 while 循环结束。如果我们由于某种原因未成功即退出 while 循环,则我们将报告失败并结束 do_activity 方法。

if (subscription_confirmed == false) @results = { :reason => "No subscriptions could be confirmed", :detail => "#{topic.num_subscriptions_confirmed} confirmed, #{topic.num_subscriptions_pending} pending" }.to_yaml return false end end end

至此,wait_for_confirmation_activity 的实现结束。我们还剩下一个活动要定义:send_result_activity

定义 SendResultActivity

如果工作流程已进展到这里,则我们已成功地让用户订阅了 Amazon SNS 主题,并且用户已确认该订阅。

我们的最后一个活动 send_result_activity,使用用户订阅的主题和用户确认订阅的终端节点,向用户发送成功主题订阅的确认。

新建一个名为 send_result_activity.rb 的文件,像设置之前的所有活动那样设置它。

require 'yaml' require_relative 'basic_activity.rb' # **SendResultActivity** sends the result of the activity to the screen, and, if # the user successfully registered using SNS, to the user using the SNS contact # information collected. class SendResultActivity < BasicActivity def initialize super('send_result_activity') end

do_activity 方法的开头相似,也从工作流程获取输入数据,从 YAML 转换这些数据,然后使用主题 ARN 创建 AWS::SNS::Topic 实例。

def do_activity(task) if task.input.nil? @results = { :reason => "Didn't receive any input!", :detail => "" } return false end input = YAML.load(task.input) # get the topic, so we publish a message to it. topic = AWS::SNS::Topic.new(input[:topic_arn]) if topic.nil? @results = { :reason => "Couldn't get SWF topic", :detail => "Topic ARN: #{topic.arn}" } return false end

具有主题后,我们将向其发布一条消息(并且将其回显到屏幕上)。

@results = "Thanks, you've successfully confirmed registration, and your workflow is complete!" # send the message via SNS, and also print it on the screen. topic.publish(@results) puts(@results) return true end end

发布到 Amazon SNS 主题将所提供的消息发送到对于该主题存在的所有 已订阅和确认的终端节点。因此,如果用户同时 通过电子邮件和手机短信进行确认,则用户将收到两条确认消息,每个终端节点一条。

后续步骤

至此,完成了 send_result_activity 的执行。现在,我们将在处理活动任务的活动应用程序中将所有这些活动联系在一起,并可启动活动作为回应,如第 4 部分:实施活动任务轮询器所述。