Skip to content

Commit

Permalink
feat: First part of wiring up actual tracing (unary only)
Browse files Browse the repository at this point in the history
There's just a single test so far - more required. We should discuss this commit in detail before going further.
Streaming calls are trickier, because we don't want to stop the activity before the call has actually completed. (We may not even know that in some cases.)
  • Loading branch information
jskeet committed Mar 13, 2024
1 parent ec04ce1 commit c8cb76d
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 6 deletions.
46 changes: 46 additions & 0 deletions Google.Api.Gax.Grpc.Tests/ApiCallTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
using Google.Protobuf;
using Google.Protobuf.Reflection;
using Grpc.Core;
using Microsoft.Extensions.Logging;
using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
Expand Down Expand Up @@ -217,6 +220,49 @@ public async Task WithLogging_Async()
Assert.All(entries, entry => Assert.Contains("SimpleMethod", entry.Message));
}

[Fact]
public void WithTracing_Sync()
{
using var helper = new ActivityHelper();
var call = new ApiCall<SimpleRequest, SimpleResponse>(
"SimpleMethod",
(req, cs) => Task.FromResult(default(SimpleResponse)),
(req, cs) => null,
null).WithTracing(helper.Source);

call.Sync(new SimpleRequest(), null);

var activity = helper.CapturedActivity;
Assert.NotNull(activity);
Assert.Contains("SimpleMethod", activity.OperationName);
Assert.Equal(ActivityStatusCode.Ok, activity.Status);
}

private class ActivityHelper : IDisposable
{
public ActivitySource Source { get; }
public Activity CapturedActivity { get; private set; }
private readonly ActivityListener _listener;

internal ActivityHelper([CallerMemberName] string name = null)
{
Source = new ActivitySource(name);
_listener = new ActivityListener
{
ShouldListenTo = candidate => candidate == Source,
Sample = (ref ActivityCreationOptions<ActivityContext> options) => ActivitySamplingResult.AllDataAndRecorded,
ActivityStarted = activity => CapturedActivity = activity
};
ActivitySource.AddActivityListener(_listener);
}

public void Dispose()
{
_listener.Dispose();
Source.Dispose();
}
}

internal class ExtractedRequestParamRequest : IMessage<ExtractedRequestParamRequest>
{
public string TableName { get; set; }
Expand Down
91 changes: 85 additions & 6 deletions Google.Api.Gax.Grpc/ApiCallTracingExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@

using Grpc.Core;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading.Tasks;

namespace Google.Api.Gax.Grpc;
Expand All @@ -19,15 +17,64 @@ namespace Google.Api.Gax.Grpc;
/// </summary>
internal static class ApiCallTracingExtensions
{
internal const string AttributeExceptionEventName = "exception";
internal const string AttributeExceptionType = "exception.type";
internal const string AttributeExceptionMessage = "exception.message";
internal const string AttributeExceptionStacktrace = "exception.stacktrace";

internal const string GrpcCallTypeTag = "grpc.call.type";
internal const string UnaryCallType = "unary";
internal const string ServerStreamingCallType = "server_streaming";
internal const string ClientStreamingCallType = "client_streaming";
internal const string BidiStreamingCallType = "bidi_streaming";

// Unary async
internal static Func<TRequest, CallSettings, Task<TResponse>> WithRetry<TRequest, TResponse>(
this Func<TRequest, CallSettings, Task<TResponse>> fn, ActivitySource activitySource, string methodName) =>
fn;
this Func<TRequest, CallSettings, Task<TResponse>> fn, ActivitySource activitySource, string methodName)
{
GaxPreconditions.CheckNotNull(activitySource, nameof(activitySource));
var activityName = FormatActivityName(fn, methodName);
return async (request, callSettings) =>
{
using var activity = activitySource.StartActivity(activityName, ActivityKind.Client);
activity?.SetTag(GrpcCallTypeTag, UnaryCallType);
try
{
var response = await fn(request, callSettings).ConfigureAwait(false);
activity?.SetStatus(ActivityStatusCode.Ok);
return response;
}
catch (Exception ex) when (SetActivityException(activity, ex))
{
// We'll never actually get here, because SetActivityException always returns false.
throw;
}
};
}

// Unary sync
internal static Func<TRequest, CallSettings, TResponse> WithTracing<TRequest, TResponse>(
this Func<TRequest, CallSettings, TResponse> fn, ActivitySource activitySource, string methodName) =>
fn;
this Func<TRequest, CallSettings, TResponse> fn, ActivitySource activitySource, string methodName)
{
GaxPreconditions.CheckNotNull(activitySource, nameof(activitySource));
var activityName = FormatActivityName(fn, methodName);
return (request, callSettings) =>
{
using var activity = activitySource.StartActivity(activityName, ActivityKind.Client);
activity?.SetTag(GrpcCallTypeTag, UnaryCallType);
try
{
var response = fn(request, callSettings);
activity?.SetStatus(ActivityStatusCode.Ok);
return response;
}
catch (Exception ex) when (SetActivityException(activity, ex))
{
// We'll never actually get here, because SetActivityException always returns false.
throw;
}
};
}

// Server-streaming async
internal static Func<TRequest, CallSettings, Task<AsyncServerStreamingCall<TResponse>>> WithTracing<TRequest, TResponse>(
Expand All @@ -48,4 +95,36 @@ internal static Func<CallSettings, AsyncClientStreamingCall<TRequest, TResponse>
internal static Func<CallSettings, AsyncDuplexStreamingCall<TRequest, TResponse>> WithTracing<TRequest, TResponse>(
this Func<CallSettings, AsyncDuplexStreamingCall<TRequest, TResponse>> fn, ActivitySource activitySource, string methodName) =>
fn;

// This is still very much up in the air, and may even require changes to the parameters, so that we get more information
// (e.g. the full RPC name, the client name etc).
private static string FormatActivityName(Delegate fn, string methodName) => $"{fn.Method.Name}/{methodName}";

// TODO: See if there's a standard way of doing this. It seems odd to have to do it ourselves.
/// <summary>
/// Sets an exception within an activity. We may wish to expose this publicly for integration purposes.
/// This always returns false, so that it can be used as an exception filter.
/// </summary>
private static bool SetActivityException(Activity activity, Exception ex)
{
if (ex is null || activity is null)
{
return false;
}

var tagsCollection = new ActivityTagsCollection
{
{ AttributeExceptionType, ex.GetType().FullName },
{ AttributeExceptionStacktrace, ex.ToString() },
};

if (!string.IsNullOrWhiteSpace(ex.Message))
{
tagsCollection[AttributeExceptionMessage] = ex.Message;
}

activity.SetStatus(ActivityStatusCode.Error, ex.Message);
activity.AddEvent(new ActivityEvent(AttributeExceptionEventName, default, tagsCollection));
return false;
}
}

0 comments on commit c8cb76d

Please sign in to comment.