Created
June 8, 2017 21:48
-
-
Save bruth/149c41ae40dee5931fea83054a33c704 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package transport | |
import ( | |
"errors" | |
"fmt" | |
"sync" | |
"time" | |
"github.com/golang/protobuf/proto" | |
"github.com/nats-io/go-nats" | |
"github.com/nats-io/nuid" | |
"go.uber.org/zap" | |
) | |
var ( | |
DefaultRequestTimeout = 2 * time.Second | |
) | |
// PublishOptions are options for a publication. | |
type PublishOptions struct { | |
Cause string | |
} | |
type PublishOption func(*PublishOptions) | |
// PublishCause sets the cause of the publication. | |
func PublishCause(s string) PublishOption { | |
return func(o *PublishOptions) { | |
o.Cause = s | |
} | |
} | |
// RequestOptions are options for a publication. | |
type RequestOptions struct { | |
Cause string | |
Timeout time.Duration | |
} | |
type RequestOption func(*RequestOptions) | |
// RequestTimeout sets a request timeout duration. | |
func RequestTimeout(t time.Duration) RequestOption { | |
return func(o *RequestOptions) { | |
o.Timeout = t | |
} | |
} | |
// RequestCause sets the cause of the request. | |
func RequestCause(s string) RequestOption { | |
return func(o *RequestOptions) { | |
o.Cause = s | |
} | |
} | |
// SubscribeOptions are options for a subscriber. | |
type SubscribeOptions struct { | |
Queue string | |
} | |
type SubscribeOption func(*SubscribeOptions) | |
// SubscribeQueue specifies the queue name of the subscriber. | |
func SubscribeQueue(q string) SubscribeOption { | |
return func(o *SubscribeOptions) { | |
o.Queue = q | |
} | |
} | |
// Decode decodes the message payload into a proto message. | |
func (m *Message) Decode(pb proto.Message) error { | |
return proto.Unmarshal(m.Payload, pb) | |
} | |
// Handler is the handler used by a subscriber. The return value may be nil if | |
// no output is yielded. If this is a request, the reply will be sent automatically | |
// with the reply value or an error if one occurred. If a reply is not expected | |
// and error occurs, it will be logged. | |
type Handler func(msg *Message) (proto.Message, error) | |
// Transport describes the interface | |
type Transport interface { | |
// Publish publishes a message asynchronously to the specified subject. | |
// The wrapped message is returned or an error. The error would only be due to | |
// a connection issue, but does not reflect any consumer error. | |
Publish(sub string, msg proto.Message, opts ...PublishOption) (*Message, error) | |
// Request publishes a message synchronously and waits for a response that | |
// is decoded into the Protobuf message supplied. The wrapped message is | |
// returned or an error. The error could be a connection error, timeout, | |
// or a consumer error. | |
Request(sub string, req proto.Message, rep proto.Message, opts ...RequestOption) (*Message, error) | |
// Subscribe creates a subscription to a subject. | |
Subscribe(sub string, hdl Handler, opts ...SubscribeOption) (*nats.Subscription, error) | |
// Conn returns the underlying NATS connection. | |
Conn() *nats.Conn | |
// Close closes the transport connection and unsubscribes all subscribers. | |
Close() | |
// Set the logger. | |
SetLogger(*zap.Logger) | |
} | |
// Connect is a convenience function establishing a connection with | |
// NATS and returning a transport. | |
func Connect(opts *nats.Options) (Transport, error) { | |
conn, err := opts.Connect() | |
if err != nil { | |
return nil, err | |
} | |
logger, _ := zap.NewProduction() | |
return &transport{ | |
logger: logger, | |
conn: conn, | |
}, nil | |
} | |
// New returns a transport using an existing NATS connection. | |
func New(conn *nats.Conn) Transport { | |
logger, _ := zap.NewProduction() | |
return &transport{ | |
logger: logger, | |
conn: conn, | |
} | |
} | |
type transport struct { | |
logger *zap.Logger | |
conn *nats.Conn | |
subs []*nats.Subscription | |
mux sync.Mutex | |
} | |
func (c *transport) SetLogger(l *zap.Logger) { | |
c.logger = l | |
} | |
func (c *transport) Conn() *nats.Conn { | |
return c.conn | |
} | |
func (c *transport) Close() { | |
for _, sub := range c.subs { | |
sub.Unsubscribe() | |
} | |
c.conn.Close() | |
} | |
func (c *transport) wrap(payload proto.Message) (*Message, error) { | |
var ( | |
pb []byte | |
err error | |
) | |
if payload != nil { | |
pb, err = proto.Marshal(payload) | |
if err != nil { | |
return nil, err | |
} | |
} | |
id := nuid.Next() | |
ts := time.Now().UnixNano() | |
msg := Message{ | |
Id: id, | |
Timestamp: uint64(ts), | |
Payload: pb, | |
} | |
return &msg, nil | |
} | |
func (c *transport) unwrap(nmsg *nats.Msg) (*Message, error) { | |
var msg Message | |
if err := proto.Unmarshal(nmsg.Data, &msg); err != nil { | |
return nil, err | |
} | |
msg.Subject = nmsg.Subject | |
msg.Reply = nmsg.Reply | |
msg.Queue = nmsg.Sub.Queue | |
return &msg, nil | |
} | |
func (c *transport) Publish(sub string, msg proto.Message, opts ...PublishOption) (*Message, error) { | |
pubOpts := &PublishOptions{} | |
// Apply options. | |
for _, opt := range opts { | |
opt(pubOpts) | |
} | |
m, err := c.wrap(msg) | |
if err != nil { | |
return nil, err | |
} | |
m.Subject = sub | |
m.Cause = pubOpts.Cause | |
mb, err := proto.Marshal(m) | |
if err != nil { | |
return nil, err | |
} | |
if err := c.conn.Publish(sub, mb); err != nil { | |
return nil, err | |
} | |
return m, nil | |
} | |
func (c *transport) Request(sub string, req proto.Message, rep proto.Message, opts ...RequestOption) (*Message, error) { | |
reqOpts := &RequestOptions{ | |
Timeout: DefaultRequestTimeout, | |
} | |
// Apply options. | |
for _, opt := range opts { | |
opt(reqOpts) | |
} | |
m, err := c.wrap(req) | |
if err != nil { | |
return nil, err | |
} | |
m.Subject = sub | |
m.Cause = reqOpts.Cause | |
mb, err := proto.Marshal(m) | |
if err != nil { | |
return nil, err | |
} | |
nm, err := c.conn.Request(sub, mb, reqOpts.Timeout) | |
if err != nil { | |
return nil, err | |
} | |
m, err = c.unwrap(nm) | |
if err != nil { | |
return nil, err | |
} | |
// Error occurred in the handler. | |
if m.Error != "" { | |
return nil, errors.New(m.Error) | |
} | |
if rep != nil { | |
if err := proto.Unmarshal(m.Payload, rep); err != nil { | |
return nil, err | |
} | |
} | |
return m, nil | |
} | |
// Subscribe creates a subscription to a subject. | |
func (c *transport) Subscribe(sub string, hdlr Handler, opts ...SubscribeOption) (*nats.Subscription, error) { | |
subOpts := &SubscribeOptions{} | |
// Apply options. | |
for _, opt := range opts { | |
opt(subOpts) | |
} | |
// Replies to the recipient with an error if applicable. | |
replyWithError := func(logger *zap.Logger, msg *Message, srcErr error) { | |
rmsg, err := c.wrap(nil) | |
// If an error occurs, this is a bug since this only relies on the local | |
// Message protobuf definition. | |
if err != nil { | |
logger.Error("failed to create transport message", | |
zap.Error(err), | |
) | |
return | |
} | |
rmsg.Cause = msg.Id | |
rmsg.Subject = msg.Reply | |
rmsg.Error = srcErr.Error() | |
mb, err := proto.Marshal(rmsg) | |
if err != nil { | |
logger.Error("failed to marshal transport message", | |
zap.Error(err), | |
) | |
return | |
} | |
if err := c.conn.Publish(msg.Reply, mb); err != nil { | |
logger.Error("failed to publish nats message", | |
zap.Error(err), | |
) | |
} | |
} | |
// NATS message handler. | |
natsHandler := func(nmsg *nats.Msg) { | |
// Copy logger for this request. | |
logger := c.logger.With( | |
zap.String("msg.subject", nmsg.Subject), | |
zap.String("msg.reply", nmsg.Reply), | |
) | |
msg, err := c.unwrap(nmsg) | |
// Failed unwrap which means the message is likely in the wrong format. | |
// A reply is ignored if this occurs since if the sent message was invalid | |
// it is unlikely the requester will be able to parse the message in the | |
// same format. Instead we log this case. | |
if err != nil { | |
logger.Error("failed to decode nats message") | |
return | |
} | |
// Add more context. | |
logger = c.logger.With( | |
zap.String("trace.id", msg.Id), | |
zap.String("msg.id", msg.Id), | |
zap.String("msg.cause", msg.Cause), | |
) | |
// In case the handler panics, catch and log. | |
defer func() { | |
if rec := recover(); rec != nil { | |
err := fmt.Errorf("recovered subscription handler panic:\n%s", rec) | |
if msg.Reply == "" { | |
logger.Error("subscription handler panic", | |
zap.Error(err), | |
) | |
return | |
} | |
replyWithError(logger, msg, err) | |
} | |
}() | |
// Pass to handler. | |
resp, err := hdlr(msg) | |
// Log error only if no reply. | |
if msg.Reply == "" { | |
if err != nil { | |
logger.Error("subscription handler error", | |
zap.Error(err), | |
) | |
} | |
return | |
} | |
// Error occured while handling message. We assume this is an error in the | |
// business logic and will reply with the error. The handler itself should | |
// have logged the error if it occurred since it can provide context. | |
if err != nil { | |
replyWithError(logger, msg, err) | |
return | |
} | |
// This will only fail if the response itself cannot be marshaled which | |
// means the handler is likely at fault. The error should be logged | |
// and a reply with a faulty handler can be returned. | |
rmsg, err := c.wrap(resp) | |
if err != nil { | |
logger.Error("failed to marshal response message", | |
zap.Error(err), | |
) | |
replyWithError(logger, msg, err) | |
return | |
} | |
rmsg.Cause = msg.Id | |
rmsg.Subject = msg.Reply | |
// Bug. | |
mb, err := proto.Marshal(rmsg) | |
if err != nil { | |
logger.Error("failed to marshal transport message", | |
zap.Error(err), | |
) | |
return | |
} | |
if err := c.conn.Publish(msg.Reply, mb); err != nil { | |
logger.Error("failed to publish nats message", | |
zap.Error(err), | |
) | |
} | |
} | |
// Queue-based subscriber. | |
if subOpts.Queue != "" { | |
s, err := c.conn.QueueSubscribe(sub, subOpts.Queue, natsHandler) | |
if err != nil { | |
return nil, err | |
} | |
c.mux.Lock() | |
c.subs = append(c.subs, s) | |
c.mux.Unlock() | |
return s, nil | |
} | |
// Standalone subscriber. | |
s, err := c.conn.Subscribe(sub, natsHandler) | |
if err != nil { | |
return nil, err | |
} | |
c.mux.Lock() | |
c.subs = append(c.subs, s) | |
c.mux.Unlock() | |
return s, nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment