

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 订阅工作流教程第 3 部分：实现活动
<a name="swf-sns-tutorial-implementing-activities"></a>

我们现在将实现工作流中的每个活动，首先是基类，它为活动代码提供某些共有功能。

**Topics**
+ [定义基本活动类型](#defining-a-basic-activity-type)
+ [定义 GetContactActivity](#defining-getcontactactivity)
+ [定义 SubscribeTopicActivity](#defining-subscribetopicactivity)
+ [定义 WaitForConfirmationActivity](#defining-waitforconfirmationactivity)
+ [定义 SendResultActivity](#defining-sendresultactivity)
+ [后续步骤](#implementing-activities-next-steps)

## 定义基本活动类型
<a name="defining-a-basic-activity-type"></a>

设计工作流时，我们指定了以下活动：
+ `get_contact_activity`
+ `subscribe_topic_activity`
+ `wait_for_confirmation_activity`
+ `send_result_activity`

我们现在将实现这些活动中的每个。因为我们的活动将共享一些功能，所以让我们做一些基础工作，创建一些他们可以共享的常用代码。我们将对其进行调用 **BasicActivity**，并在名为的新文件中对其进行定义`basic_activity.rb`。

如同其他源文件一样，我们将加入 `utils.rb` 以使用 `init_domain` 函数设置示例域。

```
   require_relative 'utils.rb' 
```

接下来，我们将声明基本活动类和我们在每项活动中都将感兴趣的一些共有数据。我们将把活动的实[AWS::SimpleWorkflow::ActivityType](https://docs.amazonaws.cn/AWSRubySDK/latest/AWS/SimpleWorkflow/ActivityType.html)例、*名称*和*结果*保存到类的属性中。

```
   class BasicActivity

     attr_accessor :activity_type
     attr_accessor :name
     attr_accessor :results
```

这些属性可访问在类的 `initialize` 方法中定义的实例数据，该方法需要一个活动*名称*、一个可选*版本*以及向 Amazon SWF 注册活动时使用的*选项*映射。

```
     def initialize(name, version = 'v1', options = nil)

       @activity_type = nil
       @name = name
       @results = nil

       # get the domain to use for activity tasks.
       @domain = init_domain

       # Check to see if this activity type already exists.
       @domain.activity_types.each do | a |
         if (a.name == @name) && (a.version == version)
           @activity_type = a
         end
       end

       if @activity_type.nil?
         # If no options were specified, use some reasonable defaults.
         if options.nil?
           options = {
             # All timeouts are in seconds.
             :default_task_heartbeat_timeout => 900,
             :default_task_schedule_to_start_timeout => 120,
             :default_task_schedule_to_close_timeout => 3800,
             :default_task_start_to_close_timeout => 3600 }
         end
         @activity_type = @domain.activity_types.register(@name, version, options)
       end
     end
```

与工作流类型注册一样，如果已注册某个活动类型，则可通过查看域的 [activity\_types](https://docs.amazonaws.cn/AWSRubySDK/latest/AWS/SimpleWorkflow/Domain.html#activity_types-instance_method) 集合，检索该类型。如果找不到该活动，则将注册该活动。

此外，与工作流类型一样，可设置在注册活动类型时与其存储在一起的*默认选项*。

我们的基本活动最后得到的是运行它的一致方式。我们将定义一个 `do_activity` 方法，该方法采用活动任务。如下所示，我们可使用传入的活动任务通过其 `input` 实例属性接收数据。

```
     def do_activity(task)
       @results = task.input # may be nil
       return true
     end
   end
```

这结束了这**BasicActivity**堂课。现在，我们将用它使我们的活动变得简单一致。

## 定义 GetContactActivity
<a name="defining-getcontactactivity"></a>

工作流执行期间运行的第一个活动是 `get_contact_activity`，它会检索用户的 Amazon SNS 主题订阅信息。

创建一个名为的新文件`get_contact_activity.rb`，并要求两者兼而有之`yaml`，我们将使用它来准备传递给 Amazon SWF 的字符串`basic_activity.rb`，并使用它作为本**GetContactActivity**类的基础。

```
   require 'yaml'
   require_relative 'basic_activity.rb'

   # **GetContactActivity** provides a prompt for the user to enter contact
   # information. When the user successfully enters contact information, the
   # activity is complete.
   class GetContactActivity < BasicActivity
```

因为我们输入了活动注册码 **BasicActivity**，所以的`initialize`方法**GetContactActivity**非常简单。我们仅仅用活动名称 `get_contact_activity` 调用基类构造函数。只需此项即可注册我们的活动。

```
     # initialize the activity
     def initialize
       super('get_contact_activity')
     end
```

现在，我们将定义该`do_activity`方法，它会提示用户输入电子邮件 and/or 电话号码。

```
     def do_activity(task)
       puts ""
       puts "Please enter either an email address or SMS message (mobile phone) number to"
       puts "receive SNS notifications. You can also enter both to use both address types."
       puts ""
       puts "If you enter a phone number, it must be able to receive SMS messages, and must"
       puts "be 11 digits (such as 12065550101 to represent the number 1-206-555-0101)."

       input_confirmed = false
       while !input_confirmed
         puts ""
         print "Email: "
         email = $stdin.gets.strip

         print "Phone: "
         phone = $stdin.gets.strip

         puts ""
         if (email == '') && (phone == '')
           print "You provided no subscription information. Quit? (y/n)"
            confirmation = $stdin.gets.strip.downcase
            if confirmation == 'y'
              return false
            end
         else
            puts "You entered:"
            puts "  email: #{email}"
            puts "  phone: #{phone}"
            print "\nIs this correct? (y/n): "
            confirmation = $stdin.gets.strip.downcase
            if confirmation == 'y'
              input_confirmed = true
            end
         end
       end

       # make sure that @results is a single string. YAML makes this easy.
       @results = { :email => email, :sms => phone }.to_yaml
       return true
     end
   end
```

在 `do_activity` 的结尾处，我们获得从用户检索的电子邮件和电话号码，将其放入映射中，然后使用 `to_yaml` 将整个映射转换为 YAML 字符串。这样做有一个重要原因：当您完成活动后，传递给 Amazon SWF 的任何结果都必须*仅为字符串数据*。Ruby 可轻松地将对象转换为 YAML 字符串，然后再转换回对象，这一点非常适合此用途。

这是 `get_contact_activity` 执行的结束。此数据将在接下来的 `subscribe_topic_activity` 执行中使用。

## 定义 SubscribeTopicActivity
<a name="defining-subscribetopicactivity"></a>

现在，我们将深入探讨 Amazon SNS 并创建一个活动，该活动使用 `get_contact_activity` 生成的信息让用户订阅 Amazon SNS 主题。

新建一个名为 `subscribe_topic_activity.rb` 的文件，添加我们用于 `get_contact_activity` 的相同要求，声明您的类，然后提供其 `initialize` 方法。

```
   require 'yaml'
   require_relative 'basic_activity.rb'

   # **SubscribeTopicActivity** sends an SMS / email message to the user, asking for
   # confirmation.  When this action has been taken, the activity is complete.
   class SubscribeTopicActivity < BasicActivity

     def initialize
       super('subscribe_topic_activity')
     end
```

现在，我们已经有了用于设置和注册活动的代码。下面，我们将添加一些代码来创建 Amazon SNS 主题。为此，我们将使用[AWS::SNS::Client](https://docs.amazonaws.cn/AWSRubySDK/latest/AWS/SNS/Client.html)对象的 [create\_topic](https://docs.amazonaws.cn/AWSRubySDK/latest/AWS/SNS/Client.html#create_topic-instance_method) 方法。

将 `create_topic` 方法添加到您的类，该方法采用传入的 Amazon SNS 客户端对象。

```
     def create_topic(sns_client)
       topic_arn = sns_client.create_topic(:name => 'SWF_Sample_Topic')[:topic_arn]

       if topic_arn != nil
         # For an SMS notification, setting `DisplayName` is *required*. Note that
         # only the *first 10 characters* of the DisplayName will be shown on the
         # SMS message sent to the user, so choose your DisplayName wisely!
         sns_client.set_topic_attributes( {
           :topic_arn => topic_arn,
           :attribute_name => 'DisplayName',
           :attribute_value => 'SWFSample' } )
       else
         @results = {
           :reason => "Couldn't create SNS topic", :detail => "" }.to_yaml
         return nil
       end

       return topic_arn
     end
```

一旦我们有了主题的亚马逊资源名称 (ARN)，我们就可以将其与亚马逊 SNS 客户端的 set\_topic\_attributes 方法一起使用来设置主题 *DisplayName*，[这是](https://docs.amazonaws.cn/AWSRubySDK/latest/AWS/SNS/Client.html#set_topic_attributes-instance_method)使用亚马逊 SNS 发送短信所必需的。

最后，我们将定义 `do_activity` 方法。首先将收集在安排该活动时通过 `input` 选项传递的任何数据。如前所述，必须以字符串（我们使用 `to_yaml` 创建了它）形式传递此项。在检索它时，我们将使用 `YAML.load` 将数据转换为 Ruby 对象。

以下是 `do_activity` 的开头，我们在此处检索输入数据。

```
     def do_activity(task)
       activity_data = {
         :topic_arn => nil,
         :email => { :endpoint => nil, :subscription_arn => nil },
         :sms => { :endpoint => nil, :subscription_arn => nil },
       }

       if task.input != nil
         input = YAML.load(task.input)
         activity_data[:email][:endpoint] = input[:email]
         activity_data[:sms][:endpoint] = input[:sms]
       else
         @results = { :reason => "Didn't receive any input!", :detail => "" }.to_yaml
         puts("  #{@results.inspect}")
         return false
       end

       # Create an SNS client. This is used to interact with the service. Set the
       # region to $SMS_REGION, which is a region that supports SMS notifications
       # (defined in the file `utils.rb`).
       sns_client = AWS::SNS::Client.new(
         :config => AWS.config.with(:region => $SMS_REGION))
```

如果我们未收到任何输入，则无计可施，因此我们只好将活动视为失败。

但是，假设一切正常，我们将继续填写`do_activity`方法，获取带有的 Amazon SNS 客户端 适用于 Ruby 的 Amazon SDK，然后将其传递给我们创建 Amazon SNS 主题`create_topic`的方法。

```
       # Create the topic and get the ARN
       activity_data[:topic_arn] = create_topic(sns_client)

       if activity_data[:topic_arn].nil?
         return false
       end
```

在此，有几点值得注意：
+ 我们使用 [https://docs.amazonaws.cn/AWSRubySDK/latest/AWS/Core/Configuration.html#with-instance_method](https://docs.amazonaws.cn/AWSRubySDK/latest/AWS/Core/Configuration.html#with-instance_method) 为 Amazon SNS 客户端设置区域。由于我们要发送手机短信，因此我们使用在 `utils.rb` 中声明的支持手机短信的地区。
+ 将该主题的 ARN 保存在 `activity_data` 映射中。这是将传递给我们的工作流程中*下一*活动的部分数据。

最后，此活动使用传入的端点（电子邮件和手机短信）让用户订阅 Amazon SNS 主题。我们不要求用户同时输入*两个* 终端节点，但是，我们至少需要一个。

```
       # Subscribe the user to the topic, using either or both endpoints.
       [:email, :sms].each do | x |
         ep = activity_data[x][:endpoint]
         # don't try to subscribe an empty endpoint
         if (ep != nil && ep != "")
           response = sns_client.subscribe( {
             :topic_arn => activity_data[:topic_arn],
             :protocol => x.to_s, :endpoint => ep } )
           activity_data[x][:subscription_arn] = response[:subscription_arn]
         end
       end
```

[AWS::SNS::Client.subsc](https://docs.amazonaws.cn/AWSRubySDK/latest/AWS/SNS/Client.html#subscribe-instance_method) *ribe 采用主题 ARN，即协议（我们巧妙地将其`activity_data`伪装成相应端点的地图密钥）。*

最后，我们以 YAML 格式重新打包下一活动的信息，以便将其发送回 Amazon SWF。

```
       # if at least one subscription arn is set, consider this a success.
       if (activity_data[:email][:subscription_arn] != nil) or (activity_data[:sms][:subscription_arn] != nil)
         @results = activity_data.to_yaml
       else
         @results = { :reason => "Couldn't subscribe to SNS topic", :detail => "" }.to_yaml
         puts("  #{@results.inspect}")
         return false
       end
       return true
     end
   end
```

至此，即完成了 `subscribe_topic_activity` 的执行。接下来，我们将定义 `wait_for_confirmation_activity`。

## 定义 WaitForConfirmationActivity
<a name="defining-waitforconfirmationactivity"></a>

用户订阅 Amazon SNS 主题后，仍需确认订阅请求。在这种情况下，我们将等待用户通过电子邮件或手机短信进行确认。

等待用户确认订阅的活动称为 `wait_for_confirmation_activity`，下面我们将定义它。首先，新建一个名为 `wait_for_confirmation_activity.rb` 的文件，并像设置以前的活动那样设置它。

```
   require 'yaml'
   require_relative 'basic_activity.rb'

   # **WaitForConfirmationActivity** waits for the user to confirm the SNS
   # subscription.  When this action has been taken, the activity is complete. It
   # might also time out...
   class WaitForConfirmationActivity < BasicActivity

     # Initialize the class
     def initialize
       super('wait_for_confirmation_activity')
     end
```

接下来，我们将开始定义 `do_activity` 方法，然后检索向名为 `subscription_data` 的本地变量中输入的任何数据。

```
     def do_activity(task)
       if task.input.nil?
         @results = { :reason => "Didn't receive any input!", :detail => "" }.to_yaml
         return false
       end

       subscription_data = YAML.load(task.input)
```

现在我们有了主题 ARN，我们可以通过创建新的实例来检索主题，[AWS::SNS::Topic](https://docs.amazonaws.cn/AWSRubySDK/latest/AWS/SNS/Topic.html)然后将 ARN 传递给它。

```
       topic = AWS::SNS::Topic.new(subscription_data[:topic_arn])

       if topic.nil?
         @results = {
           :reason => "Couldn't get SWF topic ARN",
           :detail => "Topic ARN: #{topic.arn}" }.to_yaml
         return false
       end
```

现在，我们将检查该主题以了解用户是否已使用某个终端节点确认了订阅。我们只需要一个终端节点得到确认，即将活动视为成功。

Amazon SNS 主题维护该主题的[订阅](https://docs.amazonaws.cn/AWSRubySDK/latest/AWS/SNS/Topic.html#subscriptions-instance_method)列表，我们可以通过检查订阅的 ARN 是否被设置为 `PendingConfirmation` 以外的值来检查用户是否已确认特定订阅。

```
       # loop until we get some indication that a subscription was confirmed.
       subscription_confirmed = false
       while(!subscription_confirmed)
         topic.subscriptions.each do | sub |
           if subscription_data[sub.protocol.to_sym][:endpoint] == sub.endpoint
             # this is one of the endpoints we're interested in. Is it subscribed?
             if sub.arn != 'PendingConfirmation'
               subscription_data[sub.protocol.to_sym][:subscription_arn] = sub.arn
               puts "Topic subscription confirmed for (#{sub.protocol}: #{sub.endpoint})"
               @results = subscription_data.to_yaml
               return true
             else
               puts "Topic subscription still pending for (#{sub.protocol}: #{sub.endpoint})"
             end
           end
         end
```

如果获取订阅的 ARN，则将其保存在活动的结果数据中，将其转换为 YAML，然后从 `do_activity` 返回 true，这表示活动成功完成。

由于等待订阅确认可能需要一段时间，因此我们偶尔会调用`record_heartbeat`活动任务。这将向 Amazon SWF 发出信号，表明活动仍在处理中，还可用于提供有关活动进度的最新信息（如果您正在进行某些可报告进度的操作，如处理文件）。

```
         task.record_heartbeat!(
           { :details => "#{topic.num_subscriptions_confirmed} confirmed, #{topic.num_subscriptions_pending} pending" })
         # sleep a bit.
         sleep(4.0)
       end
```

至此，我们的 `while` 循环结束。如果我们由于某种原因未成功即退出 while 循环，则我们将报告失败并结束 `do_activity` 方法。

```
       if (subscription_confirmed == false)
         @results = {
           :reason => "No subscriptions could be confirmed",
           :detail => "#{topic.num_subscriptions_confirmed} confirmed, #{topic.num_subscriptions_pending} pending" }.to_yaml
         return false
       end
     end
   end
```

至此，`wait_for_confirmation_activity` 的实现结束。我们还剩下一个活动要定义：`send_result_activity`。

## 定义 SendResultActivity
<a name="defining-sendresultactivity"></a>

如果工作流进行到这一步，说明我们已成功让用户订阅 Amazon SNS 主题，且用户已确认订阅。

我们的最后一个活动 `send_result_activity`，使用用户订阅的主题和用户确认订阅的终端节点，向用户发送成功主题订阅的确认。

新建一个名为 `send_result_activity.rb` 的文件，并像设置至今为止的所有活动那样设置它。

```
   require 'yaml'
   require_relative 'basic_activity.rb'

   # **SendResultActivity** sends the result of the activity to the screen, and, if
   # the user successfully registered using SNS, to the user using the SNS contact
   # information collected.
   class SendResultActivity < BasicActivity

     def initialize
       super('send_result_activity')
     end
```

我们的`do_activity`方法也以类似的方式开始，即从工作流程中获取输入数据，从 YAML 中进行转换，然后使用主题 ARN 创建[AWS::SNS::Topic](https://docs.amazonaws.cn/AWSRubySDK/latest/AWS/SNS/Topic.html)实例。

```
     def do_activity(task)
       if task.input.nil?
         @results = { :reason => "Didn't receive any input!", :detail => "" }
         return false
       end

       input = YAML.load(task.input)

       # get the topic, so we publish a message to it.
       topic = AWS::SNS::Topic.new(input[:topic_arn])

       if topic.nil?
         @results = {
           :reason => "Couldn't get SWF topic",
           :detail => "Topic ARN: #{topic.arn}" }
         return false
       end
```

具有主题后，我们将向其[发布](https://docs.amazonaws.cn/AWSRubySDK/latest/AWS/SNS/Topic.html#publish-instance_method)一条消息（并且将其回显到屏幕上）。

```
       @results = "Thanks, you've successfully confirmed registration, and your workflow is complete!"

       # send the message via SNS, and also print it on the screen.
       topic.publish(@results)
       puts(@results)

       return true
     end
   end
```

发布到 Amazon SNS 主题会将您提供的消息发送到该主题已存在的*所有*订阅和确认的端点。因此，如果用户*同时* 通过电子邮件和手机短信进行确认，则用户将收到两条确认消息，每个终端节点一条。

## 后续步骤
<a name="implementing-activities-next-steps"></a>

这将完成 `send_result_activity` 实现。现在，将在处理活动任务的活动应用程序中将所有这些活动联系在一起，并可启动活动作为回应，如[订阅工作流程教程第 4 部分：实现活动任务轮询器](swf-sns-tutorial-implementing-activities-poller.md)所述。