Last active
March 6, 2022 06:09
-
-
Save sclinede/69afbd78fde9cc7d2dc3afd39ce3002d to your computer and use it in GitHub Desktop.
Example of Full Saga implementation in Ruby (with Usage example, also)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Saga | |
class << self | |
def with_redis; raise NotImplementedError; end | |
attr_accessor :cleanup_delay, :queue, :last_txid | |
def queue | |
Thread.current[:saga_queue] ||= [] | |
end | |
def last_txid | |
Thread.current[:last_txid] | |
end | |
def last_txid=(value) | |
Thread.current[:last_txid] = value | |
end | |
def chain(subject_klass, args:) | |
txid = Array(args).unshift(subject_klass).join(":") | |
self.queue << [new(txid, subject_klass, parent: last_txid), args] | |
self.last_txid = txid | |
self | |
end | |
def clean | |
self.last_txid = nil | |
saga, args = queue.pop | |
return self unless saga | |
saga.clean(*args, &method(:clean)) | |
end | |
def call | |
self.last_txid = nil | |
saga, args = queue.shift | |
return self unless saga | |
saga.call(*args, &method(:call)) | |
end | |
end | |
TRANSACTION_MARKER = "(reserved)".freeze | |
def initialize(txid, subject_klass, parent: nil) | |
@txid = txid | |
@parent_txid = parent | |
@subject = subject_klass # a class of 2 methods: #call, #undo | |
end | |
def call(*args) | |
with_lock do |redis| | |
begin | |
@subject.call(*args) | |
yield if block_given? | |
rescue | |
@subject.undo(*args) | |
finish_transaction(redis) | |
raise | |
end | |
end | |
end | |
def clean(*args) | |
self.class.with_redis do |redis| | |
case (children = redis.smembers(key(@txid))).size.to_i | |
when 0 | |
# nothing to do | |
when 1 | |
@subject.undo(*args) | |
finish_transaction(redis) | |
else | |
children.each do |child_tx| | |
next if child_tx.eql?(TRANSACTION_MARKER) | |
enqueue_cleanup(*args, txid: child_tx) | |
end | |
end | |
end | |
yield if block_given? | |
end | |
private | |
def key(txid) | |
"saga:#{txid}:lock" | |
end | |
def with_lock(*args) | |
self.class.with_redis do |redis| | |
return false if transaction_in_progress?(redis) | |
begin_transaction(redis) | |
enqueue_cleanup(*args) | |
yield(redis) | |
finish_transaction(redis) | |
end | |
true | |
end | |
def transaction_in_progress?(redis) | |
redis.smembers(key(@txid)).size.positive? | |
end | |
def begin_transaction(redis) | |
redis.multi do |batched_redis| | |
batched_redis.sadd(key(@parent_txid), key(@txid)) if @parent_txid | |
batched_redis.sadd(key(@txid), TRANSACTION_MARKER) | |
end | |
end | |
def finish_transaction(redis) | |
redis.multi do |batched_redis| | |
batched_redis.srem(key(@txid), TRANSACTION_MARKER) | |
batched_redis.srem(key(@parent_txid), key(@txid)) if @parent_txid | |
end | |
end | |
def enqueue_cleanup(*args, txid: @txid) | |
Saga::CleanWorker.perform_in( | |
cleanup_delay, | |
txid, | |
@subject.to_s, | |
*args | |
) | |
end | |
def cleanup_delay | |
self.class.cleanup_delay || 3600 | |
end | |
require 'sidekiq' | |
class CleanWorker | |
include Sidekiq::Worker | |
def perform(txid, subject_klass, *args) | |
Saga.new(txid, subject_klass.constantize).clean(*args) | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require_relative "saga.rb" | |
User = Struct.new(:id, :name, :email, :external_id) | |
Parcel = Struct.new(:id, :address, :user_id, :external_id) | |
DB = { | |
users: { | |
"1" => {id: "1", name: "John Doe", email: "[email protected]", external_id: nil}, | |
"2" => {id: "2", name: "Jane Doe", email: "[email protected]", external_id: nil}, | |
}, | |
parcels: { | |
"4" => {id: "4", address: "Lenina, 2", user_id: "1", external_id: nil}, | |
"5" => {id: "5", address: "Kuybisheva, 4", user_id: "2", external_id: nil}, | |
} | |
} | |
class APIClient | |
def create_user(name, email) | |
{user_id: 123} | |
end | |
def delete_user(email) | |
{user_id: 123} | |
end | |
def create_order(user_id, address, reference_id) | |
raise "Sorry connection bad" if ENV["CONN_ERR"] | |
{order_id: 345} | |
end | |
def delete_order(reference_id) | |
{order_id: 345} | |
end | |
end | |
class RegisterUser | |
attr_reader :client, :user | |
def initialize(user_id) | |
@user = DB[:users][user_id.to_s].to_h | |
@client = APIClient.new | |
end | |
def self.call(user_id); new(user_id).create_user!; end | |
def self.undo(user_id); new(user_id).delete_user!; end | |
def create_user! | |
response = client.create_user(*@user.values_at(:name, :email)) | |
user[:external_id] = response.fetch(:user_id) | |
DB[:users][user[:id]] = user | |
end | |
def delete_user! | |
client.delete_user(@user.fetch(:email)) | |
@user.delete(:external_id) | |
end | |
end | |
class CreateOrder | |
attr_reader :client, :parcel | |
def initialize(parcel_id) | |
@parcel = DB[:parcels][parcel_id] | |
@client = APIClient.new | |
end | |
def self.call(parcel_id); new(parcel_id).create_order!; end | |
def self.undo(parcel_id); new(parcel_id).delete_order!; end | |
def reference_id | |
"internal_parcel_id_#{parcel.fetch(:id)}" | |
end | |
def create_order! | |
response = client.create_order( | |
@user_external_id, parcel[:address], reference_id | |
) | |
parcel[:external_id] = response.fetch(:order_id) | |
DB[:parcels][parcel[:id]] = parcel | |
end | |
def delete_order! | |
client.delete_order(reference_id) | |
parcel.delete(:external_id) | |
end | |
end | |
class Saga | |
require 'redis' | |
def self.redis_connection | |
Thread.current[:redis] ||= Redis.new(redis_url: "redis://localhost/12") | |
end | |
def self.with_redis | |
yield(redis_connection) | |
end | |
end | |
john_id = "1" | |
john_parcel_id = "4" | |
Saga.chain(RegisterUser, args: john_id) | |
.chain(CreateOrder, args: john_parcel_id) | |
.clean | |
Saga.chain(RegisterUser, args: john_id) | |
.chain(CreateOrder, args: john_parcel_id) | |
.call |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment