23 Commits

Author SHA1 Message Date
Brusk Awat
41074d617f Merge pull request #15 from arikarim/multi-exchange-
fix: Enhance run_task method to handle existing RabbitMQ exchanges by…
2026-01-18 15:14:57 +03:00
Ari Karim
0bcaee2805 chore: Bump rabbit_carrots gem version to 1.1.1 in Gemfile.lock and version.rb. 2026-01-18 14:31:20 +03:00
Ari Karim
d553042d1c fix: Enhance run_task method to handle existing RabbitMQ exchanges by attempting passive read before declaring new ones. 2026-01-18 14:30:07 +03:00
Brusk Awat
5d2e2df8d2 Merge pull request #14 from arikarim/multi-exchange-
feat: Add exchange_name parameter to run_task method and update Ruboc…
2026-01-18 12:04:10 +03:00
Ari Karim
8f8882c428 chore: Bump rabbit_carrots gem version to 1.1.0 in Gemfile.lock and version.rb. 2026-01-18 11:06:19 +03:00
Ari Karim
d7c7415438 feat: Add exchange_name parameter to run_task method and update Rubocop configuration for parameter lists. 2026-01-18 11:04:42 +03:00
Brusk Awat
4ac772d12c Merge pull request #11 from muhammadnawzad/main
Update gem dependencies, refactor minor code patterns, and adjust Rubocop configuration.
2026-01-18 10:38:40 +03:00
Muhammad Nawzad
cdf3e42f35 chore: Update Ruby version to 4.0.1 in GitHub Actions workflows. 2026-01-18 09:52:07 +03:00
Muhammad Nawzad
c6e22d8cd1 Update gem dependencies, refactor minor code patterns, and adjust Rubocop configuration. 2026-01-18 09:50:02 +03:00
Brusk Awat
b526ef58ad Merge pull request #10 from arikarim/orm-set
fix: Ensure message payloads are UTF-8 encoded before logging and upd…
2025-12-18 16:13:06 +03:00
Ari Karim
0a0ef429c0 fix: Ensure message payloads are UTF-8 encoded before logging and update gem version. 2025-12-18 15:47:52 +03:00
Brusk Awat
c5e10bd9e2 Merge pull request #9 from arikarim/orm-set
feat: Add setter for ORM configuration.
2025-12-18 14:10:06 +03:00
Ari Karim
c673d7e5a1 chore: Bump gem version to 1.0.5. 2025-12-18 14:08:31 +03:00
Ari Karim
7ec227174b feat: Add setter for ORM configuration. 2025-12-18 14:08:01 +03:00
Brusk Awat
35586d81b4 Merge pull request #8 from arikarim/dev
Updates rabbit_carrots to version 1.0.4 and adds 'English' module req…
2025-05-25 15:15:59 +03:00
Ari Karim
ac06deb86a Refactors process ID handling in Rabbit Carrots plugin to use $$ instead of $PROCESS_ID for improved compatibility 2025-05-25 15:14:30 +03:00
Ari Karim
cb41e257fc Enhances logging functionality by adding additional log level methods (info, warn) to the adapter in Rabbit Carrots core module 2025-05-25 14:10:40 +03:00
Ari Karim
38ca7d3927 Updates rabbit_carrots to version 1.0.4 and adds 'English' module requirement in the Puma plugin 2025-05-25 14:08:25 +03:00
Brusk Awat
a0c5f80b58 Merge pull request #7 from arikarim/dev
Refines logging levels for shutdown, stop, and message handling in Rabbit Carrots service
2025-05-25 11:54:53 +03:00
Ari Karim
7cfe2a5a45 Updates dependencies: bumps rabbit_carrots to 1.0.3, amq-protocol to 2.3.4, bunny to 2.24.0, and set to 1.1.2 2025-05-25 11:44:21 +03:00
Ari Karim
076e1f52ce Enhances logger initialization by introducing a logger adapter for improved compatibility and flexibility 2025-05-25 11:43:41 +03:00
Ari Karim
aee5a1e3cb Refactors process ID handling in Rabbit Carrots plugin and enhances logging for shutdown scenarios 2025-05-25 10:56:35 +03:00
Ari Karim
f68ab83afc Refines logging levels for shutdown, stop, and message handling in Rabbit Carrots service 2025-05-25 10:37:25 +03:00
12 changed files with 151 additions and 89 deletions

