Updated

lib/context_request_middleware/push_handler / rabbitmq_push_handler_async.rb

A
61 lines of codes
6 methods
3.6 complexity/method
2 churn
21.33 complexity
0 duplications
# frozen_string_literal: true require 'rabbitmq_client' require 'context_request_middleware/push_handler/base' module ContextRequestMiddleware module PushHandler # PushHandler that publishes the data given to a RabbitMQ exchange. # If the exchange is not existent it will be created. The session is # taken from the session_pool. class RabbitmqPushHandlerAsync < Base # Setup the publisher with configuring via the config options. The # following config options are supported: # @rabbitmq_url url to connect to RabbitMQ # @pool_size size of the connection pool to be used. Defaults to 1 # @session_params a hash defining the params passed to the session. # @heartbeat_publisher heartbeat interval used to communicate with # RabbitMQ. # @exchange_name name of the exchange defaults to 'fos.context_request' # @exchange_type type of the exchange defaults to '' # @exchange_options options passed to the exchange if it has to be # created. def initialize(**config) @config = config.dup exchange = RabbitmqClient::ExchangeRegistry.new exchange.add(exchange_name, exchange_type, exchange_options) @config[:exchange_registry] = exchange @publisher = RabbitmqClient::Publisher.new(@config) config_clean end # Publishes the given data on the exchange. # @data a hash representing the data to be published. # @options options to be passed to the publish to the exchange. def push(data, options) @publisher.publish(data, options.merge(exchange_name: exchange_name)) end private def config_clean @config.delete(:rabbitmq_url) @config.delete(:session_params) @config.delete(:heartbeat) end def exchange_name @exchange_name ||= @config.fetch(:exchange_name, 'fos.context_request') end def exchange_type @config.fetch(:exchange_type, 'topic') end def exchange_options @config.fetch(:exchange_options, {}) end end end end