Created July 24, 2010 07:06
very quick riak protobuf example in node.js
var Buffer, Riak, Schema, conn, fs, load_schema, messages, net, schema, sys, types;
sys = require('sys');
net = require('net');
fs = require('fs');
Buffer = require('buffer').Buffer;
Schema = require('protobuf_for_node').Schema;
schema = new Schema(fs.readFileSync('riak.desc'));
types = ['RpbErrorResp', 'RpbPingReq', 'RpbPingResp', 'RpbGetClientIdReq', 'RpbGetClientIdResp', 'RpbSetClientIdReq', 'RpbSetClientIdResp', 'RpbGetServerInfoReq', 'RpbGetServerInfoResp', 'RpbGetReq', 'RpbGetResp'];
messages = {};
load_schema = function(name) {
var code, sch;
code = types.indexOf(name);
sch = schema[("riak." + name)];
sch.riak_code = code;
messages[code] = sch;
return sch;
Riak = {
GetRequest: load_schema('RpbGetReq'),
GetResponse: load_schema('RpbGetResp'),
messages: messages
Riak.Connection = function(port, host) {
var riak;
this.callbacks = [];
this.conn = net.createConnection(port, host);
riak = this;
this.conn.addListener('data', function(chunk) {
var cb, msg;
if ((msg = riak.message.receive(chunk))) {
(cb = riak.callbacks.shift()) ? cb(msg) : null;
riak.conn.emit('message', msg);
return riak.reset();
return this;
Riak.Connection.prototype.reset = function() {
this.message = new Riak.Message(this.conn);
return this.message;
Riak.Connection.prototype.send = function(type, data, callback) {
return this.conn.write(this.prepare(type, data));
Riak.Connection.prototype.on = function(name, cb) {
return this.conn.addListener(name, cb);
Riak.Connection.prototype.end = function() {
return this.conn.end();
Riak.Connection.prototype.prepare = function(type, data) {
var buf, len, msg;
buf = type.serialize(data);
len = buf.length + 1;
msg = new Buffer(len + 4);
msg[0] = len >>> 24;
msg[1] = len >>> 16;
msg[2] = len >>> 8;
msg[3] = len & 255;
msg[4] = type.riak_code;
buf.copy(msg, 5, 0);
return msg;
Riak.Message = function(conn) {
this.conn = conn;
this.type = null;
this.response = null;
this.length = null; = 0;
return this;
Riak.Message.prototype.receive = function(chunk, starting) {
var chunk_len;
if (this.response) {
chunk_len = chunk.length;
starting = starting || 0;
chunk.copy(this.response,, starting, chunk_len); += chunk_len - starting;
if ( === this.length) {
return this.type.parse(this.response);
} else {
return null;
} else {
this.length = (chunk[0] << 24) + (chunk[1] << 16) + (chunk[2] << 8) + chunk[3] - 1;
this.type = Riak.messages[chunk[4]];
this.response = new Buffer(this.length);
return this.receive(chunk, 5);
conn = new Riak.Connection(8087);
conn.on('message', function(msg) {
return sys.puts(("REC: " + (sys.inspect(msg))));
conn.on('connect', function() {
return conn.send(Riak.GetRequest, {
bucket: 'timeline',
key: '2010723213'
}, function(msg) {
return conn.send(Riak.GetRequest, {
bucket: 'timeline',
key: '2010723212'
}, function(msg) {
sys.puts('ok done');
return conn.end();
package riak;
message RpbErrorResp {
required bytes errmsg = 1;
required uint32 errcode = 2;
message RpbGetReq {
required bytes bucket = 1;
required bytes key = 2;
optional uint32 r = 3;
message RpbContent {
required bytes value = 1;
optional bytes content_type = 2; // the media type/format
optional bytes charset = 3;
optional bytes content_encoding = 4;
optional bytes vtag = 5;
repeated RpbLink links = 6; // links to other resources
optional uint32 last_mod = 7;
optional uint32 last_mod_usecs = 8;
repeated RpbPair usermeta = 9; // user metadata stored with the object
message RpbPair {
required bytes key = 1;
optional bytes value = 2;
message RpbLink {
optional bytes bucket = 1;
optional bytes key = 2;
optional bytes tag = 3;
message RpbGetResp {
repeated RpbContent content = 1;
optional bytes vclock = 2;
message RpbListBucketsResp {
repeated bytes buckets = 1;
