Compare commits

..

No commits in common. "main" and "0.1.3" have entirely different histories.
main ... 0.1.3

20 changed files with 112 additions and 317 deletions

View File

@ -14,14 +14,14 @@ jobs:
strategy: strategy:
matrix: matrix:
ruby: ruby:
- "4.0.1" - '3.1.2'
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v3
- name: Set up Ruby - name: Set up Ruby
uses: ruby/setup-ruby@v1 uses: ruby/setup-ruby@v1
with: with:
ruby-version: ${{ matrix.ruby }} ruby-version: ${{ matrix.ruby }}
bundler-cache: true bundler-cache: true
- name: Run the default task - name: Run the default task
run: bundle exec rake run: bundle exec rake

3
.gitignore vendored
View File

@ -10,5 +10,4 @@
# rspec failure tracking # rspec failure tracking
.rspec_status .rspec_status
*.gem *.gem
.idea

View File

@ -1,5 +1,4 @@
plugins: require: rubocop-rails
- rubocop-rails
AllCops: AllCops:
NewCops: enable NewCops: enable

12
Gemfile
View File

@ -5,12 +5,12 @@ source 'https://rubygems.org'
# Specify your gem's dependencies in outboxable.gemspec # Specify your gem's dependencies in outboxable.gemspec
gemspec gemspec
gem 'rake', '~> 13.3.1' gem 'rake', '~> 13.0'
gem 'rspec', '~> 3.13.2' gem 'rspec', '~> 3.0'
gem 'rubocop-rails', '~> 2.34.3' gem 'rubocop-rails', '~> 2.18'
group :development, :test do group :development, :test do
gem 'activesupport', '~> 8.1.2' gem 'activesupport', '~> 7.0'
gem 'sidekiq', '~> 8.1.0' gem 'sidekiq', '~> 7.0'
gem 'sidekiq-cron', '~> 2.3.1' gem 'sidekiq-cron', '~> 1.10'
end end

View File

@ -1,139 +1,107 @@
PATH PATH
remote: . remote: .
specs: specs:
outboxable (1.0.6) outboxable (0.1.3)
bunny (>= 2.22) bunny (>= 2.19.0)
connection_pool (>= 2.4) connection_pool (~> 2.3.0)
GEM GEM
remote: https://rubygems.org/ remote: https://rubygems.org/
specs: specs:
activesupport (8.1.2) activesupport (7.0.4.3)
base64 concurrent-ruby (~> 1.0, >= 1.0.2)
bigdecimal
concurrent-ruby (~> 1.0, >= 1.3.1)
connection_pool (>= 2.2.5)
drb
i18n (>= 1.6, < 2) i18n (>= 1.6, < 2)
json
logger (>= 1.4.2)
minitest (>= 5.1) minitest (>= 5.1)
securerandom (>= 0.3) tzinfo (~> 2.0)
tzinfo (~> 2.0, >= 2.0.5)
uri (>= 0.13.1)
amq-protocol (2.3.2) amq-protocol (2.3.2)
ast (2.4.3) ast (2.4.2)
base64 (0.3.0) bunny (2.20.3)
bigdecimal (4.0.1)
bunny (2.22.0)
amq-protocol (~> 2.3, >= 2.3.1) amq-protocol (~> 2.3, >= 2.3.1)
sorted_set (~> 1, >= 1.0.2) sorted_set (~> 1, >= 1.0.2)
concurrent-ruby (1.3.6) concurrent-ruby (1.2.2)
connection_pool (3.0.2) connection_pool (2.3.0)
cronex (0.15.0) diff-lcs (1.5.0)
et-orbi (1.2.7)
tzinfo tzinfo
unicode (>= 0.4.4.5) fugit (1.8.1)
diff-lcs (1.6.2) et-orbi (~> 1, >= 1.2.7)
drb (2.2.3)
et-orbi (1.4.0)
tzinfo
fugit (1.12.1)
et-orbi (~> 1.4)
raabro (~> 1.4) raabro (~> 1.4)
globalid (1.3.0) globalid (1.1.0)
activesupport (>= 6.1) activesupport (>= 5.0)
i18n (1.14.8) i18n (1.12.0)
concurrent-ruby (~> 1.0) concurrent-ruby (~> 1.0)
json (2.18.0) json (2.6.3)
language_server-protocol (3.17.0.5) minitest (5.18.0)
lint_roller (1.1.0) parallel (1.22.1)
logger (1.7.0) parser (3.2.2.0)
minitest (6.0.1)
prism (~> 1.5)
parallel (1.27.0)
parser (3.3.10.1)
ast (~> 2.4.1) ast (~> 2.4.1)
racc
prism (1.8.0)
raabro (1.4.0) raabro (1.4.0)
racc (1.8.1) rack (2.2.6.4)
rack (3.2.4)
rainbow (3.1.1) rainbow (3.1.1)
rake (13.3.1) rake (13.0.6)
rbtree (0.4.6) rbtree (0.4.6)
redis-client (0.26.3) redis-client (0.14.1)
connection_pool connection_pool
regexp_parser (2.11.3) regexp_parser (2.7.0)
rspec (3.13.2) rexml (3.2.5)
rspec-core (~> 3.13.0) rspec (3.12.0)
rspec-expectations (~> 3.13.0) rspec-core (~> 3.12.0)
rspec-mocks (~> 3.13.0) rspec-expectations (~> 3.12.0)
rspec-core (3.13.6) rspec-mocks (~> 3.12.0)
rspec-support (~> 3.13.0) rspec-core (3.12.1)
rspec-expectations (3.13.5) rspec-support (~> 3.12.0)
rspec-expectations (3.12.2)
diff-lcs (>= 1.2.0, < 2.0) diff-lcs (>= 1.2.0, < 2.0)
rspec-support (~> 3.13.0) rspec-support (~> 3.12.0)
rspec-mocks (3.13.7) rspec-mocks (3.12.5)
diff-lcs (>= 1.2.0, < 2.0) diff-lcs (>= 1.2.0, < 2.0)
rspec-support (~> 3.13.0) rspec-support (~> 3.12.0)
rspec-support (3.13.6) rspec-support (3.12.0)
rubocop (1.82.1) rubocop (1.49.0)
json (~> 2.3) json (~> 2.3)
language_server-protocol (~> 3.17.0.2)
lint_roller (~> 1.1.0)
parallel (~> 1.10) parallel (~> 1.10)
parser (>= 3.3.0.2) parser (>= 3.2.0.0)
rainbow (>= 2.2.2, < 4.0) rainbow (>= 2.2.2, < 4.0)
regexp_parser (>= 2.9.3, < 3.0) regexp_parser (>= 1.8, < 3.0)
rubocop-ast (>= 1.48.0, < 2.0) rexml (>= 3.2.5, < 4.0)
rubocop-ast (>= 1.28.0, < 2.0)
ruby-progressbar (~> 1.7) ruby-progressbar (~> 1.7)
unicode-display_width (>= 2.4.0, < 4.0) unicode-display_width (>= 2.4.0, < 3.0)
rubocop-ast (1.49.0) rubocop-ast (1.28.0)
parser (>= 3.3.7.2) parser (>= 3.2.1.0)
prism (~> 1.7) rubocop-rails (2.18.0)
rubocop-rails (2.34.3)
activesupport (>= 4.2.0) activesupport (>= 4.2.0)
lint_roller (~> 1.1)
rack (>= 1.1) rack (>= 1.1)
rubocop (>= 1.75.0, < 2.0) rubocop (>= 1.33.0, < 2.0)
rubocop-ast (>= 1.44.0, < 2.0)
ruby-progressbar (1.13.0) ruby-progressbar (1.13.0)
securerandom (0.4.1)
set (1.0.3) set (1.0.3)
sidekiq (8.1.0) sidekiq (7.0.7)
connection_pool (>= 3.0.0) concurrent-ruby (< 2)
json (>= 2.16.0) connection_pool (>= 2.3.0)
logger (>= 1.7.0) rack (>= 2.2.4)
rack (>= 3.2.0) redis-client (>= 0.11.0)
redis-client (>= 0.26.0) sidekiq-cron (1.10.0)
sidekiq-cron (2.3.1) fugit (~> 1.8)
cronex (>= 0.13.0)
fugit (~> 1.8, >= 1.11.1)
globalid (>= 1.0.1) globalid (>= 1.0.1)
sidekiq (>= 6.5.0) sidekiq (>= 6)
sorted_set (1.0.3) sorted_set (1.0.3)
rbtree rbtree
set (~> 1.0) set (~> 1.0)
tzinfo (2.0.6) tzinfo (2.0.6)
concurrent-ruby (~> 1.0) concurrent-ruby (~> 1.0)
unicode (0.4.4.5) unicode-display_width (2.4.2)
unicode-display_width (3.2.0)
unicode-emoji (~> 4.1)
unicode-emoji (4.2.0)
uri (1.1.1)
PLATFORMS PLATFORMS
arm64-darwin-25
x86_64-linux x86_64-linux
DEPENDENCIES DEPENDENCIES
activesupport (~> 8.1.2) activesupport (~> 7.0)
outboxable! outboxable!
rake (~> 13.3.1) rake (~> 13.0)
rspec (~> 3.13.2) rspec (~> 3.0)
rubocop-rails (~> 2.34.3) rubocop-rails (~> 2.18)
sidekiq (~> 8.1.0) sidekiq (~> 7.0)
sidekiq-cron (~> 2.3.1) sidekiq-cron (~> 1.10)
BUNDLED WITH BUNDLED WITH
2.4.17 2.4.7

View File

@ -1,19 +1,6 @@
# 🚨 Discontinuation Notice for ActiveRecord 🚨
**Effective Date: August 4, 2024**
Please be aware that we are no longer maintaing the part related to **ActiveRecord** in this gem. We are dropping support for ActiveRecord in favor of [Solid Queue](https://github.com/rails/solid_queue).
In the meantime, we commit to continously support the Mongoid part of the gem.
### New Recommended Gem: `Solid Queue`
For ActiveRecord users, we recommend transitioning to the `Solid Queue` gem, which provides enhanced functionality, improved performance, and better support for modern application requirements. `Solid Queue` is designed to seamlessly integrate with your existing infrastructure while offering robust features to handle your queuing needs efficiently.
# Outboxable # Outboxable
The Outboxable Gem is tailored for Rails applications to implement the transactional outbox pattern. It supports both ActiveRecord and Mongoid. The Outboxable Gem is tailored for Rails applications to implement the transactional outbox pattern. It currently only supports ActiveRecord.
Please take into consideration that this Gem is **opinionated**, meaning it expects you to follow a certain pattern and specific setting. If you don't like it, you can always fork it and change it. Please take into consideration that this Gem is **opinionated**, meaning it expects you to follow a certain pattern and specific setting. If you don't like it, you can always fork it and change it.
@ -38,19 +25,13 @@ If bundler is not being used to manage dependencies, install the gem by executin
$ gem install outboxable $ gem install outboxable
``` ```
For use with ActiveRecord, run: Then run:
```shell ```shell
$ rails g outboxable:install --orm activerecord $ rails g outboxable:install
``` ```
For use with Mongoid, run: The command above will add a migration file and the Outbox model. You will need then to run the migrations:
```shell
$ rails g outboxable:install --orm mongoid
```
The command above will add a migration file and the Outbox model. You will need then to run the migrations (ActiveRecord only):
```shell ```shell
$ rails db:migrate $ rails db:migrate
@ -83,7 +64,7 @@ module Outboxable
end end
Outboxable.configure do |config| Outboxable.configure do |config|
# Specify the ORM you are using. Supported values are :activerecord and :mongoid # Specify the ORM you are using. For now, only ActiveRecord is supported.
config.orm = :activerecord config.orm = :activerecord
# Specify the message broker you are using. For now, only RabbitMQ is supported. # Specify the message broker you are using. For now, only RabbitMQ is supported.
@ -95,7 +76,7 @@ Outboxable.configure do |config|
config.rabbitmq_user = ENV.fetch('RABBITMQ__USERNAME') config.rabbitmq_user = ENV.fetch('RABBITMQ__USERNAME')
config.rabbitmq_password = ENV.fetch('RABBITMQ__PASSWORD') config.rabbitmq_password = ENV.fetch('RABBITMQ__PASSWORD')
config.rabbitmq_vhost = ENV.fetch('RABBITMQ__VHOST') config.rabbitmq_vhost = ENV.fetch('RABBITMQ__VHOST')
config.rabbitmq_exchange_name = ENV.fetch('RABBITMQ__EXCHANGE_NAME') config.rabbitmq_event_bus_exchange = ENV.fetch('EVENTBUS__EXCHANGE_NAME')
end end
``` ```
@ -227,12 +208,6 @@ Last but not least, run sidekiq so that the Outboxable Gem can publish the event
$ bundle exec sidekiq $ bundle exec sidekiq
``` ```
### Mongoid
The Outboxable gem works smoothly with Mongoid. It is to be noted that when used with Mongoid, Outboxable does not use the `_id` as the idempotency key. It creates a field called ``idempotency_key`` which is a UUID generated at the time of the insertion of the document.
## Development ## Development
After checking out the repo, run `bin/setup` to install dependencies. Then, run `rake spec` to run the tests. You can also run `bin/console` for an interactive prompt that will allow you to experiment. After checking out the repo, run `bin/setup` to install dependencies. Then, run `rake spec` to run the tests. You can also run `bin/console` for an interactive prompt that will allow you to experiment.

View File

@ -3,37 +3,24 @@ module Outboxable
include Rails::Generators::Migration include Rails::Generators::Migration
source_root File.expand_path('../../templates', __dir__) source_root File.expand_path('../../templates', __dir__)
class_option :orm, type: :string, default: 'activerecord'
def initialize(*args)
super(*args) # rubocop:disable Style/SuperArguments
@orm = options[:orm] || 'activerecord'
%w[activerecord mongoid].include?(@orm) || raise(ArgumentError, 'Invalid ORM. Only ActiveRecord and Mongoid are supported.')
end
# Copy initializer into user app # Copy initializer into user app
def copy_initializer def copy_initializer
copy_file('activerecord_initializer.rb', 'config/initializers/z_outboxable.rb') if @orm == 'activerecord' copy_file('initializer.rb', 'config/initializers/z_outboxable.rb')
copy_file('mongoid_initializer.rb', 'config/initializers/z_outboxable.rb') if @orm == 'mongoid'
end end
# Copy user information (model & Migrations) into user app # Copy user information (model & Migrations) into user app
def create_user_model def create_user_model
target_path = 'app/models/outbox.rb' target_path = 'app/models/outbox.rb'
if Rails.root.join(target_path).exist? if Rails.root.join(target_path).exist?
say_status('skipped', 'Model outbox already exists') say_status('skipped', 'Model outbox already exists')
else else
template('activerecrod_outbox.rb', target_path) if @orm == 'activerecord' template('outbox.rb', target_path)
template('mongoid_outbox.rb', target_path) if @orm == 'mongoid'
end end
end end
# Copy migrations # Copy migrations
def copy_migrations def copy_migrations
return if @orm == 'mongoid'
if self.class.migration_exists?('db/migrate', 'create_outboxable_outboxes') if self.class.migration_exists?('db/migrate', 'create_outboxable_outboxes')
say_status('skipped', 'Migration create_outboxable_outboxes already exists') say_status('skipped', 'Migration create_outboxable_outboxes already exists')
else else

View File

@ -3,15 +3,14 @@
require_relative 'outboxable/version' require_relative 'outboxable/version'
require 'outboxable/worker' require 'outboxable/worker'
require 'outboxable/publishing_manager'
require 'outboxable/polling_publisher_worker'
require 'outboxable/connection' require 'outboxable/connection'
require 'outboxable/configuration' require 'outboxable/configuration'
require 'outboxable/rabbitmq/publisher' require 'outboxable/rabbitmq/publisher'
require 'active_support/concern' require 'active_support/concern'
require 'outboxable/publishing_manager'
require 'outboxable/polling_publisher_worker'
module Outboxable module Outboxable
class Error < StandardError; end class Error < StandardError; end
@ -21,12 +20,12 @@ module Outboxable
after_create :instantiate_outbox_for_create, if: proc { |object| object.check_outbox_condition(object:, operation: :create) } after_create :instantiate_outbox_for_create, if: proc { |object| object.check_outbox_condition(object:, operation: :create) }
after_update :instantiate_outbox_for_update, if: proc { |object| object.check_outbox_condition(object:, operation: :update) } after_update :instantiate_outbox_for_update, if: proc { |object| object.check_outbox_condition(object:, operation: :update) }
has_many :outboxes, as: :outboxable, dependent: :destroy has_many :outboxes, as: :outboxable, autosave: false
def instantiate_outbox(routing_key:) def instantiate_outbox(routing_key:)
outboxes.new( outboxes.new(
routing_key:, routing_key:,
exchange: Outboxable.configuration.rabbitmq_exchange_name, exchange: Outboxable.configuration.rabbitmq_event_bus_exchange,
payload: as_json payload: as_json
) )
end end

View File

@ -10,14 +10,14 @@ module Outboxable
class Configuration class Configuration
ALLOWED_MESSAGE_BROKERS = %i[rabbitmq].freeze ALLOWED_MESSAGE_BROKERS = %i[rabbitmq].freeze
ALLOWED_ORMS = %i[activerecord mongoid].freeze ALLOWED_ORMS = %i[activerecord].freeze
attr_accessor :rabbitmq_host, attr_accessor :rabbitmq_host,
:rabbitmq_port, :rabbitmq_port,
:rabbitmq_user, :rabbitmq_user,
:rabbitmq_password, :rabbitmq_password,
:rabbitmq_vhost, :rabbitmq_vhost,
:rabbitmq_exchange_name, :rabbitmq_event_bus_exchange,
:message_broker, :message_broker,
:orm :orm

View File

@ -3,7 +3,6 @@ require 'singleton'
module Outboxable module Outboxable
class Connection class Connection
include ::Singleton include ::Singleton
attr_reader :connection attr_reader :connection
def initialize def initialize

View File

@ -1,27 +1,16 @@
module Outboxable module Outboxable
class PollingPublisherWorker class PollingPublisherWorker
include Sidekiq::Job include Sidekiq::Job
sidekiq_options queue: 'critical'
def perform def perform
Outboxable.configuration.orm == :mongoid ? perform_mongoid : perform_activerecord
end
def perform_activerecord
Outbox.pending.where(last_attempted_at: [..Time.zone.now, nil]).find_in_batches(batch_size: 100).each do |batch| Outbox.pending.where(last_attempted_at: [..Time.zone.now, nil]).find_in_batches(batch_size: 100).each do |batch|
batch.each do |outbox| batch.each do |outbox|
# This is to prevent a job from being retried too many times. Worst-case scenario is 1 minute delay in jobs. # This is to prevent a job from being retried too many times. Worst-case scenario is 1 minute delay in jobs.
::Outboxable::Worker.perform_async(outbox.id) outbox.update(last_attempted_at: 1.minute.from_now)
outbox.update(last_attempted_at: 1.minute.from_now, status: :processing, allow_publish: false) Outboxable::Worker.perform_async(outbox.id)
end end
end end
end end
def perform_mongoid
Outbox.pending.any_of({ last_attempted_at: ..Time.zone.now }, { last_attempted_at: nil }).each do |outbox|
# This is to prevent a job from being retried too many times. Worst-case scenario is 1 minute delay in jobs.
::Outboxable::Worker.perform_async(outbox.idempotency_key)
outbox.update(last_attempted_at: 1.minute.from_now, status: :processing, allow_publish: false)
end
end
end end
end end

View File

@ -20,12 +20,7 @@ module Outboxable
exchange = channel.topic(@resource.exchange, durable: true) exchange = channel.topic(@resource.exchange, durable: true)
# Publish the CloudEvent resource to the exchange # Publish the CloudEvent resource to the exchange
exchange.publish( exchange.publish(to_envelope(resource: @resource), routing_key: @resource.routing_key, headers: @resource.try(:headers) || {})
to_envelope(resource: @resource),
routing_key: @resource.routing_key,
headers: @resource.try(:headers) || {},
content_type: @resource.try(:content_type) || 'application/json'
)
# Wait for confirmation # Wait for confirmation
confirmed = channel.wait_for_confirms confirmed = channel.wait_for_confirms
@ -34,6 +29,7 @@ module Outboxable
return unless confirmed return unless confirmed
@resource.reload @resource.reload
@resource.increment_attempt
@resource.update(status: :published, retry_at: nil) @resource.update(status: :published, retry_at: nil)
end end
end end

View File

@ -1,5 +1,5 @@
# frozen_string_literal: true # frozen_string_literal: true
module Outboxable module Outboxable
VERSION = '1.0.6' VERSION = '0.1.3'
end end

View File

@ -5,8 +5,7 @@ module Outboxable
include ::Sidekiq::Job include ::Sidekiq::Job
def perform(outbox_id) def perform(outbox_id)
Outboxable::PublishingManager.publish(resource: Outbox.find(outbox_id)) if Outboxable.configuration.orm == :activerecord Outboxable::PublishingManager.publish(resource: Outbox.find(outbox_id))
Outboxable::PublishingManager.publish(resource: Outbox.find_by!(idempotency_key: outbox_id)) if Outboxable.configuration.orm == :mongoid
end end
end end
end end

View File

@ -7,7 +7,6 @@ class CreateOutboxableOutboxes < ActiveRecord::Migration[7.0]
t.string :exchange, null: false, default: '' t.string :exchange, null: false, default: ''
t.string :routing_key, null: false, default: '' t.string :routing_key, null: false, default: ''
t.string :content_type, null: false, default: 'application/json'
t.integer :attempts, null: false, default: 0 t.integer :attempts, null: false, default: 0
t.datetime :last_attempted_at, null: true t.datetime :last_attempted_at, null: true
@ -22,7 +21,5 @@ class CreateOutboxableOutboxes < ActiveRecord::Migration[7.0]
t.timestamps t.timestamps
end end
add_index :outboxes, %i[status last_attempted_at], name: 'index_outboxes_on_outboxable_status_and_last_attempted_at'
end end
end end

View File

@ -23,7 +23,7 @@ Outboxable.configure do |config|
# Specify the ORM you are using. For now, only ActiveRecord is supported. # Specify the ORM you are using. For now, only ActiveRecord is supported.
config.orm = :activerecord config.orm = :activerecord
# Specify the ORM you are using. Supported values are :activerecord and :mongoid # Specify the message broker you are using. For now, only RabbitMQ is supported.
config.message_broker = :rabbitmq config.message_broker = :rabbitmq
# RabbitMQ configurations # RabbitMQ configurations
@ -32,5 +32,5 @@ Outboxable.configure do |config|
config.rabbitmq_user = ENV.fetch('RABBITMQ__USERNAME') config.rabbitmq_user = ENV.fetch('RABBITMQ__USERNAME')
config.rabbitmq_password = ENV.fetch('RABBITMQ__PASSWORD') config.rabbitmq_password = ENV.fetch('RABBITMQ__PASSWORD')
config.rabbitmq_vhost = ENV.fetch('RABBITMQ__VHOST') config.rabbitmq_vhost = ENV.fetch('RABBITMQ__VHOST')
config.rabbitmq_exchange_name = ENV.fetch('RABBITMQ__EXCHANGE_NAME') config.rabbitmq_event_bus_exchange = ENV.fetch('EVENTBUS__EXCHANGE_NAME')
end end

View File

@ -1,36 +0,0 @@
# This monkey patch allows you to customize the message format that you publish to your broker.
# By default, Outboxable publishes a CloudEvent message to your broker.
module Outboxable
module RabbitMq
class Publisher
# Override this method to customize the message format that you publish to your broker
# DO NOT CHANGE THE METHOD SIGNATURE
def to_envelope(resource:)
{
id: resource.id,
source: 'http://localhost:3000',
specversion: '1.0',
type: resource.routing_key,
datacontenttype: 'application/json',
data: resource.payload
}.to_json
end
end
end
end
Outboxable.configure do |config|
# Specify the ORM you are using. For now, only ActiveRecord is supported.
config.orm = :mongoid
# Specify the ORM you are using. Supported values are :activerecord and :mongoid
config.message_broker = :rabbitmq
# RabbitMQ configurations
config.rabbitmq_host = ENV.fetch('RABBITMQ__HOST')
config.rabbitmq_port = ENV.fetch('RABBITMQ__PORT', 5672)
config.rabbitmq_user = ENV.fetch('RABBITMQ__USERNAME')
config.rabbitmq_password = ENV.fetch('RABBITMQ__PASSWORD')
config.rabbitmq_vhost = ENV.fetch('RABBITMQ__VHOST')
config.rabbitmq_exchange_name = ENV.fetch('RABBITMQ__EXCHANGE_NAME')
end

View File

@ -1,72 +0,0 @@
class Outbox
include Mongoid::Document
include Mongoid::Timestamps
attr_writer :allow_publish
# Fields
field :status, type: String, default: 'pending'
field :size, type: String, default: 'single'
field :exchange, type: String, default: ''
field :routing_key, type: String, default: ''
field :content_type, type: String, default: 'application/json'
field :attempts, type: Integer, default: 0
field :last_attempted_at, type: DateTime, default: nil
field :retry_at, type: DateTime, default: nil
field :idempotency_key, type: String
field :payload, type: Hash, default: {}
field :headers, type: Hash, default: {}
index({ idempotency_key: 1 }, { unique: true, name: 'idempotency_key_unique_index' })
before_save :check_publishing
before_create :set_idempotency_key
# Callbacks
before_create :set_last_attempted_at
after_save :publish, if: :allow_publish
# Validations
validates :payload, :exchange, :routing_key, presence: true
# Associations
belongs_to :outboxable, polymorphic: true, optional: true
def set_last_attempted_at
self.last_attempted_at = 10.seconds.from_now
end
def publish
Outboxable::Worker.perform_async(idempotency_key)
update(status: :processing, last_attempted_at: 1.minute.from_now, allow_publish: false)
end
def set_idempotency_key
self.idempotency_key = SecureRandom.uuid if idempotency_key.blank?
end
def check_publishing
self.allow_publish = false if published?
end
def allow_publish
return true if @allow_publish.nil?
@allow_publish
end
%w[pending processing published failed].each do |status|
define_method "#{status}?" do
self.status == status
end
# define scope
scope status, -> { where(status:) }
end
end

View File

@ -1,31 +1,28 @@
class Outbox < ApplicationRecord class Outbox < ApplicationRecord
attribute :allow_publish, :boolean, default: true attribute :allow_publish, :boolean, default: true
before_save :check_publishing
# Callbacks # Callbacks
before_create :set_last_attempted_at before_create :set_last_attempted_at
before_save :check_publishing
after_commit :publish, if: :allow_publish? after_commit :publish, if: :allow_publish?
# Enums # Enums
enum status: { pending: 0, processing: 1, published: 2, failed: 3 } enum status: { pending: 0, published: 1, failed: 2 }
enum size: { single: 0, batch: 1 } enum size: { single: 0, batch: 1 }
# Validations # Validations
validates :payload, :exchange, :routing_key, presence: true validates :payload, presence: true
validates :exchange, presence: true
validates :routing_key, presence: true
# Associations # Associations
belongs_to :outboxable, polymorphic: true, optional: true belongs_to :outboxable, polymorphic: true, optional: true
def set_last_attempted_at def set_last_attempted_at
self.last_attempted_at = 10.seconds.from_now self.last_attempted_at = Time.zone.now
end end
def publish def publish
# Run this in own thread Outboxable::Worker.perform_async(id)
threaded = Thread.new do
Outboxable::Worker.perform_inline(id)
update(status: :processing, last_attempted_at: 1.minute.from_now, allow_publish: false)
end
threaded.join
end end
def check_publishing def check_publishing

View File

@ -31,8 +31,7 @@ Gem::Specification.new do |spec|
spec.executables = spec.files.grep(%r{\Aexe/}) { |f| File.basename(f) } spec.executables = spec.files.grep(%r{\Aexe/}) { |f| File.basename(f) }
spec.require_paths = ['lib'] spec.require_paths = ['lib']
spec.add_dependency 'bunny', '>= 2.22' spec.add_dependency 'bunny', '>= 2.19.0'
spec.add_dependency 'connection_pool', '>= 2.4' spec.add_dependency 'connection_pool', '~> 2.3.0'
spec.metadata['rubygems_mfa_required'] = 'true' spec.metadata['rubygems_mfa_required'] = 'true'
end end