View File

@@ -14,7 +14,7 @@ jobs:
strategy: strategy:
matrix: matrix:
ruby: ruby:
- '3.1.2' - '4.0.1'
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v3

View File

@@ -14,7 +14,7 @@ jobs:
- uses: actions/checkout@v2 - uses: actions/checkout@v2
- uses: ruby/setup-ruby@v1 - uses: ruby/setup-ruby@v1
with: with:
ruby-version: 3.1.2 ruby-version: 4.0.1
- name: publish gem - name: publish gem
run: | run: |

View File

@@ -1,4 +1,5 @@
require: rubocop-rails plugins:
- rubocop-rails
AllCops: AllCops:
NewCops: enable NewCops: enable
@@ -111,3 +112,5 @@ Metrics/CyclomaticComplexity:
Max: 15 Max: 15
Metrics/PerceivedComplexity: Metrics/PerceivedComplexity:
Max: 15 Max: 15
Metrics/ParameterLists:
Max: 6

View File

@@ -5,10 +5,10 @@ source 'https://rubygems.org'
# Specify your gem's dependencies in rabbit_carrots.gemspec # Specify your gem's dependencies in rabbit_carrots.gemspec
gemspec gemspec
gem 'rake', '~> 13.1' gem 'rake', '~> 13.3.1'
gem 'rspec', '~> 3.12' gem 'rspec', '~> 3.13.2'
gem 'rubocop', '~> 1.58' gem 'rubocop', '~> 1.82.1'
gem 'rubocop-rails', '~> 2.22' gem 'rubocop-rails', '~> 2.34.3'

View File

@@ -1,102 +1,112 @@
PATH PATH
remote: . remote: .
specs: specs:
rabbit_carrots (1.0.2) rabbit_carrots (1.1.1)
bunny (>= 2.22) bunny (>= 2.22)
connection_pool (~> 2.4) connection_pool (>= 2.4)
GEM GEM
remote: https://rubygems.org/ remote: https://rubygems.org/
specs: specs:
activesupport (7.1.2) activesupport (8.1.2)
base64 base64
bigdecimal bigdecimal
concurrent-ruby (~> 1.0, >= 1.0.2) concurrent-ruby (~> 1.0, >= 1.3.1)
connection_pool (>= 2.2.5) connection_pool (>= 2.2.5)
drb drb
i18n (>= 1.6, < 2) i18n (>= 1.6, < 2)
json
logger (>= 1.4.2)
minitest (>= 5.1) minitest (>= 5.1)
mutex_m securerandom (>= 0.3)
tzinfo (~> 2.0) tzinfo (~> 2.0, >= 2.0.5)
amq-protocol (2.3.2) uri (>= 0.13.1)
ast (2.4.2) amq-protocol (2.5.0)
base64 (0.2.0) ast (2.4.3)
bigdecimal (3.1.4) base64 (0.3.0)
bunny (2.22.0) bigdecimal (4.0.1)
amq-protocol (~> 2.3, >= 2.3.1) bunny (2.24.0)
amq-protocol (~> 2.3)
sorted_set (~> 1, >= 1.0.2) sorted_set (~> 1, >= 1.0.2)
concurrent-ruby (1.2.2) concurrent-ruby (1.3.6)
connection_pool (2.4.1) connection_pool (3.0.2)
diff-lcs (1.5.0) diff-lcs (1.6.2)
drb (2.2.0) drb (2.2.3)
ruby2_keywords i18n (1.14.8)
i18n (1.14.1)
concurrent-ruby (~> 1.0) concurrent-ruby (~> 1.0)
json (2.7.1) json (2.18.0)
language_server-protocol (3.17.0.3) language_server-protocol (3.17.0.5)
minitest (5.20.0) lint_roller (1.1.0)
mutex_m (0.2.0) logger (1.7.0)
parallel (1.23.0) minitest (6.0.1)
parser (3.2.2.4) prism (~> 1.5)
parallel (1.27.0)
parser (3.3.10.1)
ast (~> 2.4.1) ast (~> 2.4.1)
racc racc
racc (1.7.3) prism (1.8.0)
rack (3.0.8) racc (1.8.1)
rack (3.2.4)
rainbow (3.1.1) rainbow (3.1.1)
rake (13.1.0) rake (13.3.1)
rbtree (0.4.6) rbtree (0.4.6)
regexp_parser (2.8.3) regexp_parser (2.11.3)
rexml (3.2.6) rspec (3.13.2)
rspec (3.12.0) rspec-core (~> 3.13.0)
rspec-core (~> 3.12.0) rspec-expectations (~> 3.13.0)
rspec-expectations (~> 3.12.0) rspec-mocks (~> 3.13.0)
rspec-mocks (~> 3.12.0) rspec-core (3.13.6)
rspec-core (3.12.0) rspec-support (~> 3.13.0)
rspec-support (~> 3.12.0) rspec-expectations (3.13.5)
rspec-expectations (3.12.0)
diff-lcs (>= 1.2.0, < 2.0) diff-lcs (>= 1.2.0, < 2.0)
rspec-support (~> 3.12.0) rspec-support (~> 3.13.0)
rspec-mocks (3.12.0) rspec-mocks (3.13.7)
diff-lcs (>= 1.2.0, < 2.0) diff-lcs (>= 1.2.0, < 2.0)
rspec-support (~> 3.12.0) rspec-support (~> 3.13.0)
rspec-support (3.12.0) rspec-support (3.13.6)
rubocop (1.58.0) rubocop (1.82.1)
json (~> 2.3) json (~> 2.3)
language_server-protocol (>= 3.17.0) language_server-protocol (~> 3.17.0.2)
lint_roller (~> 1.1.0)
parallel (~> 1.10) parallel (~> 1.10)
parser (>= 3.2.2.4) parser (>= 3.3.0.2)
rainbow (>= 2.2.2, < 4.0) rainbow (>= 2.2.2, < 4.0)
regexp_parser (>= 1.8, < 3.0) regexp_parser (>= 2.9.3, < 3.0)
rexml (>= 3.2.5, < 4.0) rubocop-ast (>= 1.48.0, < 2.0)
rubocop-ast (>= 1.30.0, < 2.0)
ruby-progressbar (~> 1.7) ruby-progressbar (~> 1.7)
unicode-display_width (>= 2.4.0, < 3.0) unicode-display_width (>= 2.4.0, < 4.0)
rubocop-ast (1.30.0) rubocop-ast (1.49.0)
parser (>= 3.2.1.0) parser (>= 3.3.7.2)
rubocop-rails (2.22.2) prism (~> 1.7)
rubocop-rails (2.34.3)
activesupport (>= 4.2.0) activesupport (>= 4.2.0)
lint_roller (~> 1.1)
rack (>= 1.1) rack (>= 1.1)
rubocop (>= 1.33.0, < 2.0) rubocop (>= 1.75.0, < 2.0)
rubocop-ast (>= 1.30.0, < 2.0) rubocop-ast (>= 1.44.0, < 2.0)
ruby-progressbar (1.13.0) ruby-progressbar (1.13.0)
ruby2_keywords (0.0.5) securerandom (0.4.1)
set (1.1.0) set (1.1.2)
sorted_set (1.0.3) sorted_set (1.0.3)
rbtree rbtree
set (~> 1.0) set (~> 1.0)
tzinfo (2.0.6) tzinfo (2.0.6)
concurrent-ruby (~> 1.0) concurrent-ruby (~> 1.0)
unicode-display_width (2.5.0) unicode-display_width (3.2.0)
unicode-emoji (~> 4.1)
unicode-emoji (4.2.0)
uri (1.1.1)
PLATFORMS PLATFORMS
arm64-darwin-25
x86_64-linux x86_64-linux
DEPENDENCIES DEPENDENCIES
rabbit_carrots! rabbit_carrots!
rake (~> 13.1) rake (~> 13.3.1)
rspec (~> 3.12) rspec (~> 3.13.2)
rubocop (~> 1.58) rubocop (~> 1.82.1)
rubocop-rails (~> 2.22) rubocop-rails (~> 2.34.3)
BUNDLED WITH BUNDLED WITH
2.3.26 2.3.26

View File

@@ -1,6 +1,6 @@
# RabbitCarrots # RabbitCarrots
RabbitCarrots is a simple background worker to handle the consumption of RabbitMQ message in Rails applications. It is an opinionated library that solves the consumption of messages among microservices, given the following conditions: RabbitCarrots is a simple background task based on rake to handle the consumption of RabbitMQ message in Rails applications. It is an opinionated library that solves the consumption of messages among microservices, given the following conditions:
1. RabbitMQ is used as an event bus for communication. 1. RabbitMQ is used as an event bus for communication.
2. Messages are routed using a single exchange, multiple routing keys. 2. Messages are routed using a single exchange, multiple routing keys.

View File

@@ -1,5 +1,5 @@
# rabbit_carrots.rb # rabbit_carrots.rb
require 'English'
require 'puma/plugin' require 'puma/plugin'
require 'rabbit_carrots' require 'rabbit_carrots'
@@ -8,7 +8,7 @@ Puma::Plugin.create do
def start(launcher) def start(launcher)
@log_writer = launcher.log_writer @log_writer = launcher.log_writer
@puma_pid = $$ @puma_pid = $PROCESS_ID
@core_service = RabbitCarrots::Core.new(logger: log_writer) @core_service = RabbitCarrots::Core.new(logger: log_writer)
@@ -43,6 +43,7 @@ Puma::Plugin.create do
Process.kill('TERM', rabbit_carrots_pid) Process.kill('TERM', rabbit_carrots_pid)
Process.wait(rabbit_carrots_pid) Process.wait(rabbit_carrots_pid)
rescue Errno::ECHILD, Errno::ESRCH rescue Errno::ECHILD, Errno::ESRCH
log 'Rabbit Carrots already stopped'
end end
def monitor_puma def monitor_puma
@@ -57,7 +58,7 @@ Puma::Plugin.create do
loop do loop do
if send(process_dead) if send(process_dead)
log message log message
Process.kill('TERM', $$) Process.kill('TERM', $PROCESS_ID)
break break
end end
sleep 2 sleep 2

View File

@@ -18,10 +18,12 @@ module RabbitCarrots
:rabbitmq_exchange_name, :rabbitmq_exchange_name,
:automatically_recover, :automatically_recover,
:network_recovery_interval, :network_recovery_interval,
:recovery_attempts, :recovery_attempts
:orm
def orm def orm
@orm ||= :activerecord @orm ||= :activerecord
end end
attr_writer :orm
end end
end end

View File

@@ -3,6 +3,7 @@ require 'singleton'
module RabbitCarrots module RabbitCarrots
class Connection class Connection
include ::Singleton include ::Singleton
attr_reader :connection attr_reader :connection
def initialize def initialize

View File

@@ -1,5 +1,5 @@
module RabbitCarrots module RabbitCarrots
class Core class Core # rubocop:disable Metrics/ClassLength
attr_reader :logger attr_reader :logger
@database_agnostic_not_null_violation = nil @database_agnostic_not_null_violation = nil
@@ -11,7 +11,7 @@ module RabbitCarrots
end end
def initialize(logger: nil) def initialize(logger: nil)
@logger = logger || Logger.new(Rails.env.production? ? '/proc/self/fd/1' : $stdout) @logger = create_logger_adapter(logger || Logger.new(Rails.env.production? ? '/proc/self/fd/1' : $stdout))
@threads = [] @threads = []
@running = true @running = true
@shutdown_requested = false @shutdown_requested = false
@@ -36,6 +36,7 @@ module RabbitCarrots
handler_class:, handler_class:,
routing_keys: channel[:routing_keys], routing_keys: channel[:routing_keys],
queue_arguments: channel[:arguments], queue_arguments: channel[:arguments],
exchange_name: channel[:exchange_name],
kill_to_restart_on_standard_error: kill_to_restart_on_standard_error:
) )
end end
@@ -61,7 +62,7 @@ module RabbitCarrots
def request_shutdown def request_shutdown
# Workaround to a known issue with Signal Traps and logs # Workaround to a known issue with Signal Traps and logs
Thread.start do Thread.start do
logger.log 'Shutting down Rabbit Carrots service...' logger.error 'Shutting down Rabbit Carrots service...'
end end
@shutdown_requested = true @shutdown_requested = true
@threads.each(&:kill) @threads.each(&:kill)
@@ -71,16 +72,24 @@ module RabbitCarrots
def stop def stop
# Workaround to a known issue with Signal Traps and logs # Workaround to a known issue with Signal Traps and logs
Thread.start do Thread.start do
logger.log 'Stoppig the Rabbit Carrots service...' logger.error 'Stoppig the Rabbit Carrots service...'
end end
@running = false @running = false
end end
def run_task(queue_name:, handler_class:, routing_keys:, queue_arguments: {}, kill_to_restart_on_standard_error: false) def run_task(queue_name:, handler_class:, routing_keys:, queue_arguments: {}, exchange_name: nil, kill_to_restart_on_standard_error: false)
RabbitCarrots::Connection.instance.channel.with do |channel| RabbitCarrots::Connection.instance.channel.with do |channel|
exchange = channel.topic(RabbitCarrots.configuration.rabbitmq_exchange_name, durable: true) exchange_name ||= RabbitCarrots.configuration.rabbitmq_exchange_name
logger.log "Listening on QUEUE: #{queue_name} for ROUTING KEYS: #{routing_keys}" begin
# Try to passively read an existing exchange without declaring it
exchange = channel.topic(exchange_name, passive: true)
rescue Bunny::NotFound
# If the exchange does not exist, declare it
exchange = channel.topic(exchange_name, durable: true)
end
logger.info "Listening on QUEUE: #{queue_name} for ROUTING KEYS: #{routing_keys}"
queue = channel.queue(queue_name, durable: true, arguments: queue_arguments) queue = channel.queue(queue_name, durable: true, arguments: queue_arguments)
routing_keys.map(&:strip).each { |k| queue.bind(exchange, routing_key: k) } routing_keys.map(&:strip).each { |k| queue.bind(exchange, routing_key: k) }
@@ -88,24 +97,29 @@ module RabbitCarrots
queue.subscribe(block: false, manual_ack: true, prefetch: 10) do |delivery_info, properties, payload| queue.subscribe(block: false, manual_ack: true, prefetch: 10) do |delivery_info, properties, payload|
break if @shutdown_requested break if @shutdown_requested
logger.log "Received from queue: #{queue_name}, Routing Keys: #{routing_keys}" logger.info "Received from queue: #{queue_name}, Routing Keys: #{routing_keys}"
handler_class.handle!(channel, delivery_info, properties, payload) handler_class.handle!(channel, delivery_info, properties, payload)
channel.ack(delivery_info.delivery_tag, false) channel.ack(delivery_info.delivery_tag, false)
rescue RabbitCarrots::EventHandlers::Errors::NackMessage, JSON::ParserError => _e rescue RabbitCarrots::EventHandlers::Errors::NackMessage, JSON::ParserError => _e
logger.log "Nacked message: #{payload}" payload = encode_payload(payload)
logger.warn "Nacked message: #{payload}"
channel.nack(delivery_info.delivery_tag, false, false) channel.nack(delivery_info.delivery_tag, false, false)
rescue RabbitCarrots::EventHandlers::Errors::NackAndRequeueMessage => _e rescue RabbitCarrots::EventHandlers::Errors::NackAndRequeueMessage => _e
logger.log "Nacked and Requeued message: #{payload}" payload = encode_payload(payload)
logger.warn "Nacked and Requeued message: #{payload}"
channel.nack(delivery_info.delivery_tag, false, true) channel.nack(delivery_info.delivery_tag, false, true)
rescue self.class.database_agnostic_not_null_violation, self.class.database_agnostic_record_invalid => e rescue self.class.database_agnostic_not_null_violation, self.class.database_agnostic_record_invalid => e
logger.log "Null constraint or Invalid violation: #{payload}. Error: #{e.message}" payload = encode_payload(payload)
logger.warn "Null constraint or Invalid violation: #{payload}. Error: #{e.message}"
channel.ack(delivery_info.delivery_tag, false) channel.ack(delivery_info.delivery_tag, false)
rescue self.class.database_agnostic_connection_not_established => e rescue self.class.database_agnostic_connection_not_established => e
logger.log "Error connection not established to the database: #{payload}. Error: #{e.message}" payload = encode_payload(payload)
logger.warn "Error connection not established to the database: #{payload}. Error: #{e.message}"
sleep 3 sleep 3
channel.nack(delivery_info.delivery_tag, false, true) channel.nack(delivery_info.delivery_tag, false, true)
rescue StandardError => e rescue StandardError => e
logger.log "Error handling message: #{payload}. Error: #{e.message}" payload = encode_payload(payload)
logger.error "Error handling message: #{payload}. Error: #{e.message}"
sleep 3 sleep 3
channel.nack(delivery_info.delivery_tag, false, true) channel.nack(delivery_info.delivery_tag, false, true)
Process.kill('SIGTERM', Process.pid) if kill_to_restart_on_standard_error Process.kill('SIGTERM', Process.pid) if kill_to_restart_on_standard_error
@@ -115,5 +129,36 @@ module RabbitCarrots
logger.error "Bunny session error: #{e.message}" logger.error "Bunny session error: #{e.message}"
request_shutdown request_shutdown
end end
private
def create_logger_adapter(logger)
return logger if logger.respond_to?(:info) && logger.respond_to?(:error) && logger.respond_to?(:warn)
adapter = Object.new
def adapter.info(msg)
@logger.write("[INFO] #{msg}\n")
end
def adapter.error(msg)
@logger.write("[ERROR] #{msg}\n")
end
def adapter.warn(msg)
@logger.write("[WARN] #{msg}\n")
end
adapter.instance_variable_set(:@logger, logger)
adapter
end
def encode_payload(payload)
payload.encode(
'UTF-8',
invalid: :replace,
undef: :replace,
replace: ''
)
end
end end
end end

View File

@@ -1,5 +1,5 @@
# frozen_string_literal: true # frozen_string_literal: true
module RabbitCarrots module RabbitCarrots
VERSION = '1.0.2' VERSION = '1.1.1'
end end

View File

@@ -33,7 +33,7 @@ Gem::Specification.new do |spec|
# Uncomment to register a new dependency of your gem # Uncomment to register a new dependency of your gem
spec.add_dependency 'bunny', '>= 2.22' spec.add_dependency 'bunny', '>= 2.22'
spec.add_dependency 'connection_pool', '~> 2.4' spec.add_dependency 'connection_pool', '>= 2.4'
# For more information and examples about making a new gem, check out our # For more information and examples about making a new gem, check out our
# guide at: https://bundler.io/guides/creating_gem.html # guide at: https://bundler.io/guides/creating_gem.html