Skip to content

Instantly share code, notes, and snippets.

@brugeman
Created May 26, 2015 05:30
Show Gist options
  • Save brugeman/ab3efd4c094ee42badc1 to your computer and use it in GitHub Desktop.
Save brugeman/ab3efd4c094ee42badc1 to your computer and use it in GitHub Desktop.
Pointer storage with safe memory reclamation
// based on http://www.research.ibm.com/people/m/michael/podc-2002.pdf
// also resembles RCU.
#ifndef SMRP_HPP
#define SMRP_HPP
#include <cassert>
#include <cstdio>
#include <atomic>
#include <vector>
#include <chrono>
#include <thread>
namespace smrp
{
// Single Writer Multiple Readers pointer storage,
// both writers and readers are lock-free,
// writers wait for all readers to finish
// with previous value so it can be deleted.
// Thus, write throughput is limited by slowest
// reader, and writes are meant to happen very rarely.
template<typename T, size_t READER_THREAD_NUM>
class swmr_pointer_store_t
{
static const size_t CACHE_LINE_SIZE = 64;
typedef std::atomic<T *> aptr_t;
// no-false-sharing-pointer
class nfs_ptr_t
{
aptr_t aptr_;
char pad_[CACHE_LINE_SIZE - sizeof (aptr_t)];
nfs_ptr_t (const nfs_ptr_t & other);
nfs_ptr_t & operator= (const nfs_ptr_t & other);
public:
nfs_ptr_t () : aptr_ (0) {}
const aptr_t * operator-> () const { return &aptr_; }
aptr_t * operator-> () { return &aptr_; }
};
// value owned by writer
nfs_ptr_t main_ptr_;
// values owned by readers
mutable nfs_ptr_t hazard_ptrs_[READER_THREAD_NUM];
// Implements wait strategy for writers in case
// the old value is used by readers.
void wait ()
{
// DEBUG: don't wait while testing, just spin.
}
// no copies! moving is ok
swmr_pointer_store_t (const swmr_pointer_store_t & other);
swmr_pointer_store_t & operator= (const swmr_pointer_store_t && other);
public:
swmr_pointer_store_t ()
{}
// Must only be called by writer thread,
// wait-free, 1 atomic load.
T * get () const
{
return main_ptr_->load (std::memory_order_relaxed);
}
// Writer thread calls this to replace previous value,
// the previous value is returned as soon as
// no readers are using it. That is, this will work as slow
// as the slowest reader works (and not faster than 'wait' above).
T * set (T * ptr)
{
// replace old value with new value,
T * const old = main_ptr_->exchange (ptr, std::memory_order_release);
// do we actually need to check readers?
if (old != 0 && ptr != old)
{
// now make sure old value is not in use
bool locked = false;
do
{
T * v = 0;
for (size_t i = 0; v != old && i < READER_THREAD_NUM; ++i)
v = hazard_ptrs_[i]->load (std::memory_order_acquire);
locked = v == old;
// some reader is using our value?
// ok, lets wait to let him unlock it.
if (locked)
wait ();
} while (locked);
}
// ok, no readers use old value now
return old;
}
// Readers use 'rlock' to acquire current value of pointer,
// and notify writer that the value is in use until 'runlock'.
// This call might loop while in contention with writer,
// but since writes are to happen rarely, looping is
// very unlikely.
T * rlock (const size_t thread_index) const
{
assert (thread_index < READER_THREAD_NUM);
T * v = 0;
T * v1 = 0;
do
{
// load the pointer
v = main_ptr_->load (std::memory_order_acquire);
// save it to our thread's hazard pointer slot
// FIXME SC is required, but makes it 10x times slower,
// w/o SC test constantly fails. Why?
hazard_ptrs_[thread_index]->store (v, std::memory_order_release);
// now - check main pointer again, to make sure
// writer hasn't managed to perform 'set' btw previous 2 operations
v1 = main_ptr_->load (std::memory_order_acquire);
// if writer didn't change anything,
// return our 'locked' value
}
while (v1 != v);
// we don't wait on retry as writer
// has already completed (or we wouldn't need to retry)
return v;
}
// Readers use 'runlock' to notify writers that the last
// read value is no longer in use. It is wait-free -
// 1 atomic store.
void runlock (const size_t thread_index) const
{
assert (thread_index < READER_THREAD_NUM);
// notify writer that we're done
hazard_ptrs_[thread_index]->store (0, std::memory_order_release);
}
};
inline int test (int ac, char ** av)
{
assert (ac > 2);
static const size_t MAX_READER_THREADS = 8;
const size_t READERS = atoi (av[2]);
assert (READERS <= MAX_READER_THREADS);
swmr_pointer_store_t<int, MAX_READER_THREADS> pointer;
auto reader = [&] (const size_t reader_index) {
int last_value = 0;
const size_t N = 150000000; // takes ~1 sec on my taptop
for (size_t i = 0; i < N; ++i)
{
int * v = pointer.rlock (reader_index);
if (v)
{
assert (*v >= last_value);
last_value = *v;
}
pointer.runlock (reader_index);
}
};
auto writer = [&] () {
const size_t N = 10000000; // ~1 sec on my laptop
int last = -1;
for (size_t i = 0; i < N; ++i)
{
int * v = new int (i);
assert (*v > last);
int * old = pointer.set (v);
if (old)
assert (last == *old);
delete old;
last = *v;
}
delete pointer.set (0);
};
std::thread w (writer);
const auto start = std::chrono::high_resolution_clock::now ();
std::vector<std::thread> readers;
for (size_t i = 0; i < READERS; ++i)
readers.push_back (std::thread (reader, i));
for (size_t i = 0; i < READERS; ++i)
readers[i].join ();
const auto end = std::chrono::high_resolution_clock::now ();
const auto passed =
std::chrono::duration_cast<std::chrono::milliseconds> (end - start);
printf ("readers done in %lld ms\n", (long long)passed.count ());
w.join ();
return 0;
}
}; /* namespace smrp */
#endif /* SMRP_HPP */
@brugeman
Copy link
Author

To compile the test, do:
g++-4.8 -std=c++11 -O3 -g -Wall -DSMRP_TEST -o smrp_test -x c++ smrp.hpp -lpthread

Now, to run the test:
READERS=1; ./smrp_test $READERS

Fails here: https://gist.github.com/brugeman/ab3efd4c094ee42badc1#file-smrp-hpp-L174

Why, anyone, please?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment