loading
Generated 2020-03-13T07:07:50+00:00

All Files ( 100.0% covered at 2.61 hits/line )

9 files in total.
171 relevant lines, 171 lines covered and 0 lines missed. ( 100.0% )
File % covered Lines Relevant Lines Lines covered Lines missed Avg. Hits / Line
lib/context_request_subscriber.rb 100.00 % 79 33 33 0 1.12
lib/context_request_subscriber/error_handler.rb 100.00 % 26 12 12 0 2.00
lib/context_request_subscriber/handler.rb 100.00 % 7 2 2 0 1.00
lib/context_request_subscriber/handler/json_api_handler.rb 100.00 % 30 14 14 0 1.21
lib/context_request_subscriber/processor.rb 100.00 % 59 28 28 0 3.75
lib/context_request_subscriber/processor/base.rb 100.00 % 28 13 13 0 3.77
lib/context_request_subscriber/processor/context.rb 100.00 % 8 3 3 0 1.00
lib/context_request_subscriber/processor/request.rb 100.00 % 8 3 3 0 1.00
lib/context_request_subscriber/rabbitmq_subscriber.rb 100.00 % 152 63 63 0 3.27

lib/context_request_subscriber.rb

100.0% lines covered

33 relevant lines. 33 lines covered and 0 lines missed.
    
  1. # frozen_string_literal: true
  2. 1 require 'json'
  3. 1 require 'active_support'
  4. 1 require 'active_support/inflector'
  5. 1 require 'context_request_subscriber/handler'
  6. 1 require 'context_request_subscriber/processor'
  7. 1 require 'context_request_subscriber/error_handler'
  8. 1 require 'context_request_subscriber/rabbitmq_subscriber'
  9. # Base module for the context request subscriber.
  10. # This class provides all the configurables to the logic for the
  11. # subscriber and handler logic of the request context logic.
  12. 1 module ContextRequestSubscriber
  13. 1 include ActiveSupport::Configurable
  14. 1 config_accessor(:logger, instance_accessor: false) do
  15. 1 ActiveSupport::TaggedLogging.new(Logger.new(STDOUT))
  16. end
  17. 1 config_accessor(:queue_name, instance_accessor: false) do
  18. 1 'fos.context_request'
  19. end
  20. 1 config_accessor(:exchange_name, instance_accessor: false) do
  21. 1 'fos.context_request'
  22. end
  23. # queue_durable: if the queue is durable. Default: true
  24. 1 config_accessor(:queue_durable, instance_accessor: false)
  25. # queue_auto_delete: if the queue gets autodeleted.
  26. 1 config_accessor(:queue_auto_delete, instance_accessor: false)
  27. # queue_exclusive: default false.
  28. 1 config_accessor(:queue_exclusive, instance_accessor: false)
  29. # Callable that is called instead of the default logic to return
  30. # a Bunny object for both the channel, the connection and the queue.
  31. 1 config_accessor(:fetch_queue_callback, instance_accessor: false)
  32. # routing_key: the routing_key used. Default #.
  33. 2 config_accessor(:routing_key, instance_accessor: false) { '#' }
  34. # Hash of small cased classname of callable object to be called in order
  35. # to handle the message. The type of the message indicates the handler class.
  36. # Defaults to JsonApiHandler for request and context types.
  37. 1 config_accessor(:handlers, instance_accessor: false) do
  38. 1 {
  39. context: ContextRequestSubscriber::Handler::JsonApiHandler::Context,
  40. request: ContextRequestSubscriber::Handler::JsonApiHandler::Request
  41. }
  42. end
  43. # Handler URL is the url to reach out for handling.
  44. 1 config_accessor(:handler_url, instance_accessor: false)
  45. # Set of session parameters
  46. 2 config_accessor(:session_params, instance_accessor: false) { {} }
  47. # url to connect to the RabbitMQ Server
  48. 1 config_accessor(:url, instance_accessor: false)
  49. # heartbeat: heartbeat for the connection. Defaults to nil.
  50. 1 config_accessor(:heartbeat, instance_accessor: false)
  51. # on_error: callable object that handles errors during processing the
  52. # message.
  53. 1 config_accessor(:on_error, instance_accessor: false) do
  54. 1 ErrorHandler::LogErrorHandler
  55. end
  56. # Instruct the subscriber to keep the subscriber alive even if the
  57. # queue is empty.
  58. # Default: true
  59. 2 config_accessor(:subscriber_keep_alive, instance_accessor: false) { false }
  60. 2 config_accessor(:handler_params, instance_accessor: false) { {} }
  61. 1 def self.run
  62. 1 subscriber = RabbitMQSubscriber.new(**config)
  63. 1 subscriber.run
  64. end
  65. end

lib/context_request_subscriber/error_handler.rb

100.0% lines covered

12 relevant lines. 12 lines covered and 0 lines missed.
    
  1. # frozen_string_literal: true
  2. 1 module ContextRequestSubscriber
  3. 1 module ErrorHandler
  4. # :nodoc:
  5. 1 class LogErrorHandler
  6. 1 def initialize(logger = nil)
  7. 7 @logger = (logger || ContextRequestSubscriber.logger)
  8. end
  9. 1 def call(error, _info, _properties, _payload)
  10. 3 @logger.error("Error recieved during processing the message. \
  11. Error #{error}.")
  12. 3 @logger.error(error.backtrace.join("\n")) if error.backtrace
  13. end
  14. end
  15. # :nodoc:
  16. 1 class LogAndRaiseErrorHandler < LogErrorHandler
  17. 1 def call(error, info, properties, payload)
  18. 2 super(error, info, properties, payload)
  19. 2 raise error
  20. end
  21. end
  22. end
  23. end

lib/context_request_subscriber/handler.rb

100.0% lines covered

2 relevant lines. 2 lines covered and 0 lines missed.
    
  1. # frozen_string_literal: true
  2. 1 require 'context_request_subscriber/handler/json_api_handler'
  3. # :nodoc:
  4. 1 module Handler
  5. end

lib/context_request_subscriber/handler/json_api_handler.rb

100.0% lines covered

14 relevant lines. 14 lines covered and 0 lines missed.
    
  1. # frozen_string_literal: true
  2. 1 require 'json_api_client'
  3. 1 module ContextRequestSubscriber
  4. 1 module Handler
  5. 1 module JsonApiHandler
  6. # :nodoc:
  7. 1 class Base < JsonApiClient::Resource
  8. 1 def initialize(params, **keys)
  9. 2 @headers = keys.fetch(:handler_headers, {})
  10. 2 self.class.site = keys.fetch(:site, nil)
  11. 2 super(params)
  12. end
  13. 1 def call
  14. 1 self.class.with_headers(@headers) do
  15. 1 save
  16. end
  17. end
  18. end
  19. # :nodoc:
  20. 1 class Context < Base; end
  21. # :nodoc:
  22. 1 class Request < Base; end
  23. end
  24. end
  25. end

lib/context_request_subscriber/processor.rb

100.0% lines covered

28 relevant lines. 28 lines covered and 0 lines missed.
    
  1. # frozen_string_literal: true
  2. 1 module ContextRequestSubscriber
  3. # :nodoc:
  4. 1 module Processor
  5. 1 autoload :Base, 'context_request_subscriber/processor/base'
  6. 1 autoload :Request, 'context_request_subscriber/processor/request'
  7. 1 autoload :Context, 'context_request_subscriber/processor/context'
  8. # Class to execute the processing of the messages.
  9. # Base on the message_type a processor is instantiated and provides
  10. # processing.
  11. 1 class Executor
  12. 1 attr_accessor :handler_params
  13. 1 def initialize(logger, **keys)
  14. 8 @logger = logger
  15. 8 @handler_params = keys[:handler_params] || {}
  16. end
  17. 1 def run(_delivery_info, properties, payload)
  18. 8 @logger.debug("#{properties[:type]} processing message.")
  19. 8 @logger.debug(" payload: #{payload}")
  20. 8 @logger.debug(" properties: #{properties}")
  21. 8 payload = JSON.parse(payload)
  22. 7 process(properties[:type], properties[:headers]&.dig('version'),
  23. payload)
  24. rescue JSON::ParserError
  25. 1 @logger.error("Could not parse message payload \
  26. with payload #{payload}. Ignoring.")
  27. 1 nil
  28. end
  29. 1 def process(name, version = nil, payload = {})
  30. 7 processor = Processor.processor_class(name, version)
  31. &.new(@logger, handler_params: handler_params)
  32. 7 if processor
  33. 6 processor.call(payload)
  34. else
  35. 1 @logger.error("Invalid processor type or \
  36. processor type with name #{name} cannot be found")
  37. end
  38. end
  39. # Method can be overwriten to communicate with the MQ and ack the message
  40. # if processing was successful.
  41. # Default is to do nothing because the setup is auto_ack.
  42. 1 def ack(_channel); end
  43. end
  44. 1 def self.processor_class(name, version = nil)
  45. 7 version = "V#{version}" if version
  46. [self.name, version, name.tr('.', '_').camelize].compact.join('::')
  47. 7 .constantize
  48. rescue NameError
  49. 1 nil
  50. end
  51. end
  52. end

lib/context_request_subscriber/processor/base.rb

100.0% lines covered

13 relevant lines. 13 lines covered and 0 lines missed.
    
  1. # frozen_string_literal: true
  2. 1 module ContextRequestSubscriber
  3. 1 module Processor
  4. # :nodoc:
  5. 1 class Base
  6. 1 def initialize(logger, **keys)
  7. 8 @logger = logger
  8. 8 @handler_params = keys[:handler_params] || {}
  9. end
  10. 1 def call(payload)
  11. 7 if (handler = ContextRequestSubscriber.handlers[type_name])
  12. 2 handler.new(payload, **@handler_params).call
  13. else
  14. 5 @logger.error("Could not find handler for \
  15. message type #{type_name}")
  16. end
  17. end
  18. 1 private
  19. 1 def type_name
  20. 12 self.class.name.split('::').last.underscore.downcase.to_sym
  21. end
  22. end
  23. end
  24. end

lib/context_request_subscriber/processor/context.rb

100.0% lines covered

3 relevant lines. 3 lines covered and 0 lines missed.
    
  1. # frozen_string_literal: true
  2. 1 module ContextRequestSubscriber
  3. 1 module Processor
  4. 1 class Context < Base
  5. end
  6. end
  7. end

lib/context_request_subscriber/processor/request.rb

100.0% lines covered

3 relevant lines. 3 lines covered and 0 lines missed.
    
  1. # frozen_string_literal: true
  2. 1 module ContextRequestSubscriber
  3. 1 module Processor
  4. 1 class Request < Base
  5. end
  6. end
  7. end

lib/context_request_subscriber/rabbitmq_subscriber.rb

100.0% lines covered

63 relevant lines. 63 lines covered and 0 lines missed.
    
  1. # frozen_string_literal: true
  2. 1 require 'bunny'
  3. 1 module ContextRequestSubscriber
  4. # The subscriber to attach to the exchange to process the
  5. # ContextRequest data.
  6. #
  7. # ErrorHandler is a Callable object that gets called for any exception
  8. # during the execution of the processor.
  9. # The call method gets passed the following information:
  10. # @exception the exception caught
  11. # @delivery_info the delivery_info object
  12. # @properties properties passed in the message (headers, ..)
  13. # @payload the payload of the message
  14. # The ErrorHandler has to decide what to do with the message. If none
  15. # is given the error will be ignored!
  16. 1 class RabbitMQSubscriber
  17. 1 DEFAULT_QUEUE_OPTS = {
  18. exclusive: false, durable: true
  19. }.freeze
  20. 1 DEFAULT_SESSION_PARAMS = {
  21. threaded: true, automatically_recover: true
  22. }.freeze
  23. # @processor the processor to be used to handle the data.
  24. # @config the configurables supported as follows
  25. # session_params: set of session params
  26. # url: url to connect to the RabbitMQ Server
  27. # heartbeat: heartbeat for the connection. Defaults to nil.
  28. # exchange_name: the exchange connected to the queue
  29. # exchange_options: the options used to create the exchange if not
  30. # existing.
  31. # exchange_type: type of the exchange, defaults to topic.
  32. # queue_name: the queue where the data is received from.
  33. # queue_durable: if the queue is durable. Default: true
  34. # queue_auto_delete: if the queue gets autodeleted.
  35. # queue_exclusive: default false.
  36. # routing_key: the routing_key used. Default nil.
  37. # on_error: callable object that handles errors during processing the
  38. # message.
  39. 1 def initialize(**config)
  40. 5 @error_handler = config[:on_error].new(config[:logger])
  41. 5 @logger = config[:logger]
  42. 5 @executor = Processor::Executor.new(config[:logger],
  43. config.slice(:handler_params))
  44. 5 config_connection(config)
  45. 5 config_exchange(config)
  46. 5 config_queue(config)
  47. end
  48. 1 def run
  49. 5 exchange, queue = setup_queue
  50. 5 @consumer = queue.subscribe(manual_ack: true,
  51. block: false) do |info, properties, payload|
  52. 5 consume(info, properties, payload)
  53. end
  54. 4 join(exchange)
  55. end
  56. 1 def join(exchange)
  57. 4 @join_workpool && exchange.channel.work_pool.join
  58. end
  59. 1 private
  60. 1 def config_connection(config)
  61. 5 @session_params = config.fetch(:session_params, {})
  62. .merge(DEFAULT_SESSION_PARAMS)
  63. 5 @url = config[:url]
  64. end
  65. 1 def config_queue(config)
  66. 5 @queue_name = config[:queue_name]
  67. 5 @queue_bindings = config.slice(:routing_key)
  68. 5 @queue_opts = queue_opts(config)
  69. end
  70. 1 def config_exchange(config)
  71. 5 @exchange_type = config.fetch(:exchange_type, 'topic')
  72. 5 @exchange_name = config.fetch(:exchange_name,
  73. ContextRequestSubscriber.exchange_name)
  74. 5 @exchange_opts = config.fetch(:exchange_options, {})
  75. 5 @join_workpool = config.fetch(:subscriber_join, true)
  76. end
  77. # setup the whole message queue settings and return the queue object.
  78. # Can be overwriten by ContextRequestMiddleware.fetch_exchange_callback
  79. 1 def setup_queue
  80. 5 callback = ContextRequestSubscriber.fetch_queue_callback
  81. 5 if callback
  82. 1 exchange, queue = callback.call(self)
  83. else
  84. 4 channel = create_channel(@session_params)
  85. 4 exchange = fetch_exchange(channel, @exchange_type, @exchange_name,
  86. @exchange_opts)
  87. 4 queue = bind_queue(channel, exchange)
  88. end
  89. 5 [exchange, queue]
  90. end
  91. # Default behaviour is that all messages are consumed by just
  92. # passing the information to the processor. If any part fails
  93. # the application needs to gracefully handle because the message
  94. # is acked automatically.
  95. # To change that behaviour overwrite the fetch_queue_callback
  96. # and extend the Processor to handle the ack method.
  97. 1 def consume(info, properties, payload)
  98. 5 @executor.run(info, properties, payload)
  99. 4 @executor.ack(@channel)
  100. rescue StandardError => e
  101. 1 handle_error(e, info, properties, payload)
  102. end
  103. 1 def create_channel(options = {})
  104. 4 connection = Bunny.new(@url, options)
  105. 4 connection.start
  106. 4 connection.create_channel
  107. end
  108. # return the exchange Bunny object and do the whole setup around it.
  109. 1 def fetch_exchange(channel, exchange_type, exchange_name, exchange_opts)
  110. 4 channel.exchanges[exchange_name] ||
  111. bunny_exchange(channel, exchange_type, exchange_name, exchange_opts)
  112. end
  113. 1 def bind_queue(channel, exchange)
  114. 4 queue = channel.queue(@queue_name, @queue_opts)
  115. 4 queue.bind(exchange, @queue_bindings)
  116. 4 queue
  117. end
  118. 1 def handle_error(error, info, properties, payload)
  119. 1 @error_handler&.call(error, info, properties, payload)
  120. end
  121. 1 def bunny_exchange(channel, type, name, opts)
  122. 1 Bunny::Exchange.new(channel, type, name, opts)
  123. end
  124. 1 def queue_opts(config)
  125. 5 opts = config.slice(:queue_durable, :queue_auto_delete, :queue_exclusive)
  126. 5 opts.dup.each do |k, _v|
  127. 5 opts[k.to_s.sub(/^queue_/, '').to_sym] = opts.delete(k)
  128. end
  129. 5 DEFAULT_QUEUE_OPTS.merge(opts)
  130. end
  131. end
  132. end