订阅工作流教程第 3 部分:实现活动 - Amazon Simple Workflow Service
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

订阅工作流教程第 3 部分:实现活动

我们现在将实现工作流中的每个活动,首先是基类,它为活动代码提供某些共有功能。

定义基本活动类型

设计工作流时,我们指定了以下活动:

  • get_contact_activity

  • subscribe_topic_activity

  • wait_for_confirmation_activity

  • send_result_activity

我们现在将实现这些活动中的每个。由于这些活动将共用某些功能,因此我们来做一点基础工作,并创建这些活动可共用的一些共有代码。我们将它称为 BasicActivity,并在一个名为 basic_activity.rb 的新文件中定义它。

如同其他源文件一样,我们将加入 utils.rb 以使用 init_domain 函数设置示例域。

require_relative 'utils.rb'

接下来,我们将声明基本活动类和我们在每项活动中都将感兴趣的一些共有数据。我们将活动的 AWS::SimpleWorkflow::ActivityType 实例、nameresults 保存在类的属性中。

class BasicActivity attr_accessor :activity_type attr_accessor :name attr_accessor :results

这些属性可访问在类的 initialize 方法中定义的实例数据,该方法需要一个活动名称、一个可选版本以及向 Amazon SWF 注册活动时使用的选项映射。

def initialize(name, version = 'v1', options = nil) @activity_type = nil @name = name @results = nil # get the domain to use for activity tasks. @domain = init_domain # Check to see if this activity type already exists. @domain.activity_types.each do | a | if (a.name == @name) && (a.version == version) @activity_type = a end end if @activity_type.nil? # If no options were specified, use some reasonable defaults. if options.nil? options = { # All timeouts are in seconds. :default_task_heartbeat_timeout => 900, :default_task_schedule_to_start_timeout => 120, :default_task_schedule_to_close_timeout => 3800, :default_task_start_to_close_timeout => 3600 } end @activity_type = @domain.activity_types.register(@name, version, options) end end

与工作流类型注册一样,如果已注册某个活动类型,则可通过查看域的 activity_types 集合,检索该类型。如果找不到该活动,则将注册该活动。

此外,与工作流类型一样,可设置在注册活动类型时与其存储在一起的默认选项

我们的基本活动最后得到的是运行它的一致方式。我们将定义一个 do_activity 方法,该方法采用活动任务。如下所示,我们可使用传入的活动任务通过其 input 实例属性接收数据。

def do_activity(task) @results = task.input # may be nil return true end end

这样即封装 BasicActivity 类。现在,我们将用它使我们的活动变得简单一致。

定义 GetContactActivity

工作流执行期间运行的第一个活动是 get_contact_activity,它会检索用户的 Amazon SNS 主题订阅信息。

新建一个名为 get_contact_activity.rb 的文件,并且需要 yaml(我们将使用它准备要传递给 Amazon SWF 的字符串)和 basic_activity.rb(我们将它用作 GetContactActivity 类的基础)。

require 'yaml' require_relative 'basic_activity.rb' # **GetContactActivity** provides a prompt for the user to enter contact # information. When the user successfully enters contact information, the # activity is complete. class GetContactActivity < BasicActivity

由于我们将活动注册代码放入 BasicActivity 中,因此 initialize 方法给 GetContactActivity 是相当简单的。我们仅仅用活动名称 get_contact_activity 调用基类构造函数。只需此项即可注册我们的活动。

# initialize the activity def initialize super('get_contact_activity') end

现在我们将定义 do_activity 方法,它提示输入用户的电子邮件和/或电话号码。

def do_activity(task) puts "" puts "Please enter either an email address or SMS message (mobile phone) number to" puts "receive SNS notifications. You can also enter both to use both address types." puts "" puts "If you enter a phone number, it must be able to receive SMS messages, and must" puts "be 11 digits (such as 12065550101 to represent the number 1-206-555-0101)." input_confirmed = false while !input_confirmed puts "" print "Email: " email = $stdin.gets.strip print "Phone: " phone = $stdin.gets.strip puts "" if (email == '') && (phone == '') print "You provided no subscription information. Quit? (y/n)" confirmation = $stdin.gets.strip.downcase if confirmation == 'y' return false end else puts "You entered:" puts " email: #{email}" puts " phone: #{phone}" print "\nIs this correct? (y/n): " confirmation = $stdin.gets.strip.downcase if confirmation == 'y' input_confirmed = true end end end # make sure that @results is a single string. YAML makes this easy. @results = { :email => email, :sms => phone }.to_yaml return true end end

do_activity 的结尾处,我们获得从用户检索的电子邮件和电话号码,将其放入映射中,然后使用 to_yaml 将整个映射转换为 YAML 字符串。这样做有一个重要原因:当您完成活动后,传递给 Amazon SWF 的任何结果都必须仅为字符串数据。Ruby 可轻松地将对象转换为 YAML 字符串,然后再转换回对象,这一点非常适合此用途。

这是 get_contact_activity 执行的结束。此数据将在接下来的 subscribe_topic_activity 执行中使用。

定义 SubscribeTopicActivity

现在,我们将深入探讨 Amazon SNS 并创建一个活动,该活动使用 get_contact_activity 生成的信息让用户订阅 Amazon SNS 主题。

新建一个名为 subscribe_topic_activity.rb 的文件,添加我们用于 get_contact_activity 的相同要求,声明您的类,然后提供其 initialize 方法。

require 'yaml' require_relative 'basic_activity.rb' # **SubscribeTopicActivity** sends an SMS / email message to the user, asking for # confirmation. When this action has been taken, the activity is complete. class SubscribeTopicActivity < BasicActivity def initialize super('subscribe_topic_activity') end

现在,我们已经有了用于设置和注册活动的代码。下面,我们将添加一些代码来创建 Amazon SNS 主题。为此,我们将使用 AWS::SNS::Client 对象的 create_topic 方法。

create_topic 方法添加到您的类,该方法采用传入的 Amazon SNS 客户端对象。

def create_topic(sns_client) topic_arn = sns_client.create_topic(:name => 'SWF_Sample_Topic')[:topic_arn] if topic_arn != nil # For an SMS notification, setting `DisplayName` is *required*. Note that # only the *first 10 characters* of the DisplayName will be shown on the # SMS message sent to the user, so choose your DisplayName wisely! sns_client.set_topic_attributes( { :topic_arn => topic_arn, :attribute_name => 'DisplayName', :attribute_value => 'SWFSample' } ) else @results = { :reason => "Couldn't create SNS topic", :detail => "" }.to_yaml return nil end return topic_arn end

有了主题的 Amazon 资源名 (ARN) 后,我们就可以将其与 Amazon SNS 客户端的 set_topic_attributes 方法配合使用来设置主题的 DisplayName,使用 Amazon SNS 发送短信时需要该名称。

最后,我们将定义 do_activity 方法。首先将收集在安排该活动时通过 input 选项传递的任何数据。如前所述,必须以字符串(我们使用 to_yaml 创建了它)形式传递此项。在检索它时,我们将使用 YAML.load 将数据转换为 Ruby 对象。

以下是 do_activity 的开头,我们在此处检索输入数据。

def do_activity(task) activity_data = { :topic_arn => nil, :email => { :endpoint => nil, :subscription_arn => nil }, :sms => { :endpoint => nil, :subscription_arn => nil }, } if task.input != nil input = YAML.load(task.input) activity_data[:email][:endpoint] = input[:email] activity_data[:sms][:endpoint] = input[:sms] else @results = { :reason => "Didn't receive any input!", :detail => "" }.to_yaml puts(" #{@results.inspect}") return false end # Create an SNS client. This is used to interact with the service. Set the # region to $SMS_REGION, which is a region that supports SMS notifications # (defined in the file `utils.rb`). sns_client = AWS::SNS::Client.new( :config => AWS.config.with(:region => $SMS_REGION))

如果我们未收到任何输入,则无计可施,因此我们只好将活动视为失败。

但假如一切正常,我们将继续充实 do_activity 方法,使用Amazon SDK for Ruby 获取 Amazon SNS 客户端,并将其传递给 create_topic 方法以创建 Amazon SNS 主题。

# Create the topic and get the ARN activity_data[:topic_arn] = create_topic(sns_client) if activity_data[:topic_arn].nil? return false end

在此,有几点值得注意:

  • 我们使用 AWS.config.with 为 Amazon SNS 客户端设置区域。由于我们要发送手机短信,因此我们使用在 utils.rb 中声明的支持手机短信的地区。

  • 将该主题的 ARN 保存在 activity_data 映射中。这是将传递给我们的工作流程中下一活动的部分数据。

最后,此活动使用传入的端点(电子邮件和手机短信)让用户订阅 Amazon SNS 主题。我们不要求用户同时输入两个 终端节点,但是,我们至少需要一个。

# Subscribe the user to the topic, using either or both endpoints. [:email, :sms].each do | x | ep = activity_data[x][:endpoint] # don't try to subscribe an empty endpoint if (ep != nil && ep != "") response = sns_client.subscribe( { :topic_arn => activity_data[:topic_arn], :protocol => x.to_s, :endpoint => ep } ) activity_data[x][:subscription_arn] = response[:subscription_arn] end end

AWS::SNS::Client.subscribe 采用主题 ARN 和 protocol(我们巧妙地将其伪装成相应端点的 activity_data 映射键)。

最后,我们以 YAML 格式重新打包下一活动的信息,以便将其发送回 Amazon SWF。

# if at least one subscription arn is set, consider this a success. if (activity_data[:email][:subscription_arn] != nil) or (activity_data[:sms][:subscription_arn] != nil) @results = activity_data.to_yaml else @results = { :reason => "Couldn't subscribe to SNS topic", :detail => "" }.to_yaml puts(" #{@results.inspect}") return false end return true end end

至此,即完成了 subscribe_topic_activity 的执行。接下来,我们将定义 wait_for_confirmation_activity

定义 WaitForConfirmationActivity

用户订阅 Amazon SNS 主题后,仍需确认订阅请求。在这种情况下,我们将等待用户通过电子邮件或手机短信进行确认。

等待用户确认订阅的活动称为 wait_for_confirmation_activity,下面我们将定义它。首先,新建一个名为 wait_for_confirmation_activity.rb 的文件,并像设置以前的活动那样设置它。

require 'yaml' require_relative 'basic_activity.rb' # **WaitForConfirmationActivity** waits for the user to confirm the SNS # subscription. When this action has been taken, the activity is complete. It # might also time out... class WaitForConfirmationActivity < BasicActivity # Initialize the class def initialize super('wait_for_confirmation_activity') end

接下来,我们将开始定义 do_activity 方法,然后检索向名为 subscription_data 的本地变量中输入的任何数据。

def do_activity(task) if task.input.nil? @results = { :reason => "Didn't receive any input!", :detail => "" }.to_yaml return false end subscription_data = YAML.load(task.input)

现在,我们有了主题的 ARN,可以通过创建 AWS::SNS::Topic 的新实例来检索主题,然向其传递该 ARN。

topic = AWS::SNS::Topic.new(subscription_data[:topic_arn]) if topic.nil? @results = { :reason => "Couldn't get SWF topic ARN", :detail => "Topic ARN: #{topic.arn}" }.to_yaml return false end

现在,我们将检查该主题以了解用户是否已使用某个终端节点确认了订阅。我们只需要一个终端节点得到确认,即将活动视为成功。

Amazon SNS 主题维护该主题的订阅列表,我们可以通过检查订阅的 ARN 是否被设置为 PendingConfirmation 以外的值来检查用户是否已确认特定订阅。

# loop until we get some indication that a subscription was confirmed. subscription_confirmed = false while(!subscription_confirmed) topic.subscriptions.each do | sub | if subscription_data[sub.protocol.to_sym][:endpoint] == sub.endpoint # this is one of the endpoints we're interested in. Is it subscribed? if sub.arn != 'PendingConfirmation' subscription_data[sub.protocol.to_sym][:subscription_arn] = sub.arn puts "Topic subscription confirmed for (#{sub.protocol}: #{sub.endpoint})" @results = subscription_data.to_yaml return true else puts "Topic subscription still pending for (#{sub.protocol}: #{sub.endpoint})" end end end

如果获取订阅的 ARN,则将其保存在活动的结果数据中,将其转换为 YAML,然后从 do_activity 返回 true,这表示活动成功完成。

由于等待确认订阅可能需要一段时间,因此我们将偶尔对活动任务调用 record_heartbeat。这将向 Amazon SWF 发出信号,表明活动仍在处理中,还可用于提供有关活动进度的最新信息(如果您正在进行某些可报告进度的操作,如处理文件)。

task.record_heartbeat!( { :details => "#{topic.num_subscriptions_confirmed} confirmed, #{topic.num_subscriptions_pending} pending" }) # sleep a bit. sleep(4.0) end

至此,我们的 while 循环结束。如果我们由于某种原因未成功即退出 while 循环,则我们将报告失败并结束 do_activity 方法。

if (subscription_confirmed == false) @results = { :reason => "No subscriptions could be confirmed", :detail => "#{topic.num_subscriptions_confirmed} confirmed, #{topic.num_subscriptions_pending} pending" }.to_yaml return false end end end

至此,wait_for_confirmation_activity 的实现结束。我们还剩下一个活动要定义:send_result_activity

定义 SendResultActivity

如果工作流进行到这一步,说明我们已成功让用户订阅 Amazon SNS 主题,且用户已确认订阅。

我们的最后一个活动 send_result_activity,使用用户订阅的主题和用户确认订阅的终端节点,向用户发送成功主题订阅的确认。

新建一个名为 send_result_activity.rb 的文件,并像设置至今为止的所有活动那样设置它。

require 'yaml' require_relative 'basic_activity.rb' # **SendResultActivity** sends the result of the activity to the screen, and, if # the user successfully registered using SNS, to the user using the SNS contact # information collected. class SendResultActivity < BasicActivity def initialize super('send_result_activity') end

do_activity 方法也是以类似的方式开始的,它也是从工作流获取输入数据,从 YAML 转换这些数据,然后使用主题 ARN 创建 AWS::SNS::Topic 实例。

def do_activity(task) if task.input.nil? @results = { :reason => "Didn't receive any input!", :detail => "" } return false end input = YAML.load(task.input) # get the topic, so we publish a message to it. topic = AWS::SNS::Topic.new(input[:topic_arn]) if topic.nil? @results = { :reason => "Couldn't get SWF topic", :detail => "Topic ARN: #{topic.arn}" } return false end

具有主题后,我们将向其发布一条消息(并且将其回显到屏幕上)。

@results = "Thanks, you've successfully confirmed registration, and your workflow is complete!" # send the message via SNS, and also print it on the screen. topic.publish(@results) puts(@results) return true end end

发布到 Amazon SNS 主题会将您提供的消息发送到该主题已存在的所有订阅和确认的端点。因此,如果用户同时 通过电子邮件和手机短信进行确认,则用户将收到两条确认消息,每个终端节点一条。

后续步骤

这将完成 send_result_activity 实现。现在,将在处理活动任务的活动应用程序中将所有这些活动联系在一起,并可启动活动作为回应,如订阅工作流程教程第 4 部分:实现活动任务轮询器所述。