AWS services or capabilities described in AWS Documentation may vary by region/location. Click Getting Started with Amazon AWS to see specific differences applicable to the China (Beijing) Region.

Class: Aws::Rails::SqsActiveJob::Executor

Inherits:
Object
  • Object
show all
Defined in:
lib/aws/rails/sqs_active_job/executor.rb

Overview

CLI runner for polling for SQS ActiveJobs

Constant Summary collapse

DEFAULTS =
{
   min_threads:     0,
   max_threads:     Concurrent.processor_count,
   auto_terminate:  true,
   idletime:        60, # 1 minute
   fallback_policy: :caller_runs # slow down the producer thread
}.freeze

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Executor

Returns a new instance of Executor.



19
20
21
22
# File 'lib/aws/rails/sqs_active_job/executor.rb', line 19

def initialize(options = {})
  @executor = Concurrent::ThreadPoolExecutor.new(DEFAULTS.merge(options))
  @logger = options[:logger] || ActiveSupport::Logger.new(STDOUT)
end

Instance Method Details

#execute(message) ⇒ Object

TODO: Consider catching the exception and sleeping instead of using :caller_runs



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/aws/rails/sqs_active_job/executor.rb', line 25

def execute(message)
  @executor.post(message) do |message|
    begin
      job = JobRunner.new(message)
      @logger.info("Running job: #{job.id}[#{job.class_name}]")
      job.run
      message.delete
    rescue Aws::Json::ParseError => e
      @logger.error "Unable to parse message body: #{message.data.body}. Error: #{e}."
    rescue StandardError => e
      # message will not be deleted and will be retried
      job_msg = job ? "#{job.id}[#{job.class_name}]" : 'unknown job'
      @logger.info "Error processing job #{job_msg}: #{e}"
      @logger.debug e.backtrace.join("\n")
    end
  end
end

#shutdown(timeout = nil) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/aws/rails/sqs_active_job/executor.rb', line 43

def shutdown(timeout=nil)
  @executor.shutdown
  clean_shutdown = @executor.wait_for_termination(timeout)
  if clean_shutdown
    @logger.info 'Clean shutdown complete.  All executing jobs finished.'
  else
    @logger.info "Timeout (#{timeout}) exceeded.  Some jobs may not have"\
      " finished cleanly.  Unfinished jobs will not be removed from"\
      " the queue and can be ru-run once their visibility timeout"\
      " passes."
  end
end