Skip to content

Instantly share code, notes, and snippets.

@bruth
Created June 8, 2017 21:48
Show Gist options
  • Save bruth/149c41ae40dee5931fea83054a33c704 to your computer and use it in GitHub Desktop.
Save bruth/149c41ae40dee5931fea83054a33c704 to your computer and use it in GitHub Desktop.
package transport
import (
"errors"
"fmt"
"sync"
"time"
"github.com/golang/protobuf/proto"
"github.com/nats-io/go-nats"
"github.com/nats-io/nuid"
"go.uber.org/zap"
)
var (
DefaultRequestTimeout = 2 * time.Second
)
// PublishOptions are options for a publication.
type PublishOptions struct {
Cause string
}
type PublishOption func(*PublishOptions)
// PublishCause sets the cause of the publication.
func PublishCause(s string) PublishOption {
return func(o *PublishOptions) {
o.Cause = s
}
}
// RequestOptions are options for a publication.
type RequestOptions struct {
Cause string
Timeout time.Duration
}
type RequestOption func(*RequestOptions)
// RequestTimeout sets a request timeout duration.
func RequestTimeout(t time.Duration) RequestOption {
return func(o *RequestOptions) {
o.Timeout = t
}
}
// RequestCause sets the cause of the request.
func RequestCause(s string) RequestOption {
return func(o *RequestOptions) {
o.Cause = s
}
}
// SubscribeOptions are options for a subscriber.
type SubscribeOptions struct {
Queue string
}
type SubscribeOption func(*SubscribeOptions)
// SubscribeQueue specifies the queue name of the subscriber.
func SubscribeQueue(q string) SubscribeOption {
return func(o *SubscribeOptions) {
o.Queue = q
}
}
// Decode decodes the message payload into a proto message.
func (m *Message) Decode(pb proto.Message) error {
return proto.Unmarshal(m.Payload, pb)
}
// Handler is the handler used by a subscriber. The return value may be nil if
// no output is yielded. If this is a request, the reply will be sent automatically
// with the reply value or an error if one occurred. If a reply is not expected
// and error occurs, it will be logged.
type Handler func(msg *Message) (proto.Message, error)
// Transport describes the interface
type Transport interface {
// Publish publishes a message asynchronously to the specified subject.
// The wrapped message is returned or an error. The error would only be due to
// a connection issue, but does not reflect any consumer error.
Publish(sub string, msg proto.Message, opts ...PublishOption) (*Message, error)
// Request publishes a message synchronously and waits for a response that
// is decoded into the Protobuf message supplied. The wrapped message is
// returned or an error. The error could be a connection error, timeout,
// or a consumer error.
Request(sub string, req proto.Message, rep proto.Message, opts ...RequestOption) (*Message, error)
// Subscribe creates a subscription to a subject.
Subscribe(sub string, hdl Handler, opts ...SubscribeOption) (*nats.Subscription, error)
// Conn returns the underlying NATS connection.
Conn() *nats.Conn
// Close closes the transport connection and unsubscribes all subscribers.
Close()
// Set the logger.
SetLogger(*zap.Logger)
}
// Connect is a convenience function establishing a connection with
// NATS and returning a transport.
func Connect(opts *nats.Options) (Transport, error) {
conn, err := opts.Connect()
if err != nil {
return nil, err
}
logger, _ := zap.NewProduction()
return &transport{
logger: logger,
conn: conn,
}, nil
}
// New returns a transport using an existing NATS connection.
func New(conn *nats.Conn) Transport {
logger, _ := zap.NewProduction()
return &transport{
logger: logger,
conn: conn,
}
}
type transport struct {
logger *zap.Logger
conn *nats.Conn
subs []*nats.Subscription
mux sync.Mutex
}
func (c *transport) SetLogger(l *zap.Logger) {
c.logger = l
}
func (c *transport) Conn() *nats.Conn {
return c.conn
}
func (c *transport) Close() {
for _, sub := range c.subs {
sub.Unsubscribe()
}
c.conn.Close()
}
func (c *transport) wrap(payload proto.Message) (*Message, error) {
var (
pb []byte
err error
)
if payload != nil {
pb, err = proto.Marshal(payload)
if err != nil {
return nil, err
}
}
id := nuid.Next()
ts := time.Now().UnixNano()
msg := Message{
Id: id,
Timestamp: uint64(ts),
Payload: pb,
}
return &msg, nil
}
func (c *transport) unwrap(nmsg *nats.Msg) (*Message, error) {
var msg Message
if err := proto.Unmarshal(nmsg.Data, &msg); err != nil {
return nil, err
}
msg.Subject = nmsg.Subject
msg.Reply = nmsg.Reply
msg.Queue = nmsg.Sub.Queue
return &msg, nil
}
func (c *transport) Publish(sub string, msg proto.Message, opts ...PublishOption) (*Message, error) {
pubOpts := &PublishOptions{}
// Apply options.
for _, opt := range opts {
opt(pubOpts)
}
m, err := c.wrap(msg)
if err != nil {
return nil, err
}
m.Subject = sub
m.Cause = pubOpts.Cause
mb, err := proto.Marshal(m)
if err != nil {
return nil, err
}
if err := c.conn.Publish(sub, mb); err != nil {
return nil, err
}
return m, nil
}
func (c *transport) Request(sub string, req proto.Message, rep proto.Message, opts ...RequestOption) (*Message, error) {
reqOpts := &RequestOptions{
Timeout: DefaultRequestTimeout,
}
// Apply options.
for _, opt := range opts {
opt(reqOpts)
}
m, err := c.wrap(req)
if err != nil {
return nil, err
}
m.Subject = sub
m.Cause = reqOpts.Cause
mb, err := proto.Marshal(m)
if err != nil {
return nil, err
}
nm, err := c.conn.Request(sub, mb, reqOpts.Timeout)
if err != nil {
return nil, err
}
m, err = c.unwrap(nm)
if err != nil {
return nil, err
}
// Error occurred in the handler.
if m.Error != "" {
return nil, errors.New(m.Error)
}
if rep != nil {
if err := proto.Unmarshal(m.Payload, rep); err != nil {
return nil, err
}
}
return m, nil
}
// Subscribe creates a subscription to a subject.
func (c *transport) Subscribe(sub string, hdlr Handler, opts ...SubscribeOption) (*nats.Subscription, error) {
subOpts := &SubscribeOptions{}
// Apply options.
for _, opt := range opts {
opt(subOpts)
}
// Replies to the recipient with an error if applicable.
replyWithError := func(logger *zap.Logger, msg *Message, srcErr error) {
rmsg, err := c.wrap(nil)
// If an error occurs, this is a bug since this only relies on the local
// Message protobuf definition.
if err != nil {
logger.Error("failed to create transport message",
zap.Error(err),
)
return
}
rmsg.Cause = msg.Id
rmsg.Subject = msg.Reply
rmsg.Error = srcErr.Error()
mb, err := proto.Marshal(rmsg)
if err != nil {
logger.Error("failed to marshal transport message",
zap.Error(err),
)
return
}
if err := c.conn.Publish(msg.Reply, mb); err != nil {
logger.Error("failed to publish nats message",
zap.Error(err),
)
}
}
// NATS message handler.
natsHandler := func(nmsg *nats.Msg) {
// Copy logger for this request.
logger := c.logger.With(
zap.String("msg.subject", nmsg.Subject),
zap.String("msg.reply", nmsg.Reply),
)
msg, err := c.unwrap(nmsg)
// Failed unwrap which means the message is likely in the wrong format.
// A reply is ignored if this occurs since if the sent message was invalid
// it is unlikely the requester will be able to parse the message in the
// same format. Instead we log this case.
if err != nil {
logger.Error("failed to decode nats message")
return
}
// Add more context.
logger = c.logger.With(
zap.String("trace.id", msg.Id),
zap.String("msg.id", msg.Id),
zap.String("msg.cause", msg.Cause),
)
// In case the handler panics, catch and log.
defer func() {
if rec := recover(); rec != nil {
err := fmt.Errorf("recovered subscription handler panic:\n%s", rec)
if msg.Reply == "" {
logger.Error("subscription handler panic",
zap.Error(err),
)
return
}
replyWithError(logger, msg, err)
}
}()
// Pass to handler.
resp, err := hdlr(msg)
// Log error only if no reply.
if msg.Reply == "" {
if err != nil {
logger.Error("subscription handler error",
zap.Error(err),
)
}
return
}
// Error occured while handling message. We assume this is an error in the
// business logic and will reply with the error. The handler itself should
// have logged the error if it occurred since it can provide context.
if err != nil {
replyWithError(logger, msg, err)
return
}
// This will only fail if the response itself cannot be marshaled which
// means the handler is likely at fault. The error should be logged
// and a reply with a faulty handler can be returned.
rmsg, err := c.wrap(resp)
if err != nil {
logger.Error("failed to marshal response message",
zap.Error(err),
)
replyWithError(logger, msg, err)
return
}
rmsg.Cause = msg.Id
rmsg.Subject = msg.Reply
// Bug.
mb, err := proto.Marshal(rmsg)
if err != nil {
logger.Error("failed to marshal transport message",
zap.Error(err),
)
return
}
if err := c.conn.Publish(msg.Reply, mb); err != nil {
logger.Error("failed to publish nats message",
zap.Error(err),
)
}
}
// Queue-based subscriber.
if subOpts.Queue != "" {
s, err := c.conn.QueueSubscribe(sub, subOpts.Queue, natsHandler)
if err != nil {
return nil, err
}
c.mux.Lock()
c.subs = append(c.subs, s)
c.mux.Unlock()
return s, nil
}
// Standalone subscriber.
s, err := c.conn.Subscribe(sub, natsHandler)
if err != nil {
return nil, err
}
c.mux.Lock()
c.subs = append(c.subs, s)
c.mux.Unlock()
return s, nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment