mirror of
https://github.com/ditkrg/rabbit_carrots.git
synced 2026-01-22 22:06:40 +00:00
120 lines
4.8 KiB
Ruby
120 lines
4.8 KiB
Ruby
module RabbitCarrots
|
|
class Core
|
|
attr_reader :logger
|
|
|
|
@database_agnostic_not_null_violation = nil
|
|
@database_agnostic_connection_not_established = nil
|
|
@database_agnostic_record_invalid = nil
|
|
|
|
class << self
|
|
attr_accessor :database_agnostic_not_null_violation, :database_agnostic_connection_not_established, :database_agnostic_record_invalid
|
|
end
|
|
|
|
def initialize(logger: nil)
|
|
@logger = logger || Logger.new(Rails.env.production? ? '/proc/self/fd/1' : $stdout)
|
|
@threads = []
|
|
@running = true
|
|
@shutdown_requested = false
|
|
end
|
|
|
|
def start(kill_to_restart_on_standard_error: false)
|
|
self.class.database_agnostic_not_null_violation = RabbitCarrots.configuration.orm == :activerecord ? ActiveRecord::NotNullViolation : RabbitCarrots::EventHandlers::Errors::PlaceholderError
|
|
self.class.database_agnostic_connection_not_established = RabbitCarrots.configuration.orm == :activerecord ? ActiveRecord::ConnectionNotEstablished : ::Mongo::Error::SocketError
|
|
self.class.database_agnostic_record_invalid = RabbitCarrots.configuration.orm == :activerecord ? ActiveRecord::RecordInvalid : ::Mongoid::Errors::Validations
|
|
|
|
channels = RabbitCarrots.configuration.routing_key_mappings.map do |mapping|
|
|
{ **mapping, handler: mapping[:handler].constantize }
|
|
end
|
|
|
|
channels.each do |channel|
|
|
handler_class = channel[:handler]
|
|
raise "#{handler_class.name} must respond to `handle!`" unless handler_class.respond_to?(:handle!)
|
|
|
|
@threads << Thread.new do
|
|
run_task(
|
|
queue_name: channel[:queue],
|
|
handler_class:,
|
|
routing_keys: channel[:routing_keys],
|
|
queue_arguments: channel[:arguments],
|
|
kill_to_restart_on_standard_error:
|
|
)
|
|
end
|
|
end
|
|
|
|
Signal.trap('INT') { request_shutdown }
|
|
Signal.trap('TERM') { request_shutdown }
|
|
|
|
while @running
|
|
if @shutdown_requested
|
|
request_shutdown
|
|
sleep 1
|
|
break
|
|
end
|
|
sleep 1
|
|
end
|
|
|
|
@threads.each(&:join)
|
|
rescue StandardError => e
|
|
logger.error "Error starting Rabbit Carrots: #{e.message}"
|
|
end
|
|
|
|
def request_shutdown
|
|
# Workaround to a known issue with Signal Traps and logs
|
|
Thread.start do
|
|
logger.log 'Shutting down Rabbit Carrots service...'
|
|
end
|
|
@shutdown_requested = true
|
|
@threads.each(&:kill)
|
|
stop
|
|
end
|
|
|
|
def stop
|
|
# Workaround to a known issue with Signal Traps and logs
|
|
Thread.start do
|
|
logger.log 'Stoppig the Rabbit Carrots service...'
|
|
end
|
|
@running = false
|
|
end
|
|
|
|
def run_task(queue_name:, handler_class:, routing_keys:, queue_arguments: {}, kill_to_restart_on_standard_error: false)
|
|
RabbitCarrots::Connection.instance.channel.with do |channel|
|
|
exchange = channel.topic(RabbitCarrots.configuration.rabbitmq_exchange_name, durable: true)
|
|
|
|
logger.log "Listening on QUEUE: #{queue_name} for ROUTING KEYS: #{routing_keys}"
|
|
queue = channel.queue(queue_name, durable: true, arguments: queue_arguments)
|
|
|
|
routing_keys.map(&:strip).each { |k| queue.bind(exchange, routing_key: k) }
|
|
|
|
queue.subscribe(block: false, manual_ack: true, prefetch: 10) do |delivery_info, properties, payload|
|
|
break if @shutdown_requested
|
|
|
|
logger.log "Received from queue: #{queue_name}, Routing Keys: #{routing_keys}"
|
|
handler_class.handle!(channel, delivery_info, properties, payload)
|
|
channel.ack(delivery_info.delivery_tag, false)
|
|
rescue RabbitCarrots::EventHandlers::Errors::NackMessage, JSON::ParserError => _e
|
|
logger.log "Nacked message: #{payload}"
|
|
channel.nack(delivery_info.delivery_tag, false, false)
|
|
rescue RabbitCarrots::EventHandlers::Errors::NackAndRequeueMessage => _e
|
|
logger.log "Nacked and Requeued message: #{payload}"
|
|
channel.nack(delivery_info.delivery_tag, false, true)
|
|
rescue self.class.database_agnostic_not_null_violation, self.class.database_agnostic_record_invalid => e
|
|
logger.log "Null constraint or Invalid violation: #{payload}. Error: #{e.message}"
|
|
channel.ack(delivery_info.delivery_tag, false)
|
|
rescue self.class.database_agnostic_connection_not_established => e
|
|
logger.log "Error connection not established to the database: #{payload}. Error: #{e.message}"
|
|
sleep 3
|
|
channel.nack(delivery_info.delivery_tag, false, true)
|
|
rescue StandardError => e
|
|
logger.log "Error handling message: #{payload}. Error: #{e.message}"
|
|
sleep 3
|
|
channel.nack(delivery_info.delivery_tag, false, true)
|
|
Process.kill('SIGTERM', Process.pid) if kill_to_restart_on_standard_error
|
|
end
|
|
end
|
|
rescue StandardError => e
|
|
logger.error "Bunny session error: #{e.message}"
|
|
request_shutdown
|
|
end
|
|
end
|
|
end
|