Example Activity Worker in Ruby - Amazon Step Functions
Services or capabilities described in Amazon Web Services documentation might vary by Region. To see the differences applicable to the China Regions, see Getting Started with Amazon Web Services in China (PDF).

Example Activity Worker in Ruby

The following is an example activity worker that uses the Amazon SDK for Ruby to show you how to use best practices and implement your own activity worker.

The code implements a consumer-producer pattern with a configurable number of threads for pollers and activity workers. The poller threads are constantly long polling the activity task. Once an activity task is retrieved, it's passed through a bounded blocking queue for the activity thread to pick it up.

The following Ruby code is the main entry point for this example Ruby activity worker.

require_relative '../lib/step_functions/activity' credentials = Aws::SharedCredentials.new region = 'us-west-2' activity_arn = 'ACTIVITY_ARN' activity = StepFunctions::Activity.new( credentials: credentials, region: region, activity_arn: activity_arn, workers_count: 1, pollers_count: 1, heartbeat_delay: 30 ) # The start method takes as argument the block that is the actual logic of your custom activity. activity.start do |input| { result: :SUCCESS, echo: input['value'] } end

The code includes defaults you can change to reference your activity, and to adapt it to your specific implementation. This code takes as input the actual implementation logic, allows you to reference your specific activity and credentials, and enables you to configure the number of threads and heartbeat delay. For more information and to download the code, see Step Functions Ruby Activity Worker.

Item Description

require_relative

Relative path to the following example activity worker code.

region

Amazon Region of your activity.

workers_count

The number of threads for your activity worker. For most implementations, between 10 and 20 threads should be sufficient. The longer the activity takes to process, the more threads it might need. As an estimate, multiply the number of process activities per second by the 99th percentile activity processing latency, in seconds.

pollers_count

The number of threads for your pollers. Between 10 and 20 threads should be sufficient for most implementations.

heartbeat_delay

The delay in seconds between heartbeats.

input Implementation logic of your activity.

The following is the Ruby activity worker, referenced with ../lib/step_functions/activity in your code.

require 'set' require 'json' require 'thread' require 'logger' require 'aws-sdk' module Validate def self.positive(value) raise ArgumentError, 'Argument has to be positive' if value <= 0 value end def self.required(value) raise ArgumentError, 'Argument is required' if value.nil? value end end module StepFunctions class RetryError < StandardError def initialize(message) super(message) end end def self.with_retries(options = {}, &block) retries = 0 base_delay_seconds = options[:base_delay_seconds] || 2 max_retries = options[:max_retries] || 3 begin block.call rescue => e puts e if retries < max_retries retries += 1 sleep base_delay_seconds**retries retry end raise RetryError, 'All retries of operation had failed' end end class Activity def initialize(options = {}) @states = Aws::States::Client.new( credentials: Validate.required(options[:credentials]), region: Validate.required(options[:region]), http_read_timeout: Validate.positive(options[:http_read_timeout] || 60) ) @activity_arn = Validate.required(options[:activity_arn]) @heartbeat_delay = Validate.positive(options[:heartbeat_delay] || 60) @queue_max = Validate.positive(options[:queue_max] || 5) @pollers_count = Validate.positive(options[:pollers_count] || 1) @workers_count = Validate.positive(options[:workers_count] || 1) @max_retry = Validate.positive(options[:workers_count] || 3) @logger = Logger.new(STDOUT) end def start(&block) @sink = SizedQueue.new(@queue_max) @activities = Set.new start_heartbeat_worker(@activities) start_workers(@activities, block, @sink) start_pollers(@activities, @sink) wait end def queue_size return 0 if @sink.nil? @sink.size end def activities_count return 0 if @activities.nil? @activities.size end private def start_pollers(activities, sink) @pollers = Array.new(@pollers_count) do PollerWorker.new( states: @states, activity_arn: @activity_arn, sink: sink, activities: activities, max_retry: @max_retry ) end @pollers.each(&:start) end def start_workers(activities, block, sink) @workers = Array.new(@workers_count) do ActivityWorker.new( states: @states, block: block, sink: sink, activities: activities, max_retry: @max_retry ) end @workers.each(&:start) end def start_heartbeat_worker(activities) @heartbeat_worker = HeartbeatWorker.new( states: @states, activities: activities, heartbeat_delay: @heartbeat_delay, max_retry: @max_retry ) @heartbeat_worker.start end def wait sleep rescue Interrupt shutdown ensure Thread.current.exit end def shutdown stop_workers(@pollers) wait_workers(@pollers) wait_activities_drained stop_workers(@workers) wait_activities_completed shutdown_workers(@workers) shutdown_worker(@heartbeat_worker) end def shutdown_workers(workers) workers.each do |worker| shutdown_worker(worker) end end def shutdown_worker(worker) worker.kill end def wait_workers(workers) workers.each(&:wait) end def wait_activities_drained wait_condition { @sink.empty? } end def wait_activities_completed wait_condition { @activities.empty? } end def wait_condition(&block) loop do break if block.call sleep(1) end end def stop_workers(workers) workers.each(&:stop) end class Worker def initialize @logger = Logger.new(STDOUT) @running = false end def run raise 'Method run hasn\'t been implemented' end def process loop do begin break unless @running run rescue => e puts e @logger.error('Unexpected error has occurred') @logger.error(e) end end end def start return unless @thread.nil? @running = true @thread = Thread.new do process end end def stop @running = false end def kill return if @thread.nil? @thread.kill @thread = nil end def wait @thread.join end end class PollerWorker < Worker def initialize(options = {}) @states = options[:states] @activity_arn = options[:activity_arn] @sink = options[:sink] @activities = options[:activities] @max_retry = options[:max_retry] @logger = Logger.new(STDOUT) end def run activity_task = StepFunctions.with_retries(max_retry: @max_retry) do begin @states.get_activity_task(activity_arn: @activity_arn) rescue => e @logger.error('Failed to retrieve activity task') @logger.error(e) end end return if activity_task.nil? || activity_task.task_token.nil? @activities.add(activity_task.task_token) @sink.push(activity_task) end end class ActivityWorker < Worker def initialize(options = {}) @states = options[:states] @block = options[:block] @sink = options[:sink] @activities = options[:activities] @max_retry = options[:max_retry] @logger = Logger.new(STDOUT) end def run activity_task = @sink.pop result = @block.call(JSON.parse(activity_task.input)) send_task_success(activity_task, result) rescue => e send_task_failure(activity_task, e) ensure @activities.delete(activity_task.task_token) unless activity_task.nil? end def send_task_success(activity_task, result) StepFunctions.with_retries(max_retry: @max_retry) do begin @states.send_task_success( task_token: activity_task.task_token, output: JSON.dump(result) ) rescue => e @logger.error('Failed to send task success') @logger.error(e) end end end def send_task_failure(activity_task, error) StepFunctions.with_retries do begin @states.send_task_failure( task_token: activity_task.task_token, cause: error.message ) rescue => e @logger.error('Failed to send task failure') @logger.error(e) end end end end class HeartbeatWorker < Worker def initialize(options = {}) @states = options[:states] @activities = options[:activities] @heartbeat_delay = options[:heartbeat_delay] @max_retry = options[:max_retry] @logger = Logger.new(STDOUT) end def run sleep(@heartbeat_delay) @activities.each do |token| send_heartbeat(token) end end def send_heartbeat(token) StepFunctions.with_retries(max_retry: @max_retry) do begin @states.send_task_heartbeat(token) rescue => e @logger.error('Failed to send heartbeat for activity') @logger.error(e) end end rescue => e @logger.error('Failed to send heartbeat for activity') @logger.error(e) end end end end