Created
May 26, 2015 05:30
-
-
Save brugeman/ab3efd4c094ee42badc1 to your computer and use it in GitHub Desktop.
Pointer storage with safe memory reclamation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// based on http://www.research.ibm.com/people/m/michael/podc-2002.pdf | |
// also resembles RCU. | |
#ifndef SMRP_HPP | |
#define SMRP_HPP | |
#include <cassert> | |
#include <cstdio> | |
#include <atomic> | |
#include <vector> | |
#include <chrono> | |
#include <thread> | |
namespace smrp | |
{ | |
// Single Writer Multiple Readers pointer storage, | |
// both writers and readers are lock-free, | |
// writers wait for all readers to finish | |
// with previous value so it can be deleted. | |
// Thus, write throughput is limited by slowest | |
// reader, and writes are meant to happen very rarely. | |
template<typename T, size_t READER_THREAD_NUM> | |
class swmr_pointer_store_t | |
{ | |
static const size_t CACHE_LINE_SIZE = 64; | |
typedef std::atomic<T *> aptr_t; | |
// no-false-sharing-pointer | |
class nfs_ptr_t | |
{ | |
aptr_t aptr_; | |
char pad_[CACHE_LINE_SIZE - sizeof (aptr_t)]; | |
nfs_ptr_t (const nfs_ptr_t & other); | |
nfs_ptr_t & operator= (const nfs_ptr_t & other); | |
public: | |
nfs_ptr_t () : aptr_ (0) {} | |
const aptr_t * operator-> () const { return &aptr_; } | |
aptr_t * operator-> () { return &aptr_; } | |
}; | |
// value owned by writer | |
nfs_ptr_t main_ptr_; | |
// values owned by readers | |
mutable nfs_ptr_t hazard_ptrs_[READER_THREAD_NUM]; | |
// Implements wait strategy for writers in case | |
// the old value is used by readers. | |
void wait () | |
{ | |
// DEBUG: don't wait while testing, just spin. | |
} | |
// no copies! moving is ok | |
swmr_pointer_store_t (const swmr_pointer_store_t & other); | |
swmr_pointer_store_t & operator= (const swmr_pointer_store_t && other); | |
public: | |
swmr_pointer_store_t () | |
{} | |
// Must only be called by writer thread, | |
// wait-free, 1 atomic load. | |
T * get () const | |
{ | |
return main_ptr_->load (std::memory_order_relaxed); | |
} | |
// Writer thread calls this to replace previous value, | |
// the previous value is returned as soon as | |
// no readers are using it. That is, this will work as slow | |
// as the slowest reader works (and not faster than 'wait' above). | |
T * set (T * ptr) | |
{ | |
// replace old value with new value, | |
T * const old = main_ptr_->exchange (ptr, std::memory_order_release); | |
// do we actually need to check readers? | |
if (old != 0 && ptr != old) | |
{ | |
// now make sure old value is not in use | |
bool locked = false; | |
do | |
{ | |
T * v = 0; | |
for (size_t i = 0; v != old && i < READER_THREAD_NUM; ++i) | |
v = hazard_ptrs_[i]->load (std::memory_order_acquire); | |
locked = v == old; | |
// some reader is using our value? | |
// ok, lets wait to let him unlock it. | |
if (locked) | |
wait (); | |
} while (locked); | |
} | |
// ok, no readers use old value now | |
return old; | |
} | |
// Readers use 'rlock' to acquire current value of pointer, | |
// and notify writer that the value is in use until 'runlock'. | |
// This call might loop while in contention with writer, | |
// but since writes are to happen rarely, looping is | |
// very unlikely. | |
T * rlock (const size_t thread_index) const | |
{ | |
assert (thread_index < READER_THREAD_NUM); | |
T * v = 0; | |
T * v1 = 0; | |
do | |
{ | |
// load the pointer | |
v = main_ptr_->load (std::memory_order_acquire); | |
// save it to our thread's hazard pointer slot | |
// FIXME SC is required, but makes it 10x times slower, | |
// w/o SC test constantly fails. Why? | |
hazard_ptrs_[thread_index]->store (v, std::memory_order_release); | |
// now - check main pointer again, to make sure | |
// writer hasn't managed to perform 'set' btw previous 2 operations | |
v1 = main_ptr_->load (std::memory_order_acquire); | |
// if writer didn't change anything, | |
// return our 'locked' value | |
} | |
while (v1 != v); | |
// we don't wait on retry as writer | |
// has already completed (or we wouldn't need to retry) | |
return v; | |
} | |
// Readers use 'runlock' to notify writers that the last | |
// read value is no longer in use. It is wait-free - | |
// 1 atomic store. | |
void runlock (const size_t thread_index) const | |
{ | |
assert (thread_index < READER_THREAD_NUM); | |
// notify writer that we're done | |
hazard_ptrs_[thread_index]->store (0, std::memory_order_release); | |
} | |
}; | |
inline int test (int ac, char ** av) | |
{ | |
assert (ac > 2); | |
static const size_t MAX_READER_THREADS = 8; | |
const size_t READERS = atoi (av[2]); | |
assert (READERS <= MAX_READER_THREADS); | |
swmr_pointer_store_t<int, MAX_READER_THREADS> pointer; | |
auto reader = [&] (const size_t reader_index) { | |
int last_value = 0; | |
const size_t N = 150000000; // takes ~1 sec on my taptop | |
for (size_t i = 0; i < N; ++i) | |
{ | |
int * v = pointer.rlock (reader_index); | |
if (v) | |
{ | |
assert (*v >= last_value); | |
last_value = *v; | |
} | |
pointer.runlock (reader_index); | |
} | |
}; | |
auto writer = [&] () { | |
const size_t N = 10000000; // ~1 sec on my laptop | |
int last = -1; | |
for (size_t i = 0; i < N; ++i) | |
{ | |
int * v = new int (i); | |
assert (*v > last); | |
int * old = pointer.set (v); | |
if (old) | |
assert (last == *old); | |
delete old; | |
last = *v; | |
} | |
delete pointer.set (0); | |
}; | |
std::thread w (writer); | |
const auto start = std::chrono::high_resolution_clock::now (); | |
std::vector<std::thread> readers; | |
for (size_t i = 0; i < READERS; ++i) | |
readers.push_back (std::thread (reader, i)); | |
for (size_t i = 0; i < READERS; ++i) | |
readers[i].join (); | |
const auto end = std::chrono::high_resolution_clock::now (); | |
const auto passed = | |
std::chrono::duration_cast<std::chrono::milliseconds> (end - start); | |
printf ("readers done in %lld ms\n", (long long)passed.count ()); | |
w.join (); | |
return 0; | |
} | |
}; /* namespace smrp */ | |
#endif /* SMRP_HPP */ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
To compile the test, do:
g++-4.8 -std=c++11 -O3 -g -Wall -DSMRP_TEST -o smrp_test -x c++ smrp.hpp -lpthread
Now, to run the test:
READERS=1; ./smrp_test $READERS
Fails here: https://gist.github.com/brugeman/ab3efd4c094ee42badc1#file-smrp-hpp-L174
Why, anyone, please?