AWS Step Functions
开发人员指南
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

使用 Ruby 编写的示例活动工作线程

下面是使用适用于 Ruby 的 AWS 开发工具包的示例活动工作线程,介绍如何使用最佳实践实现您自己的活动工作线程。

这段代码实现了消费者/生产者模型,并且允许对轮询器和活动工作线程的线程数进行配置。轮询器线程不断长轮询活动任务。一旦检索到活动任务,则将其传入一个有界的阻塞队列,供活动线程领取任务。

以下 Ruby 代码是此示例 Ruby 活动工作线程的主入口点。

require_relative '../lib/step_functions/activity' credentials = Aws::SharedCredentials.new region = 'us-west-2' activity_arn = 'ACTIVITY_ARN' activity = StepFunctions::Activity.new( credentials: credentials, region: region, activity_arn: activity_arn, workers_count: 1, pollers_count: 1, heartbeat_delay: 30 ) # The start method takes as argument the block that is the actual logic of your custom activity. activity.start do |input| { result: :SUCCESS, echo: input['value'] }

这段代码包含默认设置,您可以更改为引用您的活动并使其适应您的特定实现。这段代码将实际实现逻辑作为输入,允许您引用特定的活动和凭证,并允许您配置线程数量和检测信号延迟。要获取更多信息和下载代码,请参阅 Step Functions Ruby 活动工作线程

项目 描述

require_relative

下面示例活动工作线程代码的相对路径。

region

活动的 AWS 区域。

workers_count

活动工作线程的线程数。对于大多数实现来说,10 到 20 个线程就足够了。活动的处理时间越长,可能需要的线程就越多。您可以使用以下方式进行估算:将每秒处理活动数乘以第 99 个百分点的活动处理延迟 (以秒为单位)。

pollers_count

轮询器的线程数。对于大多数实现来说,10 到 20 个线程就足够了。

heartbeat_delay

检测信号间的延迟 (以秒为单位)。

input 活动的实现逻辑。

下面是 Ruby 活动工作线程,在代码中由 ../lib/step_functions/activity 引用。

require 'set' require 'json' require 'thread' require 'logger' require 'aws-sdk' module Validate def self.positive(value) raise ArgumentError, 'Argument has to be positive' if value <= 0 value end def self.required(value) raise ArgumentError, 'Argument is required' if value.nil? value end end module StepFunctions class RetryError < StandardError def initialize(message) super(message) end end def self.with_retries(options = {}, &block) retries = 0 base_delay_seconds = options[:base_delay_seconds] || 2 max_retries = options[:max_retries] || 3 begin block.call rescue => e puts e if retries < max_retries retries += 1 sleep base_delay_seconds**retries retry end raise RetryError, 'All retries of operation had failed' end end class Activity def initialize(options = {}) @states = Aws::States::Client.new( credentials: Validate.required(options[:credentials]), region: Validate.required(options[:region]), http_read_timeout: Validate.positive(options[:http_read_timeout] || 60) ) @activity_arn = Validate.required(options[:activity_arn]) @heartbeat_delay = Validate.positive(options[:heartbeat_delay] || 60) @queue_max = Validate.positive(options[:queue_max] || 5) @pollers_count = Validate.positive(options[:pollers_count] || 1) @workers_count = Validate.positive(options[:workers_count] || 1) @max_retry = Validate.positive(options[:workers_count] || 3) @logger = Logger.new(STDOUT) end def start(&block) @sink = SizedQueue.new(@queue_max) @activities = Set.new start_heartbeat_worker(@activities) start_workers(@activities, block, @sink) start_pollers(@activities, @sink) wait end def queue_size return 0 if @sink.nil? @sink.size end def activities_count return 0 if @activities.nil? @activities.size end private def start_pollers(activities, sink) @pollers = Array.new(@pollers_count) do PollerWorker.new( states: @states, activity_arn: @activity_arn, sink: sink, activities: activities, max_retry: @max_retry ) end @pollers.each(&:start) end def start_workers(activities, block, sink) @workers = Array.new(@workers_count) do ActivityWorker.new( states: @states, block: block, sink: sink, activities: activities, max_retry: @max_retry ) end @workers.each(&:start) end def start_heartbeat_worker(activities) @heartbeat_worker = HeartbeatWorker.new( states: @states, activities: activities, heartbeat_delay: @heartbeat_delay, max_retry: @max_retry ) @heartbeat_worker.start end def wait sleep rescue Interrupt shutdown ensure Thread.current.exit end def shutdown stop_workers(@pollers) wait_workers(@pollers) wait_activities_drained stop_workers(@workers) wait_activities_completed shutdown_workers(@workers) shutdown_worker(@heartbeat_worker) end def shutdown_workers(workers) workers.each do |worker| shutdown_worker(worker) end end def shutdown_worker(worker) worker.kill end def wait_workers(workers) workers.each(&:wait) end def wait_activities_drained wait_condition { @sink.empty? } end def wait_activities_completed wait_condition { @activities.empty? } end def wait_condition(&block) loop do break if block.call sleep(1) end end def stop_workers(workers) workers.each(&:stop) end class Worker def initialize @logger = Logger.new(STDOUT) @running = false end def run raise 'Method run hasn\'t been implemented' end def process loop do begin break unless @running run rescue => e puts e @logger.error('Unexpected error has occurred') @logger.error(e) end end end def start return unless @thread.nil? @running = true @thread = Thread.new do process end end def stop @running = false end def kill return if @thread.nil? @thread.kill @thread = nil end def wait @thread.join end end class PollerWorker < Worker def initialize(options = {}) @states = options[:states] @activity_arn = options[:activity_arn] @sink = options[:sink] @activities = options[:activities] @max_retry = options[:max_retry] @logger = Logger.new(STDOUT) end def run activity_task = StepFunctions.with_retries(max_retry: @max_retry) do begin @states.get_activity_task(activity_arn: @activity_arn) rescue => e @logger.error('Failed to retrieve activity task') @logger.error(e) end end return if activity_task.nil? || activity_task.task_token.nil? @activities.add(activity_task.task_token) @sink.push(activity_task) end end class ActivityWorker < Worker def initialize(options = {}) @states = options[:states] @block = options[:block] @sink = options[:sink] @activities = options[:activities] @max_retry = options[:max_retry] @logger = Logger.new(STDOUT) end def run activity_task = @sink.pop result = @block.call(JSON.parse(activity_task.input)) send_task_success(activity_task, result) rescue => e send_task_failure(activity_task, e) ensure @activities.delete(activity_task.task_token) unless activity_task.nil? end def send_task_success(activity_task, result) StepFunctions.with_retries(max_retry: @max_retry) do begin @states.send_task_success( task_token: activity_task.task_token, output: JSON.dump(result) ) rescue => e @logger.error('Failed to send task success') @logger.error(e) end end end def send_task_failure(activity_task, error) StepFunctions.with_retries do begin @states.send_task_failure( task_token: activity_task.task_token, cause: error.message ) rescue => e @logger.error('Failed to send task failure') @logger.error(e) end end end end class HeartbeatWorker < Worker def initialize(options = {}) @states = options[:states] @activities = options[:activities] @heartbeat_delay = options[:heartbeat_delay] @max_retry = options[:max_retry] @logger = Logger.new(STDOUT) end def run sleep(@heartbeat_delay) @activities.each do |token| send_heartbeat(token) end end def send_heartbeat(token) StepFunctions.with_retries(max_retry: @max_retry) do begin @states.send_task_heartbeat(token) rescue => e @logger.error('Failed to send heartbeat for activity') @logger.error(e) end end rescue => e @logger.error('Failed to send heartbeat for activity') @logger.error(e) end end end end