Last active
January 6, 2019 04:20
-
-
Save njligames/90178d977dfd1cbdd9fc2bfe58d07110 to your computer and use it in GitHub Desktop.
Producer - Consumer Threading example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// main.cpp | |
// Threading_Consumer_Producer | |
// | |
// Created by James Folk on 1/5/19. | |
// Copyright © 2019 James Folk. All rights reserved. | |
// | |
// Thanks to: https://gist.github.com/iikuy/8115191 | |
#include <thread> | |
#include <iostream> | |
#include <queue> | |
template <class T> | |
class ThreadedQueue | |
{ | |
public: | |
ThreadedQueue() | |
: mx(new std::mutex) | |
, cv(new std::condition_variable){} | |
virtual ~ThreadedQueue(){delete cv;delete mx;} | |
std::mutex *mx; | |
std::condition_variable *cv; | |
std::queue<T> q; | |
}; | |
// ############################################################################## | |
template <class T> | |
class Producer | |
{ | |
public: | |
Producer(ThreadedQueue<T> &q):threadedQueue(q),mFinished(false){} | |
virtual bool produceFunction(T &) = 0; | |
bool isFinished()const; | |
void finish(); | |
public: | |
void produce(); | |
protected: | |
ThreadedQueue<T> &threadedQueue; | |
bool mFinished; | |
private: | |
Producer(); | |
Producer(const Producer&); | |
const Producer &operator=(const Producer&); | |
}; | |
template <class T> | |
void Producer<T>::produce() | |
{ | |
while(true) | |
{ | |
std::lock_guard<std::mutex> lk(*threadedQueue.mx); | |
T val; | |
if(this->produceFunction(val)) | |
{ | |
// threadedQueue.q.push(); | |
} | |
threadedQueue.cv->notify_all(); | |
if (isFinished()) | |
{ | |
break; | |
} | |
} | |
} | |
template <class T> | |
bool Producer<T>::isFinished()const | |
{ | |
return mFinished; | |
} | |
template <class T> | |
void Producer<T>::finish() | |
{ | |
std::lock_guard<std::mutex> lk(*threadedQueue.mx); | |
mFinished=true; | |
threadedQueue.cv->notify_all(); | |
} | |
// ############################################################################## | |
class OneProducer : public Producer<int> | |
{ | |
public: | |
OneProducer(ThreadedQueue<int> &q):Producer<int>(q) | |
{ | |
} | |
virtual bool produceFunction(int &v)override; | |
}; | |
bool OneProducer::produceFunction(int &v) | |
{ | |
v = 1; | |
return true; | |
} | |
// ############################################################################## | |
class TwoProducer : public Producer<int> | |
{ | |
public: | |
TwoProducer(ThreadedQueue<int> &q):Producer<int>(q) | |
{ | |
} | |
virtual bool produceFunction(int &)override; | |
}; | |
bool TwoProducer::produceFunction(int &v) | |
{ | |
v = 2; | |
return true; | |
} | |
// ############################################################################## | |
template <class T> | |
class Consumer | |
{ | |
public: | |
Consumer(ThreadedQueue<T> &q, Producer<T> &p):threadedQueue(q),producer(p) | |
{ | |
} | |
void consume(); | |
T nextAvailable(); | |
size_t availableSize()const; | |
private: | |
std::queue<T> available; | |
Consumer(); | |
Consumer(const Consumer&); | |
const Consumer &operator=(const Consumer&); | |
ThreadedQueue<T> &threadedQueue; | |
const Producer<T> &producer; | |
// const std::vector<Producer<T>> &producerVector; | |
}; | |
template <class T> | |
void Consumer<T>::consume() | |
{ | |
while(true) | |
{ | |
std::unique_lock<std::mutex> lk(*threadedQueue.mx); | |
const auto& _threadedQueue = threadedQueue; | |
const auto& _producer = producer; | |
threadedQueue.cv->wait(lk, [&_threadedQueue,&_producer]{ | |
return _producer.isFinished() || !_threadedQueue.q.empty(); | |
}); | |
available.push(std::move(threadedQueue.q.front())); | |
threadedQueue.q.pop(); | |
if(producer.isFinished()) | |
{ | |
break; | |
} | |
} | |
} | |
template <class T> | |
T Consumer<T>::nextAvailable() | |
{ | |
T ret(std::move(available.front())); | |
available.pop(); | |
return std::move(ret); | |
} | |
template <class T> | |
size_t Consumer<T>::availableSize()const | |
{ | |
return available.size(); | |
} | |
// ############################################################################## | |
class IntConsumer : public Consumer<int> | |
{ | |
public: | |
IntConsumer(ThreadedQueue<int> &q, OneProducer &p):Consumer<int>(q,p) | |
{ | |
} | |
}; | |
// ############################################################################## | |
std::mutex *mx = new std::mutex; | |
std::condition_variable *cv = new std::condition_variable; | |
std::queue<int> q; | |
bool finished = false; | |
void producer(int n) { | |
std::this_thread::sleep_for (std::chrono::seconds(10)); | |
// for(int i=0; i<n; ++i) | |
for(int i=0; !finished; ++i) | |
{ | |
{ | |
std::lock_guard<std::mutex> lk(*mx); | |
q.push(i); | |
std::cout << "pushing " << i << std::endl; | |
} | |
cv->notify_all(); | |
} | |
{ | |
std::lock_guard<std::mutex> lk(*mx); | |
finished = true; | |
} | |
cv->notify_all(); | |
} | |
void consumer() { | |
while (true) { | |
std::unique_lock<std::mutex> lk(*mx); | |
cv->wait(lk, []{ return finished || !q.empty(); }); | |
while (!q.empty()) { | |
std::cout << "consuming " << q.front() << std::endl; | |
q.pop(); | |
} | |
if (finished) break; | |
} | |
} | |
int main(int argc, const char * argv[]) { | |
ThreadedQueue<int> *threadeQueue = new ThreadedQueue<int>; | |
// TwoProducer *kp = new TwoProducer(*threadeQueue); | |
OneProducer *ip = new OneProducer(*threadeQueue); | |
IntConsumer *ic = new IntConsumer(*threadeQueue, *ip); | |
// std::thread t0(&TwoProducer::produce, kp); | |
std::thread t1(&OneProducer::produce, ip); | |
std::thread t2(&IntConsumer::consume, ic); | |
// t0.join(); | |
t1.join(); | |
t2.join(); | |
delete ic; | |
delete ip; | |
// delete kp; | |
delete threadeQueue; | |
std::cout << "finished!" << std::endl; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment