Skip to content

Instantly share code, notes, and snippets.

@njligames
Last active January 6, 2019 04:20
Show Gist options
  • Save njligames/90178d977dfd1cbdd9fc2bfe58d07110 to your computer and use it in GitHub Desktop.
Save njligames/90178d977dfd1cbdd9fc2bfe58d07110 to your computer and use it in GitHub Desktop.
Producer - Consumer Threading example
//
// main.cpp
// Threading_Consumer_Producer
//
// Created by James Folk on 1/5/19.
// Copyright © 2019 James Folk. All rights reserved.
//
// Thanks to: https://gist.github.com/iikuy/8115191
#include <thread>
#include <iostream>
#include <queue>
template <class T>
class ThreadedQueue
{
public:
ThreadedQueue()
: mx(new std::mutex)
, cv(new std::condition_variable){}
virtual ~ThreadedQueue(){delete cv;delete mx;}
std::mutex *mx;
std::condition_variable *cv;
std::queue<T> q;
};
// ##############################################################################
template <class T>
class Producer
{
public:
Producer(ThreadedQueue<T> &q):threadedQueue(q),mFinished(false){}
virtual bool produceFunction(T &) = 0;
bool isFinished()const;
void finish();
public:
void produce();
protected:
ThreadedQueue<T> &threadedQueue;
bool mFinished;
private:
Producer();
Producer(const Producer&);
const Producer &operator=(const Producer&);
};
template <class T>
void Producer<T>::produce()
{
while(true)
{
std::lock_guard<std::mutex> lk(*threadedQueue.mx);
T val;
if(this->produceFunction(val))
{
// threadedQueue.q.push();
}
threadedQueue.cv->notify_all();
if (isFinished())
{
break;
}
}
}
template <class T>
bool Producer<T>::isFinished()const
{
return mFinished;
}
template <class T>
void Producer<T>::finish()
{
std::lock_guard<std::mutex> lk(*threadedQueue.mx);
mFinished=true;
threadedQueue.cv->notify_all();
}
// ##############################################################################
class OneProducer : public Producer<int>
{
public:
OneProducer(ThreadedQueue<int> &q):Producer<int>(q)
{
}
virtual bool produceFunction(int &v)override;
};
bool OneProducer::produceFunction(int &v)
{
v = 1;
return true;
}
// ##############################################################################
class TwoProducer : public Producer<int>
{
public:
TwoProducer(ThreadedQueue<int> &q):Producer<int>(q)
{
}
virtual bool produceFunction(int &)override;
};
bool TwoProducer::produceFunction(int &v)
{
v = 2;
return true;
}
// ##############################################################################
template <class T>
class Consumer
{
public:
Consumer(ThreadedQueue<T> &q, Producer<T> &p):threadedQueue(q),producer(p)
{
}
void consume();
T nextAvailable();
size_t availableSize()const;
private:
std::queue<T> available;
Consumer();
Consumer(const Consumer&);
const Consumer &operator=(const Consumer&);
ThreadedQueue<T> &threadedQueue;
const Producer<T> &producer;
// const std::vector<Producer<T>> &producerVector;
};
template <class T>
void Consumer<T>::consume()
{
while(true)
{
std::unique_lock<std::mutex> lk(*threadedQueue.mx);
const auto& _threadedQueue = threadedQueue;
const auto& _producer = producer;
threadedQueue.cv->wait(lk, [&_threadedQueue,&_producer]{
return _producer.isFinished() || !_threadedQueue.q.empty();
});
available.push(std::move(threadedQueue.q.front()));
threadedQueue.q.pop();
if(producer.isFinished())
{
break;
}
}
}
template <class T>
T Consumer<T>::nextAvailable()
{
T ret(std::move(available.front()));
available.pop();
return std::move(ret);
}
template <class T>
size_t Consumer<T>::availableSize()const
{
return available.size();
}
// ##############################################################################
class IntConsumer : public Consumer<int>
{
public:
IntConsumer(ThreadedQueue<int> &q, OneProducer &p):Consumer<int>(q,p)
{
}
};
// ##############################################################################
std::mutex *mx = new std::mutex;
std::condition_variable *cv = new std::condition_variable;
std::queue<int> q;
bool finished = false;
void producer(int n) {
std::this_thread::sleep_for (std::chrono::seconds(10));
// for(int i=0; i<n; ++i)
for(int i=0; !finished; ++i)
{
{
std::lock_guard<std::mutex> lk(*mx);
q.push(i);
std::cout << "pushing " << i << std::endl;
}
cv->notify_all();
}
{
std::lock_guard<std::mutex> lk(*mx);
finished = true;
}
cv->notify_all();
}
void consumer() {
while (true) {
std::unique_lock<std::mutex> lk(*mx);
cv->wait(lk, []{ return finished || !q.empty(); });
while (!q.empty()) {
std::cout << "consuming " << q.front() << std::endl;
q.pop();
}
if (finished) break;
}
}
int main(int argc, const char * argv[]) {
ThreadedQueue<int> *threadeQueue = new ThreadedQueue<int>;
// TwoProducer *kp = new TwoProducer(*threadeQueue);
OneProducer *ip = new OneProducer(*threadeQueue);
IntConsumer *ic = new IntConsumer(*threadeQueue, *ip);
// std::thread t0(&TwoProducer::produce, kp);
std::thread t1(&OneProducer::produce, ip);
std::thread t2(&IntConsumer::consume, ic);
// t0.join();
t1.join();
t2.join();
delete ic;
delete ip;
// delete kp;
delete threadeQueue;
std::cout << "finished!" << std::endl;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment