Class: Aws::SQS::QueuePoller

Inherits:
Object
  • Object
show all
Defined in:
gems/aws-sdk-sqs/lib/aws-sdk-sqs/queue_poller.rb

Overview

A utility class for long polling messages in a loop. Messages are automatically deleted from the queue at the end of the given block.

poller = Aws::SQS::QueuePoller.new(queue_url)

poller.poll do |msg|
  puts msg.body
end

Long Polling

By default, messages are received using long polling. This method will force a default :wait_time_seconds of 20 seconds. If you prefer to use the queue default wait time, then pass a nil value for :wait_time_seconds.

# disables 20 second default, use queue ReceiveMessageWaitTimeSeconds
poller.poll(wait_time_seconds:nil) do |msg|
  # ...
end

When disabling :wait_time_seconds by passing nil, you must ensure the queue ReceiveMessageWaitTimeSeconds attribute is set to a non-zero value, or you will be short-polling. This will trigger significantly more API calls.

Batch Receiving Messages

You can specify a maximum number of messages to receive with each polling attempt via :max_number_of_messages. When this is set to a positive value, greater than 1, the block will receive an array of messages, instead of a single message.

# receives and yields 1 message at a time
poller.poll do |msg|
  # ...
end

# receives and yields up to 10 messages at a time
poller.poll(max_number_of_messages:10) do |messages|
  messages.each do |msg|
    # ...
  end
end

The maximum value for :max_number_of_messages is enforced by Amazon SQS.

Visibility Timeouts

When receiving messages, you have a fixed amount of time to process and delete the message before it is added back into the queue. This is the visibility timeout. By default, the queue's VisibilityTimeout attribute is used. You can provide an alternative visibility timeout when polling.

# queue default VisibilityTimeout
poller.poll do |msg|
end

# custom visibility timeout
poller.poll(visibility_timeout:10) do |msg|
end

You can reset the visibility timeout of a single message by calling #change_message_visibility_timeout. This is useful when you need more time to finish processing the message.

poller.poll do |msg|

  # do work ...

  # need more time for processing
  poller.change_message_visibility_timeout(msg, 60)

  # finish work ...

end

If you change the visibility timeout of a message to zero, it will return to the queue immediately.

Deleting Messages

Messages are deleted from the queue when the block returns normally.

poller.poll do |msg|
  # do work
end # messages deleted here

You can skip message deletion by passing skip_delete: true. This allows you to manually delete the messages using #delete_message, or #delete_messages.

# single message
poller.poll(skip_delete: true) do |msg|
  poller.delete_message(msg) # if successful
end

# batch delete messages
poller.poll(skip_delete: true, max_number_of_messages:10) do |messages|
  poller.delete_messages(messages)
end

Another way to manage message deletion is to throw :skip_delete from the poll block. You can use this to choose when a message, or message batch is deleted on an individual basis. This can be very useful when you are capturing temporal errors and wish for the message to timeout.

poller.poll do |msg|
  begin
    # do work
  rescue
    # unexpected error occurred while processing messages,
    # log it, and skip delete so it can be re-processed later
    throw :skip_delete
  end
end

Terminating the Polling Loop

By default, polling will continue indefinitely. You can stop the poller by providing an idle timeout or by throwing :stop_polling from the #before_request callback.

:idle_timeout Option

This is a configurable, maximum number of seconds to wait for a new message before the polling loop exists. By default, there is no idle timeout.

# stops polling after a minute of no received messages
poller.poll(idle_timeout: 60) do |msg|
  # ...
end

Throw :stop_polling

If you want more fine grained control, you can configure a before request callback to trigger before each long poll. Throwing :stop_polling from this callback will cause the poller to exit normally without making the next request.

# stop after processing 100 messages
poller.before_request do |stats|
  throw :stop_polling if stats.received_message_count >= 100
end

poller.poll do |msg|
  # do work ...
end

Tracking Progress

The poller will automatically track a few statistics client-side in a PollerStats object. You can access the poller stats three ways:

Here are examples of accessing the statistics.

  poller.before_request do |stats|
    logger.info("requests: #{stats.request_count}")
    logger.info("messages: #{stats.received_message_count}")
    logger.info("last-timestamp: #{stats.last_message_received_at}")
  end
  poller.after_empty_receive do |stats|
    logger.info("requests: #{stats.request_count}")
    logger.info("messages: #{stats.received_message_count}")
    logger.info("last-timestamp: #{stats.last_message_received_at}")
  end
  • Accept a 2nd argument in the poll block, for example:
  poller.poll do |msg, stats|
    logger.info("requests: #{stats.request_count}")
    logger.info("messages: #{stats.received_message_count}")
    logger.info("last-timestamp: #{stats.last_message_received_at}")
  end
  • Return value:
  stats = poller.poll(idle_timeout:10) do |msg|
    # do work ...
  end
  logger.info("requests: #{stats.request_count}")
  logger.info("messages: #{stats.received_message_count}")
  logger.info("last-timestamp: #{stats.last_message_received_at}")

Defined Under Namespace

Classes: PollerConfig, PollerStats

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue_url, options = {}) ⇒ QueuePoller

Returns a new instance of QueuePoller.

Parameters:

  • queue_url (String)
  • options (Hash) (defaults to: {})

    a customizable set of options

Options Hash (options):

  • :client (Client)
  • :wait_time_seconds (Integer) — default: 20

    The long polling interval. Messages are yielded as soon as they are received. The :wait_time_seconds option specifies the max duration for each polling attempt before a new request is sent to receive messages.

  • :max_number_of_messages (Integer) — default: 1

    The maximum number of messages to yield from each polling attempt. Values can be from 1 to 10.

  • :visibility_timeout (Integer) — default: nil

    The number of seconds you have to process a message before it is put back into the queue and can be received again. By default, the queue's visibility timeout is not set.

  • :attribute_names (Array<String>) — default: []

    The list of attributes that need to be returned along with each message. Valid attribute names include:

    • All - All attributes.
    • ApproximateFirstReceiveTimestamp - The time when the message was first received from the queue (epoch time in milliseconds).
    • ApproximateReceiveCount - The number of times a message has been received from the queue but not deleted.
    • SenderId - The AWS account number (or the IP address, if anonymous access is allowed) of the sender.
    • SentTimestamp - The time when the message was sent to the queue (epoch time in milliseconds).
  • :message_attribute_names (Array<String>) — default: []

    A list of message attributes to receive. You can receive all messages by using All or .*. You can also use foo.* to return all message attributes starting with the foo prefix.

  • :idle_timeout (Integer) — default: nil

    Polling terminates gracefully when :idle_timeout seconds have passed without receiving any messages.

  • :skip_delete (Boolean) — default: false

    When true, messages are not deleted after polling block. If you wish to delete received messages, you will need to call #delete_message or #delete_messages manually.

  • :before_request (Proc) — default: nil

    Called before each polling attempt. This proc receives a single argument, an instance of PollerStats.



218
219
220
221
222
# File 'gems/aws-sdk-sqs/lib/aws-sdk-sqs/queue_poller.rb', line 218

def initialize(queue_url, options = {})
  @queue_url = queue_url
  @client = options.delete(:client) || Client.new
  @default_config = PollerConfig.new(options)
end

Instance Attribute Details

#clientClient (readonly)

Returns:



228
229
230
# File 'gems/aws-sdk-sqs/lib/aws-sdk-sqs/queue_poller.rb', line 228

def client
  @client
end

#default_configPollerConfig (readonly)

Returns:



231
232
233
# File 'gems/aws-sdk-sqs/lib/aws-sdk-sqs/queue_poller.rb', line 231

def default_config
  @default_config
end

#queue_urlString (readonly)

Returns:

  • (String)


225
226
227
# File 'gems/aws-sdk-sqs/lib/aws-sdk-sqs/queue_poller.rb', line 225

def queue_url
  @queue_url
end

Instance Method Details

#after_empty_receive {|stats| ... } ⇒ void

This method returns an undefined value.

Registers a callback that is invoked when the poll requests returns with no messages. This callback is invoked after the idle timeout is checked.

poller.after_empty_receive do |stats|
  # Handle empty receive
end

Yield Parameters:

  • stats (PollerStats)

    An object that tracks a few client-side statistics about the queue polling.



280
281
282
# File 'gems/aws-sdk-sqs/lib/aws-sdk-sqs/queue_poller.rb', line 280

def after_empty_receive(&block)
  @default_config = @default_config.with(after_empty_receive: block) if block_given?
end

#before_request {|stats| ... } ⇒ void

This method returns an undefined value.

Registers a callback that is invoked once before every polling attempt.

poller.before_request do |stats|
  logger.info("requests: #{stats.request_count}")
  logger.info("messages: #{stats.received_message_count}")
  logger.info("last-timestamp: #{stats.last_message_received_at}")
end

poller.poll do |msg|
  # do work ...
end

:stop_polling

If you throw :stop_polling from the #before_request callback, then the poller will exit normally before making the next long poll request.

poller.before_request do |stats|
  throw :stop_polling if stats.received_messages >= 100
end

# at most 100 messages will be yielded
poller.poll do |msg|
  # do work ...
end

Yield Parameters:

  • stats (PollerStats)

    An object that tracks a few client-side statistics about the queue polling.



265
266
267
# File 'gems/aws-sdk-sqs/lib/aws-sdk-sqs/queue_poller.rb', line 265

def before_request(&block)
  @default_config = @default_config.with(before_request: block) if block_given?
end

#change_message_visibility_timeout(message, seconds) ⇒ Object

Note:

This method should be called from inside a #poll block.

Parameters:

  • message (#receipt_handle)

    An object that responds to #receipt_handle.

  • seconds (Integer)


374
375
376
377
378
379
380
# File 'gems/aws-sdk-sqs/lib/aws-sdk-sqs/queue_poller.rb', line 374

def change_message_visibility_timeout(message, seconds)
  @client.change_message_visibility(
    queue_url: @queue_url,
    receipt_handle: message.receipt_handle,
    visibility_timeout: seconds
  )
end

#delete_message(message) ⇒ Object

Note:

This method should be called from inside a #poll block.

Parameters:

  • message (#receipt_handle)

    An object that responds to #receipt_handle.



385
386
387
388
389
390
# File 'gems/aws-sdk-sqs/lib/aws-sdk-sqs/queue_poller.rb', line 385

def delete_message(message)
  @client.delete_message(
    queue_url: @queue_url,
    receipt_handle: message.receipt_handle
  )
end

#delete_messages(messages) ⇒ Object

Note:

This method should be called from inside a #poll block.

Parameters:

  • messages (Array<#message_id, #receipt_handle>)

    An array of received messages. Each object must respond to #message_id and #receipt_handle.



396
397
398
399
400
401
402
403
# File 'gems/aws-sdk-sqs/lib/aws-sdk-sqs/queue_poller.rb', line 396

def delete_messages(messages)
  @client.delete_message_batch(
    queue_url: @queue_url,
    entries: messages.map do |msg|
      { id: msg.message_id, receipt_handle: msg.receipt_handle }
    end
  )
end

#poll(options = {}, &block) ⇒ PollerStats

Polls the queue, yielded a message, or an array of messages. Messages are automatically deleted from the queue at the end of the given block. See the class documentation on Aws::SQS::QueuePoller for more examples.

Examples:

Basic example, loops indefinitely


poller.poll do |msg|
  # ...
end

Receives and deletes messages as a batch


poller.poll(max_number_of_messages:10) do |messages|
  messages.each do |msg|
    # ...
  end
end

Parameters:

  • options (Hash) (defaults to: {})

    a customizable set of options

Options Hash (options):

  • :wait_time_seconds (Integer) — default: 20

    The long polling interval. Messages are yielded as soon as they are received. The :wait_time_seconds option specifies the max duration for each polling attempt before a new request is sent to receive messages.

  • :max_number_of_messages (Integer) — default: 1

    The maximum number of messages to yield from each polling attempt. Values can be from 1 to 10.

  • :visibility_timeout (Integer) — default: nil

    The number of seconds you have to process a message before it is put back into the queue and can be received again. By default, the queue's visibility timeout is not set.

  • :attribute_names (Array<String>) — default: []

    The list of attributes that need to be returned along with each message. Valid attribute names include:

    • All - All attributes.
    • ApproximateFirstReceiveTimestamp - The time when the message was first received from the queue (epoch time in milliseconds).
    • ApproximateReceiveCount - The number of times a message has been received from the queue but not deleted.
    • SenderId - The AWS account number (or the IP address, if anonymous access is allowed) of the sender.
    • SentTimestamp - The time when the message was sent to the queue (epoch time in milliseconds).
  • :message_attribute_names (Array<String>) — default: []

    A list of message attributes to receive. You can receive all messages by using All or .*. You can also use foo.* to return all message attributes starting with the foo prefix.

  • :idle_timeout (Integer) — default: nil

    Polling terminates gracefully when :idle_timeout seconds have passed without receiving any messages.

  • :skip_delete (Boolean) — default: false

    When true, messages are not deleted after polling block. If you wish to delete received messages, you will need to call #delete_message or #delete_messages manually.

  • :before_request (Proc) — default: nil

    Called before each polling attempt. This proc receives a single argument, an instance of PollerStats.

Returns:



352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
# File 'gems/aws-sdk-sqs/lib/aws-sdk-sqs/queue_poller.rb', line 352

def poll(options = {}, &block)
  config = @default_config.with(options)
  stats = PollerStats.new
  catch(:stop_polling) do
    loop do
      messages = get_messages(config, stats)
      if messages.empty?
        check_idle_timeout(config, stats)
        config.after_empty_receive&.call(stats)
      else
        process_messages(config, stats, messages, &block)
      end
    end
  end
  stats.polling_stopped_at = Time.now
  stats
end