Updated

lib/context_request_middleware/push_handler / rabbitmq_push_handler.rb

B
105 lines of codes
10 methods
5.2 complexity/method
4 churn
51.74 complexity
0 duplications
# frozen_string_literal: true require 'bunny' require 'connection_pool' require 'context_request_middleware/push_handler/base' module ContextRequestMiddleware module PushHandler # PushHandler that pusblishes the data given to a RabbitMQ exchange. # If the exchange is not existant it will be created. The session is # taken from the session_pool. class RabbitmqPushHandler < Base # :nodoc: class ConfirmationFailed < StandardError
  1. ContextRequestMiddleware::PushHandler::RabbitmqPushHandler::ConfirmationFailed has no descriptive comment
def initialize(channel, nacked, unconfirmed) super("Message confirmation on the exchange #{channel} has failed\ (#{nacked}/#{unconfirmed}).") end end # Setup the rublisher with configuring via the config options. The # following config options are supported: # @rabbit_mq_url url to connect to RabbitMQ # @pool_size size of the connection pool to be used. Defaults to 1 # @session_params a hash definiting the params passed to the session. # @heartbeat heartbeat interval used to communicate with # RabbitMQ. # @exchange_name name of the exchange defaults to 'fos.context_request' # @exchange_type type of the exchange defaults to '' # @exchange_options options passed to the exchange if it has to be # created. # rubocop:disable Metrics/MethodLength def initialize(**config) @config = config.dup @session_params = config.fetch(:session_params, {}) .merge(threaded: false, automatically_recover: false, heartbeat: config[:heartbeat]) pool_size = @session_params.delete(:session_pool) || 1 @session_params.freeze @session_pool = ConnectionPool.new(size: pool_size) do Bunny.new(config[:rabbit_mq_url], @session_params) end config_clean end # rubocop:enable Metrics/MethodLength # Publishes the given data on the exchange. The exchange is created if # it does not exist. # @data a hash representing the data to be published as json. # @options options to be passed to the publish to the exchange. def push(data, options)
  1. ContextRequestMiddleware::PushHandler::RabbitmqPushHandler#push has approx 8 statements
@session_pool.with do |session| session.start channel = session.create_channel channel.confirm_select exchange = fetch_exchange(session, channel) exchange.publish(data.to_json, **options) wait_for_confirms(channel) channel.close end end private def wait_for_confirms(channel) return if channel.wait_for_confirms
  1. ContextRequestMiddleware::PushHandler::RabbitmqPushHandler#wait_for_confirms refers to 'channel' more than self (maybe move it to another class?) Locations: 0 1 2
raise ConfirmationFailed.new(exchange_name, channel.nacked_set,
  1. ContextRequestMiddleware::PushHandler::RabbitmqPushHandler#wait_for_confirms refers to 'channel' more than self (maybe move it to another class?) Locations: 0 1 2
channel.unconfirmed_set)
  1. ContextRequestMiddleware::PushHandler::RabbitmqPushHandler#wait_for_confirms refers to 'channel' more than self (maybe move it to another class?) Locations: 0 1 2
end def config_clean @config.delete(:rabbit_mq_url) @config.delete(:session_params) @config.delete(:heartbeat) end # return the channel if a channel is already there otherwise create a new # exchange with the predefined settings. # Can be overwriten by ContextRequestMiddleware.fetch_exchange_callback def fetch_exchange(_session, channel) channel.exchanges[exchange_name] || bunny_exchange(channel) end def bunny_exchange(channel) Bunny::Exchange.new(channel, exchange_type, exchange_name, exchange_options) end def exchange_name @exchange_name ||= @config.fetch(:exchange_name, 'fos.context_request') end def exchange_type @config.fetch(:exchange_type, 'topic') end def exchange_options @config.fetch(:exchange_options, {}) end end end end