mirror of
https://github.com/ditkrg/rabbit_carrots.git
synced 2026-01-25 23:22:56 +00:00
Compare commits
16 Commits
v0.1.17
...
broosk1993
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a2dd059194 | ||
|
59ad012e04
|
|||
|
3950ff5047
|
|||
|
|
edb1fffad1 | ||
| ef60e8342d | |||
| a51eed8101 | |||
| 28b620b7b4 | |||
| 876a48a1ab | |||
| ff838c50ad | |||
|
|
7c4c7ea08f | ||
|
|
6b6da5e5cb | ||
|
|
54025a5aa4 | ||
|
|
8a95f69556 | ||
|
3bb19c1850
|
|||
|
185f0aade8
|
|||
|
b90ef2752b
|
8
Gemfile
8
Gemfile
@@ -5,10 +5,10 @@ source 'https://rubygems.org'
|
|||||||
# Specify your gem's dependencies in rabbit_carrots.gemspec
|
# Specify your gem's dependencies in rabbit_carrots.gemspec
|
||||||
gemspec
|
gemspec
|
||||||
|
|
||||||
gem 'rake', '~> 13.0'
|
gem 'rake', '~> 13.1'
|
||||||
|
|
||||||
gem 'rspec', '~> 3.0'
|
gem 'rspec', '~> 3.12'
|
||||||
|
|
||||||
gem 'rubocop', '~> 1.21'
|
gem 'rubocop', '~> 1.58'
|
||||||
|
|
||||||
gem 'rubocop-rails', '~> 2.17'
|
gem 'rubocop-rails', '~> 2.22'
|
||||||
|
|||||||
78
Gemfile.lock
78
Gemfile.lock
@@ -1,39 +1,52 @@
|
|||||||
PATH
|
PATH
|
||||||
remote: .
|
remote: .
|
||||||
specs:
|
specs:
|
||||||
rabbit_carrots (0.1.17)
|
rabbit_carrots (1.0.2)
|
||||||
bunny (>= 2.19.0)
|
bunny (>= 2.22)
|
||||||
connection_pool (~> 2.3.0)
|
connection_pool (~> 2.4)
|
||||||
|
|
||||||
GEM
|
GEM
|
||||||
remote: https://rubygems.org/
|
remote: https://rubygems.org/
|
||||||
specs:
|
specs:
|
||||||
activesupport (7.0.4)
|
activesupport (7.1.2)
|
||||||
|
base64
|
||||||
|
bigdecimal
|
||||||
concurrent-ruby (~> 1.0, >= 1.0.2)
|
concurrent-ruby (~> 1.0, >= 1.0.2)
|
||||||
|
connection_pool (>= 2.2.5)
|
||||||
|
drb
|
||||||
i18n (>= 1.6, < 2)
|
i18n (>= 1.6, < 2)
|
||||||
minitest (>= 5.1)
|
minitest (>= 5.1)
|
||||||
|
mutex_m
|
||||||
tzinfo (~> 2.0)
|
tzinfo (~> 2.0)
|
||||||
amq-protocol (2.3.2)
|
amq-protocol (2.3.2)
|
||||||
ast (2.4.2)
|
ast (2.4.2)
|
||||||
bunny (2.20.3)
|
base64 (0.2.0)
|
||||||
|
bigdecimal (3.1.4)
|
||||||
|
bunny (2.22.0)
|
||||||
amq-protocol (~> 2.3, >= 2.3.1)
|
amq-protocol (~> 2.3, >= 2.3.1)
|
||||||
sorted_set (~> 1, >= 1.0.2)
|
sorted_set (~> 1, >= 1.0.2)
|
||||||
concurrent-ruby (1.1.10)
|
concurrent-ruby (1.2.2)
|
||||||
connection_pool (2.3.0)
|
connection_pool (2.4.1)
|
||||||
diff-lcs (1.5.0)
|
diff-lcs (1.5.0)
|
||||||
i18n (1.12.0)
|
drb (2.2.0)
|
||||||
|
ruby2_keywords
|
||||||
|
i18n (1.14.1)
|
||||||
concurrent-ruby (~> 1.0)
|
concurrent-ruby (~> 1.0)
|
||||||
json (2.6.2)
|
json (2.7.1)
|
||||||
minitest (5.16.3)
|
language_server-protocol (3.17.0.3)
|
||||||
parallel (1.22.1)
|
minitest (5.20.0)
|
||||||
parser (3.1.2.1)
|
mutex_m (0.2.0)
|
||||||
|
parallel (1.23.0)
|
||||||
|
parser (3.2.2.4)
|
||||||
ast (~> 2.4.1)
|
ast (~> 2.4.1)
|
||||||
rack (3.0.4.2)
|
racc
|
||||||
|
racc (1.7.3)
|
||||||
|
rack (3.0.8)
|
||||||
rainbow (3.1.1)
|
rainbow (3.1.1)
|
||||||
rake (13.0.6)
|
rake (13.1.0)
|
||||||
rbtree (0.4.6)
|
rbtree (0.4.6)
|
||||||
regexp_parser (2.6.1)
|
regexp_parser (2.8.3)
|
||||||
rexml (3.2.5)
|
rexml (3.2.6)
|
||||||
rspec (3.12.0)
|
rspec (3.12.0)
|
||||||
rspec-core (~> 3.12.0)
|
rspec-core (~> 3.12.0)
|
||||||
rspec-expectations (~> 3.12.0)
|
rspec-expectations (~> 3.12.0)
|
||||||
@@ -47,40 +60,43 @@ GEM
|
|||||||
diff-lcs (>= 1.2.0, < 2.0)
|
diff-lcs (>= 1.2.0, < 2.0)
|
||||||
rspec-support (~> 3.12.0)
|
rspec-support (~> 3.12.0)
|
||||||
rspec-support (3.12.0)
|
rspec-support (3.12.0)
|
||||||
rubocop (1.39.0)
|
rubocop (1.58.0)
|
||||||
json (~> 2.3)
|
json (~> 2.3)
|
||||||
|
language_server-protocol (>= 3.17.0)
|
||||||
parallel (~> 1.10)
|
parallel (~> 1.10)
|
||||||
parser (>= 3.1.2.1)
|
parser (>= 3.2.2.4)
|
||||||
rainbow (>= 2.2.2, < 4.0)
|
rainbow (>= 2.2.2, < 4.0)
|
||||||
regexp_parser (>= 1.8, < 3.0)
|
regexp_parser (>= 1.8, < 3.0)
|
||||||
rexml (>= 3.2.5, < 4.0)
|
rexml (>= 3.2.5, < 4.0)
|
||||||
rubocop-ast (>= 1.23.0, < 2.0)
|
rubocop-ast (>= 1.30.0, < 2.0)
|
||||||
ruby-progressbar (~> 1.7)
|
ruby-progressbar (~> 1.7)
|
||||||
unicode-display_width (>= 1.4.0, < 3.0)
|
unicode-display_width (>= 2.4.0, < 3.0)
|
||||||
rubocop-ast (1.23.0)
|
rubocop-ast (1.30.0)
|
||||||
parser (>= 3.1.1.0)
|
parser (>= 3.2.1.0)
|
||||||
rubocop-rails (2.17.3)
|
rubocop-rails (2.22.2)
|
||||||
activesupport (>= 4.2.0)
|
activesupport (>= 4.2.0)
|
||||||
rack (>= 1.1)
|
rack (>= 1.1)
|
||||||
rubocop (>= 1.33.0, < 2.0)
|
rubocop (>= 1.33.0, < 2.0)
|
||||||
ruby-progressbar (1.11.0)
|
rubocop-ast (>= 1.30.0, < 2.0)
|
||||||
set (1.0.3)
|
ruby-progressbar (1.13.0)
|
||||||
|
ruby2_keywords (0.0.5)
|
||||||
|
set (1.1.0)
|
||||||
sorted_set (1.0.3)
|
sorted_set (1.0.3)
|
||||||
rbtree
|
rbtree
|
||||||
set (~> 1.0)
|
set (~> 1.0)
|
||||||
tzinfo (2.0.5)
|
tzinfo (2.0.6)
|
||||||
concurrent-ruby (~> 1.0)
|
concurrent-ruby (~> 1.0)
|
||||||
unicode-display_width (2.3.0)
|
unicode-display_width (2.5.0)
|
||||||
|
|
||||||
PLATFORMS
|
PLATFORMS
|
||||||
x86_64-linux
|
x86_64-linux
|
||||||
|
|
||||||
DEPENDENCIES
|
DEPENDENCIES
|
||||||
rabbit_carrots!
|
rabbit_carrots!
|
||||||
rake (~> 13.0)
|
rake (~> 13.1)
|
||||||
rspec (~> 3.0)
|
rspec (~> 3.12)
|
||||||
rubocop (~> 1.21)
|
rubocop (~> 1.58)
|
||||||
rubocop-rails (~> 2.17)
|
rubocop-rails (~> 2.22)
|
||||||
|
|
||||||
BUNDLED WITH
|
BUNDLED WITH
|
||||||
2.3.26
|
2.3.26
|
||||||
|
|||||||
22
README.md
22
README.md
@@ -1,6 +1,6 @@
|
|||||||
# RabbitCarrots
|
# RabbitCarrots
|
||||||
|
|
||||||
RabbitCarrots is a simple background task based on rake to handle the consumption of RabbitMQ message in Rails applications. It is an opinionated library that solves the consumption of messages among microservices, given the following conditions:
|
RabbitCarrots is a simple background worker to handle the consumption of RabbitMQ message in Rails applications. It is an opinionated library that solves the consumption of messages among microservices, given the following conditions:
|
||||||
|
|
||||||
1. RabbitMQ is used as an event bus for communication.
|
1. RabbitMQ is used as an event bus for communication.
|
||||||
2. Messages are routed using a single exchange, multiple routing keys.
|
2. Messages are routed using a single exchange, multiple routing keys.
|
||||||
@@ -32,7 +32,11 @@ RabbitCarrots.configure do |c|
|
|||||||
c.rabbitmq_user = ENV.fetch('RABBITMQ__USER', nil)
|
c.rabbitmq_user = ENV.fetch('RABBITMQ__USER', nil)
|
||||||
c.rabbitmq_password = ENV.fetch('RABBITMQ__PASSWORD', nil)
|
c.rabbitmq_password = ENV.fetch('RABBITMQ__PASSWORD', nil)
|
||||||
c.rabbitmq_vhost = ENV.fetch('RABBITMQ__VHOST', nil)
|
c.rabbitmq_vhost = ENV.fetch('RABBITMQ__VHOST', nil)
|
||||||
c.event_bus_exchange_name = ENV.fetch('EVENTBUS__EXCHANGE_NAME', nil)
|
c.rabbitmq_exchange_name = ENV.fetch('RABBITMQ__EXCHANGE_NAME', nil)
|
||||||
|
c.automatically_recover = true
|
||||||
|
c.network_recovery_interval = 5
|
||||||
|
c.recovery_attempts = 5
|
||||||
|
c.orm = :activerecord || :mongoid
|
||||||
c.routing_key_mappings = [
|
c.routing_key_mappings = [
|
||||||
{ routing_keys: ['RK1', 'RK2'], queue: 'QUEUE_NAME', handler: 'CLASS HANDLER IN STRING' },
|
{ routing_keys: ['RK1', 'RK2'], queue: 'QUEUE_NAME', handler: 'CLASS HANDLER IN STRING' },
|
||||||
{ routing_keys: ['RK1', 'RK2'], queue: 'QUEUE_NAME', handler: 'CLASS HANDLER IN STRING' }
|
{ routing_keys: ['RK1', 'RK2'], queue: 'QUEUE_NAME', handler: 'CLASS HANDLER IN STRING' }
|
||||||
@@ -41,8 +45,6 @@ end
|
|||||||
|
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
Note that handler is a class that must implement a method named ```handle!``` that takes 4 parameters as follow:
|
Note that handler is a class that must implement a method named ```handle!``` that takes 4 parameters as follow:
|
||||||
|
|
||||||
```ruby
|
```ruby
|
||||||
@@ -53,8 +55,6 @@ class DummyEventHandler
|
|||||||
end
|
end
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
Inside the handle message, you can NACK the message without re-queuing by raising ```RabbitCarrots::EventHandlers::Errors::NackMessage``` exception.
|
Inside the handle message, you can NACK the message without re-queuing by raising ```RabbitCarrots::EventHandlers::Errors::NackMessage``` exception.
|
||||||
|
|
||||||
To NACK and re-queue, raise ```RabbitCarrots::EventHandlers::Errors::NackAndRequeueMessage``` exception.
|
To NACK and re-queue, raise ```RabbitCarrots::EventHandlers::Errors::NackAndRequeueMessage``` exception.
|
||||||
@@ -65,8 +65,16 @@ Note: Any other unrescued exception raised inside ```handle!``` the that is a su
|
|||||||
|
|
||||||
### Running
|
### Running
|
||||||
|
|
||||||
Then run ```bundle exec rake rabbit_carrots:eat```.
|
For better scalability and improved performance, you can run rabbit_carrots in standalone mode by invoking the following command:
|
||||||
|
```bundle exec rake rabbit_carrots:eat```.
|
||||||
|
|
||||||
|
#### Puma
|
||||||
|
|
||||||
|
For small and medium sized projects, you can delegate the management of the rabbit_carrots to the Puma web server. To achieve that, add the following line to your puma.rb
|
||||||
|
|
||||||
|
```plugin :rabbit_carrots```
|
||||||
|
|
||||||
|
This will make sure that Puma will manage rabbit carrots as a background service and will gracefully terminate if rabbit_carrots eventually loses connection after multiple automatic recovery.
|
||||||
## Development
|
## Development
|
||||||
|
|
||||||
After checking out the repo, run `bin/setup` to install dependencies. Then, run `rake spec` to run the tests. You can also run `bin/console` for an interactive prompt that will allow you to experiment.
|
After checking out the repo, run `bin/setup` to install dependencies. Then, run `rake spec` to run the tests. You can also run `bin/console` for an interactive prompt that will allow you to experiment.
|
||||||
|
|||||||
85
lib/puma/plugin/rabbit_carrots.rb
Normal file
85
lib/puma/plugin/rabbit_carrots.rb
Normal file
@@ -0,0 +1,85 @@
|
|||||||
|
# rabbit_carrots.rb
|
||||||
|
|
||||||
|
require 'puma/plugin'
|
||||||
|
require 'rabbit_carrots'
|
||||||
|
|
||||||
|
Puma::Plugin.create do
|
||||||
|
attr_reader :puma_pid, :rabbit_carrots_pid, :log_writer, :core_service
|
||||||
|
|
||||||
|
def start(launcher)
|
||||||
|
@log_writer = launcher.log_writer
|
||||||
|
@puma_pid = $$
|
||||||
|
|
||||||
|
@core_service = RabbitCarrots::Core.new(logger: log_writer)
|
||||||
|
|
||||||
|
in_background do
|
||||||
|
monitor_rabbit_carrots
|
||||||
|
end
|
||||||
|
|
||||||
|
launcher.events.on_booted do
|
||||||
|
@rabbit_carrots_pid = fork do
|
||||||
|
Thread.new { monitor_puma }
|
||||||
|
start_rabbit_carrots_consumer
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
launcher.events.on_stopped { stop_rabbit_carrots }
|
||||||
|
launcher.events.on_restart { stop_rabbit_carrots }
|
||||||
|
end
|
||||||
|
|
||||||
|
private
|
||||||
|
|
||||||
|
def start_rabbit_carrots_consumer
|
||||||
|
core_service.start(kill_to_restart_on_standard_error: true)
|
||||||
|
rescue StandardError => e
|
||||||
|
Rails.logger.error "Error starting Rabbit Carrots: #{e.message}"
|
||||||
|
end
|
||||||
|
|
||||||
|
def stop_rabbit_carrots
|
||||||
|
return unless rabbit_carrots_pid
|
||||||
|
|
||||||
|
log 'Stopping Rabbit Carrots...'
|
||||||
|
core_service.request_shutdown
|
||||||
|
Process.kill('TERM', rabbit_carrots_pid)
|
||||||
|
Process.wait(rabbit_carrots_pid)
|
||||||
|
rescue Errno::ECHILD, Errno::ESRCH
|
||||||
|
end
|
||||||
|
|
||||||
|
def monitor_puma
|
||||||
|
monitor(:puma_dead?, 'Detected Puma has gone away, stopping Rabbit Carrots...')
|
||||||
|
end
|
||||||
|
|
||||||
|
def monitor_rabbit_carrots
|
||||||
|
monitor(:rabbit_carrots_dead?, 'Rabbits Carrot is dead, stopping Puma...')
|
||||||
|
end
|
||||||
|
|
||||||
|
def monitor(process_dead, message)
|
||||||
|
loop do
|
||||||
|
if send(process_dead)
|
||||||
|
log message
|
||||||
|
Process.kill('TERM', $$)
|
||||||
|
break
|
||||||
|
end
|
||||||
|
sleep 2
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def rabbit_carrots_dead?
|
||||||
|
Process.waitpid(rabbit_carrots_pid, Process::WNOHANG) if rabbit_carrots_started?
|
||||||
|
false
|
||||||
|
rescue Errno::ECHILD, Errno::ESRCH
|
||||||
|
true
|
||||||
|
end
|
||||||
|
|
||||||
|
def rabbit_carrots_started?
|
||||||
|
rabbit_carrots_pid.present?
|
||||||
|
end
|
||||||
|
|
||||||
|
def puma_dead?
|
||||||
|
Process.ppid != puma_pid
|
||||||
|
end
|
||||||
|
|
||||||
|
def log(...)
|
||||||
|
log_writer.log(...)
|
||||||
|
end
|
||||||
|
end
|
||||||
@@ -1,23 +1,11 @@
|
|||||||
# frozen_string_literal: true
|
# frozen_string_literal: true
|
||||||
|
|
||||||
require_relative 'rabbit_carrots/version'
|
require_relative 'rabbit_carrots/version'
|
||||||
|
require 'rabbit_carrots/errors'
|
||||||
require 'rabbit_carrots/connection'
|
require 'rabbit_carrots/connection'
|
||||||
require 'rabbit_carrots/configuration'
|
require 'rabbit_carrots/configuration'
|
||||||
require 'rabbit_carrots/railtie' if defined?(Rails)
|
require 'rabbit_carrots/railtie' if defined?(Rails)
|
||||||
|
require 'rabbit_carrots/core'
|
||||||
|
|
||||||
module RabbitCarrots
|
module RabbitCarrots
|
||||||
class Error < StandardError; end
|
|
||||||
|
|
||||||
module EventHandlers
|
|
||||||
module Errors
|
|
||||||
class IrrelevantMessage < StandardError
|
|
||||||
end
|
|
||||||
|
|
||||||
class NackMessage < StandardError
|
|
||||||
end
|
|
||||||
|
|
||||||
class NackAndRequeueMessage < StandardError
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|||||||
@@ -9,6 +9,19 @@ module RabbitCarrots
|
|||||||
end
|
end
|
||||||
|
|
||||||
class Configuration
|
class Configuration
|
||||||
attr_accessor :rabbitmq_host, :rabbitmq_port, :rabbitmq_user, :rabbitmq_password, :rabbitmq_vhost, :routing_key_mappings, :event_bus_exchange_name
|
attr_accessor :rabbitmq_host,
|
||||||
|
:rabbitmq_port,
|
||||||
|
:rabbitmq_user,
|
||||||
|
:rabbitmq_password,
|
||||||
|
:rabbitmq_vhost,
|
||||||
|
:routing_key_mappings,
|
||||||
|
:rabbitmq_exchange_name,
|
||||||
|
:automatically_recover,
|
||||||
|
:network_recovery_interval,
|
||||||
|
:recovery_attempts,
|
||||||
|
:orm
|
||||||
|
def orm
|
||||||
|
@orm ||= :activerecord
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
@@ -11,7 +11,11 @@ module RabbitCarrots
|
|||||||
port: RabbitCarrots.configuration.rabbitmq_port,
|
port: RabbitCarrots.configuration.rabbitmq_port,
|
||||||
user: RabbitCarrots.configuration.rabbitmq_user,
|
user: RabbitCarrots.configuration.rabbitmq_user,
|
||||||
password: RabbitCarrots.configuration.rabbitmq_password,
|
password: RabbitCarrots.configuration.rabbitmq_password,
|
||||||
vhost: RabbitCarrots.configuration.rabbitmq_vhost
|
vhost: RabbitCarrots.configuration.rabbitmq_vhost,
|
||||||
|
automatically_recover: RabbitCarrots.configuration.automatically_recover || true,
|
||||||
|
network_recovery_interval: RabbitCarrots.configuration.network_recovery_interval || 5,
|
||||||
|
recovery_attempts: RabbitCarrots.configuration.recovery_attempts || 5,
|
||||||
|
recovery_attempts_exhausted: -> { Process.kill('TERM', Process.pid) }
|
||||||
)
|
)
|
||||||
|
|
||||||
@connection.start
|
@connection.start
|
||||||
|
|||||||
119
lib/rabbit_carrots/core.rb
Normal file
119
lib/rabbit_carrots/core.rb
Normal file
@@ -0,0 +1,119 @@
|
|||||||
|
module RabbitCarrots
|
||||||
|
class Core
|
||||||
|
attr_reader :logger
|
||||||
|
|
||||||
|
@database_agnostic_not_null_violation = nil
|
||||||
|
@database_agnostic_connection_not_established = nil
|
||||||
|
@database_agnostic_record_invalid = nil
|
||||||
|
|
||||||
|
class << self
|
||||||
|
attr_accessor :database_agnostic_not_null_violation, :database_agnostic_connection_not_established, :database_agnostic_record_invalid
|
||||||
|
end
|
||||||
|
|
||||||
|
def initialize(logger: nil)
|
||||||
|
@logger = logger || Logger.new(Rails.env.production? ? '/proc/self/fd/1' : $stdout)
|
||||||
|
@threads = []
|
||||||
|
@running = true
|
||||||
|
@shutdown_requested = false
|
||||||
|
end
|
||||||
|
|
||||||
|
def start(kill_to_restart_on_standard_error: false)
|
||||||
|
self.class.database_agnostic_not_null_violation = RabbitCarrots.configuration.orm == :activerecord ? ActiveRecord::NotNullViolation : RabbitCarrots::EventHandlers::Errors::PlaceholderError
|
||||||
|
self.class.database_agnostic_connection_not_established = RabbitCarrots.configuration.orm == :activerecord ? ActiveRecord::ConnectionNotEstablished : ::Mongo::Error::SocketError
|
||||||
|
self.class.database_agnostic_record_invalid = RabbitCarrots.configuration.orm == :activerecord ? ActiveRecord::RecordInvalid : ::Mongoid::Errors::Validations
|
||||||
|
|
||||||
|
channels = RabbitCarrots.configuration.routing_key_mappings.map do |mapping|
|
||||||
|
{ **mapping, handler: mapping[:handler].constantize }
|
||||||
|
end
|
||||||
|
|
||||||
|
channels.each do |channel|
|
||||||
|
handler_class = channel[:handler]
|
||||||
|
raise "#{handler_class.name} must respond to `handle!`" unless handler_class.respond_to?(:handle!)
|
||||||
|
|
||||||
|
@threads << Thread.new do
|
||||||
|
run_task(
|
||||||
|
queue_name: channel[:queue],
|
||||||
|
handler_class:,
|
||||||
|
routing_keys: channel[:routing_keys],
|
||||||
|
queue_arguments: channel[:arguments],
|
||||||
|
kill_to_restart_on_standard_error:
|
||||||
|
)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
Signal.trap('INT') { request_shutdown }
|
||||||
|
Signal.trap('TERM') { request_shutdown }
|
||||||
|
|
||||||
|
while @running
|
||||||
|
if @shutdown_requested
|
||||||
|
request_shutdown
|
||||||
|
sleep 1
|
||||||
|
break
|
||||||
|
end
|
||||||
|
sleep 1
|
||||||
|
end
|
||||||
|
|
||||||
|
@threads.each(&:join)
|
||||||
|
rescue StandardError => e
|
||||||
|
logger.error "Error starting Rabbit Carrots: #{e.message}"
|
||||||
|
end
|
||||||
|
|
||||||
|
def request_shutdown
|
||||||
|
# Workaround to a known issue with Signal Traps and logs
|
||||||
|
Thread.start do
|
||||||
|
logger.log 'Shutting down Rabbit Carrots service...'
|
||||||
|
end
|
||||||
|
@shutdown_requested = true
|
||||||
|
@threads.each(&:kill)
|
||||||
|
stop
|
||||||
|
end
|
||||||
|
|
||||||
|
def stop
|
||||||
|
# Workaround to a known issue with Signal Traps and logs
|
||||||
|
Thread.start do
|
||||||
|
logger.log 'Stoppig the Rabbit Carrots service...'
|
||||||
|
end
|
||||||
|
@running = false
|
||||||
|
end
|
||||||
|
|
||||||
|
def run_task(queue_name:, handler_class:, routing_keys:, queue_arguments: {}, kill_to_restart_on_standard_error: false)
|
||||||
|
RabbitCarrots::Connection.instance.channel.with do |channel|
|
||||||
|
exchange = channel.topic(RabbitCarrots.configuration.rabbitmq_exchange_name, durable: true)
|
||||||
|
|
||||||
|
logger.log "Listening on QUEUE: #{queue_name} for ROUTING KEYS: #{routing_keys}"
|
||||||
|
queue = channel.queue(queue_name, durable: true, arguments: queue_arguments)
|
||||||
|
|
||||||
|
routing_keys.map(&:strip).each { |k| queue.bind(exchange, routing_key: k) }
|
||||||
|
|
||||||
|
queue.subscribe(block: false, manual_ack: true, prefetch: 10) do |delivery_info, properties, payload|
|
||||||
|
break if @shutdown_requested
|
||||||
|
|
||||||
|
logger.log "Received from queue: #{queue_name}, Routing Keys: #{routing_keys}"
|
||||||
|
handler_class.handle!(channel, delivery_info, properties, payload)
|
||||||
|
channel.ack(delivery_info.delivery_tag, false)
|
||||||
|
rescue RabbitCarrots::EventHandlers::Errors::NackMessage, JSON::ParserError => _e
|
||||||
|
logger.log "Nacked message: #{payload}"
|
||||||
|
channel.nack(delivery_info.delivery_tag, false, false)
|
||||||
|
rescue RabbitCarrots::EventHandlers::Errors::NackAndRequeueMessage => _e
|
||||||
|
logger.log "Nacked and Requeued message: #{payload}"
|
||||||
|
channel.nack(delivery_info.delivery_tag, false, true)
|
||||||
|
rescue self.class.database_agnostic_not_null_violation, self.class.database_agnostic_record_invalid => e
|
||||||
|
logger.log "Null constraint or Invalid violation: #{payload}. Error: #{e.message}"
|
||||||
|
channel.ack(delivery_info.delivery_tag, false)
|
||||||
|
rescue self.class.database_agnostic_connection_not_established => e
|
||||||
|
logger.log "Error connection not established to the database: #{payload}. Error: #{e.message}"
|
||||||
|
sleep 3
|
||||||
|
channel.nack(delivery_info.delivery_tag, false, true)
|
||||||
|
rescue StandardError => e
|
||||||
|
logger.log "Error handling message: #{payload}. Error: #{e.message}"
|
||||||
|
sleep 3
|
||||||
|
channel.nack(delivery_info.delivery_tag, false, true)
|
||||||
|
Process.kill('SIGTERM', Process.pid) if kill_to_restart_on_standard_error
|
||||||
|
end
|
||||||
|
end
|
||||||
|
rescue StandardError => e
|
||||||
|
logger.error "Bunny session error: #{e.message}"
|
||||||
|
request_shutdown
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
18
lib/rabbit_carrots/errors.rb
Normal file
18
lib/rabbit_carrots/errors.rb
Normal file
@@ -0,0 +1,18 @@
|
|||||||
|
module RabbitCarrots
|
||||||
|
class Error < StandardError; end
|
||||||
|
|
||||||
|
module EventHandlers
|
||||||
|
module Errors
|
||||||
|
class IrrelevantMessage < StandardError
|
||||||
|
end
|
||||||
|
|
||||||
|
class NackMessage < StandardError
|
||||||
|
end
|
||||||
|
|
||||||
|
class NackAndRequeueMessage < StandardError
|
||||||
|
end
|
||||||
|
|
||||||
|
class PlaceholderError < Error; end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
@@ -1,71 +1,13 @@
|
|||||||
require 'bunny'
|
|
||||||
|
|
||||||
namespace :rabbit_carrots do
|
namespace :rabbit_carrots do
|
||||||
desc 'Listener for Queue'
|
desc 'Rake task for standalone RabbitCarrots mode'
|
||||||
task eat: :environment do
|
task eat: :environment do
|
||||||
Rails.application.eager_load!
|
Rails.application.eager_load!
|
||||||
|
|
||||||
channels = RabbitCarrots.configuration.routing_key_mappings.map do |mapping|
|
logger = Logger.new(Rails.env.production? ? '/proc/self/fd/1' : $stdout)
|
||||||
# This will be supplied in initializer. At that time, the Handler will not be available to be loaded and will throw Uninitialized Constant
|
logger.level = Logger::INFO
|
||||||
{ **mapping, handler: mapping[:handler].constantize }
|
|
||||||
end
|
|
||||||
|
|
||||||
Rails.logger = Logger.new(Rails.env.production? ? '/proc/self/fd/1' : $stdout)
|
core_service = RabbitCarrots::Core.new(logger:)
|
||||||
|
|
||||||
# Run RMQ Subscriber for each channel
|
core_service.start(kill_to_restart_on_standard_error: true)
|
||||||
channels.each do |channel|
|
|
||||||
handler_class = channel[:handler]
|
|
||||||
|
|
||||||
raise "#{handler_class.name} must respond to `handle!`" unless handler_class.respond_to?(:handle!)
|
|
||||||
|
|
||||||
run_task(queue_name: channel[:queue], handler_class:, routing_keys: channel[:routing_keys])
|
|
||||||
end
|
|
||||||
|
|
||||||
# Infinite loop to keep the process running
|
|
||||||
loop do
|
|
||||||
sleep 1
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def run_task(queue_name:, handler_class:, routing_keys:)
|
|
||||||
RabbitCarrots::Connection.instance.channel.with do |channel|
|
|
||||||
exchange = channel.topic(RabbitCarrots.configuration.event_bus_exchange_name, durable: true)
|
|
||||||
|
|
||||||
Rails.logger.info "Listening on QUEUE: #{queue_name} for ROUTING KEYS: #{routing_keys}"
|
|
||||||
queue = channel.queue(queue_name, durable: true)
|
|
||||||
|
|
||||||
routing_keys.map(&:strip).each { |k| queue.bind(exchange, routing_key: k) }
|
|
||||||
|
|
||||||
queue.subscribe(block: false, manual_ack: true, prefetch: 10) do |delivery_info, properties, payload|
|
|
||||||
Rails.logger.info "Received from queue: #{queue_name}, Routing Keys: #{routing_keys}"
|
|
||||||
handler_class.handle!(channel, delivery_info, properties, payload)
|
|
||||||
channel.ack(delivery_info.delivery_tag, false)
|
|
||||||
rescue RabbitCarrots::EventHandlers::Errors::NackMessage, JSON::ParserError => _e
|
|
||||||
Rails.logger.info "Nacked message: #{payload}"
|
|
||||||
channel.nack(delivery_info.delivery_tag, false, false)
|
|
||||||
rescue RabbitCarrots::EventHandlers::Errors::NackAndRequeueMessage => _e
|
|
||||||
Rails.logger.info "Nacked and Requeued message: #{payload}"
|
|
||||||
channel.nack(delivery_info.delivery_tag, false, true)
|
|
||||||
rescue ActiveRecord::NotNullViolation, ActiveRecord::RecordInvalid => e
|
|
||||||
# on null constraint violation, we want to ack the message
|
|
||||||
Rails.logger.error "Null constraint or Invalid violation: #{payload}. Error: #{e.message}"
|
|
||||||
channel.ack(delivery_info.delivery_tag, false)
|
|
||||||
rescue ActiveRecord::ConnectionNotEstablished => e
|
|
||||||
# on connection not established, we want to requeue the message and sleep for 3 seconds
|
|
||||||
Rails.logger.error "Error connection not established to the database: #{payload}. Error: #{e.message}"
|
|
||||||
# delay for 3 seconds before requeuing
|
|
||||||
sleep 3
|
|
||||||
channel.nack(delivery_info.delivery_tag, false, true)
|
|
||||||
rescue StandardError => e
|
|
||||||
Rails.logger.error "Error handling message: #{payload}. Error: #{e.message}"
|
|
||||||
# requeue the message then kill the container
|
|
||||||
sleep 3
|
|
||||||
channel.nack(delivery_info.delivery_tag, false, true)
|
|
||||||
# kill the container with sigterm
|
|
||||||
Process.kill('SIGTERM', Process.pid)
|
|
||||||
end
|
|
||||||
|
|
||||||
Rails.logger.info 'RUN TASK ENDED'
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
# frozen_string_literal: true
|
# frozen_string_literal: true
|
||||||
|
|
||||||
module RabbitCarrots
|
module RabbitCarrots
|
||||||
VERSION = '0.1.17'
|
VERSION = '1.0.2'
|
||||||
end
|
end
|
||||||
|
|||||||
@@ -32,8 +32,8 @@ Gem::Specification.new do |spec|
|
|||||||
spec.require_paths = ['lib']
|
spec.require_paths = ['lib']
|
||||||
|
|
||||||
# Uncomment to register a new dependency of your gem
|
# Uncomment to register a new dependency of your gem
|
||||||
spec.add_dependency 'bunny', '>= 2.19.0'
|
spec.add_dependency 'bunny', '>= 2.22'
|
||||||
spec.add_dependency 'connection_pool', '~> 2.3.0'
|
spec.add_dependency 'connection_pool', '~> 2.4'
|
||||||
|
|
||||||
# For more information and examples about making a new gem, check out our
|
# For more information and examples about making a new gem, check out our
|
||||||
# guide at: https://bundler.io/guides/creating_gem.html
|
# guide at: https://bundler.io/guides/creating_gem.html
|
||||||
|
|||||||
Reference in New Issue
Block a user