1 Commits

Author SHA1 Message Date
Brusk Awat
a2dd059194 Update README.md 2024-08-16 00:48:01 +03:00
12 changed files with 89 additions and 151 deletions

View File

@@ -14,7 +14,7 @@ jobs:
strategy:
matrix:
ruby:
- '4.0.1'
- '3.1.2'
steps:
- uses: actions/checkout@v3

View File

@@ -14,7 +14,7 @@ jobs:
- uses: actions/checkout@v2
- uses: ruby/setup-ruby@v1
with:
ruby-version: 4.0.1
ruby-version: 3.1.2
- name: publish gem
run: |

View File

@@ -1,5 +1,4 @@
plugins:
- rubocop-rails
require: rubocop-rails
AllCops:
NewCops: enable
@@ -112,5 +111,3 @@ Metrics/CyclomaticComplexity:
Max: 15
Metrics/PerceivedComplexity:
Max: 15
Metrics/ParameterLists:
Max: 6

View File

@@ -5,10 +5,10 @@ source 'https://rubygems.org'
# Specify your gem's dependencies in rabbit_carrots.gemspec
gemspec
gem 'rake', '~> 13.3.1'
gem 'rake', '~> 13.1'
gem 'rspec', '~> 3.13.2'
gem 'rspec', '~> 3.12'
gem 'rubocop', '~> 1.82.1'
gem 'rubocop', '~> 1.58'
gem 'rubocop-rails', '~> 2.34.3'
gem 'rubocop-rails', '~> 2.22'

View File

@@ -1,112 +1,102 @@
PATH
remote: .
specs:
rabbit_carrots (1.1.1)
rabbit_carrots (1.0.2)
bunny (>= 2.22)
connection_pool (>= 2.4)
connection_pool (~> 2.4)
GEM
remote: https://rubygems.org/
specs:
activesupport (8.1.2)
activesupport (7.1.2)
base64
bigdecimal
concurrent-ruby (~> 1.0, >= 1.3.1)
concurrent-ruby (~> 1.0, >= 1.0.2)
connection_pool (>= 2.2.5)
drb
i18n (>= 1.6, < 2)
json
logger (>= 1.4.2)
minitest (>= 5.1)
securerandom (>= 0.3)
tzinfo (~> 2.0, >= 2.0.5)
uri (>= 0.13.1)
amq-protocol (2.5.0)
ast (2.4.3)
base64 (0.3.0)
bigdecimal (4.0.1)
bunny (2.24.0)
amq-protocol (~> 2.3)
mutex_m
tzinfo (~> 2.0)
amq-protocol (2.3.2)
ast (2.4.2)
base64 (0.2.0)
bigdecimal (3.1.4)
bunny (2.22.0)
amq-protocol (~> 2.3, >= 2.3.1)
sorted_set (~> 1, >= 1.0.2)
concurrent-ruby (1.3.6)
connection_pool (3.0.2)
diff-lcs (1.6.2)
drb (2.2.3)
i18n (1.14.8)
concurrent-ruby (1.2.2)
connection_pool (2.4.1)
diff-lcs (1.5.0)
drb (2.2.0)
ruby2_keywords
i18n (1.14.1)
concurrent-ruby (~> 1.0)
json (2.18.0)
language_server-protocol (3.17.0.5)
lint_roller (1.1.0)
logger (1.7.0)
minitest (6.0.1)
prism (~> 1.5)
parallel (1.27.0)
parser (3.3.10.1)
json (2.7.1)
language_server-protocol (3.17.0.3)
minitest (5.20.0)
mutex_m (0.2.0)
parallel (1.23.0)
parser (3.2.2.4)
ast (~> 2.4.1)
racc
prism (1.8.0)
racc (1.8.1)
rack (3.2.4)
racc (1.7.3)
rack (3.0.8)
rainbow (3.1.1)
rake (13.3.1)
rake (13.1.0)
rbtree (0.4.6)
regexp_parser (2.11.3)
rspec (3.13.2)
rspec-core (~> 3.13.0)
rspec-expectations (~> 3.13.0)
rspec-mocks (~> 3.13.0)
rspec-core (3.13.6)
rspec-support (~> 3.13.0)
rspec-expectations (3.13.5)
regexp_parser (2.8.3)
rexml (3.2.6)
rspec (3.12.0)
rspec-core (~> 3.12.0)
rspec-expectations (~> 3.12.0)
rspec-mocks (~> 3.12.0)
rspec-core (3.12.0)
rspec-support (~> 3.12.0)
rspec-expectations (3.12.0)
diff-lcs (>= 1.2.0, < 2.0)
rspec-support (~> 3.13.0)
rspec-mocks (3.13.7)
rspec-support (~> 3.12.0)
rspec-mocks (3.12.0)
diff-lcs (>= 1.2.0, < 2.0)
rspec-support (~> 3.13.0)
rspec-support (3.13.6)
rubocop (1.82.1)
rspec-support (~> 3.12.0)
rspec-support (3.12.0)
rubocop (1.58.0)
json (~> 2.3)
language_server-protocol (~> 3.17.0.2)
lint_roller (~> 1.1.0)
language_server-protocol (>= 3.17.0)
parallel (~> 1.10)
parser (>= 3.3.0.2)
parser (>= 3.2.2.4)
rainbow (>= 2.2.2, < 4.0)
regexp_parser (>= 2.9.3, < 3.0)
rubocop-ast (>= 1.48.0, < 2.0)
regexp_parser (>= 1.8, < 3.0)
rexml (>= 3.2.5, < 4.0)
rubocop-ast (>= 1.30.0, < 2.0)
ruby-progressbar (~> 1.7)
unicode-display_width (>= 2.4.0, < 4.0)
rubocop-ast (1.49.0)
parser (>= 3.3.7.2)
prism (~> 1.7)
rubocop-rails (2.34.3)
unicode-display_width (>= 2.4.0, < 3.0)
rubocop-ast (1.30.0)
parser (>= 3.2.1.0)
rubocop-rails (2.22.2)
activesupport (>= 4.2.0)
lint_roller (~> 1.1)
rack (>= 1.1)
rubocop (>= 1.75.0, < 2.0)
rubocop-ast (>= 1.44.0, < 2.0)
rubocop (>= 1.33.0, < 2.0)
rubocop-ast (>= 1.30.0, < 2.0)
ruby-progressbar (1.13.0)
securerandom (0.4.1)
set (1.1.2)
ruby2_keywords (0.0.5)
set (1.1.0)
sorted_set (1.0.3)
rbtree
set (~> 1.0)
tzinfo (2.0.6)
concurrent-ruby (~> 1.0)
unicode-display_width (3.2.0)
unicode-emoji (~> 4.1)
unicode-emoji (4.2.0)
uri (1.1.1)
unicode-display_width (2.5.0)
PLATFORMS
arm64-darwin-25
x86_64-linux
DEPENDENCIES
rabbit_carrots!
rake (~> 13.3.1)
rspec (~> 3.13.2)
rubocop (~> 1.82.1)
rubocop-rails (~> 2.34.3)
rake (~> 13.1)
rspec (~> 3.12)
rubocop (~> 1.58)
rubocop-rails (~> 2.22)
BUNDLED WITH
2.3.26

View File

@@ -1,6 +1,6 @@
# RabbitCarrots
RabbitCarrots is a simple background task based on rake to handle the consumption of RabbitMQ message in Rails applications. It is an opinionated library that solves the consumption of messages among microservices, given the following conditions:
RabbitCarrots is a simple background worker to handle the consumption of RabbitMQ message in Rails applications. It is an opinionated library that solves the consumption of messages among microservices, given the following conditions:
1. RabbitMQ is used as an event bus for communication.
2. Messages are routed using a single exchange, multiple routing keys.

View File

@@ -1,5 +1,5 @@
# rabbit_carrots.rb
require 'English'
require 'puma/plugin'
require 'rabbit_carrots'
@@ -8,7 +8,7 @@ Puma::Plugin.create do
def start(launcher)
@log_writer = launcher.log_writer
@puma_pid = $PROCESS_ID
@puma_pid = $$
@core_service = RabbitCarrots::Core.new(logger: log_writer)
@@ -43,7 +43,6 @@ Puma::Plugin.create do
Process.kill('TERM', rabbit_carrots_pid)
Process.wait(rabbit_carrots_pid)
rescue Errno::ECHILD, Errno::ESRCH
log 'Rabbit Carrots already stopped'
end
def monitor_puma
@@ -58,7 +57,7 @@ Puma::Plugin.create do
loop do
if send(process_dead)
log message
Process.kill('TERM', $PROCESS_ID)
Process.kill('TERM', $$)
break
end
sleep 2

View File

@@ -18,12 +18,10 @@ module RabbitCarrots
:rabbitmq_exchange_name,
:automatically_recover,
:network_recovery_interval,
:recovery_attempts
:recovery_attempts,
:orm
def orm
@orm ||= :activerecord
end
attr_writer :orm
end
end

View File

@@ -3,7 +3,6 @@ require 'singleton'
module RabbitCarrots
class Connection
include ::Singleton
attr_reader :connection
def initialize

View File

@@ -1,5 +1,5 @@
module RabbitCarrots
class Core # rubocop:disable Metrics/ClassLength
class Core
attr_reader :logger
@database_agnostic_not_null_violation = nil
@@ -11,7 +11,7 @@ module RabbitCarrots
end
def initialize(logger: nil)
@logger = create_logger_adapter(logger || Logger.new(Rails.env.production? ? '/proc/self/fd/1' : $stdout))
@logger = logger || Logger.new(Rails.env.production? ? '/proc/self/fd/1' : $stdout)
@threads = []
@running = true
@shutdown_requested = false
@@ -36,7 +36,6 @@ module RabbitCarrots
handler_class:,
routing_keys: channel[:routing_keys],
queue_arguments: channel[:arguments],
exchange_name: channel[:exchange_name],
kill_to_restart_on_standard_error:
)
end
@@ -62,7 +61,7 @@ module RabbitCarrots
def request_shutdown
# Workaround to a known issue with Signal Traps and logs
Thread.start do
logger.error 'Shutting down Rabbit Carrots service...'
logger.log 'Shutting down Rabbit Carrots service...'
end
@shutdown_requested = true
@threads.each(&:kill)
@@ -72,24 +71,16 @@ module RabbitCarrots
def stop
# Workaround to a known issue with Signal Traps and logs
Thread.start do
logger.error 'Stoppig the Rabbit Carrots service...'
logger.log 'Stoppig the Rabbit Carrots service...'
end
@running = false
end
def run_task(queue_name:, handler_class:, routing_keys:, queue_arguments: {}, exchange_name: nil, kill_to_restart_on_standard_error: false)
def run_task(queue_name:, handler_class:, routing_keys:, queue_arguments: {}, kill_to_restart_on_standard_error: false)
RabbitCarrots::Connection.instance.channel.with do |channel|
exchange_name ||= RabbitCarrots.configuration.rabbitmq_exchange_name
exchange = channel.topic(RabbitCarrots.configuration.rabbitmq_exchange_name, durable: true)
begin
# Try to passively read an existing exchange without declaring it
exchange = channel.topic(exchange_name, passive: true)
rescue Bunny::NotFound
# If the exchange does not exist, declare it
exchange = channel.topic(exchange_name, durable: true)
end
logger.info "Listening on QUEUE: #{queue_name} for ROUTING KEYS: #{routing_keys}"
logger.log "Listening on QUEUE: #{queue_name} for ROUTING KEYS: #{routing_keys}"
queue = channel.queue(queue_name, durable: true, arguments: queue_arguments)
routing_keys.map(&:strip).each { |k| queue.bind(exchange, routing_key: k) }
@@ -97,29 +88,24 @@ module RabbitCarrots
queue.subscribe(block: false, manual_ack: true, prefetch: 10) do |delivery_info, properties, payload|
break if @shutdown_requested
logger.info "Received from queue: #{queue_name}, Routing Keys: #{routing_keys}"
logger.log "Received from queue: #{queue_name}, Routing Keys: #{routing_keys}"
handler_class.handle!(channel, delivery_info, properties, payload)
channel.ack(delivery_info.delivery_tag, false)
rescue RabbitCarrots::EventHandlers::Errors::NackMessage, JSON::ParserError => _e
payload = encode_payload(payload)
logger.warn "Nacked message: #{payload}"
logger.log "Nacked message: #{payload}"
channel.nack(delivery_info.delivery_tag, false, false)
rescue RabbitCarrots::EventHandlers::Errors::NackAndRequeueMessage => _e
payload = encode_payload(payload)
logger.warn "Nacked and Requeued message: #{payload}"
logger.log "Nacked and Requeued message: #{payload}"
channel.nack(delivery_info.delivery_tag, false, true)
rescue self.class.database_agnostic_not_null_violation, self.class.database_agnostic_record_invalid => e
payload = encode_payload(payload)
logger.warn "Null constraint or Invalid violation: #{payload}. Error: #{e.message}"
logger.log "Null constraint or Invalid violation: #{payload}. Error: #{e.message}"
channel.ack(delivery_info.delivery_tag, false)
rescue self.class.database_agnostic_connection_not_established => e
payload = encode_payload(payload)
logger.warn "Error connection not established to the database: #{payload}. Error: #{e.message}"
logger.log "Error connection not established to the database: #{payload}. Error: #{e.message}"
sleep 3
channel.nack(delivery_info.delivery_tag, false, true)
rescue StandardError => e
payload = encode_payload(payload)
logger.error "Error handling message: #{payload}. Error: #{e.message}"
logger.log "Error handling message: #{payload}. Error: #{e.message}"
sleep 3
channel.nack(delivery_info.delivery_tag, false, true)
Process.kill('SIGTERM', Process.pid) if kill_to_restart_on_standard_error
@@ -129,36 +115,5 @@ module RabbitCarrots
logger.error "Bunny session error: #{e.message}"
request_shutdown
end
private
def create_logger_adapter(logger)
return logger if logger.respond_to?(:info) && logger.respond_to?(:error) && logger.respond_to?(:warn)
adapter = Object.new
def adapter.info(msg)
@logger.write("[INFO] #{msg}\n")
end
def adapter.error(msg)
@logger.write("[ERROR] #{msg}\n")
end
def adapter.warn(msg)
@logger.write("[WARN] #{msg}\n")
end
adapter.instance_variable_set(:@logger, logger)
adapter
end
def encode_payload(payload)
payload.encode(
'UTF-8',
invalid: :replace,
undef: :replace,
replace: ''
)
end
end
end

View File

@@ -1,5 +1,5 @@
# frozen_string_literal: true
module RabbitCarrots
VERSION = '1.1.1'
VERSION = '1.0.2'
end

View File

@@ -33,7 +33,7 @@ Gem::Specification.new do |spec|
# Uncomment to register a new dependency of your gem
spec.add_dependency 'bunny', '>= 2.22'
spec.add_dependency 'connection_pool', '>= 2.4'
spec.add_dependency 'connection_pool', '~> 2.4'
# For more information and examples about making a new gem, check out our
# guide at: https://bundler.io/guides/creating_gem.html