3 Commits

Author SHA1 Message Date
3bb19c1850 Version bump 2023-11-12 12:25:58 +03:00
185f0aade8 Works better with Mongoid 2023-11-12 12:25:52 +03:00
b90ef2752b Accepts queue argument 2023-03-29 11:53:48 +03:00
4 changed files with 14 additions and 8 deletions

View File

@@ -1,7 +1,7 @@
PATH PATH
remote: . remote: .
specs: specs:
rabbit_carrots (0.1.17) rabbit_carrots (0.1.19)
bunny (>= 2.19.0) bunny (>= 2.19.0)
connection_pool (~> 2.3.0) connection_pool (~> 2.3.0)
@@ -15,7 +15,7 @@ GEM
tzinfo (~> 2.0) tzinfo (~> 2.0)
amq-protocol (2.3.2) amq-protocol (2.3.2)
ast (2.4.2) ast (2.4.2)
bunny (2.20.3) bunny (2.22.0)
amq-protocol (~> 2.3, >= 2.3.1) amq-protocol (~> 2.3, >= 2.3.1)
sorted_set (~> 1, >= 1.0.2) sorted_set (~> 1, >= 1.0.2)
concurrent-ruby (1.1.10) concurrent-ruby (1.1.10)

View File

@@ -18,6 +18,8 @@ module RabbitCarrots
class NackAndRequeueMessage < StandardError class NackAndRequeueMessage < StandardError
end end
class PlaceholderError < Error; end
end end
end end
end end

View File

@@ -5,6 +5,10 @@ namespace :rabbit_carrots do
task eat: :environment do task eat: :environment do
Rails.application.eager_load! Rails.application.eager_load!
DatabaseAgonsticNotNullViolation = defined?(ActiveRecord) ? ActiveRecord::NotNullViolation : RabbitCarrots::EventHandlers::Errors::PlaceholderError
DatabaseAgonsticConnectionNotEstablished = defined?(ActiveRecord) ? ActiveRecord::ConnectionNotEstablished : Mongo::Error::SocketError
DatabaseAgnosticRecordInvalid = defined?(ActiveRecord) ? ActiveRecord::RecordInvalid : Mongoid::Errors::Validations
channels = RabbitCarrots.configuration.routing_key_mappings.map do |mapping| channels = RabbitCarrots.configuration.routing_key_mappings.map do |mapping|
# This will be supplied in initializer. At that time, the Handler will not be available to be loaded and will throw Uninitialized Constant # This will be supplied in initializer. At that time, the Handler will not be available to be loaded and will throw Uninitialized Constant
{ **mapping, handler: mapping[:handler].constantize } { **mapping, handler: mapping[:handler].constantize }
@@ -18,7 +22,7 @@ namespace :rabbit_carrots do
raise "#{handler_class.name} must respond to `handle!`" unless handler_class.respond_to?(:handle!) raise "#{handler_class.name} must respond to `handle!`" unless handler_class.respond_to?(:handle!)
run_task(queue_name: channel[:queue], handler_class:, routing_keys: channel[:routing_keys]) run_task(queue_name: channel[:queue], handler_class:, routing_keys: channel[:routing_keys], queue_arguments: channel[:arguments])
end end
# Infinite loop to keep the process running # Infinite loop to keep the process running
@@ -28,12 +32,12 @@ namespace :rabbit_carrots do
end end
end end
def run_task(queue_name:, handler_class:, routing_keys:) def run_task(queue_name:, queue_arguments: {}, handler_class:, routing_keys:)
RabbitCarrots::Connection.instance.channel.with do |channel| RabbitCarrots::Connection.instance.channel.with do |channel|
exchange = channel.topic(RabbitCarrots.configuration.event_bus_exchange_name, durable: true) exchange = channel.topic(RabbitCarrots.configuration.event_bus_exchange_name, durable: true)
Rails.logger.info "Listening on QUEUE: #{queue_name} for ROUTING KEYS: #{routing_keys}" Rails.logger.info "Listening on QUEUE: #{queue_name} for ROUTING KEYS: #{routing_keys}"
queue = channel.queue(queue_name, durable: true) queue = channel.queue(queue_name, durable: true, arguments: queue_arguments)
routing_keys.map(&:strip).each { |k| queue.bind(exchange, routing_key: k) } routing_keys.map(&:strip).each { |k| queue.bind(exchange, routing_key: k) }
@@ -47,11 +51,11 @@ def run_task(queue_name:, handler_class:, routing_keys:)
rescue RabbitCarrots::EventHandlers::Errors::NackAndRequeueMessage => _e rescue RabbitCarrots::EventHandlers::Errors::NackAndRequeueMessage => _e
Rails.logger.info "Nacked and Requeued message: #{payload}" Rails.logger.info "Nacked and Requeued message: #{payload}"
channel.nack(delivery_info.delivery_tag, false, true) channel.nack(delivery_info.delivery_tag, false, true)
rescue ActiveRecord::NotNullViolation, ActiveRecord::RecordInvalid => e rescue DatabaseAgonsticNotNullViolation, DatabaseAgnosticRecordInvalid => e
# on null constraint violation, we want to ack the message # on null constraint violation, we want to ack the message
Rails.logger.error "Null constraint or Invalid violation: #{payload}. Error: #{e.message}" Rails.logger.error "Null constraint or Invalid violation: #{payload}. Error: #{e.message}"
channel.ack(delivery_info.delivery_tag, false) channel.ack(delivery_info.delivery_tag, false)
rescue ActiveRecord::ConnectionNotEstablished => e rescue DatabaseAgonsticConnectionNotEstablished => e
# on connection not established, we want to requeue the message and sleep for 3 seconds # on connection not established, we want to requeue the message and sleep for 3 seconds
Rails.logger.error "Error connection not established to the database: #{payload}. Error: #{e.message}" Rails.logger.error "Error connection not established to the database: #{payload}. Error: #{e.message}"
# delay for 3 seconds before requeuing # delay for 3 seconds before requeuing

View File

@@ -1,5 +1,5 @@
# frozen_string_literal: true # frozen_string_literal: true
module RabbitCarrots module RabbitCarrots
VERSION = '0.1.17' VERSION = '0.1.19'
end end