Skip to content

Instantly share code, notes, and snippets.

@iansmith
Created November 13, 2011 14:17
Show Gist options
  • Save iansmith/1362154 to your computer and use it in GitHub Desktop.
Save iansmith/1362154 to your computer and use it in GitHub Desktop.
This patch fixes the gozmq package to understand the new error type in go, plus it exports the ZmqErrno type so that one can handle errors more conveniently. It adds the ZMQ_LINGER constant that is available only in ZMQ versions after 2.1, but this appea
From 6d33af21c66901171e84e56c23e3c4d6297f8cb8 Mon Sep 17 00:00:00 2001
From: Ian Smith <[email protected]>
Date: Fri, 11 Nov 2011 17:57:57 +0100
Subject: [PATCH] appear to have updated this for the current version of go,
weekly on nov 9
---
zmq.go | 97 ++++++++++++++++++++++++++++++++--------------------------
zmq_test.go | 30 ++++++++----------
2 files changed, 67 insertions(+), 60 deletions(-)
diff --git a/zmq.go b/zmq.go
index 05025a9..f5196b3 100644
--- a/zmq.go
+++ b/zmq.go
@@ -14,7 +14,6 @@
limitations under the License.
*/
-
//
// This package implements Go bindings for the 0mq C API.
//
@@ -33,31 +32,32 @@ package gozmq
import "C"
import (
- "container/vector"
+// "container/vector"
+ "errors"
"os"
"unsafe"
)
type Context interface {
- NewSocket(t SocketType) (Socket, os.Error)
+ NewSocket(t SocketType) (Socket, error)
Close()
}
type Socket interface {
- Bind(address string) os.Error
- Connect(address string) os.Error
- Send(data []byte, flags SendRecvOption) os.Error
- Recv(flags SendRecvOption) (data []byte, err os.Error)
- RecvMultipart(flags SendRecvOption) (parts [][]byte, err os.Error)
- SendMultipart(parts [][]byte, flags SendRecvOption) (err os.Error)
- Close() os.Error
-
- SetSockOptInt64(option Int64SocketOption, value int64) os.Error
- SetSockOptUInt64(option UInt64SocketOption, value uint64) os.Error
- SetSockOptString(option StringSocketOption, value string) os.Error
- GetSockOptInt64(option Int64SocketOption) (value int64, err os.Error)
- GetSockOptUInt64(option UInt64SocketOption) (value uint64, err os.Error)
- GetSockOptString(option StringSocketOption) (value string, err os.Error)
+ Bind(address string) error
+ Connect(address string) error
+ Send(data []byte, flags SendRecvOption) error
+ Recv(flags SendRecvOption) (data []byte, err error)
+ RecvMultipart(flags SendRecvOption) (parts [][]byte, err error)
+ SendMultipart(parts [][]byte, flags SendRecvOption) (err error)
+ Close() error
+
+ SetSockOptInt64(option Int64SocketOption, value int64) error
+ SetSockOptUInt64(option UInt64SocketOption, value uint64) error
+ SetSockOptString(option StringSocketOption, value string) error
+ GetSockOptInt64(option Int64SocketOption) (value int64, err error)
+ GetSockOptUInt64(option UInt64SocketOption) (value uint64, err error)
+ GetSockOptString(option StringSocketOption) (value string, err error)
// Package local function makes this interface unimplementable outside
// of this package which removes some of the point of using an interface
@@ -71,7 +71,6 @@ type StringSocketOption int
type SendRecvOption int
type zmqErrno os.Errno
-
const (
// NewSocket types
PAIR = SocketType(C.ZMQ_PAIR)
@@ -133,13 +132,12 @@ func Version() (int, int, int) {
return int(major), int(minor), int(patch)
}
-
func (e zmqErrno) String() string {
return C.GoString(C.zmq_strerror(C.int(e)))
}
// int zmq_errno ();
-func errno() os.Error {
+func errno() error {
return zmqErrno(C.zmq_errno())
}
@@ -153,7 +151,7 @@ type zmqContext struct {
// Create a new context.
// void *zmq_init (int io_threads);
-func NewContext() (Context, os.Error) {
+func NewContext() (Context, error) {
// TODO Pass something useful here. Number of cores?
// C.NULL is correct but causes a runtime failure on darwin at present
if c := C.zmq_init(1); c != nil /*C.NULL*/ {
@@ -174,7 +172,7 @@ func (c *zmqContext) Close() {
// Create a new socket.
// void *zmq_socket (void *context, int type);
-func (c *zmqContext) NewSocket(t SocketType) (Socket, os.Error) {
+func (c *zmqContext) NewSocket(t SocketType) (Socket, error) {
// C.NULL is correct but causes a runtime failure on darwin at present
if s := C.zmq_socket(c.c, C.int(t)); s != nil /*C.NULL*/ {
return &zmqSocket{c: c, s: s}, nil
@@ -190,7 +188,7 @@ type zmqSocket struct {
// Shutdown the socket.
// int zmq_close (void *s);
-func (s *zmqSocket) Close() os.Error {
+func (s *zmqSocket) Close() error {
if C.zmq_close(s.s) != 0 {
return errno()
}
@@ -201,13 +199,13 @@ func (s *zmqSocket) Close() os.Error {
func (s *zmqSocket) destroy() {
// Will this get called without being added by runtime.SetFinalizer()?
if err := s.Close(); err != nil {
- panic("Error while destroying zmqSocket: " + err.String() + "\n")
+ panic("Error while destroying zmqSocket: " + err.Error() + "\n")
}
}
// Set an int64 option on the socket.
// int zmq_setsockopt (void *s, int option, const void *optval, size_t optvallen);
-func (s *zmqSocket) SetSockOptInt64(option Int64SocketOption, value int64) os.Error {
+func (s *zmqSocket) SetSockOptInt64(option Int64SocketOption, value int64) error {
if C.zmq_setsockopt(s.s, C.int(option), unsafe.Pointer(&value), C.size_t(unsafe.Sizeof(&value))) != 0 {
return errno()
}
@@ -216,7 +214,7 @@ func (s *zmqSocket) SetSockOptInt64(option Int64SocketOption, value int64) os.Er
// Set a uint64 option on the socket.
// int zmq_setsockopt (void *s, int option, const void *optval, size_t optvallen);
-func (s *zmqSocket) SetSockOptUInt64(option UInt64SocketOption, value uint64) os.Error {
+func (s *zmqSocket) SetSockOptUInt64(option UInt64SocketOption, value uint64) error {
if C.zmq_setsockopt(s.s, C.int(option), unsafe.Pointer(&value), C.size_t(unsafe.Sizeof(&value))) != 0 {
return errno()
}
@@ -225,7 +223,7 @@ func (s *zmqSocket) SetSockOptUInt64(option UInt64SocketOption, value uint64) os
// Set a string option on the socket.
// int zmq_setsockopt (void *s, int option, const void *optval, size_t optvallen);
-func (s *zmqSocket) SetSockOptString(option StringSocketOption, value string) os.Error {
+func (s *zmqSocket) SetSockOptString(option StringSocketOption, value string) error {
v := C.CString(value)
defer C.free(unsafe.Pointer(v))
if C.zmq_setsockopt(s.s, C.int(option), unsafe.Pointer(v), C.size_t(len(value))) != 0 {
@@ -236,7 +234,7 @@ func (s *zmqSocket) SetSockOptString(option StringSocketOption, value string) os
// Get an int64 option from the socket.
// int zmq_getsockopt (void *s, int option, void *optval, size_t *optvallen);
-func (s *zmqSocket) GetSockOptInt64(option Int64SocketOption) (value int64, err os.Error) {
+func (s *zmqSocket) GetSockOptInt64(option Int64SocketOption) (value int64, err error) {
size := C.size_t(unsafe.Sizeof(value))
if C.zmq_getsockopt(s.s, C.int(option), unsafe.Pointer(&value), &size) != 0 {
err = errno()
@@ -247,7 +245,7 @@ func (s *zmqSocket) GetSockOptInt64(option Int64SocketOption) (value int64, err
// Get a uint64 option from the socket.
// int zmq_getsockopt (void *s, int option, void *optval, size_t *optvallen);
-func (s *zmqSocket) GetSockOptUInt64(option UInt64SocketOption) (value uint64, err os.Error) {
+func (s *zmqSocket) GetSockOptUInt64(option UInt64SocketOption) (value uint64, err error) {
size := C.size_t(unsafe.Sizeof(value))
if C.zmq_getsockopt(s.s, C.int(option), unsafe.Pointer(&value), &size) != 0 {
err = errno()
@@ -258,7 +256,7 @@ func (s *zmqSocket) GetSockOptUInt64(option UInt64SocketOption) (value uint64, e
// Get a string option from the socket.
// int zmq_getsockopt (void *s, int option, void *optval, size_t *optvallen);
-func (s *zmqSocket) GetSockOptString(option StringSocketOption) (value string, err os.Error) {
+func (s *zmqSocket) GetSockOptString(option StringSocketOption) (value string, err error) {
var buffer [1024]byte
var size C.size_t = 1024
if C.zmq_getsockopt(s.s, C.int(option), unsafe.Pointer(&buffer), &size) != 0 {
@@ -271,7 +269,7 @@ func (s *zmqSocket) GetSockOptString(option StringSocketOption) (value string, e
// Bind the socket to a listening address.
// int zmq_bind (void *s, const char *addr);
-func (s *zmqSocket) Bind(address string) os.Error {
+func (s *zmqSocket) Bind(address string) error {
a := C.CString(address)
defer C.free(unsafe.Pointer(a))
if C.zmq_bind(s.s, a) != 0 {
@@ -282,7 +280,7 @@ func (s *zmqSocket) Bind(address string) os.Error {
// Connect the socket to an address.
// int zmq_connect (void *s, const char *addr);
-func (s *zmqSocket) Connect(address string) os.Error {
+func (s *zmqSocket) Connect(address string) error {
a := C.CString(address)
defer C.free(unsafe.Pointer(a))
if C.zmq_connect(s.s, a) != 0 {
@@ -293,7 +291,7 @@ func (s *zmqSocket) Connect(address string) os.Error {
// Send a message to the socket.
// int zmq_send (void *s, zmq_msg_t *msg, int flags);
-func (s *zmqSocket) Send(data []byte, flags SendRecvOption) os.Error {
+func (s *zmqSocket) Send(data []byte, flags SendRecvOption) error {
var m C.zmq_msg_t
// Copy data array into C-allocated buffer.
size := C.size_t(len(data))
@@ -317,7 +315,7 @@ func (s *zmqSocket) Send(data []byte, flags SendRecvOption) os.Error {
// Receive a message from the socket.
// int zmq_recv (void *s, zmq_msg_t *msg, int flags);
-func (s *zmqSocket) Recv(flags SendRecvOption) (data []byte, err os.Error) {
+func (s *zmqSocket) Recv(flags SendRecvOption) (data []byte, err error) {
// Allocate and initialise a new zmq_msg_t
var m C.zmq_msg_t
if C.zmq_msg_init(&m) != 0 {
@@ -343,7 +341,7 @@ func (s *zmqSocket) Recv(flags SendRecvOption) (data []byte, err os.Error) {
}
// Send a multipart message.
-func (s *zmqSocket) SendMultipart(parts [][]byte, flags SendRecvOption) (err os.Error) {
+func (s *zmqSocket) SendMultipart(parts [][]byte, flags SendRecvOption) (err error) {
for i := 0; i < len(parts)-1; i++ {
if err = s.Send(parts[i], SNDMORE|flags); err != nil {
return
@@ -354,8 +352,10 @@ func (s *zmqSocket) SendMultipart(parts [][]byte, flags SendRecvOption) (err os.
}
// Receive a multipart message.
-func (s *zmqSocket) RecvMultipart(flags SendRecvOption) (parts [][]byte, err os.Error) {
- buffer := new(vector.Vector)
+func (s *zmqSocket) RecvMultipart(flags SendRecvOption) (parts [][]byte, err error) {
+ buffer := make([][]byte,0)
+
+ //buffer := new(vector.Vector)
for {
var data []byte
var more uint64
@@ -364,7 +364,8 @@ func (s *zmqSocket) RecvMultipart(flags SendRecvOption) (parts [][]byte, err os.
if err != nil {
return
}
- buffer.Push(data)
+ buffer=append(buffer,data)
+ //buffer.Push(data)
more, err = s.GetSockOptUInt64(RCVMORE)
if err != nil {
return
@@ -373,9 +374,12 @@ func (s *zmqSocket) RecvMultipart(flags SendRecvOption) (parts [][]byte, err os.
break
}
}
- parts = make([][]byte, buffer.Len())
- for i := 0; i < buffer.Len(); i++ {
- parts[i] = []byte(buffer.At(i).([]byte))
+// parts = make([][]byte, buffer.Len())
+ parts = make([][]byte, len(buffer))
+// for i := 0; i < buffer.Len(); i++ {
+ for i := 0; i < len(buffer); i++ {
+ //parts[i] = []byte(buffer[i]).([]byte)
+ parts[i] = buffer[i]
}
return
}
@@ -398,7 +402,7 @@ type PollItems []PollItem
// Poll ZmqSockets and file descriptors for I/O readiness. Timeout is in
// microseconds.
-func Poll(items []PollItem, timeout int64) (count int, err os.Error) {
+func Poll(items []PollItem, timeout int64) (count int, err error) {
zitems := make([]C.zmq_pollitem_t, len(items))
for i, pi := range items {
zitems[i].socket = pi.Socket.apiSocket()
@@ -418,11 +422,11 @@ func Poll(items []PollItem, timeout int64) (count int, err os.Error) {
}
// run a zmq_device passing messages between in and out
-func Device(t DeviceType, in, out Socket) os.Error {
+func Device(t DeviceType, in, out Socket) error {
if C.zmq_device(C.int(t), in.apiSocket(), out.apiSocket()) != 0 {
return errno()
}
- return os.NewError("zmq_device() returned unexpectedly.")
+ return errors.New("zmq_device() returned unexpectedly.")
}
// XXX For now, this library abstracts zmq_msg_t out of the API.
@@ -433,3 +437,8 @@ func Device(t DeviceType, in, out Socket) os.Error {
// void *zmq_msg_data (zmq_msg_t *msg);
// int zmq_msg_copy (zmq_msg_t *dest, zmq_msg_t *src);
// int zmq_msg_move (zmq_msg_t *dest, zmq_msg_t *src);
+
+func (self zmqErrno) Error() string {
+ return self.String()
+}
+
diff --git a/zmq_test.go b/zmq_test.go
index 68161ed..3302a33 100644
--- a/zmq_test.go
+++ b/zmq_test.go
@@ -16,12 +16,12 @@
package gozmq
import (
+// "container/vector"
+
"log"
- "os"
"runtime"
"testing"
"time"
- "container/vector"
)
const ADDRESS1 = "tcp://127.0.0.1:23456"
@@ -46,7 +46,7 @@ func runServer(t *testing.T, c Context, callback func(s Socket)) chan bool {
s, _ := c.NewSocket(REP)
defer s.Close()
if rc := s.Bind(ADDRESS1); rc != nil {
- t.Errorf("Failed to bind to %s; %s", ADDRESS1, rc.String())
+ t.Errorf("Failed to bind to %s; %s", ADDRESS1, rc.Error())
}
callback(s)
finished <- true
@@ -125,7 +125,7 @@ func TestBindToLoopBack(t *testing.T) {
s, _ := c.NewSocket(REP)
defer s.Close()
if rc := s.Bind(ADDRESS1); rc != nil {
- t.Errorf("Failed to bind to %s; %s", ADDRESS1, rc.String())
+ t.Errorf("Failed to bind to %s; %s", ADDRESS1, rc.Error())
}
}
@@ -135,7 +135,7 @@ func TestSetSockOptString(t *testing.T) {
s, _ := c.NewSocket(SUB)
defer s.Close()
if rc := s.Bind(ADDRESS1); rc != nil {
- t.Errorf("Failed to bind to %s; %s", ADDRESS1, rc.String())
+ t.Errorf("Failed to bind to %s; %s", ADDRESS1, rc.Error())
}
if rc := s.SetSockOptString(SUBSCRIBE, "TEST"); rc != nil {
t.Errorf("Failed to subscribe; %v", rc)
@@ -148,7 +148,7 @@ func TestMultipart(t *testing.T) {
finished := runServer(t, c, func(s Socket) {
parts, rc := s.RecvMultipart(0)
if rc != nil {
- t.Errorf("Failed to receive multipart message; %s", rc.String())
+ t.Errorf("Failed to receive multipart message; %s", rc.Error())
}
if len(parts) != 2 {
t.Errorf("Invalid multipart message, not enough parts; %d", len(parts))
@@ -161,10 +161,10 @@ func TestMultipart(t *testing.T) {
s, _ := c.NewSocket(REQ)
defer s.Close()
if rc := s.Connect(ADDRESS1); rc != nil {
- t.Errorf("Failed to connect to %s; %s", ADDRESS1, rc.String())
+ t.Errorf("Failed to connect to %s; %s", ADDRESS1, rc.Error())
}
if rc := s.SendMultipart([][]byte{[]byte("part1"), []byte("part2")}, 0); rc != nil {
- t.Errorf("Failed to send multipart message; %s", rc.String())
+ t.Errorf("Failed to send multipart message; %s", rc.Error())
}
<-finished
}
@@ -212,8 +212,8 @@ func TestDevice(t *testing.T) {
}
func TestZmqErrorStr(t *testing.T) {
- var e os.Error = EFSM
- es := e.String()
+ var e error = EFSM
+ es := e.Error()
if es != "Operation cannot be accomplished in current state" {
t.Errorf("EFSM.String() returned unexpected result: %s", e)
}
@@ -296,7 +296,7 @@ func BenchmarkSendReceive1MBinproc(b *testing.B) {
// A helper to make tests less verbose
type testEnv struct {
context Context
- sockets vector.Vector
+ sockets []Socket
t *testing.T
}
@@ -309,7 +309,7 @@ func NewTestEnv(t *testing.T) *testEnv {
t.Errorf("failed to create context in testEnv: %v", err)
t.FailNow()
}
- return &testEnv{context: c, t: t}
+ return &testEnv{context: c, t: t, sockets:[]Socket{}}
}
func (te *testEnv) NewSocket(t SocketType) Socket {
@@ -325,7 +325,7 @@ func (te *testEnv) NewBoundSocket(t SocketType, bindAddr string) Socket {
if err := s.Bind(bindAddr); err != nil {
log.Panicf("Failed to connect to %v: %v", bindAddr, err)
}
- te.sockets.Push(s)
+ te.sockets=append(te.sockets,s)
return s
}
@@ -334,7 +334,7 @@ func (te *testEnv) NewConnectedSocket(t SocketType, connectAddr string) Socket {
if err := s.Connect(connectAddr); err != nil {
log.Panicf("Failed to connect to %v: %v", connectAddr, err)
}
- te.sockets.Push(s)
+ te.sockets=append(te.sockets,s)
return s
}
@@ -373,11 +373,9 @@ func (te *testEnv) Recv(sock Socket, flags SendRecvOption) []byte {
return data
}
-
// TODO Test various socket types. UDP, TCP, etc.
// TODO Test NOBLOCK mode.
// TODO Test getting/setting socket options. Probably sufficient to do just one
// int and one string test.
-
// TODO Test that closing a context underneath a socket behaves "reasonably" (ie. doesnt' crash).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment