-
# frozen_string_literal: true
-
-
1
require 'bunny'
-
-
1
module ContextRequestSubscriber
-
# The subscriber to attach to the exchange to process the
-
# ContextRequest data.
-
#
-
# ErrorHandler is a Callable object that gets called for any exception
-
# during the execution of the processor.
-
# The call method gets passed the following information:
-
# @exception the exception caught
-
# @delivery_info the delivery_info object
-
# @properties properties passed in the message (headers, ..)
-
# @payload the payload of the message
-
# The ErrorHandler has to decide what to do with the message. If none
-
# is given the error will be ignored!
-
1
class RabbitMQSubscriber
-
1
DEFAULT_QUEUE_OPTS = {
-
exclusive: false, durable: true
-
}.freeze
-
-
1
DEFAULT_SESSION_PARAMS = {
-
threaded: true, automatically_recover: true
-
}.freeze
-
-
# @processor the processor to be used to handle the data.
-
# @config the configurables supported as follows
-
# session_params: set of session params
-
# url: url to connect to the RabbitMQ Server
-
# heartbeat: heartbeat for the connection. Defaults to nil.
-
# exchange_name: the exchange connected to the queue
-
# exchange_options: the options used to create the exchange if not
-
# existing.
-
# exchange_type: type of the exchange, defaults to topic.
-
# queue_name: the queue where the data is received from.
-
# queue_durable: if the queue is durable. Default: true
-
# queue_auto_delete: if the queue gets autodeleted.
-
# queue_exclusive: default false.
-
# routing_key: the routing_key used. Default nil.
-
# on_error: callable object that handles errors during processing the
-
# message.
-
1
def initialize(**config)
-
5
@error_handler = config[:on_error].new(config[:logger])
-
5
@logger = config[:logger]
-
5
@executor = Processor::Executor.new(config[:logger],
-
config.slice(:handler_params))
-
5
config_connection(config)
-
-
5
config_exchange(config)
-
5
config_queue(config)
-
end
-
-
1
def run
-
5
exchange, queue = setup_queue
-
5
@consumer = queue.subscribe(manual_ack: true,
-
block: false) do |info, properties, payload|
-
5
consume(info, properties, payload)
-
end
-
4
join(exchange)
-
end
-
-
1
def join(exchange)
-
4
@join_workpool && exchange.channel.work_pool.join
-
end
-
-
1
private
-
-
1
def config_connection(config)
-
5
@session_params = config.fetch(:session_params, {})
-
.merge(DEFAULT_SESSION_PARAMS)
-
5
@url = config[:url]
-
end
-
-
1
def config_queue(config)
-
5
@queue_name = config[:queue_name]
-
5
@queue_bindings = config.slice(:routing_key)
-
5
@queue_opts = queue_opts(config)
-
end
-
-
1
def config_exchange(config)
-
5
@exchange_type = config.fetch(:exchange_type, 'topic')
-
5
@exchange_name = config.fetch(:exchange_name,
-
ContextRequestSubscriber.exchange_name)
-
5
@exchange_opts = config.fetch(:exchange_options, {})
-
5
@join_workpool = config.fetch(:subscriber_join, true)
-
end
-
-
# setup the whole message queue settings and return the queue object.
-
# Can be overwriten by ContextRequestMiddleware.fetch_exchange_callback
-
1
def setup_queue
-
5
callback = ContextRequestSubscriber.fetch_queue_callback
-
5
if callback
-
1
exchange, queue = callback.call(self)
-
else
-
4
channel = create_channel(@session_params)
-
4
exchange = fetch_exchange(channel, @exchange_type, @exchange_name,
-
@exchange_opts)
-
4
queue = bind_queue(channel, exchange)
-
end
-
5
[exchange, queue]
-
end
-
-
# Default behaviour is that all messages are consumed by just
-
# passing the information to the processor. If any part fails
-
# the application needs to gracefully handle because the message
-
# is acked automatically.
-
# To change that behaviour overwrite the fetch_queue_callback
-
# and extend the Processor to handle the ack method.
-
1
def consume(info, properties, payload)
-
5
@executor.run(info, properties, payload)
-
4
@executor.ack(@channel)
-
rescue StandardError => e
-
1
handle_error(e, info, properties, payload)
-
end
-
-
1
def create_channel(options = {})
-
4
connection = Bunny.new(@url, options)
-
4
connection.start
-
-
4
connection.create_channel
-
end
-
-
# return the exchange Bunny object and do the whole setup around it.
-
1
def fetch_exchange(channel, exchange_type, exchange_name, exchange_opts)
-
4
channel.exchanges[exchange_name] ||
-
bunny_exchange(channel, exchange_type, exchange_name, exchange_opts)
-
end
-
-
1
def bind_queue(channel, exchange)
-
4
queue = channel.queue(@queue_name, @queue_opts)
-
4
queue.bind(exchange, @queue_bindings)
-
4
queue
-
end
-
-
1
def handle_error(error, info, properties, payload)
-
1
@error_handler&.call(error, info, properties, payload)
-
end
-
-
1
def bunny_exchange(channel, type, name, opts)
-
1
Bunny::Exchange.new(channel, type, name, opts)
-
end
-
-
1
def queue_opts(config)
-
5
opts = config.slice(:queue_durable, :queue_auto_delete, :queue_exclusive)
-
5
opts.dup.each do |k, _v|
-
5
opts[k.to_s.sub(/^queue_/, '').to_sym] = opts.delete(k)
-
end
-
5
DEFAULT_QUEUE_OPTS.merge(opts)
-
end
-
end
-
end