Created
July 24, 2010 07:06
-
-
Save technoweenie/488488 to your computer and use it in GitHub Desktop.
very quick riak protobuf example in node.js
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
sys: require 'sys' | |
net: require 'net' | |
fs: require 'fs' | |
Buffer: require('buffer').Buffer | |
Schema: require('protobuf_for_node').Schema | |
schema: new Schema(fs.readFileSync('riak.desc')) | |
types: ['RpbErrorResp', 'RpbPingReq', 'RpbPingResp', 'RpbGetClientIdReq', | |
'RpbGetClientIdResp', 'RpbSetClientIdReq', 'RpbSetClientIdResp', | |
'RpbGetServerInfoReq', 'RpbGetServerInfoResp', 'RpbGetReq', 'RpbGetResp'] | |
messages: {} | |
# loads a schema, assigns a riak code based on it's array position | |
load_schema: (name) -> | |
code: types.indexOf name | |
sch: schema["riak.$name"] | |
sch.riak_code: code | |
messages[code]: sch | |
sch | |
Riak: { | |
GetRequest: load_schema 'RpbGetReq' | |
GetResponse: load_schema 'RpbGetResp' | |
messages: messages | |
} | |
# sends protobuf bytes to riak | |
class Riak.Connection | |
constructor: (port, host) -> | |
@callbacks: [] | |
@conn: net.createConnection port, host | |
riak: this | |
@conn.addListener 'data', (chunk) -> | |
if msg: riak.message.receive chunk | |
if cb: riak.callbacks.shift() | |
cb msg | |
riak.conn.emit 'message', msg | |
riak.reset() | |
@reset() | |
reset: -> | |
@message: new Riak.Message(@conn) | |
send: (type, data, callback) -> | |
@callbacks.push callback | |
@conn.write @prepare(type, data) | |
on: (name, cb) -> | |
@conn.addListener name, cb | |
end: -> | |
@conn.end() | |
prepare: (type, data) -> | |
buf: type.serialize data | |
len: buf.length + 1 | |
msg: new Buffer(len + 4) | |
msg[0]: len >>> 24 | |
msg[1]: len >>> 16 | |
msg[2]: len >>> 8 | |
msg[3]: len & 255 | |
msg[4]: type.riak_code | |
buf.copy msg, 5, 0 | |
msg | |
# Reads buffers from riak and parses them. | |
class Riak.Message | |
constructor: (conn) -> | |
@conn: conn | |
@type: null # the protobuf response type | |
@response: null # placeholder for the response buffer | |
@length: null # length of the response data | |
@read: 0 # how many bytes have been read into @response | |
# parse a received chunk. don't assume the whole thing comes | |
# in with just one chunk. | |
receive: (chunk, starting) -> | |
# is a response buffer created? if so, read for data | |
if @response | |
chunk_len: chunk.length | |
starting: or 0 # starting point on the chunk to read | |
chunk.copy @response, @read, starting, chunk_len | |
@read += chunk_len - starting | |
# are we there yet? | |
if @read == @length | |
@type.parse @response | |
else | |
null | |
else | |
@length: (chunk[0] << 24) + | |
(chunk[1] << 16) + | |
(chunk[2] << 8) + | |
chunk[3] - 1 | |
@type: Riak.messages[chunk[4]] | |
@response: new Buffer(@length) | |
@receive chunk, 5 | |
# bleh | |
conn: new Riak.Connection(8087) | |
# watches for any received messages | |
conn.on 'message', (msg) -> | |
sys.puts "REC: ${sys.inspect msg}" | |
conn.on 'connect', -> | |
conn.send Riak.GetRequest, {bucket: 'timeline', key: '2010723213'}, | |
(msg) -> | |
sys.puts "CALLBACK!" | |
conn.send Riak.GetRequest, {bucket: 'timeline', key: '2010723212'}, (msg) -> | |
sys.puts 'ok done' | |
conn.end() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var Buffer, Riak, Schema, conn, fs, load_schema, messages, net, schema, sys, types; | |
sys = require('sys'); | |
net = require('net'); | |
fs = require('fs'); | |
Buffer = require('buffer').Buffer; | |
Schema = require('protobuf_for_node').Schema; | |
schema = new Schema(fs.readFileSync('riak.desc')); | |
types = ['RpbErrorResp', 'RpbPingReq', 'RpbPingResp', 'RpbGetClientIdReq', 'RpbGetClientIdResp', 'RpbSetClientIdReq', 'RpbSetClientIdResp', 'RpbGetServerInfoReq', 'RpbGetServerInfoResp', 'RpbGetReq', 'RpbGetResp']; | |
messages = {}; | |
load_schema = function(name) { | |
var code, sch; | |
code = types.indexOf(name); | |
sch = schema[("riak." + name)]; | |
sch.riak_code = code; | |
messages[code] = sch; | |
return sch; | |
}; | |
Riak = { | |
GetRequest: load_schema('RpbGetReq'), | |
GetResponse: load_schema('RpbGetResp'), | |
messages: messages | |
}; | |
Riak.Connection = function(port, host) { | |
var riak; | |
this.callbacks = []; | |
this.conn = net.createConnection(port, host); | |
riak = this; | |
this.conn.addListener('data', function(chunk) { | |
var cb, msg; | |
if ((msg = riak.message.receive(chunk))) { | |
(cb = riak.callbacks.shift()) ? cb(msg) : null; | |
riak.conn.emit('message', msg); | |
return riak.reset(); | |
} | |
}); | |
this.reset(); | |
return this; | |
}; | |
Riak.Connection.prototype.reset = function() { | |
this.message = new Riak.Message(this.conn); | |
return this.message; | |
}; | |
Riak.Connection.prototype.send = function(type, data, callback) { | |
this.callbacks.push(callback); | |
return this.conn.write(this.prepare(type, data)); | |
}; | |
Riak.Connection.prototype.on = function(name, cb) { | |
return this.conn.addListener(name, cb); | |
}; | |
Riak.Connection.prototype.end = function() { | |
return this.conn.end(); | |
}; | |
Riak.Connection.prototype.prepare = function(type, data) { | |
var buf, len, msg; | |
buf = type.serialize(data); | |
len = buf.length + 1; | |
msg = new Buffer(len + 4); | |
msg[0] = len >>> 24; | |
msg[1] = len >>> 16; | |
msg[2] = len >>> 8; | |
msg[3] = len & 255; | |
msg[4] = type.riak_code; | |
buf.copy(msg, 5, 0); | |
return msg; | |
}; | |
Riak.Message = function(conn) { | |
this.conn = conn; | |
this.type = null; | |
this.response = null; | |
this.length = null; | |
this.read = 0; | |
return this; | |
}; | |
Riak.Message.prototype.receive = function(chunk, starting) { | |
var chunk_len; | |
if (this.response) { | |
chunk_len = chunk.length; | |
starting = starting || 0; | |
chunk.copy(this.response, this.read, starting, chunk_len); | |
this.read += chunk_len - starting; | |
if (this.read === this.length) { | |
return this.type.parse(this.response); | |
} else { | |
return null; | |
} | |
} else { | |
this.length = (chunk[0] << 24) + (chunk[1] << 16) + (chunk[2] << 8) + chunk[3] - 1; | |
this.type = Riak.messages[chunk[4]]; | |
this.response = new Buffer(this.length); | |
return this.receive(chunk, 5); | |
} | |
}; | |
conn = new Riak.Connection(8087); | |
conn.on('message', function(msg) { | |
return sys.puts(("REC: " + (sys.inspect(msg)))); | |
}); | |
conn.on('connect', function() { | |
return conn.send(Riak.GetRequest, { | |
bucket: 'timeline', | |
key: '2010723213' | |
}, function(msg) { | |
sys.puts("CALLBACK!"); | |
return conn.send(Riak.GetRequest, { | |
bucket: 'timeline', | |
key: '2010723212' | |
}, function(msg) { | |
sys.puts('ok done'); | |
return conn.end(); | |
}); | |
}); | |
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package riak; | |
message RpbErrorResp { | |
required bytes errmsg = 1; | |
required uint32 errcode = 2; | |
} | |
message RpbGetReq { | |
required bytes bucket = 1; | |
required bytes key = 2; | |
optional uint32 r = 3; | |
} | |
message RpbContent { | |
required bytes value = 1; | |
optional bytes content_type = 2; // the media type/format | |
optional bytes charset = 3; | |
optional bytes content_encoding = 4; | |
optional bytes vtag = 5; | |
repeated RpbLink links = 6; // links to other resources | |
optional uint32 last_mod = 7; | |
optional uint32 last_mod_usecs = 8; | |
repeated RpbPair usermeta = 9; // user metadata stored with the object | |
} | |
message RpbPair { | |
required bytes key = 1; | |
optional bytes value = 2; | |
} | |
message RpbLink { | |
optional bytes bucket = 1; | |
optional bytes key = 2; | |
optional bytes tag = 3; | |
} | |
message RpbGetResp { | |
repeated RpbContent content = 1; | |
optional bytes vclock = 2; | |
} | |
message RpbListBucketsResp { | |
repeated bytes buckets = 1; | |
} |
Author
technoweenie
commented
Jul 24, 2010
- http://code.google.com/p/protobuf-for-node/
- http://wiki.basho.com/display/RIAK/PBC+API
neat!
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment