|
|
|
|
@@ -5,6 +5,10 @@ namespace :rabbit_carrots do
|
|
|
|
|
task eat: :environment do
|
|
|
|
|
Rails.application.eager_load!
|
|
|
|
|
|
|
|
|
|
DatabaseAgonsticNotNullViolation = defined?(ActiveRecord) ? ActiveRecord::NotNullViolation : RabbitCarrots::EventHandlers::Errors::PlaceholderError
|
|
|
|
|
DatabaseAgonsticConnectionNotEstablished = defined?(ActiveRecord) ? ActiveRecord::ConnectionNotEstablished : Mongo::Error::SocketError
|
|
|
|
|
DatabaseAgnosticRecordInvalid = defined?(ActiveRecord) ? ActiveRecord::RecordInvalid : Mongoid::Errors::Validations
|
|
|
|
|
|
|
|
|
|
channels = RabbitCarrots.configuration.routing_key_mappings.map do |mapping|
|
|
|
|
|
# This will be supplied in initializer. At that time, the Handler will not be available to be loaded and will throw Uninitialized Constant
|
|
|
|
|
{ **mapping, handler: mapping[:handler].constantize }
|
|
|
|
|
@@ -18,7 +22,7 @@ namespace :rabbit_carrots do
|
|
|
|
|
|
|
|
|
|
raise "#{handler_class.name} must respond to `handle!`" unless handler_class.respond_to?(:handle!)
|
|
|
|
|
|
|
|
|
|
run_task(queue_name: channel[:queue], handler_class:, routing_keys: channel[:routing_keys])
|
|
|
|
|
run_task(queue_name: channel[:queue], handler_class:, routing_keys: channel[:routing_keys], queue_arguments: channel[:arguments])
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
# Infinite loop to keep the process running
|
|
|
|
|
@@ -28,12 +32,12 @@ namespace :rabbit_carrots do
|
|
|
|
|
end
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
def run_task(queue_name:, handler_class:, routing_keys:)
|
|
|
|
|
def run_task(queue_name:, queue_arguments: {}, handler_class:, routing_keys:)
|
|
|
|
|
RabbitCarrots::Connection.instance.channel.with do |channel|
|
|
|
|
|
exchange = channel.topic(RabbitCarrots.configuration.event_bus_exchange_name, durable: true)
|
|
|
|
|
|
|
|
|
|
Rails.logger.info "Listening on QUEUE: #{queue_name} for ROUTING KEYS: #{routing_keys}"
|
|
|
|
|
queue = channel.queue(queue_name, durable: true)
|
|
|
|
|
queue = channel.queue(queue_name, durable: true, arguments: queue_arguments)
|
|
|
|
|
|
|
|
|
|
routing_keys.map(&:strip).each { |k| queue.bind(exchange, routing_key: k) }
|
|
|
|
|
|
|
|
|
|
@@ -47,11 +51,11 @@ def run_task(queue_name:, handler_class:, routing_keys:)
|
|
|
|
|
rescue RabbitCarrots::EventHandlers::Errors::NackAndRequeueMessage => _e
|
|
|
|
|
Rails.logger.info "Nacked and Requeued message: #{payload}"
|
|
|
|
|
channel.nack(delivery_info.delivery_tag, false, true)
|
|
|
|
|
rescue ActiveRecord::NotNullViolation, ActiveRecord::RecordInvalid => e
|
|
|
|
|
rescue DatabaseAgonsticNotNullViolation, DatabaseAgnosticRecordInvalid => e
|
|
|
|
|
# on null constraint violation, we want to ack the message
|
|
|
|
|
Rails.logger.error "Null constraint or Invalid violation: #{payload}. Error: #{e.message}"
|
|
|
|
|
channel.ack(delivery_info.delivery_tag, false)
|
|
|
|
|
rescue ActiveRecord::ConnectionNotEstablished => e
|
|
|
|
|
rescue DatabaseAgonsticConnectionNotEstablished => e
|
|
|
|
|
# on connection not established, we want to requeue the message and sleep for 3 seconds
|
|
|
|
|
Rails.logger.error "Error connection not established to the database: #{payload}. Error: #{e.message}"
|
|
|
|
|
# delay for 3 seconds before requeuing
|
|
|
|
|
|