mirror of
https://github.com/ditkrg/idempotent-request.git
synced 2026-01-22 22:06:44 +00:00
This commit is intended to fix an issue, when concurrent requests sent to an endpoint won't be protected by idempotency until the 1st request is finished.
This works adding a "lock" key in redis and returning 429 http error when concurrent requests are sent.
This commit is contained in:
parent
e43ee51503
commit
17a2fed1f6
@ -14,12 +14,30 @@ module IdempotentRequest
|
|||||||
def process(env)
|
def process(env)
|
||||||
set_request(env)
|
set_request(env)
|
||||||
return app.call(request.env) unless process?
|
return app.call(request.env) unless process?
|
||||||
storage = RequestManager.new(request, config)
|
read_idempotent_request ||
|
||||||
storage.read || storage.write(*app.call(request.env))
|
write_idempotent_request ||
|
||||||
|
concurrent_request_response
|
||||||
end
|
end
|
||||||
|
|
||||||
private
|
private
|
||||||
|
|
||||||
|
def storage
|
||||||
|
@storage ||= RequestManager.new(request, config)
|
||||||
|
end
|
||||||
|
|
||||||
|
def read_idempotent_request
|
||||||
|
storage.read
|
||||||
|
end
|
||||||
|
|
||||||
|
def write_idempotent_request
|
||||||
|
return unless storage.lock
|
||||||
|
storage.write(*app.call(request.env))
|
||||||
|
end
|
||||||
|
|
||||||
|
def concurrent_request_response
|
||||||
|
[429, {}, []]
|
||||||
|
end
|
||||||
|
|
||||||
attr_reader :app, :env, :config, :request, :policy
|
attr_reader :app, :env, :config, :request, :policy
|
||||||
|
|
||||||
def process?
|
def process?
|
||||||
|
|||||||
@ -8,14 +8,24 @@ module IdempotentRequest
|
|||||||
@expire_time = config[:expire_time]
|
@expire_time = config[:expire_time]
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def lock(key)
|
||||||
|
setnx_with_expiration(lock_key(key), true)
|
||||||
|
end
|
||||||
|
|
||||||
def read(key)
|
def read(key)
|
||||||
redis.get(namespaced_key(key))
|
redis.get(namespaced_key(key))
|
||||||
end
|
end
|
||||||
|
|
||||||
def write(key, payload)
|
def write(key, payload)
|
||||||
|
setnx_with_expiration(namespaced_key(key), payload)
|
||||||
|
end
|
||||||
|
|
||||||
|
private
|
||||||
|
|
||||||
|
def setnx_with_expiration(key, data)
|
||||||
redis.set(
|
redis.set(
|
||||||
namespaced_key(key),
|
key,
|
||||||
payload,
|
data,
|
||||||
{}.tap do |options|
|
{}.tap do |options|
|
||||||
options[:nx] = true
|
options[:nx] = true
|
||||||
options[:ex] = expire_time.to_i if expire_time.to_i > 0
|
options[:ex] = expire_time.to_i if expire_time.to_i > 0
|
||||||
@ -23,10 +33,12 @@ module IdempotentRequest
|
|||||||
)
|
)
|
||||||
end
|
end
|
||||||
|
|
||||||
private
|
def lock_key(key)
|
||||||
|
namespaced_key("lock:#{key}")
|
||||||
|
end
|
||||||
|
|
||||||
def namespaced_key(idempotency_key)
|
def namespaced_key(key)
|
||||||
[namespace, idempotency_key.strip]
|
[namespace, key.strip]
|
||||||
.compact
|
.compact
|
||||||
.join(':')
|
.join(':')
|
||||||
.downcase
|
.downcase
|
||||||
|
|||||||
@ -8,6 +8,10 @@ module IdempotentRequest
|
|||||||
@callback = config[:callback]
|
@callback = config[:callback]
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def lock
|
||||||
|
storage.lock(key)
|
||||||
|
end
|
||||||
|
|
||||||
def read
|
def read
|
||||||
status, headers, response = parse_data(storage.read(key)).values
|
status, headers, response = parse_data(storage.read(key)).values
|
||||||
|
|
||||||
|
|||||||
@ -42,6 +42,30 @@ RSpec.describe IdempotentRequest::Middleware do
|
|||||||
middleware.call(env)
|
middleware.call(env)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
context 'when concurrent requests' do
|
||||||
|
before do
|
||||||
|
allow_any_instance_of(IdempotentRequest::RequestManager).to receive(:lock).and_return(false)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'should not return data from storage' do
|
||||||
|
expect_any_instance_of(IdempotentRequest::RequestManager).to receive(:read).and_return(nil)
|
||||||
|
|
||||||
|
middleware.call(env)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'should not obtain lock' do
|
||||||
|
expect_any_instance_of(IdempotentRequest::RequestManager).not_to receive(:write)
|
||||||
|
|
||||||
|
middleware.call(env)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'returns 429' do
|
||||||
|
expect_any_instance_of(described_class).to receive(:concurrent_request_response)
|
||||||
|
|
||||||
|
middleware.call(env)
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
context 'when should not be idempotent' do
|
context 'when should not be idempotent' do
|
||||||
|
|||||||
@ -13,6 +13,16 @@ RSpec.describe IdempotentRequest::RedisStorage do
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
describe '#lock' do
|
||||||
|
let(:key) { 'key' }
|
||||||
|
let(:lock_key) { "#{namespace}:lock:#{key}" }
|
||||||
|
|
||||||
|
it 'should add lock' do
|
||||||
|
expect(redis).to receive(:set).with(lock_key, true, nx: true, ex: expire_time)
|
||||||
|
redis_storage.lock(key)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
describe '#write' do
|
describe '#write' do
|
||||||
let(:key) { 'key' }
|
let(:key) { 'key' }
|
||||||
let(:payload) { {} }
|
let(:payload) { {} }
|
||||||
|
|||||||
@ -13,6 +13,29 @@ RSpec.describe IdempotentRequest::RequestManager do
|
|||||||
memory_storage.clear
|
memory_storage.clear
|
||||||
end
|
end
|
||||||
|
|
||||||
|
describe '#lock' do
|
||||||
|
it 'delegates to storage service' do
|
||||||
|
expect(memory_storage).to receive(:lock).with(request.key)
|
||||||
|
request_storage.lock
|
||||||
|
end
|
||||||
|
|
||||||
|
context 'for the first lock' do
|
||||||
|
it 'returns true' do
|
||||||
|
expect(request_storage.lock).to be_truthy
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
context 'for the second lock' do
|
||||||
|
before do
|
||||||
|
request_storage.lock
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'returns false' do
|
||||||
|
expect(request_storage.lock).to be_falsey
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
describe '#read' do
|
describe '#read' do
|
||||||
context 'when there is no data' do
|
context 'when there is no data' do
|
||||||
it 'should return nil' do
|
it 'should return nil' do
|
||||||
|
|||||||
@ -4,6 +4,12 @@ module IdempotentRequest
|
|||||||
@memory = {}
|
@memory = {}
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def lock(key)
|
||||||
|
namespaced_key = lock_key(key)
|
||||||
|
return false if @memory.key?(namespaced_key)
|
||||||
|
@memory[namespaced_key] = true
|
||||||
|
end
|
||||||
|
|
||||||
def read(key)
|
def read(key)
|
||||||
@memory[key]
|
@memory[key]
|
||||||
end
|
end
|
||||||
@ -15,5 +21,11 @@ module IdempotentRequest
|
|||||||
def clear
|
def clear
|
||||||
@memory = {}
|
@memory = {}
|
||||||
end
|
end
|
||||||
|
|
||||||
|
private
|
||||||
|
|
||||||
|
def lock_key(key)
|
||||||
|
"lock:#{key}"
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user