Created
September 5, 2024 07:08
-
-
Save McMlok/df4e83b5ccf0bf0fda9539167168bbe2 to your computer and use it in GitHub Desktop.
OTEL MassTransit observer using Propagators
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Diagnostics; | |
using System.Threading.Tasks; | |
using MassTransit; | |
using OpenTelemetry.Context.Propagation; | |
using OpenTelemetry; | |
using System.Collections.Generic; | |
using MassTransit.Transports; | |
using MassTransit.Logging; | |
using MassTransit.Util; | |
using Activity = System.Diagnostics.Activity; | |
namespace MassTransit.Artemis; | |
internal class TracingObserver(ActivitySource source) : IReceiveObserver, ISendObserver | |
{ | |
public Task ConsumeFault<T>(ConsumeContext<T> context, TimeSpan duration, string consumerType, Exception exception) | |
where T : class | |
{ | |
if (context.TryGetPayload<Activity>(out var activity)) | |
{ | |
AddExceptionEvent(activity, exception); | |
activity.Stop(); | |
} | |
return Task.CompletedTask; | |
} | |
public Task PostConsume<T>(ConsumeContext<T> context, TimeSpan duration, string consumerType) where T : class | |
{ | |
return Task.CompletedTask; | |
} | |
public Task PostReceive(ReceiveContext context) | |
{ | |
if(context.TryGetPayload<Activity>(out var activity)) | |
{ | |
activity.Stop(); | |
} | |
return Task.CompletedTask; | |
} | |
public Task PreReceive(ReceiveContext context) | |
{ | |
var activityContext = new PropagationContext(new ActivityContext(), Baggage.Current); | |
var propagationContext = Propagators.DefaultTextMapPropagator.Extract(activityContext, context, GetHeaderFromContext); | |
var newActivity = source.CreateActivity($"{context.InputAddress.GetDiagnosticEndpointName()} receive", ActivityKind.Consumer, propagationContext.ActivityContext); | |
newActivity!.Start(); | |
context.AddOrUpdatePayload(() => newActivity, (existing) => newActivity); | |
return Task.CompletedTask; | |
} | |
public Task ReceiveFault(ReceiveContext context, Exception exception) | |
{ | |
return Task.CompletedTask; | |
} | |
public Task PreSend<T>(SendContext<T> context) where T : class | |
{ | |
var activity = Activity.Current; | |
if (activity is not null) | |
{ | |
Propagators.DefaultTextMapPropagator.Inject(new PropagationContext(activity.Context, Baggage.Current), context, SetHeaderToContext); | |
context.AddOrUpdatePayload(() => activity, (existing) => activity); | |
} | |
return Task.CompletedTask; | |
} | |
public Task PostSend<T>(SendContext<T> context) where T : class | |
{ | |
if (context.TryGetPayload<Activity>(out var activity)) | |
{ | |
activity.Stop(); | |
} | |
return Task.CompletedTask; | |
} | |
public Task SendFault<T>(SendContext<T> context, Exception exception) where T : class | |
{ | |
if (context.TryGetPayload<Activity>(out var activity)) | |
{ | |
AddExceptionEvent(activity, exception); | |
activity.Stop(); | |
} | |
return Task.CompletedTask; | |
} | |
private static IEnumerable<string> GetHeaderFromContext(ReceiveContext context, string key) | |
{ | |
if (context.TransportHeaders.TryGetHeader(key, out var headerValue)) | |
{ | |
return [headerValue.ToString()!]; | |
} | |
return []; | |
} | |
private void SetHeaderToContext(SendContext<object> context, string key, string value) | |
{ | |
if (!context.Headers.TryGetHeader(key, out var _)) | |
{ | |
context.Headers.Set(key, value); | |
} | |
} | |
public static void AddExceptionEvent(Activity activity, Exception exception, bool escaped = true) | |
{ | |
exception = exception.GetBaseException() ?? exception; | |
var exceptionMessage = ExceptionUtil.GetMessage(exception); | |
var tags = new ActivityTagsCollection | |
{ | |
{ DiagnosticHeaders.Exceptions.Escaped, escaped }, | |
{ DiagnosticHeaders.Exceptions.Message, exceptionMessage }, | |
{ DiagnosticHeaders.Exceptions.Type, TypeCache.GetShortName(exception.GetType()) }, | |
{ DiagnosticHeaders.Exceptions.Stacktrace, ExceptionUtil.GetStackTrace(exception) } | |
}; | |
var activityEvent = new ActivityEvent(DiagnosticHeaders.Exceptions.EventName, DateTimeOffset.UtcNow, tags); | |
activity.AddEvent(activityEvent); | |
activity.SetStatus(ActivityStatusCode.Error, exceptionMessage); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment