Bug fixes and improvements

This commit is contained in:
Brusk Awat 2023-04-14 02:38:32 +03:00
parent ee8dae21d5
commit bab6309502
Signed by: broosk1993
GPG Key ID: 5D20F7E02649F74E
9 changed files with 31 additions and 26 deletions

View File

@ -4,7 +4,6 @@ PATH
outboxable (1.0.1)
bunny (>= 2.19.0)
connection_pool (~> 2.3.0)
simple_enum (>= 2.3)
GEM
remote: https://rubygems.org/
@ -85,8 +84,6 @@ GEM
fugit (~> 1.8)
globalid (>= 1.0.1)
sidekiq (>= 6)
simple_enum (2.3.2)
activesupport (>= 4.0.0)
sorted_set (1.0.3)
rbtree
set (~> 1.0)

View File

@ -5,8 +5,8 @@ module Outboxable
source_root File.expand_path('../../templates', __dir__)
class_option :orm, type: :string, default: 'activerecord'
def initialize(**options)
super(options)
def initialize(*)
super(*)
@orm = options[:orm] || 'activerecord'
%w[activerecord mongoid].include?(@orm) || raise(ArgumentError, 'Invalid ORM. Only ActiveRecord and Mongoid are supported.')

View File

@ -6,6 +6,12 @@ module Outboxable
def self.configure
self.configuration ||= Configuration.new
yield(configuration)
# In accordance to sidekiq-cron README: https://github.com/sidekiq-cron/sidekiq-cron#under-the-hood
Sidekiq::Options[:cron_poll_interval] = 5
# Create the cron job for the polling publisher
Sidekiq::Cron::Job.create(name: 'OutboxablePollingPublisher', cron: '*/5 * * * * *', class: 'Outboxable::PollingPublisherWorker', args: [{ orm: configuration.orm }])
end
class Configuration
@ -26,12 +32,6 @@ module Outboxable
raise Error, 'Outboxable Gem only supports Rails but you application does not seem to be a Rails app' unless Object.const_defined?('Rails')
raise Error, 'Outboxable Gem only support Rails version 7 and newer' if Rails::VERSION::MAJOR < 7
raise Error, 'Outboxable Gem uses the sidekiq-cron Gem. Make sure you add it to your project' unless Object.const_defined?('Sidekiq::Cron')
# In accordance to sidekiq-cron README: https://github.com/sidekiq-cron/sidekiq-cron#under-the-hood
Sidekiq::Options[:cron_poll_interval] = 5
# Create the cron job for the polling publisher
Sidekiq::Cron::Job.create(name: 'OutboxablePollingPublisher', cron: '*/5 * * * * *', class: 'Outboxable::PollingPublisherWorker')
end
def message_broker=(message_broker)

View File

@ -3,24 +3,25 @@ module Outboxable
include Sidekiq::Job
sidekiq_options queue: 'critical'
def perform
Outboxable.configuration.orm == :mongoid ? perform_mongoid : perform_activerecord
def perform(args)
orm = args['orm']
orm == 'mongoid' ? perform_mongoid(orm) : perform_activerecord(orm)
end
def perform_activerecord
def perform_activerecord(orm)
Outbox.pending.where(last_attempted_at: [..Time.zone.now, nil]).find_in_batches(batch_size: 100).each do |batch|
batch.each do |outbox|
# This is to prevent a job from being retried too many times. Worst-case scenario is 1 minute delay in jobs.
Outboxable::Worker.perform_async(outbox.id)
Outboxable::Worker.perform_async(outbox.id, orm)
outbox.update(last_attempted_at: 1.minute.from_now, status: :processing, allow_publish: false)
end
end
end
def perform_mongoid
def perform_mongoid(orm)
Outbox.pending.where(last_attempted_at: [..Time.zone.now, nil]).each do |outbox|
# This is to prevent a job from being retried too many times. Worst-case scenario is 1 minute delay in jobs.
Outboxable::Worker.perform_async(outbox.id)
Outboxable::Worker.perform_async(outbox.idempotency_key, orm)
outbox.update(last_attempted_at: 1.minute.from_now, status: :processing, allow_publish: false)
end
end

View File

@ -4,8 +4,9 @@ module Outboxable
class Worker
include ::Sidekiq::Job
def perform(outbox_id)
Outboxable::PublishingManager.publish(resource: Outbox.find(outbox_id))
def perform(outbox_id, orm)
Outboxable::PublishingManager.publish(resource: Outbox.find(outbox_id)) if orm == 'activerecord'
Outboxable::PublishingManager.publish(resource: Outbox.find_by!(idempotency_key: outbox_id)) if orm == 'mongoid'
end
end
end

View File

@ -4,7 +4,7 @@ class Outbox < ApplicationRecord
before_save :check_publishing
# Callbacks
before_create :set_last_attempted_at
after_commit :publish, if: :allow_publish?
after_save :publish, if: :allow_publish
# Enums
enum status: { pending: 0, processing: 1, published: 2, failed: 3 }
enum size: { single: 0, batch: 1 }

View File

@ -1,5 +1,3 @@
require 'simple_enum/mongoid'
# This monkey patch allows you to customize the message format that you publish to your broker.
# By default, Outboxable publishes a CloudEvent message to your broker.
module Outboxable

View File

@ -3,7 +3,7 @@ class Outbox
include Mongoid::Timestamps
include SimpleEnum::Mongoid
attr_accessor :allow_publish
attr_writer :allow_publish
# Fields
field :status, type: String, default: 'pending'
@ -18,14 +18,19 @@ class Outbox
field :retry_at, type: DateTime, default: nil
field :idempotency_key, type: String
field :payload, type: Hash, default: {}
field :headers, type: Hash, default: {}
index({ idempotency_key: 1 }, { unique: true, name: 'idempotency_key_unique_index' })
before_save :check_publishing
before_create :set_idempotency_key
# Callbacks
before_create :set_last_attempted_at
after_commit :publish, if: :allow_publish?
after_save :publish, if: :allow_publish
# Enums
as_enum :status, { pending: 0, processing: 1, published: 2, failed: 3 }, pluralize_scopes: false, map: :string
@ -42,10 +47,14 @@ class Outbox
end
def publish
Outboxable::Worker.perform_async(id)
Outboxable::Worker.perform_async(idempotency_key)
update(status: :processing, last_attempted_at: 1.minute.from_now, allow_publish: false)
end
def set_idempotency_key
self.idempotency_key = SecureRandom.uuid if idempotency_key.blank?
end
def check_publishing
self.allow_publish = false if published?
end

View File

@ -33,7 +33,6 @@ Gem::Specification.new do |spec|
spec.add_dependency 'bunny', '>= 2.19.0'
spec.add_dependency 'connection_pool', '~> 2.3.0'
spec.add_dependency 'simple_enum', '>= 2.3'
spec.metadata['rubygems_mfa_required'] = 'true'
end