Skip to content

Instantly share code, notes, and snippets.

@jstorimer
Last active June 4, 2022 00:45

This is a proof-of-concept of a couple of concurrent data structures written in Ruby.

The implementations are heavily commented for those interested. There are benchmarks (with results) included below. The results are interesting, but, as always, take with a grain of salt.

Data structures

AtomicLinkedQueue is a lock-free queue, built on atomic CAS operations. It doesn't use any mutexes.

TwoLockLinkedQueue is a lock-based queue, but with separate locks for the head + tail. This means there can be lots of contention when pushing to the queue, but little when it comes to popping off the queue.

Both of these implementations are unbounded queues, with no blocking operations.

See the individual files below for more about their implementation.

Background

For those unfamiliar with the atomic compare-and-swap operation, here's a brief outline of how it operates.

Create an Atomic instance that holds a value. (This comes from the atomic rubygem).

item = Atomic.new('value')

The workhorse method is #compare_and_set(old, new). You pass it the value that you think it currently holds, and the value you want to set it to.

If it does hold the expected value, it's set to your new value. Otherwise, it returns false. In this implementation, when that happens, the operation is re-started, over and over, until it succeeds.

This compare-and-set operation is hardware-supported and works across Ruby implementations thanks to the 'atomic' gem. The hardware support provides the atomic guarantee. Without this guarantee, it would be possible to read a value, then have another thread update the value before you can, then your thread would overwrite that value. This stuff is complicated.

Due credit

I can't take credit for this implementation. This is an implementation of the pseudo-code from "Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms"[1], with some hints from Java's implementation of java.util.concurrent.ConcurrentLinkedQueue[2].

  1. http://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
  2. http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/6-b14/java/util/concurrent/ConcurrentLinkedQueue.java

Benchmark results

These are the results of running the included benchmarks against Rubinius 2.0.0-rc1 and JRuby 1.7.3. Any results from MRI will be superficial due to the global lock, so I omitted it completely.

All of the benchmarks were run against Queue (a fully synchronized queue from Ruby's std lib), TwoLockLinkedQueue (implementation below using two different locks), and AtomicLinkedQueue (the lock-free implementation).

The benchmark_concurrent_reads+writes.rb benchmark allocates some threads to write to the queue, and others to read. So there's always contention for both pushing and popping.

The benchmark_separate_reads+writes.rb benchmark first focuses on filling up the queue to capacity, then emptying it completely. This focuses all of the contention on writing, then all of it on reading.

Rubinius results

$ rbx benchmark_concurrent_reads+writes.rb
                                                        user     system      total        real
Queue - more writers than readers                   4.178510   4.892144   9.070654 (  6.677201)
Queue - more readers than writers                   5.427958   4.914869  10.342827 (  7.545760)
Queue - equal writers and readers                   5.313148   6.802285  12.115433 (  8.720809)
                                                        user     system      total        real
TwoLockLinkedQueue - more writers than readers      5.151256   7.610410  12.761666 (  6.458769)
TwoLockLinkedQueue - more readers than writers      5.395152   8.326897  13.722049 (  6.568123)
TwoLockLinkedQueue - equal writers and readers      6.641767  10.623600  17.265367 (  7.297145)
                                                        user     system      total        real
AtomicLinkedQueue - more writers than readers       2.897964   0.061956   2.959920 (  0.717638)
AtomicLinkedQueue - more readers than writers       2.814892   0.050590   2.865482 (  0.596547)
AtomicLinkedQueue - equal writers and readers       4.175097   0.086688   4.261785 (  0.891113)
                                                        user     system      total        real

$ rbx benchmark_separate_reads+writes.rb
                                                   user     system      total        real
Queue - fill, then empty - 10k                 0.113160   0.120949   0.234109 (  0.159911)
Queue - fill, then empty - 100k                1.035808   1.174422   2.210230 (  1.596514)
Queue - fill, then empty - 1mm                11.258097  12.224407  23.482504 ( 17.325185)
                                                   user     system      total        real
TwoLockLinkedQueue - fill, then empty - 10k    0.139143   0.172324   0.311467 (  0.214725)
TwoLockLinkedQueue - fill, then empty - 100k   1.312984   1.671349   2.984333 (  2.233421)
TwoLockLinkedQueue - fill, then empty - 1mm   12.175179  16.069279  28.244458 ( 22.541654)
                                                   user     system      total        real
AtomicLinkedQueue - fill, then empty - 10k     0.071836   0.003300   0.075136 (  0.009811)
AtomicLinkedQueue - fill, then empty - 100k    0.645546   0.011743   0.657289 (  0.147805)
AtomicLinkedQueue - fill, then empty - 1mm     7.075495   0.108397   7.183892 (  1.663006)

JRuby results

$ jruby benchmark_concurrent_reads+writes.rb
                                                         user     system      total        real
Queue - more writers than readers                    0.224000   0.000000   0.224000 (  0.224000)
Queue - more readers than writers                    8.529000   0.000000   8.529000 (  8.529000)
Queue - equal writers and readers                    0.263000   0.000000   0.263000 (  0.262000)
                                                         user     system      total        real
TwoLockLinkedQueue - more writers than readers       1.492000   0.000000   1.492000 (  1.492000)
TwoLockLinkedQueue - more readers than writers       1.788000   0.000000   1.788000 (  1.788000)
TwoLockLinkedQueue - equal writers and readers       2.205000   0.000000   2.205000 (  2.205000)
                                                         user     system      total        real
AtomicLinkedQueue - more writers than readers        1.086000   0.000000   1.086000 (  1.086000)
AtomicLinkedQueue - more readers than writers        0.571000   0.000000   0.571000 (  0.572000)
AtomicLinkedQueue - equal writers and readers        1.049000   0.000000   1.049000 (  1.049000)

$ jruby benchmark_separate_reads+writes.rb
                                                    user     system      total        real
Queue - fill, then empty - 10k                  0.014000   0.000000   0.014000 (  0.014000)
Queue - fill, then empty - 100k                 0.065000   0.000000   0.065000 (  0.065000)
Queue - fill, then empty - 1mm                  0.744000   0.000000   0.744000 (  0.744000)
                                                    user     system      total        real
TwoLockLinkedQueue - fill, then empty - 10k     0.032000   0.000000   0.032000 (  0.032000)
TwoLockLinkedQueue - fill, then empty - 100k    0.337000   0.000000   0.337000 (  0.337000)
TwoLockLinkedQueue - fill, then empty - 1mm     4.640000   0.000000   4.640000 (  4.640000)
                                                    user     system      total        real
AtomicLinkedQueue - fill, then empty - 10k      0.016000   0.000000   0.016000 (  0.016000)
AtomicLinkedQueue - fill, then empty - 100k     0.162000   0.000000   0.162000 (  0.162000)
AtomicLinkedQueue - fill, then empty - 1mm      2.706000   0.000000   2.706000 (  2.706000)
# This is a proof-of-concept of a concurrent, lock-free FIFO data
# structure written in Ruby. It leverages atomic updates, rather than
# lock-based synchronization.
require 'atomic'
class AtomicLinkedQueue
class Node
attr_accessor :item
def initialize(item, successor)
@item = item
@successor = Atomic.new(successor)
end
def successor
@successor.value
end
def update_successor(old, new)
@successor.compare_and_set(old, new)
end
end
def initialize
dummy_node = Node.new(:dummy, nil)
@head = Atomic.new(dummy_node)
@tail = Atomic.new(dummy_node)
end
def push(item)
# allocate a new node with the item embedded
new_node = Node.new(item, nil)
# keep trying until the operation succeeds
loop do
current_tail_node = @tail.value
current_tail_successor = current_tail_node.successor
# if our stored tail is still the current tail
if current_tail_node == @tail.value
# if that tail was really the last node
if current_tail_successor.nil?
# if we can update the previous successor of tail to point to this new node
if current_tail_node.update_successor(nil, new_node)
# then update tail to point to this node as well
@tail.compare_and_set(current_tail_node, new_node)
# and return
return true
# else, start the loop over
end
else
# in this case, the tail ref we had wasn't the real tail
# so we try to set its successor as the real tail, then start the loop again
@tail.compare_and_set(current_tail_node, current_tail_successor)
end
end
end
end
def pop
# retry until some value can be returned
loop do
# the value in @head is just a dummy node that always sits in that position,
# the real 'head' is in its successor
current_dummy_node = @head.value
current_tail_node = @tail.value
current_head_node = current_dummy_node.successor
# if our local head is still consistent with the head node, continue
# otherwise, start over
if current_dummy_node == @head.value
# if either the queue is empty, or falling behind
if current_dummy_node == current_tail_node
# if there's nothing after the 'dummy' head node
if current_head_node.nil?
# just return nil
return nil
else
# here the head element succeeding head is not nil, but the head and tail are equal
# so tail is falling behind, update it, then start over
@tail.compare_and_set(current_tail_node, current_head_node)
end
# the queue isn't empty
# if we can set the dummy head to the 'real' head, we're free to return the value in that real head, success
elsif @head.compare_and_set(current_dummy_node, current_head_node)
# grab the item from the popped node
item = current_head_node.item
if item != nil
current_head_node.item = nil
end
# return it, success!
return item
# else
# try again
end
end
end
end
def size
successor = @head.value.successor
count = 0
loop do
break if successor.nil?
current_node = successor
successor = current_node.successor
count += 1
end
count
end
end
require 'benchmark'
require 'thread'
require_relative 'atomic_linked_queue'
require_relative 'two_lock_linked_queue'
thread_count = 50
iterations = 10_000
queue_klasses = [Queue, TwoLockLinkedQueue, AtomicLinkedQueue]
Thread.abort_on_exception = true
# this one tells all the threads when to start
$go = false
def setup(queue, writer_thread_count, reader_thread_count, iterations)
tg = ThreadGroup.new
# spawn writer threads
writer_thread_count.times do
t = Thread.new do
# wait until the bm starts to do the work. This should
# minimize variance.
nil until $go
iterations.times do
queue.push('item')
end
end
tg.add(t)
end
# spawn reader threads
if queue.class == Queue
# the Queue class gets special behaviour because its #pop
# method is blocking by default.
reader_thread_count.times do
t = Thread.new do
nil until $go
iterations.times do
begin
queue.pop(:nonblocking)
rescue
end
end
end
tg.add(t)
end
else
reader_thread_count.times do
t = Thread.new do
nil until $go
iterations.times do
queue.pop
end
end
tg.add(t)
end
end
tg
end
def exercise(tg)
$go = true
tg.list.each(&:join)
$go = false
end
3.times do
queue_klasses.each do |klass|
Benchmark.bm(50) do |bm|
queue = klass.new
tg = setup(queue, thread_count, (thread_count * 0.6).to_i, iterations)
bm.report("#{klass} - more writers than readers") { exercise(tg) }
queue = klass.new
tg = setup(queue, (thread_count * 0.6).to_i, thread_count, iterations)
bm.report("#{klass} - more readers than writers") { exercise(tg) }
queue = klass.new
tg = setup(queue, thread_count, thread_count, iterations)
bm.report("#{klass} - equal writers and readers") { exercise(tg) }
end
end
end
require 'benchmark'
require 'thread'
require_relative 'atomic_linked_queue'
require_relative 'two_lock_linked_queue'
thread_count = 50
queue_klasses = [Queue, TwoLockLinkedQueue, AtomicLinkedQueue]
Thread.abort_on_exception = true
# this one tells all the threads when to start
$go = false
def setup(queue, thread_count, queue_length)
tg = ThreadGroup.new
if queue.class == Queue
thread_count.times do
t = Thread.new do
# wait until the bm starts to do the work. This should
# minimize variance.
nil until $go
(queue_length / thread_count).to_i.times do
queue.push('item')
end
loop do
begin
result = queue.pop(:nonblock)
rescue => e
break
end
end
end
tg.add(t)
end
else
thread_count.times do
t = Thread.new do
nil until $go
(queue_length / thread_count).to_i.times do
queue.push('item')
end
result = true
until result.nil?
result = queue.pop
end
end
tg.add(t)
end
end
GC.start
tg
end
def exercise(tg)
$go = true
tg.list.each(&:join)
$go = false
end
3.times do
queue_klasses.each do |klass|
Benchmark.bm(45) do |bm|
queue = klass.new
tg = setup(queue, thread_count, 10_000)
bm.report("#{klass} - fill, then empty - 10k") { exercise(tg) }
raise 'hell' unless queue.size.zero?
queue = klass.new
tg = setup(queue, thread_count, 100_000)
bm.report("#{klass} - fill, then empty - 100k") { exercise(tg) }
raise 'hell' unless queue.size.zero?
queue = klass.new
tg = setup(queue, thread_count, 1_000_000)
bm.report("#{klass} - fill, then empty - 1mm") { exercise(tg) }
raise 'hell' unless queue.size.zero?
end
end
end
require 'thread'
class TwoLockLinkedQueue
# This Node doesn't need atomic updates, it assumes
# that you're modifying it while holding a lock
Node = Struct.new(:item, :successor)
def initialize
dummy_node = Node.new(:dummy, nil)
@head_node = dummy_node
@tail_node = dummy_node
@head_lock = Mutex.new
@tail_lock = Mutex.new
end
def push(item)
# allocate a new node with the item embedded
new_node = Node.new(item, nil)
@tail_lock.synchronize do
# update the successor of the current tail to point to the new node
@tail_node.successor = new_node
@tail_node = new_node
end
end
def pop
@head_lock.synchronize do
dummy_node = @head_node
head = @head_node.successor
if head.nil? # then queue was empty
return nil
else
# the current head becomes the new 'dummy' head
@head_node = head
# return its value
return head.item
end
end
end
def size
successor = @head_node.successor
count = 0
loop do
break if successor.nil?
current_node = successor
successor = current_node.successor
count += 1
end
count
end
end
@JakubOboza
Copy link

Yeah i think there is an error there. Also i don't think it is a safe queue anyway or lock free its more like try_lock untill you will succeed

@funny-falcon
Copy link

It seems, that I was wrong: algorihm is correct. Please change current_tail_successor to nil to improve readability at line 53

@funny-falcon
Copy link

And there is no need for using atomic for node.item - whole algorithm ensures that only one reader will use/change it.

@jstorimer
Copy link
Author

@JakubOboza what do you mean not safe? The AtomicLinkedQueue doesn't use any locks. If you're referring to the fact that it can take multiple attempts before an operation completes, you might be getting it confused with a wait-free queue.

@funny-falcon thanks for taking a look! Good points, updated.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment