Skip to content

Instantly share code, notes, and snippets.

@mniehe
Last active April 24, 2017 18:41
Show Gist options
  • Save mniehe/06fac46f87c5fc3ce5c2854a42888931 to your computer and use it in GitHub Desktop.
Save mniehe/06fac46f87c5fc3ce5c2854a42888931 to your computer and use it in GitHub Desktop.
Consumer class for rabbit mq
class Consumer {
constructor(channel, queueName, name, interval = 1000, maxQueue = 5) {
this.queueName = queueName;
this.queue = [];
this.channel = channel;
this.maxQueue = maxQueue;
this._interval = interval;
this._consumerTag = `${name}:${uuidV4()}`;
this._processInterval = null;
this._processFunc = null;
}
start(func) {
const options = {
consumerTag: this._consumerTag,
noAck: false
};
console.log('Starting consumer...');
this.channel.consume(this.queueName, this._receiveMessage.bind(this), options);
this._processFunc = func;
this._processInterval = setInterval(this._consume.bind(this, this._processFunc), this._interval);
}
stop() {
this.channel.cancel(this._consumerTag);
// Need to check if interval is currently running before clearing?
clearInterval(this._processInterval);
if (this.queue.length > 0) {
this._consume(this._processFunc);
}
console.log('Consumer stopped!');
}
_receiveMessage(message) {
this.queue.push(message);
}
_consume(func) {
const itemCount = (this.queue.length >= this.maxQueue) ? this.maxQueue : this.queue.length;
const processList = this.queue.splice(0, itemCount);
// Exit if nothing to process
if (itemCount === 0) return;
// console.log(`Processing ${itemCount} items with ${this.queue.length} still left in queue...`);
const tags = processList.map(message => message.fields.deliveryTag)
console.log(`Processing [${tags.join(',')}]`);
const result = func(processList);
if (result instanceof Promise) {
result.then((ackList) => {
if (!Array.isArray(ackList)) {
throw new SyntaxError('Return value from consume function is not an array.');
}
// console.log(`Acking ${ackList.length} messages!`);
const tags = ackList.map(message => message.fields.deliveryTag)
console.log(`Acking [${tags.join(',')}]`);
for (const message of ackList) {
this.channel.ack(message);
}
});
} else {
throw new SyntaxError('Non promise function passed to consumer.');
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment