Merge pull request #7 from arikarim/dev

Refines logging levels for shutdown, stop, and message handling in Rabbit Carrots service
This commit is contained in:
Brusk Awat 2025-05-25 11:54:53 +03:00 committed by GitHub
commit a0c5f80b58
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 40 additions and 20 deletions

View File

@ -1,7 +1,7 @@
PATH PATH
remote: . remote: .
specs: specs:
rabbit_carrots (1.0.2) rabbit_carrots (1.0.3)
bunny (>= 2.22) bunny (>= 2.22)
connection_pool (~> 2.4) connection_pool (~> 2.4)
@ -18,12 +18,12 @@ GEM
minitest (>= 5.1) minitest (>= 5.1)
mutex_m mutex_m
tzinfo (~> 2.0) tzinfo (~> 2.0)
amq-protocol (2.3.2) amq-protocol (2.3.4)
ast (2.4.2) ast (2.4.2)
base64 (0.2.0) base64 (0.2.0)
bigdecimal (3.1.4) bigdecimal (3.1.4)
bunny (2.22.0) bunny (2.24.0)
amq-protocol (~> 2.3, >= 2.3.1) amq-protocol (~> 2.3)
sorted_set (~> 1, >= 1.0.2) sorted_set (~> 1, >= 1.0.2)
concurrent-ruby (1.2.2) concurrent-ruby (1.2.2)
connection_pool (2.4.1) connection_pool (2.4.1)
@ -80,7 +80,7 @@ GEM
rubocop-ast (>= 1.30.0, < 2.0) rubocop-ast (>= 1.30.0, < 2.0)
ruby-progressbar (1.13.0) ruby-progressbar (1.13.0)
ruby2_keywords (0.0.5) ruby2_keywords (0.0.5)
set (1.1.0) set (1.1.2)
sorted_set (1.0.3) sorted_set (1.0.3)
rbtree rbtree
set (~> 1.0) set (~> 1.0)

View File

@ -8,7 +8,7 @@ Puma::Plugin.create do
def start(launcher) def start(launcher)
@log_writer = launcher.log_writer @log_writer = launcher.log_writer
@puma_pid = $$ @puma_pid = $PROCESS_ID
@core_service = RabbitCarrots::Core.new(logger: log_writer) @core_service = RabbitCarrots::Core.new(logger: log_writer)
@ -43,6 +43,7 @@ Puma::Plugin.create do
Process.kill('TERM', rabbit_carrots_pid) Process.kill('TERM', rabbit_carrots_pid)
Process.wait(rabbit_carrots_pid) Process.wait(rabbit_carrots_pid)
rescue Errno::ECHILD, Errno::ESRCH rescue Errno::ECHILD, Errno::ESRCH
log 'Rabbit Carrots already stopped'
end end
def monitor_puma def monitor_puma
@ -57,7 +58,7 @@ Puma::Plugin.create do
loop do loop do
if send(process_dead) if send(process_dead)
log message log message
Process.kill('TERM', $$) Process.kill('TERM', $PROCESS_ID)
break break
end end
sleep 2 sleep 2

View File

@ -18,8 +18,8 @@ module RabbitCarrots
:rabbitmq_exchange_name, :rabbitmq_exchange_name,
:automatically_recover, :automatically_recover,
:network_recovery_interval, :network_recovery_interval,
:recovery_attempts, :recovery_attempts
:orm
def orm def orm
@orm ||= :activerecord @orm ||= :activerecord
end end

View File

@ -11,7 +11,7 @@ module RabbitCarrots
end end
def initialize(logger: nil) def initialize(logger: nil)
@logger = logger || Logger.new(Rails.env.production? ? '/proc/self/fd/1' : $stdout) @logger = create_logger_adapter(logger || Logger.new(Rails.env.production? ? '/proc/self/fd/1' : $stdout))
@threads = [] @threads = []
@running = true @running = true
@shutdown_requested = false @shutdown_requested = false
@ -61,7 +61,7 @@ module RabbitCarrots
def request_shutdown def request_shutdown
# Workaround to a known issue with Signal Traps and logs # Workaround to a known issue with Signal Traps and logs
Thread.start do Thread.start do
logger.log 'Shutting down Rabbit Carrots service...' logger.error 'Shutting down Rabbit Carrots service...'
end end
@shutdown_requested = true @shutdown_requested = true
@threads.each(&:kill) @threads.each(&:kill)
@ -71,7 +71,7 @@ module RabbitCarrots
def stop def stop
# Workaround to a known issue with Signal Traps and logs # Workaround to a known issue with Signal Traps and logs
Thread.start do Thread.start do
logger.log 'Stoppig the Rabbit Carrots service...' logger.error 'Stoppig the Rabbit Carrots service...'
end end
@running = false @running = false
end end
@ -80,7 +80,7 @@ module RabbitCarrots
RabbitCarrots::Connection.instance.channel.with do |channel| RabbitCarrots::Connection.instance.channel.with do |channel|
exchange = channel.topic(RabbitCarrots.configuration.rabbitmq_exchange_name, durable: true) exchange = channel.topic(RabbitCarrots.configuration.rabbitmq_exchange_name, durable: true)
logger.log "Listening on QUEUE: #{queue_name} for ROUTING KEYS: #{routing_keys}" logger.info "Listening on QUEUE: #{queue_name} for ROUTING KEYS: #{routing_keys}"
queue = channel.queue(queue_name, durable: true, arguments: queue_arguments) queue = channel.queue(queue_name, durable: true, arguments: queue_arguments)
routing_keys.map(&:strip).each { |k| queue.bind(exchange, routing_key: k) } routing_keys.map(&:strip).each { |k| queue.bind(exchange, routing_key: k) }
@ -88,24 +88,24 @@ module RabbitCarrots
queue.subscribe(block: false, manual_ack: true, prefetch: 10) do |delivery_info, properties, payload| queue.subscribe(block: false, manual_ack: true, prefetch: 10) do |delivery_info, properties, payload|
break if @shutdown_requested break if @shutdown_requested
logger.log "Received from queue: #{queue_name}, Routing Keys: #{routing_keys}" logger.info "Received from queue: #{queue_name}, Routing Keys: #{routing_keys}"
handler_class.handle!(channel, delivery_info, properties, payload) handler_class.handle!(channel, delivery_info, properties, payload)
channel.ack(delivery_info.delivery_tag, false) channel.ack(delivery_info.delivery_tag, false)
rescue RabbitCarrots::EventHandlers::Errors::NackMessage, JSON::ParserError => _e rescue RabbitCarrots::EventHandlers::Errors::NackMessage, JSON::ParserError => _e
logger.log "Nacked message: #{payload}" logger.warn "Nacked message: #{payload}"
channel.nack(delivery_info.delivery_tag, false, false) channel.nack(delivery_info.delivery_tag, false, false)
rescue RabbitCarrots::EventHandlers::Errors::NackAndRequeueMessage => _e rescue RabbitCarrots::EventHandlers::Errors::NackAndRequeueMessage => _e
logger.log "Nacked and Requeued message: #{payload}" logger.warn "Nacked and Requeued message: #{payload}"
channel.nack(delivery_info.delivery_tag, false, true) channel.nack(delivery_info.delivery_tag, false, true)
rescue self.class.database_agnostic_not_null_violation, self.class.database_agnostic_record_invalid => e rescue self.class.database_agnostic_not_null_violation, self.class.database_agnostic_record_invalid => e
logger.log "Null constraint or Invalid violation: #{payload}. Error: #{e.message}" logger.warn "Null constraint or Invalid violation: #{payload}. Error: #{e.message}"
channel.ack(delivery_info.delivery_tag, false) channel.ack(delivery_info.delivery_tag, false)
rescue self.class.database_agnostic_connection_not_established => e rescue self.class.database_agnostic_connection_not_established => e
logger.log "Error connection not established to the database: #{payload}. Error: #{e.message}" logger.warn "Error connection not established to the database: #{payload}. Error: #{e.message}"
sleep 3 sleep 3
channel.nack(delivery_info.delivery_tag, false, true) channel.nack(delivery_info.delivery_tag, false, true)
rescue StandardError => e rescue StandardError => e
logger.log "Error handling message: #{payload}. Error: #{e.message}" logger.error "Error handling message: #{payload}. Error: #{e.message}"
sleep 3 sleep 3
channel.nack(delivery_info.delivery_tag, false, true) channel.nack(delivery_info.delivery_tag, false, true)
Process.kill('SIGTERM', Process.pid) if kill_to_restart_on_standard_error Process.kill('SIGTERM', Process.pid) if kill_to_restart_on_standard_error
@ -115,5 +115,24 @@ module RabbitCarrots
logger.error "Bunny session error: #{e.message}" logger.error "Bunny session error: #{e.message}"
request_shutdown request_shutdown
end end
private
def create_logger_adapter(logger)
return logger if logger.respond_to?(:info) && logger.respond_to?(:error) && logger.respond_to?(:warn)
adapter = Object.new
def adapter.info(msg)
@logger.write("[INFO] #{msg}\n")
end
def adapter.error(msg)
@logger.write("[ERROR] #{msg}\n")
end
def adapter.warn(msg)
@logger.write("[WARN] #{msg}\n")
end
adapter.instance_variable_set(:@logger, logger)
adapter
end
end end
end end

View File

@ -1,5 +1,5 @@
# frozen_string_literal: true # frozen_string_literal: true
module RabbitCarrots module RabbitCarrots
VERSION = '1.0.2' VERSION = '1.0.3'
end end