Skip to content

Instantly share code, notes, and snippets.

@McMlok
Created September 5, 2024 07:08
Show Gist options
  • Save McMlok/df4e83b5ccf0bf0fda9539167168bbe2 to your computer and use it in GitHub Desktop.
Save McMlok/df4e83b5ccf0bf0fda9539167168bbe2 to your computer and use it in GitHub Desktop.
OTEL MassTransit observer using Propagators
using System;
using System.Diagnostics;
using System.Threading.Tasks;
using MassTransit;
using OpenTelemetry.Context.Propagation;
using OpenTelemetry;
using System.Collections.Generic;
using MassTransit.Transports;
using MassTransit.Logging;
using MassTransit.Util;
using Activity = System.Diagnostics.Activity;
namespace MassTransit.Artemis;
internal class TracingObserver(ActivitySource source) : IReceiveObserver, ISendObserver
{
public Task ConsumeFault<T>(ConsumeContext<T> context, TimeSpan duration, string consumerType, Exception exception)
where T : class
{
if (context.TryGetPayload<Activity>(out var activity))
{
AddExceptionEvent(activity, exception);
activity.Stop();
}
return Task.CompletedTask;
}
public Task PostConsume<T>(ConsumeContext<T> context, TimeSpan duration, string consumerType) where T : class
{
return Task.CompletedTask;
}
public Task PostReceive(ReceiveContext context)
{
if(context.TryGetPayload<Activity>(out var activity))
{
activity.Stop();
}
return Task.CompletedTask;
}
public Task PreReceive(ReceiveContext context)
{
var activityContext = new PropagationContext(new ActivityContext(), Baggage.Current);
var propagationContext = Propagators.DefaultTextMapPropagator.Extract(activityContext, context, GetHeaderFromContext);
var newActivity = source.CreateActivity($"{context.InputAddress.GetDiagnosticEndpointName()} receive", ActivityKind.Consumer, propagationContext.ActivityContext);
newActivity!.Start();
context.AddOrUpdatePayload(() => newActivity, (existing) => newActivity);
return Task.CompletedTask;
}
public Task ReceiveFault(ReceiveContext context, Exception exception)
{
return Task.CompletedTask;
}
public Task PreSend<T>(SendContext<T> context) where T : class
{
var activity = Activity.Current;
if (activity is not null)
{
Propagators.DefaultTextMapPropagator.Inject(new PropagationContext(activity.Context, Baggage.Current), context, SetHeaderToContext);
context.AddOrUpdatePayload(() => activity, (existing) => activity);
}
return Task.CompletedTask;
}
public Task PostSend<T>(SendContext<T> context) where T : class
{
if (context.TryGetPayload<Activity>(out var activity))
{
activity.Stop();
}
return Task.CompletedTask;
}
public Task SendFault<T>(SendContext<T> context, Exception exception) where T : class
{
if (context.TryGetPayload<Activity>(out var activity))
{
AddExceptionEvent(activity, exception);
activity.Stop();
}
return Task.CompletedTask;
}
private static IEnumerable<string> GetHeaderFromContext(ReceiveContext context, string key)
{
if (context.TransportHeaders.TryGetHeader(key, out var headerValue))
{
return [headerValue.ToString()!];
}
return [];
}
private void SetHeaderToContext(SendContext<object> context, string key, string value)
{
if (!context.Headers.TryGetHeader(key, out var _))
{
context.Headers.Set(key, value);
}
}
public static void AddExceptionEvent(Activity activity, Exception exception, bool escaped = true)
{
exception = exception.GetBaseException() ?? exception;
var exceptionMessage = ExceptionUtil.GetMessage(exception);
var tags = new ActivityTagsCollection
{
{ DiagnosticHeaders.Exceptions.Escaped, escaped },
{ DiagnosticHeaders.Exceptions.Message, exceptionMessage },
{ DiagnosticHeaders.Exceptions.Type, TypeCache.GetShortName(exception.GetType()) },
{ DiagnosticHeaders.Exceptions.Stacktrace, ExceptionUtil.GetStackTrace(exception) }
};
var activityEvent = new ActivityEvent(DiagnosticHeaders.Exceptions.EventName, DateTimeOffset.UtcNow, tags);
activity.AddEvent(activityEvent);
activity.SetStatus(ActivityStatusCode.Error, exceptionMessage);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment