- use ActiveSupport::Notifications to instrument events

- fix an issue when getting an exception inside application would not delete lock, so client could receive 429 after 500
This commit is contained in:
Dmytro Zakharov 2018-12-13 15:29:10 +01:00
parent b830893261
commit 4488a19f28
9 changed files with 55 additions and 27 deletions

View File

@ -27,5 +27,5 @@ Gem::Specification.new do |spec|
spec.add_development_dependency 'rake', '~> 10.0' spec.add_development_dependency 'rake', '~> 10.0'
spec.add_development_dependency 'rspec', '~> 3.0' spec.add_development_dependency 'rspec', '~> 3.0'
spec.add_development_dependency 'fakeredis', '~> 0.6' spec.add_development_dependency 'fakeredis', '~> 0.6'
spec.add_development_dependency 'pry', '~> 0.11' spec.add_development_dependency 'byebug', '~> 10.0'
end end

View File

@ -4,6 +4,7 @@ module IdempotentRequest
@app = app @app = app
@config = config @config = config
@policy = config.fetch(:policy) @policy = config.fetch(:policy)
@notifier = ActiveSupport::Notifications if defined?(ActiveSupport::Notifications)
end end
def call(env) def call(env)
@ -13,10 +14,12 @@ module IdempotentRequest
def process(env) def process(env)
set_request(env) set_request(env)
request.env['idempotent.request'] = {}
return app.call(request.env) unless process? return app.call(request.env) unless process?
read_idempotent_request || request.env['idempotent.request']['key'] = request.key
write_idempotent_request || response = read_idempotent_request || write_idempotent_request || concurrent_request_response
concurrent_request_response instrument(request)
response
end end
private private
@ -26,19 +29,30 @@ module IdempotentRequest
end end
def read_idempotent_request def read_idempotent_request
storage.read request.env['idempotent.request']['read'] = storage.read
end end
def write_idempotent_request def write_idempotent_request
return unless storage.lock return unless storage.lock
storage.write(*app.call(request.env)) begin
result = app.call(request.env)
request.env['idempotent.request']['write'] = result
storage.write(*result)
ensure
request.env['idempotent.request']['unlocked'] = storage.unlock
result
end
end end
def concurrent_request_response def concurrent_request_response
[429, {}, []] status = 429
headers = { 'Content-Type' => 'application/json' }
body = [ Oj.dump('error' => 'Concurrent requests detected') ]
request.env['idempotent.request']['concurrent_request_response'] = true
Rack::Response.new(body, status, headers).finish
end end
attr_reader :app, :env, :config, :request, :policy attr_reader :app, :env, :config, :request, :policy, :notifier
def process? def process?
!request.key.to_s.empty? && should_be_idempotent? !request.key.to_s.empty? && should_be_idempotent?
@ -49,6 +63,10 @@ module IdempotentRequest
policy.new(request).should? policy.new(request).should?
end end
def instrument(request)
notifier.instrument('idempotent.request', request: request) if notifier
end
def set_request(env) def set_request(env)
@env = env @env = env
@request ||= Request.new(env, config) @request ||= Request.new(env, config)

View File

@ -9,7 +9,7 @@ module IdempotentRequest
end end
def lock(key) def lock(key)
setnx_with_expiration(lock_key(key), true) setnx_with_expiration(lock_key(key), Time.now.to_f)
end end
def unlock(key) def unlock(key)

View File

@ -22,10 +22,9 @@ module IdempotentRequest
private private
def header_name def header_name
key = @header_name key = @header_name.to_s
.to_s
.upcase .upcase
.gsub('-', '_') .tr('-', '_')
key.start_with?('HTTP_') ? key : "HTTP_#{key}" key.start_with?('HTTP_') ? key : "HTTP_#{key}"
end end

View File

@ -30,8 +30,6 @@ module IdempotentRequest
if (200..226).cover?(status) if (200..226).cover?(status)
storage.write(key, payload(status, headers, response)) storage.write(key, payload(status, headers, response))
else
unlock
end end
data data
@ -46,11 +44,9 @@ module IdempotentRequest
end end
def payload(status, headers, response) def payload(status, headers, response)
Oj.dump({ Oj.dump(status: status,
status: status,
headers: headers.to_h, headers: headers.to_h,
response: Array(response) response: Array(response))
})
end end
def run_callback(action, args) def run_callback(action, args)

View File

@ -29,6 +29,26 @@ RSpec.describe IdempotentRequest::Middleware do
middleware.call(env) middleware.call(env)
end end
it 'should obtain lock and release lock' do
expect_any_instance_of(IdempotentRequest::RequestManager).to receive(:lock).and_return(true)
expect_any_instance_of(IdempotentRequest::RequestManager).to receive(:write)
expect_any_instance_of(IdempotentRequest::RequestManager).to receive(:unlock)
middleware.call(env)
end
context 'when an exception happens inside another middleware' do
let(:app) { ->(_) { raise 'fatality' } }
it 'should release lock' do
expect_any_instance_of(IdempotentRequest::RequestManager).to receive(:lock).and_return(true)
expect_any_instance_of(IdempotentRequest::RequestManager).not_to receive(:write)
expect_any_instance_of(IdempotentRequest::RequestManager).to receive(:unlock)
expect { middleware.call(env) }.to raise_error('fatality')
end
end
context 'when has data in storage' do context 'when has data in storage' do
before do before do
data = [200, {}, 'body'] data = [200, {}, 'body']

View File

@ -18,7 +18,7 @@ RSpec.describe IdempotentRequest::RedisStorage do
let(:lock_key) { "#{namespace}:lock:#{key}" } let(:lock_key) { "#{namespace}:lock:#{key}" }
it 'should add lock' do it 'should add lock' do
expect(redis).to receive(:set).with(lock_key, true, nx: true, ex: expire_time) expect(redis).to receive(:set).with(lock_key, Float, nx: true, ex: expire_time)
redis_storage.lock(key) redis_storage.lock(key)
end end
end end

View File

@ -144,11 +144,6 @@ RSpec.describe IdempotentRequest::RequestManager do
request_storage.write(*data) request_storage.write(*data)
expect(memory_storage.read(request.key)).to be_nil expect(memory_storage.read(request.key)).to be_nil
end end
it 'should unlock stored key' do
expect(memory_storage).to receive(:unlock).with(request.key)
request_storage.write(*data)
end
end end
end end

View File

@ -1,6 +1,6 @@
require "bundler/setup" require "bundler/setup"
require 'fakeredis' require 'fakeredis'
require 'pry' require 'byebug'
require "idempotent-request" require "idempotent-request"
spec = File.expand_path('../', __FILE__) spec = File.expand_path('../', __FILE__)