1 Commits

Author SHA1 Message Date
Brusk Awat
a2dd059194 Update README.md 2024-08-16 00:48:01 +03:00
12 changed files with 89 additions and 151 deletions

View File

@@ -14,7 +14,7 @@ jobs:
strategy: strategy:
matrix: matrix:
ruby: ruby:
- '4.0.1' - '3.1.2'
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v3

View File

@@ -14,7 +14,7 @@ jobs:
- uses: actions/checkout@v2 - uses: actions/checkout@v2
- uses: ruby/setup-ruby@v1 - uses: ruby/setup-ruby@v1
with: with:
ruby-version: 4.0.1 ruby-version: 3.1.2
- name: publish gem - name: publish gem
run: | run: |

View File

@@ -1,5 +1,4 @@
plugins: require: rubocop-rails
- rubocop-rails
AllCops: AllCops:
NewCops: enable NewCops: enable
@@ -112,5 +111,3 @@ Metrics/CyclomaticComplexity:
Max: 15 Max: 15
Metrics/PerceivedComplexity: Metrics/PerceivedComplexity:
Max: 15 Max: 15
Metrics/ParameterLists:
Max: 6

View File

@@ -5,10 +5,10 @@ source 'https://rubygems.org'
# Specify your gem's dependencies in rabbit_carrots.gemspec # Specify your gem's dependencies in rabbit_carrots.gemspec
gemspec gemspec
gem 'rake', '~> 13.3.1' gem 'rake', '~> 13.1'
gem 'rspec', '~> 3.13.2' gem 'rspec', '~> 3.12'
gem 'rubocop', '~> 1.82.1' gem 'rubocop', '~> 1.58'
gem 'rubocop-rails', '~> 2.34.3' gem 'rubocop-rails', '~> 2.22'

View File

@@ -1,112 +1,102 @@
PATH PATH
remote: . remote: .
specs: specs:
rabbit_carrots (1.1.1) rabbit_carrots (1.0.2)
bunny (>= 2.22) bunny (>= 2.22)
connection_pool (>= 2.4) connection_pool (~> 2.4)
GEM GEM
remote: https://rubygems.org/ remote: https://rubygems.org/
specs: specs:
activesupport (8.1.2) activesupport (7.1.2)
base64 base64
bigdecimal bigdecimal
concurrent-ruby (~> 1.0, >= 1.3.1) concurrent-ruby (~> 1.0, >= 1.0.2)
connection_pool (>= 2.2.5) connection_pool (>= 2.2.5)
drb drb
i18n (>= 1.6, < 2) i18n (>= 1.6, < 2)
json
logger (>= 1.4.2)
minitest (>= 5.1) minitest (>= 5.1)
securerandom (>= 0.3) mutex_m
tzinfo (~> 2.0, >= 2.0.5) tzinfo (~> 2.0)
uri (>= 0.13.1) amq-protocol (2.3.2)
amq-protocol (2.5.0) ast (2.4.2)
ast (2.4.3) base64 (0.2.0)
base64 (0.3.0) bigdecimal (3.1.4)
bigdecimal (4.0.1) bunny (2.22.0)
bunny (2.24.0) amq-protocol (~> 2.3, >= 2.3.1)
amq-protocol (~> 2.3)
sorted_set (~> 1, >= 1.0.2) sorted_set (~> 1, >= 1.0.2)
concurrent-ruby (1.3.6) concurrent-ruby (1.2.2)
connection_pool (3.0.2) connection_pool (2.4.1)
diff-lcs (1.6.2) diff-lcs (1.5.0)
drb (2.2.3) drb (2.2.0)
i18n (1.14.8) ruby2_keywords
i18n (1.14.1)
concurrent-ruby (~> 1.0) concurrent-ruby (~> 1.0)
json (2.18.0) json (2.7.1)
language_server-protocol (3.17.0.5) language_server-protocol (3.17.0.3)
lint_roller (1.1.0) minitest (5.20.0)
logger (1.7.0) mutex_m (0.2.0)
minitest (6.0.1) parallel (1.23.0)
prism (~> 1.5) parser (3.2.2.4)
parallel (1.27.0)
parser (3.3.10.1)
ast (~> 2.4.1) ast (~> 2.4.1)
racc racc
prism (1.8.0) racc (1.7.3)
racc (1.8.1) rack (3.0.8)
rack (3.2.4)
rainbow (3.1.1) rainbow (3.1.1)
rake (13.3.1) rake (13.1.0)
rbtree (0.4.6) rbtree (0.4.6)
regexp_parser (2.11.3) regexp_parser (2.8.3)
rspec (3.13.2) rexml (3.2.6)
rspec-core (~> 3.13.0) rspec (3.12.0)
rspec-expectations (~> 3.13.0) rspec-core (~> 3.12.0)
rspec-mocks (~> 3.13.0) rspec-expectations (~> 3.12.0)
rspec-core (3.13.6) rspec-mocks (~> 3.12.0)
rspec-support (~> 3.13.0) rspec-core (3.12.0)
rspec-expectations (3.13.5) rspec-support (~> 3.12.0)
rspec-expectations (3.12.0)
diff-lcs (>= 1.2.0, < 2.0) diff-lcs (>= 1.2.0, < 2.0)
rspec-support (~> 3.13.0) rspec-support (~> 3.12.0)
rspec-mocks (3.13.7) rspec-mocks (3.12.0)
diff-lcs (>= 1.2.0, < 2.0) diff-lcs (>= 1.2.0, < 2.0)
rspec-support (~> 3.13.0) rspec-support (~> 3.12.0)
rspec-support (3.13.6) rspec-support (3.12.0)
rubocop (1.82.1) rubocop (1.58.0)
json (~> 2.3) json (~> 2.3)
language_server-protocol (~> 3.17.0.2) language_server-protocol (>= 3.17.0)
lint_roller (~> 1.1.0)
parallel (~> 1.10) parallel (~> 1.10)
parser (>= 3.3.0.2) parser (>= 3.2.2.4)
rainbow (>= 2.2.2, < 4.0) rainbow (>= 2.2.2, < 4.0)
regexp_parser (>= 2.9.3, < 3.0) regexp_parser (>= 1.8, < 3.0)
rubocop-ast (>= 1.48.0, < 2.0) rexml (>= 3.2.5, < 4.0)
rubocop-ast (>= 1.30.0, < 2.0)
ruby-progressbar (~> 1.7) ruby-progressbar (~> 1.7)
unicode-display_width (>= 2.4.0, < 4.0) unicode-display_width (>= 2.4.0, < 3.0)
rubocop-ast (1.49.0) rubocop-ast (1.30.0)
parser (>= 3.3.7.2) parser (>= 3.2.1.0)
prism (~> 1.7) rubocop-rails (2.22.2)
rubocop-rails (2.34.3)
activesupport (>= 4.2.0) activesupport (>= 4.2.0)
lint_roller (~> 1.1)
rack (>= 1.1) rack (>= 1.1)
rubocop (>= 1.75.0, < 2.0) rubocop (>= 1.33.0, < 2.0)
rubocop-ast (>= 1.44.0, < 2.0) rubocop-ast (>= 1.30.0, < 2.0)
ruby-progressbar (1.13.0) ruby-progressbar (1.13.0)
securerandom (0.4.1) ruby2_keywords (0.0.5)
set (1.1.2) set (1.1.0)
sorted_set (1.0.3) sorted_set (1.0.3)
rbtree rbtree
set (~> 1.0) set (~> 1.0)
tzinfo (2.0.6) tzinfo (2.0.6)
concurrent-ruby (~> 1.0) concurrent-ruby (~> 1.0)
unicode-display_width (3.2.0) unicode-display_width (2.5.0)
unicode-emoji (~> 4.1)
unicode-emoji (4.2.0)
uri (1.1.1)
PLATFORMS PLATFORMS
arm64-darwin-25
x86_64-linux x86_64-linux
DEPENDENCIES DEPENDENCIES
rabbit_carrots! rabbit_carrots!
rake (~> 13.3.1) rake (~> 13.1)
rspec (~> 3.13.2) rspec (~> 3.12)
rubocop (~> 1.82.1) rubocop (~> 1.58)
rubocop-rails (~> 2.34.3) rubocop-rails (~> 2.22)
BUNDLED WITH BUNDLED WITH
2.3.26 2.3.26

View File

@@ -1,6 +1,6 @@
# RabbitCarrots # RabbitCarrots
RabbitCarrots is a simple background task based on rake to handle the consumption of RabbitMQ message in Rails applications. It is an opinionated library that solves the consumption of messages among microservices, given the following conditions: RabbitCarrots is a simple background worker to handle the consumption of RabbitMQ message in Rails applications. It is an opinionated library that solves the consumption of messages among microservices, given the following conditions:
1. RabbitMQ is used as an event bus for communication. 1. RabbitMQ is used as an event bus for communication.
2. Messages are routed using a single exchange, multiple routing keys. 2. Messages are routed using a single exchange, multiple routing keys.

View File

@@ -1,5 +1,5 @@
# rabbit_carrots.rb # rabbit_carrots.rb
require 'English'
require 'puma/plugin' require 'puma/plugin'
require 'rabbit_carrots' require 'rabbit_carrots'
@@ -8,7 +8,7 @@ Puma::Plugin.create do
def start(launcher) def start(launcher)
@log_writer = launcher.log_writer @log_writer = launcher.log_writer
@puma_pid = $PROCESS_ID @puma_pid = $$
@core_service = RabbitCarrots::Core.new(logger: log_writer) @core_service = RabbitCarrots::Core.new(logger: log_writer)
@@ -43,7 +43,6 @@ Puma::Plugin.create do
Process.kill('TERM', rabbit_carrots_pid) Process.kill('TERM', rabbit_carrots_pid)
Process.wait(rabbit_carrots_pid) Process.wait(rabbit_carrots_pid)
rescue Errno::ECHILD, Errno::ESRCH rescue Errno::ECHILD, Errno::ESRCH
log 'Rabbit Carrots already stopped'
end end
def monitor_puma def monitor_puma
@@ -58,7 +57,7 @@ Puma::Plugin.create do
loop do loop do
if send(process_dead) if send(process_dead)
log message log message
Process.kill('TERM', $PROCESS_ID) Process.kill('TERM', $$)
break break
end end
sleep 2 sleep 2

View File

@@ -18,12 +18,10 @@ module RabbitCarrots
:rabbitmq_exchange_name, :rabbitmq_exchange_name,
:automatically_recover, :automatically_recover,
:network_recovery_interval, :network_recovery_interval,
:recovery_attempts :recovery_attempts,
:orm
def orm def orm
@orm ||= :activerecord @orm ||= :activerecord
end end
attr_writer :orm
end end
end end

View File

@@ -3,7 +3,6 @@ require 'singleton'
module RabbitCarrots module RabbitCarrots
class Connection class Connection
include ::Singleton include ::Singleton
attr_reader :connection attr_reader :connection
def initialize def initialize

View File

@@ -1,5 +1,5 @@
module RabbitCarrots module RabbitCarrots
class Core # rubocop:disable Metrics/ClassLength class Core
attr_reader :logger attr_reader :logger
@database_agnostic_not_null_violation = nil @database_agnostic_not_null_violation = nil
@@ -11,7 +11,7 @@ module RabbitCarrots
end end
def initialize(logger: nil) def initialize(logger: nil)
@logger = create_logger_adapter(logger || Logger.new(Rails.env.production? ? '/proc/self/fd/1' : $stdout)) @logger = logger || Logger.new(Rails.env.production? ? '/proc/self/fd/1' : $stdout)
@threads = [] @threads = []
@running = true @running = true
@shutdown_requested = false @shutdown_requested = false
@@ -36,7 +36,6 @@ module RabbitCarrots
handler_class:, handler_class:,
routing_keys: channel[:routing_keys], routing_keys: channel[:routing_keys],
queue_arguments: channel[:arguments], queue_arguments: channel[:arguments],
exchange_name: channel[:exchange_name],
kill_to_restart_on_standard_error: kill_to_restart_on_standard_error:
) )
end end
@@ -62,7 +61,7 @@ module RabbitCarrots
def request_shutdown def request_shutdown
# Workaround to a known issue with Signal Traps and logs # Workaround to a known issue with Signal Traps and logs
Thread.start do Thread.start do
logger.error 'Shutting down Rabbit Carrots service...' logger.log 'Shutting down Rabbit Carrots service...'
end end
@shutdown_requested = true @shutdown_requested = true
@threads.each(&:kill) @threads.each(&:kill)
@@ -72,24 +71,16 @@ module RabbitCarrots
def stop def stop
# Workaround to a known issue with Signal Traps and logs # Workaround to a known issue with Signal Traps and logs
Thread.start do Thread.start do
logger.error 'Stoppig the Rabbit Carrots service...' logger.log 'Stoppig the Rabbit Carrots service...'
end end
@running = false @running = false
end end
def run_task(queue_name:, handler_class:, routing_keys:, queue_arguments: {}, exchange_name: nil, kill_to_restart_on_standard_error: false) def run_task(queue_name:, handler_class:, routing_keys:, queue_arguments: {}, kill_to_restart_on_standard_error: false)
RabbitCarrots::Connection.instance.channel.with do |channel| RabbitCarrots::Connection.instance.channel.with do |channel|
exchange_name ||= RabbitCarrots.configuration.rabbitmq_exchange_name exchange = channel.topic(RabbitCarrots.configuration.rabbitmq_exchange_name, durable: true)
begin logger.log "Listening on QUEUE: #{queue_name} for ROUTING KEYS: #{routing_keys}"
# Try to passively read an existing exchange without declaring it
exchange = channel.topic(exchange_name, passive: true)
rescue Bunny::NotFound
# If the exchange does not exist, declare it
exchange = channel.topic(exchange_name, durable: true)
end
logger.info "Listening on QUEUE: #{queue_name} for ROUTING KEYS: #{routing_keys}"
queue = channel.queue(queue_name, durable: true, arguments: queue_arguments) queue = channel.queue(queue_name, durable: true, arguments: queue_arguments)
routing_keys.map(&:strip).each { |k| queue.bind(exchange, routing_key: k) } routing_keys.map(&:strip).each { |k| queue.bind(exchange, routing_key: k) }
@@ -97,29 +88,24 @@ module RabbitCarrots
queue.subscribe(block: false, manual_ack: true, prefetch: 10) do |delivery_info, properties, payload| queue.subscribe(block: false, manual_ack: true, prefetch: 10) do |delivery_info, properties, payload|
break if @shutdown_requested break if @shutdown_requested
logger.info "Received from queue: #{queue_name}, Routing Keys: #{routing_keys}" logger.log "Received from queue: #{queue_name}, Routing Keys: #{routing_keys}"
handler_class.handle!(channel, delivery_info, properties, payload) handler_class.handle!(channel, delivery_info, properties, payload)
channel.ack(delivery_info.delivery_tag, false) channel.ack(delivery_info.delivery_tag, false)
rescue RabbitCarrots::EventHandlers::Errors::NackMessage, JSON::ParserError => _e rescue RabbitCarrots::EventHandlers::Errors::NackMessage, JSON::ParserError => _e
payload = encode_payload(payload) logger.log "Nacked message: #{payload}"
logger.warn "Nacked message: #{payload}"
channel.nack(delivery_info.delivery_tag, false, false) channel.nack(delivery_info.delivery_tag, false, false)
rescue RabbitCarrots::EventHandlers::Errors::NackAndRequeueMessage => _e rescue RabbitCarrots::EventHandlers::Errors::NackAndRequeueMessage => _e
payload = encode_payload(payload) logger.log "Nacked and Requeued message: #{payload}"
logger.warn "Nacked and Requeued message: #{payload}"
channel.nack(delivery_info.delivery_tag, false, true) channel.nack(delivery_info.delivery_tag, false, true)
rescue self.class.database_agnostic_not_null_violation, self.class.database_agnostic_record_invalid => e rescue self.class.database_agnostic_not_null_violation, self.class.database_agnostic_record_invalid => e
payload = encode_payload(payload) logger.log "Null constraint or Invalid violation: #{payload}. Error: #{e.message}"
logger.warn "Null constraint or Invalid violation: #{payload}. Error: #{e.message}"
channel.ack(delivery_info.delivery_tag, false) channel.ack(delivery_info.delivery_tag, false)
rescue self.class.database_agnostic_connection_not_established => e rescue self.class.database_agnostic_connection_not_established => e
payload = encode_payload(payload) logger.log "Error connection not established to the database: #{payload}. Error: #{e.message}"
logger.warn "Error connection not established to the database: #{payload}. Error: #{e.message}"
sleep 3 sleep 3
channel.nack(delivery_info.delivery_tag, false, true) channel.nack(delivery_info.delivery_tag, false, true)
rescue StandardError => e rescue StandardError => e
payload = encode_payload(payload) logger.log "Error handling message: #{payload}. Error: #{e.message}"
logger.error "Error handling message: #{payload}. Error: #{e.message}"
sleep 3 sleep 3
channel.nack(delivery_info.delivery_tag, false, true) channel.nack(delivery_info.delivery_tag, false, true)
Process.kill('SIGTERM', Process.pid) if kill_to_restart_on_standard_error Process.kill('SIGTERM', Process.pid) if kill_to_restart_on_standard_error
@@ -129,36 +115,5 @@ module RabbitCarrots
logger.error "Bunny session error: #{e.message}" logger.error "Bunny session error: #{e.message}"
request_shutdown request_shutdown
end end
private
def create_logger_adapter(logger)
return logger if logger.respond_to?(:info) && logger.respond_to?(:error) && logger.respond_to?(:warn)
adapter = Object.new
def adapter.info(msg)
@logger.write("[INFO] #{msg}\n")
end
def adapter.error(msg)
@logger.write("[ERROR] #{msg}\n")
end
def adapter.warn(msg)
@logger.write("[WARN] #{msg}\n")
end
adapter.instance_variable_set(:@logger, logger)
adapter
end
def encode_payload(payload)
payload.encode(
'UTF-8',
invalid: :replace,
undef: :replace,
replace: ''
)
end
end end
end end

View File

@@ -1,5 +1,5 @@
# frozen_string_literal: true # frozen_string_literal: true
module RabbitCarrots module RabbitCarrots
VERSION = '1.1.1' VERSION = '1.0.2'
end end

View File

@@ -33,7 +33,7 @@ Gem::Specification.new do |spec|
# Uncomment to register a new dependency of your gem # Uncomment to register a new dependency of your gem
spec.add_dependency 'bunny', '>= 2.22' spec.add_dependency 'bunny', '>= 2.22'
spec.add_dependency 'connection_pool', '>= 2.4' spec.add_dependency 'connection_pool', '~> 2.4'
# For more information and examples about making a new gem, check out our # For more information and examples about making a new gem, check out our
# guide at: https://bundler.io/guides/creating_gem.html # guide at: https://bundler.io/guides/creating_gem.html