Skip to content

Instantly share code, notes, and snippets.

@mjackson
Created September 7, 2011 17:31
Show Gist options
  • Save mjackson/1201196 to your computer and use it in GitHub Desktop.
Save mjackson/1201196 to your computer and use it in GitHub Desktop.
A mock stream wrapper for node.js
var util = require("util"),
Stream = require("stream").Stream;
module.exports = MockStream;
/**
* A constructor that inherits from Stream and emits data from the given
* `source`. If it's a Stream it will be piped through to this stream.
* Otherwise, it should be a string or a Buffer which will be emitted by this
* stream as soon as possible.
*/
function MockStream(source) {
Stream.call(this);
this._chunks = [];
this._wait = false;
this.encoding = null;
this.readable = true;
this.writable = true;
var self = this;
process.nextTick(function () {
if (self.readable || self._chunks.length) {
var hasData = self._chunks.length != 0;
var chunk;
while (self.readable && self._chunks.length && !self._wait) {
chunk = self._chunks.shift();
if (self.encoding) {
self.emit("data", chunk.toString(self.encoding));
} else {
self.emit("data", chunk);
}
}
if (hasData && self._chunks.length == 0) {
self.emit("drain");
}
process.nextTick(arguments.callee);
}
});
if (source instanceof Stream) {
source.pipe(this);
} else {
this.end(source);
}
}
util.inherits(MockStream, Stream);
MockStream.prototype.setEncoding = function setEncoding(encoding) {
this.encoding = encoding;
}
MockStream.prototype.pause = function pause() {
this._wait = true;
this.emit("pause");
}
MockStream.prototype.resume = function resume() {
this._wait = false;
this.emit("resume");
}
MockStream.prototype.write = function write(chunk) {
if (typeof chunk == "string") {
chunk = new Buffer(chunk);
}
this._chunks.push(chunk);
}
MockStream.prototype.end = function end(chunk) {
if (chunk) {
this.write(chunk);
}
var self = this;
this.destroySoon(function () {
self.emit("end");
});
}
MockStream.prototype.destroy = function destroy() {
this._chunks = [];
this.readable = false;
this.writable = false;
}
MockStream.prototype.destroySoon = function destroySoon(callback) {
var self = this;
process.nextTick(function () {
if (self._chunks.length == 0) {
self.destroy();
if (typeof callback == "function") {
callback();
}
} else {
process.nextTick(arguments.callee);
}
});
}
var MockStream = require("./mockstream");
var stream = new MockStream("hello world");
stream.pause();
setTimeout(function () {
stream.resume();
stream.on("data", function (buffer) {
console.log(buffer.toString());
});
stream.on("end", function () {
console.log("all done!");
});
}, 100);
@stephenmathieson
Copy link

Could you add a license please? :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment