mirror of
https://github.com/ditkrg/rabbit_carrots.git
synced 2026-01-22 22:06:40 +00:00
Improves error handling and shut down. Moves code for reusability
This commit is contained in:
parent
ff838c50ad
commit
876a48a1ab
@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
require_relative 'rabbit_carrots/version'
|
require_relative 'rabbit_carrots/version'
|
||||||
require 'rabbit_carrots/connection'
|
require 'rabbit_carrots/connection'
|
||||||
|
require 'rabbit_carrots/core'
|
||||||
require 'rabbit_carrots/configuration'
|
require 'rabbit_carrots/configuration'
|
||||||
require 'rabbit_carrots/railtie' if defined?(Rails)
|
require 'rabbit_carrots/railtie' if defined?(Rails)
|
||||||
|
|
||||||
|
|||||||
113
lib/rabbit_carrots/core.rb
Normal file
113
lib/rabbit_carrots/core.rb
Normal file
@ -0,0 +1,113 @@
|
|||||||
|
module RabbitCarrots
|
||||||
|
class Core
|
||||||
|
attr_reader :logger
|
||||||
|
|
||||||
|
DatabaseAgonsticNotNullViolation = defined?(ActiveRecord) ? ActiveRecord::NotNullViolation : RabbitCarrots::EventHandlers::Errors::PlaceholderError
|
||||||
|
DatabaseAgonsticConnectionNotEstablished = defined?(ActiveRecord) ? ActiveRecord::ConnectionNotEstablished : Mongo::Error::SocketError
|
||||||
|
DatabaseAgnosticRecordInvalid = defined?(ActiveRecord) ? ActiveRecord::RecordInvalid : Mongoid::Errors::Validations
|
||||||
|
|
||||||
|
def initialize(logger: nil)
|
||||||
|
@logger = logger || Logger.new(Rails.env.production? ? '/proc/self/fd/1' : $stdout)
|
||||||
|
@threads = []
|
||||||
|
@running = true
|
||||||
|
@shutdown_requested = false
|
||||||
|
end
|
||||||
|
|
||||||
|
def start(kill_to_restart_on_standard_error: false)
|
||||||
|
channels = RabbitCarrots.configuration.routing_key_mappings.map do |mapping|
|
||||||
|
{ **mapping, handler: mapping[:handler].constantize }
|
||||||
|
end
|
||||||
|
|
||||||
|
channels.each do |channel|
|
||||||
|
handler_class = channel[:handler]
|
||||||
|
raise "#{handler_class.name} must respond to `handle!`" unless handler_class.respond_to?(:handle!)
|
||||||
|
|
||||||
|
@threads << Thread.new do
|
||||||
|
run_task(
|
||||||
|
queue_name: channel[:queue],
|
||||||
|
handler_class:,
|
||||||
|
routing_keys: channel[:routing_keys],
|
||||||
|
queue_arguments: channel[:arguments],
|
||||||
|
kill_to_restart_on_standard_error:
|
||||||
|
)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
Signal.trap('INT') { request_shutdown }
|
||||||
|
Signal.trap('TERM') { request_shutdown }
|
||||||
|
|
||||||
|
while @running
|
||||||
|
if @shutdown_requested
|
||||||
|
request_shutdown
|
||||||
|
sleep 1
|
||||||
|
break
|
||||||
|
end
|
||||||
|
sleep 1
|
||||||
|
end
|
||||||
|
|
||||||
|
@threads.each(&:join)
|
||||||
|
rescue StandardError => e
|
||||||
|
logger.error "Error starting Rabbit Carrots: #{e.message}"
|
||||||
|
end
|
||||||
|
|
||||||
|
def request_shutdown
|
||||||
|
# Workaround to a known issue with Signal Traps and logs
|
||||||
|
Thread.start do
|
||||||
|
logger.info 'Shutting down Rabbit Carrots service...'
|
||||||
|
end
|
||||||
|
@shutdown_requested = true
|
||||||
|
@threads.each(&:kill)
|
||||||
|
stop
|
||||||
|
end
|
||||||
|
|
||||||
|
def stop
|
||||||
|
# Workaround to a known issue with Signal Traps and logs
|
||||||
|
Thread.start do
|
||||||
|
logger.info 'Stoppig the Rabbit Carrots service...'
|
||||||
|
end
|
||||||
|
@running = false
|
||||||
|
end
|
||||||
|
|
||||||
|
def run_task(queue_name:, handler_class:, routing_keys:, queue_arguments: {}, kill_to_restart_on_standard_error: false)
|
||||||
|
RabbitCarrots::Connection.instance.channel.with do |channel|
|
||||||
|
exchange = channel.topic(RabbitCarrots.configuration.rabbitmq_exchange_name, durable: true)
|
||||||
|
|
||||||
|
logger.info "Listening on QUEUE: #{queue_name} for ROUTING KEYS: #{routing_keys}"
|
||||||
|
queue = channel.queue(queue_name, durable: true, arguments: queue_arguments)
|
||||||
|
|
||||||
|
routing_keys.map(&:strip).each { |k| queue.bind(exchange, routing_key: k) }
|
||||||
|
|
||||||
|
queue.subscribe(block: false, manual_ack: true, prefetch: 10) do |delivery_info, properties, payload|
|
||||||
|
break if @shutdown_requested
|
||||||
|
|
||||||
|
logger.info "Received from queue: #{queue_name}, Routing Keys: #{routing_keys}"
|
||||||
|
handler_class.handle!(channel, delivery_info, properties, payload)
|
||||||
|
channel.ack(delivery_info.delivery_tag, false)
|
||||||
|
rescue RabbitCarrots::EventHandlers::Errors::NackMessage, JSON::ParserError => _e
|
||||||
|
logger.info "Nacked message: #{payload}"
|
||||||
|
channel.nack(delivery_info.delivery_tag, false, false)
|
||||||
|
rescue RabbitCarrots::EventHandlers::Errors::NackAndRequeueMessage => _e
|
||||||
|
logger.info "Nacked and Requeued message: #{payload}"
|
||||||
|
channel.nack(delivery_info.delivery_tag, false, true)
|
||||||
|
rescue DatabaseAgonsticNotNullViolation, DatabaseAgnosticRecordInvalid => e
|
||||||
|
logger.info "Null constraint or Invalid violation: #{payload}. Error: #{e.message}"
|
||||||
|
channel.ack(delivery_info.delivery_tag, false)
|
||||||
|
rescue DatabaseAgonsticConnectionNotEstablished => e
|
||||||
|
logger.info "Error connection not established to the database: #{payload}. Error: #{e.message}"
|
||||||
|
sleep 3
|
||||||
|
channel.nack(delivery_info.delivery_tag, false, true)
|
||||||
|
rescue StandardError => e
|
||||||
|
logger.info "Error handling message: #{payload}. Error: #{e.message}"
|
||||||
|
sleep 3
|
||||||
|
channel.nack(delivery_info.delivery_tag, false, true)
|
||||||
|
Process.kill('SIGTERM', Process.pid) if kill_to_restart_on_standard_error
|
||||||
|
end
|
||||||
|
|
||||||
|
logger.info "Ending task for queue: #{queue_name}"
|
||||||
|
end
|
||||||
|
rescue StandardError => e
|
||||||
|
logger.error "Bunny session error: #{e.message}"
|
||||||
|
request_shutdown
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
@ -1,77 +1,13 @@
|
|||||||
require 'bunny'
|
|
||||||
|
|
||||||
namespace :rabbit_carrots do
|
namespace :rabbit_carrots do
|
||||||
desc 'Listener for Queue'
|
desc 'Rake task for standalone RabbitCarrots mode'
|
||||||
task eat: :environment do
|
task eat: :environment do
|
||||||
Rails.application.eager_load!
|
Rails.application.eager_load!
|
||||||
|
|
||||||
# rubocop:disable Lint/ConstantDefinitionInBlock
|
logger = Logger.new(Rails.env.production? ? '/proc/self/fd/1' : $stdout)
|
||||||
DatabaseAgonsticNotNullViolation = defined?(ActiveRecord) ? ActiveRecord::NotNullViolation : RabbitCarrots::EventHandlers::Errors::PlaceholderError
|
logger.level = Logger::INFO
|
||||||
DatabaseAgonsticConnectionNotEstablished = defined?(ActiveRecord) ? ActiveRecord::ConnectionNotEstablished : Mongo::Error::SocketError
|
|
||||||
DatabaseAgnosticRecordInvalid = defined?(ActiveRecord) ? ActiveRecord::RecordInvalid : Mongoid::Errors::Validations
|
|
||||||
# rubocop:enable Lint/ConstantDefinitionInBlock
|
|
||||||
|
|
||||||
channels = RabbitCarrots.configuration.routing_key_mappings.map do |mapping|
|
core_service = RabbitCarrots::Core.new(logger:)
|
||||||
# This will be supplied in initializer. At that time, the Handler will not be available to be loaded and will throw Uninitialized Constant
|
|
||||||
{ **mapping, handler: mapping[:handler].constantize }
|
|
||||||
end
|
|
||||||
|
|
||||||
Rails.logger = Logger.new(Rails.env.production? ? '/proc/self/fd/1' : $stdout)
|
core_service.start(kill_to_restart_on_standard_error: true)
|
||||||
|
|
||||||
# Run RMQ Subscriber for each channel
|
|
||||||
channels.each do |channel|
|
|
||||||
handler_class = channel[:handler]
|
|
||||||
|
|
||||||
raise "#{handler_class.name} must respond to `handle!`" unless handler_class.respond_to?(:handle!)
|
|
||||||
|
|
||||||
run_task(queue_name: channel[:queue], handler_class:, routing_keys: channel[:routing_keys], queue_arguments: channel[:arguments])
|
|
||||||
end
|
|
||||||
|
|
||||||
# Infinite loop to keep the process running
|
|
||||||
loop do
|
|
||||||
sleep 1
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def run_task(queue_name:, handler_class:, routing_keys:, queue_arguments: {})
|
|
||||||
RabbitCarrots::Connection.instance.channel.with do |channel|
|
|
||||||
exchange = channel.topic(RabbitCarrots.configuration.rabbitmq_exchange_name, durable: true)
|
|
||||||
|
|
||||||
Rails.logger.info "Listening on QUEUE: #{queue_name} for ROUTING KEYS: #{routing_keys}"
|
|
||||||
queue = channel.queue(queue_name, durable: true, arguments: queue_arguments)
|
|
||||||
|
|
||||||
routing_keys.map(&:strip).each { |k| queue.bind(exchange, routing_key: k) }
|
|
||||||
|
|
||||||
queue.subscribe(block: false, manual_ack: true, prefetch: 10) do |delivery_info, properties, payload|
|
|
||||||
Rails.logger.info "Received from queue: #{queue_name}, Routing Keys: #{routing_keys}"
|
|
||||||
handler_class.handle!(channel, delivery_info, properties, payload)
|
|
||||||
channel.ack(delivery_info.delivery_tag, false)
|
|
||||||
rescue RabbitCarrots::EventHandlers::Errors::NackMessage, JSON::ParserError => _e
|
|
||||||
Rails.logger.info "Nacked message: #{payload}"
|
|
||||||
channel.nack(delivery_info.delivery_tag, false, false)
|
|
||||||
rescue RabbitCarrots::EventHandlers::Errors::NackAndRequeueMessage => _e
|
|
||||||
Rails.logger.info "Nacked and Requeued message: #{payload}"
|
|
||||||
channel.nack(delivery_info.delivery_tag, false, true)
|
|
||||||
rescue DatabaseAgonsticNotNullViolation, DatabaseAgnosticRecordInvalid => e
|
|
||||||
# on null constraint violation, we want to ack the message
|
|
||||||
Rails.logger.error "Null constraint or Invalid violation: #{payload}. Error: #{e.message}"
|
|
||||||
channel.ack(delivery_info.delivery_tag, false)
|
|
||||||
rescue DatabaseAgonsticConnectionNotEstablished => e
|
|
||||||
# on connection not established, we want to requeue the message and sleep for 3 seconds
|
|
||||||
Rails.logger.error "Error connection not established to the database: #{payload}. Error: #{e.message}"
|
|
||||||
# delay for 3 seconds before requeuing
|
|
||||||
sleep 3
|
|
||||||
channel.nack(delivery_info.delivery_tag, false, true)
|
|
||||||
rescue StandardError => e
|
|
||||||
Rails.logger.error "Error handling message: #{payload}. Error: #{e.message}"
|
|
||||||
# requeue the message then kill the container
|
|
||||||
sleep 3
|
|
||||||
channel.nack(delivery_info.delivery_tag, false, true)
|
|
||||||
# kill the container with sigterm
|
|
||||||
Process.kill('SIGTERM', Process.pid)
|
|
||||||
end
|
|
||||||
|
|
||||||
Rails.logger.info 'RUN TASK ENDED'
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user