本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
订阅工作流教程第 3 部分:实现活动
我们现在将实现工作流中的每个活动,首先是基类,它为活动代码提供某些共有功能。
主题
定义基本活动类型
设计工作流时,我们指定了以下活动:
-
get_contact_activity
-
subscribe_topic_activity
-
wait_for_confirmation_activity
-
send_result_activity
我们现在将实现这些活动中的每个。由于这些活动将共用某些功能,因此我们来做一点基础工作,并创建这些活动可共用的一些共有代码。我们将它称为 BasicActivity,并在一个名为 basic_activity.rb
的新文件中定义它。
如同其他源文件一样,我们将加入 utils.rb
以使用 init_domain
函数设置示例域。
require_relative 'utils.rb'
接下来,我们将声明基本活动类和我们在每项活动中都将感兴趣的一些共有数据。我们将保存活动Amazon። SimpleWorkflow። Activity 类型实例,名称, 和结果在班级的属性中。
class BasicActivity attr_accessor :activity_type attr_accessor :name attr_accessor :results
这些属性访问在类中定义的实例数据 'initialize
方法,它需要一个活动名称,还有可选的版本和地图选项在 Amazon SWF 注册活动时使用。
def initialize(name, version = 'v1', options = nil) @activity_type = nil @name = name @results = nil # get the domain to use for activity tasks. @domain = init_domain # Check to see if this activity type already exists. @domain.activity_types.each do | a | if (a.name == @name) && (a.version == version) @activity_type = a end end if @activity_type.nil? # If no options were specified, use some reasonable defaults. if options.nil? options = { # All timeouts are in seconds. :default_task_heartbeat_timeout => 900, :default_task_schedule_to_start_timeout => 120, :default_task_schedule_to_close_timeout => 3800, :default_task_start_to_close_timeout => 3600 } end @activity_type = @domain.activity_types.register(@name, version, options) end end
与工作流类型注册一样,如果已注册某个活动类型,则可通过查看域的 activity_types 集合,检索该类型。如果找不到该活动,则将注册该活动。
此外,与工作流类型一样,可设置在注册活动类型时与其存储在一起的默认选项。
我们的基本活动最后得到的是运行它的一致方式。我们将定义一个 do_activity
方法,该方法采用活动任务。如下所示,我们可使用传入的活动任务通过其 input
实例属性接收数据。
def do_activity(task) @results = task.input # may be nil return true end end
这样即封装 BasicActivity 类。现在,我们将用它使我们的活动变得简单一致。
定义 GetContactActivity
在工作流程执行期间运行的第一个活动是get_contact_activity
,该项将检索用户的 Amazon SNS 主题订阅信息。
创建名为的新文件get_contact_activity.rb
,并且两者都要求yaml
,我们将用它来准备传递给 Amazon SWF 的字符串,以及basic_activity.rb
,我们将用它作为此的基础获取联系人活动类。
require 'yaml' require_relative 'basic_activity.rb' # **GetContactActivity** provides a prompt for the user to enter contact # information. When the user successfully enters contact information, the # activity is complete. class GetContactActivity < BasicActivity
由于我们将活动注册代码放入 BasicActivity 中,因此 initialize
方法给 GetContactActivity 是相当简单的。我们仅仅用活动名称 get_contact_activity
调用基类构造函数。只需此项即可注册我们的活动。
# initialize the activity def initialize super('get_contact_activity') end
现在我们将定义 do_activity
方法,它提示输入用户的电子邮件和/或电话号码。
def do_activity(task) puts "" puts "Please enter either an email address or SMS message (mobile phone) number to" puts "receive SNS notifications. You can also enter both to use both address types." puts "" puts "If you enter a phone number, it must be able to receive SMS messages, and must" puts "be 11 digits (such as 12065550101 to represent the number 1-206-555-0101)." input_confirmed = false while !input_confirmed puts "" print "Email: " email = $stdin.gets.strip print "Phone: " phone = $stdin.gets.strip puts "" if (email == '') && (phone == '') print "You provided no subscription information. Quit? (y/n)" confirmation = $stdin.gets.strip.downcase if confirmation == 'y' return false end else puts "You entered:" puts " email: #{email}" puts " phone: #{phone}" print "\nIs this correct? (y/n): " confirmation = $stdin.gets.strip.downcase if confirmation == 'y' input_confirmed = true end end end # make sure that @results is a single string. YAML makes this easy. @results = { :email => email, :sms => phone }.to_yaml return true end end
在 do_activity
的结尾处,我们获得从用户检索的电子邮件和电话号码,将其放入映射中,然后使用 to_yaml
将整个映射转换为 YAML 字符串。这样做有一个重要原因:完成活动后传递给 Amazon SWF 的任何结果都必须为仅限字符串数据. Ruby 可轻松地将对象转换为 YAML 字符串,然后再转换回对象,这一点非常适合此用途。
这是 get_contact_activity
执行的结束。此数据将在接下来的 subscribe_topic_activity
执行中使用。
定义 SubscribeTopicActivity
我们现在将深入研究 Amazon SNS 并创建一个活动,该活动使用get_contact_activity
为用户订阅 Amazon SNS 主题。
新建一个名为 subscribe_topic_activity.rb
的文件,添加我们用于 get_contact_activity
的相同要求,声明您的类,然后提供其 initialize
方法。
require 'yaml' require_relative 'basic_activity.rb' # **SubscribeTopicActivity** sends an SMS / email message to the user, asking for # confirmation. When this action has been taken, the activity is complete. class SubscribeTopicActivity < BasicActivity def initialize super('subscribe_topic_activity') end
既然我们具有用于设置和注册活动的代码就位,则我们将要添加一些代码以创建 Amazon SNS 主题。为此,我们使用Amazon። SNS። 客户端对象的创建 _topic方法。
添加create_topic
方法转换到您的类,该方法采用传入的 Amazon SNS 客户端对象。
def create_topic(sns_client) topic_arn = sns_client.create_topic(:name => 'SWF_Sample_Topic')[:topic_arn] if topic_arn != nil # For an SMS notification, setting `DisplayName` is *required*. Note that # only the *first 10 characters* of the DisplayName will be shown on the # SMS message sent to the user, so choose your DisplayName wisely! sns_client.set_topic_attributes( { :topic_arn => topic_arn, :attribute_name => 'DisplayName', :attribute_value => 'SWFSample' } ) else @results = { :reason => "Couldn't create SNS topic", :detail => "" }.to_yaml return nil end return topic_arn end
一旦我们具有该主题的 Amazon 资源名称 (ARN),即可将其与 Amazon SNS 客户端配合使用set_topic_属性设置主题的方法DisplayName,用 Amazon SNS 发送手机短信时需要该项。
最后,我们将定义 do_activity
方法。首先将收集在安排该活动时通过 input
选项传递的任何数据。如前所述,必须以字符串(我们使用 to_yaml
创建了它)形式传递此项。在检索它时,我们将使用 YAML.load
将数据转换为 Ruby 对象。
以下是 do_activity
的开头,我们在此处检索输入数据。
def do_activity(task) activity_data = { :topic_arn => nil, :email => { :endpoint => nil, :subscription_arn => nil }, :sms => { :endpoint => nil, :subscription_arn => nil }, } if task.input != nil input = YAML.load(task.input) activity_data[:email][:endpoint] = input[:email] activity_data[:sms][:endpoint] = input[:sms] else @results = { :reason => "Didn't receive any input!", :detail => "" }.to_yaml puts(" #{@results.inspect}") return false end # Create an SNS client. This is used to interact with the service. Set the # region to $SMS_REGION, which is a region that supports SMS notifications # (defined in the file `utils.rb`). sns_client = Amazon::SNS::Client.new( :config => AWS.config.with(:region => $SMS_REGION))
如果我们未收到任何输入,则无计可施,因此我们只好将活动视为失败。
但是,假如一切正常,则我们将继续充实do_activity
方法 Amazon SNS 使用Amazon SDK for Ruby,然后将其传递给我们create_topic
方法创建 Amazon SNS 主题。
# Create the topic and get the ARN activity_data[:topic_arn] = create_topic(sns_client) if activity_data[:topic_arn].nil? return false end
在此,有几点值得注意:
-
我们使用
AWS.config.with
为 Amazon SNS 客户端设置地区。由于我们要发送手机短信,因此我们使用在utils.rb
中声明的支持手机短信的地区。 -
将该主题的 ARN 保存在
activity_data
映射中。这是将传递给我们的工作流程中下一活动的部分数据。
最后,此活动使用传入的终端节点(电子邮件和 SMS)让用户订阅 Amazon SNS 主题。我们不要求用户同时输入两个 终端节点,但是,我们至少需要一个。
# Subscribe the user to the topic, using either or both endpoints. [:email, :sms].each do | x | ep = activity_data[x][:endpoint] # don't try to subscribe an empty endpoint if (ep != nil && ep != "") response = sns_client.subscribe( { :topic_arn => activity_data[:topic_arn], :protocol => x.to_s, :endpoint => ep } ) activity_data[x][:subscription_arn] = response[:subscription_arn] end end
Amazon። SNS። client.Subscription采用主题 ARN,protocol(巧妙地,我们伪装成activity_data
相应终端节点的映射密钥)。
最后,我们以 YAML 格式重新打包下一活动的信息,以便我们可将其发回 Amazon SWF。
# if at least one subscription arn is set, consider this a success. if (activity_data[:email][:subscription_arn] != nil) or (activity_data[:sms][:subscription_arn] != nil) @results = activity_data.to_yaml else @results = { :reason => "Couldn't subscribe to SNS topic", :detail => "" }.to_yaml puts(" #{@results.inspect}") return false end return true end end
至此,即完成了 subscribe_topic_activity
的执行。接下来,我们将定义 wait_for_confirmation_activity
。
定义 WaitForConfirmationActivity
用户订阅 Amazon SNS 主题后,仍需要确认订阅请求。在这种情况下,我们将等待用户通过电子邮件或手机短信进行确认。
等待用户确认订阅的活动称为 wait_for_confirmation_activity
,下面我们将定义它。首先,新建一个名为 wait_for_confirmation_activity.rb
的文件,并像设置以前的活动那样设置它。
require 'yaml' require_relative 'basic_activity.rb' # **WaitForConfirmationActivity** waits for the user to confirm the SNS # subscription. When this action has been taken, the activity is complete. It # might also time out... class WaitForConfirmationActivity < BasicActivity # Initialize the class def initialize super('wait_for_confirmation_activity') end
接下来,我们将开始定义 do_activity
方法,然后检索向名为 subscription_data
的本地变量中输入的任何数据。
def do_activity(task) if task.input.nil? @results = { :reason => "Didn't receive any input!", :detail => "" }.to_yaml return false end subscription_data = YAML.load(task.input)
既然我们已有主题 ARN,即可通过创建新的实例,检索主题AWS::SNS::Topic然后把它传递给 ARN。
topic = Amazon::SNS::Topic.new(subscription_data[:topic_arn]) if topic.nil? @results = { :reason => "Couldn't get SWF topic ARN", :detail => "Topic ARN: #{topic.arn}" }.to_yaml return false end
现在,我们将检查该主题以了解用户是否已使用某个终端节点确认了订阅。我们只需要一个终端节点得到确认,即将活动视为成功。
Amazon SNS 主题保留了订阅对于该主题,并且我们可通过了解订阅的 ARN 是否设置为以外的内容,检查用户是否已确认某个特定的订阅。PendingConfirmation
.
# loop until we get some indication that a subscription was confirmed. subscription_confirmed = false while(!subscription_confirmed) topic.subscriptions.each do | sub | if subscription_data[sub.protocol.to_sym][:endpoint] == sub.endpoint # this is one of the endpoints we're interested in. Is it subscribed? if sub.arn != 'PendingConfirmation' subscription_data[sub.protocol.to_sym][:subscription_arn] = sub.arn puts "Topic subscription confirmed for (#{sub.protocol}: #{sub.endpoint})" @results = subscription_data.to_yaml return true else puts "Topic subscription still pending for (#{sub.protocol}: #{sub.endpoint})" end end end
如果获取订阅的 ARN,则将其保存在活动的结果数据中,将其转换为 YAML,然后从 do_activity
返回 true,这表示活动成功完成。
由于等待确认订阅可能需要一段时间,因此我们将偶尔对活动任务调用 record_heartbeat
。这样可告知 Amazon SWF,该活动仍在进行处理,并可用于提供有关活动进度的最新消息(如果正在进行某些可报告进度的操作,如处理文件)。
task.record_heartbeat!( { :details => "#{topic.num_subscriptions_confirmed} confirmed, #{topic.num_subscriptions_pending} pending" }) # sleep a bit. sleep(4.0) end
至此,我们的 while
循环结束。如果我们由于某种原因未成功即退出 while 循环,则我们将报告失败并结束 do_activity
方法。
if (subscription_confirmed == false) @results = { :reason => "No subscriptions could be confirmed", :detail => "#{topic.num_subscriptions_confirmed} confirmed, #{topic.num_subscriptions_pending} pending" }.to_yaml return false end end end
至此,wait_for_confirmation_activity
的实现结束。我们还剩下一个活动要定义:send_result_activity
。
定义 SendResultActivity
如果工作流程已进展到这里,则我们已成功地让用户订阅 Amazon SNS 主题,并且用户已确认该订阅。
我们的最后一个活动 send_result_activity
,使用用户订阅的主题和用户确认订阅的终端节点,向用户发送成功主题订阅的确认。
新建一个名为 send_result_activity.rb
的文件,并像设置至今为止的所有活动那样设置它。
require 'yaml' require_relative 'basic_activity.rb' # **SendResultActivity** sends the result of the activity to the screen, and, if # the user successfully registered using SNS, to the user using the SNS contact # information collected. class SendResultActivity < BasicActivity def initialize super('send_result_activity') end
我们的do_activity
方法的开头相似,也从工作流获取输入数据,从 YAML 转换这些数据,然后使用主题 ARN 创建AWS::SNS::Topic实例。
def do_activity(task) if task.input.nil? @results = { :reason => "Didn't receive any input!", :detail => "" } return false end input = YAML.load(task.input) # get the topic, so we publish a message to it. topic = Amazon::SNS::Topic.new(input[:topic_arn]) if topic.nil? @results = { :reason => "Couldn't get SWF topic", :detail => "Topic ARN: #{topic.arn}" } return false end
具有主题后,我们将向其发布一条消息(并且将其回显到屏幕上)。
@results = "Thanks, you've successfully confirmed registration, and your workflow is complete!" # send the message via SNS, and also print it on the screen. topic.publish(@results) puts(@results) return true end end
发布到 Amazon SNS 主题将所提供的消息发送到所有已订阅并确认该主题存在的终端节点。因此,如果用户同时 通过电子邮件和手机短信进行确认,则用户将收到两条确认消息,每个终端节点一条。
后续步骤
这将完成 send_result_activity
实现。现在,将在处理活动任务的活动应用程序中将所有这些活动联系在一起,并可启动活动作为回应,如订阅工作流程教程第 4 部分:实现活动任务轮询器所述。