Skip to content

Commit

Permalink
feat: better tracing support in Stl.Rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
alexyakunin committed Sep 28, 2023
1 parent c82af31 commit 7bf5780
Show file tree
Hide file tree
Showing 27 changed files with 245 additions and 65 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
using Microsoft.EntityFrameworkCore.Storage.ValueConversion;
using System.Globalization;
using Microsoft.EntityFrameworkCore.Storage.ValueConversion;

namespace Stl.Fusion.EntityFramework.Conversion;

public class UlidToStringValueConverter(ConverterMappingHints mappingHints = null!)
: ValueConverter<Ulid, string>(x => x.ToString(), x => Ulid.Parse(x), DefaultHints.With(mappingHints))
: ValueConverter<Ulid, string>(
x => x.ToString()!,
x => Ulid.Parse(x, CultureInfo.InvariantCulture),
DefaultHints.With(mappingHints))
{
private static readonly ConverterMappingHints DefaultHints = new(26, unicode: false);
}
2 changes: 0 additions & 2 deletions src/Stl.Fusion.EntityFramework/DbEntityResolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,9 @@ public record Options

private ConcurrentDictionary<Symbol, BatchProcessor<TKey, TDbEntity?>>? _batchProcessors;
private ITransientErrorDetector<TDbContext>? _transientErrorDetector;
private ILogger? _log;

protected Options Settings { get; }
protected (Func<TDbContext, TKey[], IAsyncEnumerable<TDbEntity>> Query, int BatchSize)[] Queries { get; init; }
protected ILogger Log => _log ??= Services.LogFor(GetType());

public Func<TDbEntity, TKey> KeyExtractor { get; init; }
public Expression<Func<TDbEntity, TKey>> KeyExtractorExpression { get; init; }
Expand Down
11 changes: 8 additions & 3 deletions src/Stl.Rpc/Configuration/RpcDefaultDelegates.cs
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
using Stl.Generators;
using Stl.Interception;
using Stl.Rpc.Diagnostics;
using Stl.Rpc.Infrastructure;

namespace Stl.Rpc;

public delegate Symbol RpcServiceNameBuilder(Type serviceType);
public delegate Symbol RpcMethodNameBuilder(RpcMethodDef methodDef);
public delegate Symbol RpcMethodNameBuilder(RpcMethodDef method);
public delegate void RpcPeerTracker(RpcPeer peer);
public delegate RpcPeer RpcPeerFactory(RpcHub hub, RpcPeerRef peerRef);
public delegate RpcInboundContext RpcInboundContextFactory(RpcPeer peer, RpcMessage message, CancellationToken cancellationToken);
public delegate RpcPeer? RpcCallRouter(RpcMethodDef methodDef, ArgumentList arguments);
public delegate RpcPeer? RpcCallRouter(RpcMethodDef method, ArgumentList arguments);
public delegate Task<RpcConnection> RpcClientConnectionFactory(RpcClientPeer peer, CancellationToken cancellationToken);
public delegate Task<RpcConnection> RpcServerConnectionFactory(
RpcServerPeer peer, Channel<RpcMessage> channel, ImmutableOptionSet options, CancellationToken cancellationToken);
public delegate string RpcClientIdGenerator();
public delegate bool RpcBackendServiceDetector(Type serviceType, Symbol serviceName);
public delegate bool RpcUnrecoverableErrorDetector(Exception error, CancellationToken cancellationToken);
public delegate RpcMethodTracer? RpcMethodTracerFactory(RpcMethodDef method);

public static class RpcDefaultDelegates
{
Expand All @@ -26,7 +28,7 @@ public static class RpcDefaultDelegates
static methodDef => $"{methodDef.Method.Name}:{methodDef.ParameterTypes.Length}";

public static RpcCallRouter CallRouter { get; set; } =
static (methodDef, arguments) => methodDef.Hub.GetPeer(RpcPeerRef.Default);
static (method, arguments) => method.Hub.GetPeer(RpcPeerRef.Default);

public static RpcInboundContextFactory InboundContextFactory { get; set; } =
static (peer, message, cancellationToken) => new RpcInboundContext(peer, message, cancellationToken);
Expand Down Expand Up @@ -55,4 +57,7 @@ public static class RpcDefaultDelegates
static (error, cancellationToken)
=> cancellationToken.IsCancellationRequested
|| error is ConnectionUnrecoverableException;

public static RpcMethodTracerFactory MethodTracerFactory { get; set; } =
static method => null;
}
4 changes: 4 additions & 0 deletions src/Stl.Rpc/Configuration/RpcMethodDef.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Stl.Interception;
using Stl.Interception.Interceptors;
using Stl.Rpc.Diagnostics;

namespace Stl.Rpc;

Expand All @@ -19,6 +20,7 @@ public sealed class RpcMethodDef : MethodDef
public Func<ArgumentList> ArgumentListFactory { get; }
public Func<ArgumentList> ResultListFactory { get; }
public bool NoWait { get; }
public RpcMethodTracer? Tracer { get; }

public RpcMethodDef(RpcServiceDef service, MethodInfo method)
: base(service.Type, method)
Expand All @@ -43,6 +45,8 @@ public RpcMethodDef(RpcServiceDef service, MethodInfo method)

if (!IsAsyncMethod)
IsValid = false;

Tracer = Hub.MethodTracerFactory.Invoke(this);
}

public override string ToString()
Expand Down
8 changes: 2 additions & 6 deletions src/Stl.Rpc/Configuration/RpcServiceDef.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using System.Diagnostics;
using Stl.Rpc.Infrastructure;
using Stl.Rpc.Internal;

Expand All @@ -16,11 +15,9 @@ public sealed class RpcServiceDef
public Symbol Name { get; }
public bool IsSystem { get; }
public bool IsBackend { get; }
public int MethodCount => _methods.Count;
public IEnumerable<RpcMethodDef> Methods => _methods.Values;
public bool HasServer => ServerResolver != null;
public object Server => _server ??= ServerResolver.Resolve(Hub.Services);
public ActivitySource ActivitySource { get; }
public IReadOnlyCollection<RpcMethodDef> Methods => _methodByName.Values;

public RpcMethodDef this[MethodInfo method] => Get(method) ?? throw Errors.NoMethod(Type, method);
public RpcMethodDef this[Symbol methodName] => Get(methodName) ?? throw Errors.NoMethod(Type, methodName);
Expand All @@ -33,7 +30,6 @@ public RpcServiceDef(RpcHub hub, Symbol name, RpcServiceBuilder source)
ServerResolver = source.ServerResolver;
IsSystem = typeof(IRpcSystemService).IsAssignableFrom(Type);
IsBackend = hub.BackendServiceDetector.Invoke(Type, name);
ActivitySource = Type.GetActivitySource();

_methods = new Dictionary<MethodInfo, RpcMethodDef>();
_methodByName = new Dictionary<Symbol, RpcMethodDef>();
Expand Down Expand Up @@ -63,7 +59,7 @@ public RpcServiceDef(RpcHub hub, Symbol name, RpcServiceBuilder source)
public override string ToString()
{
var serverInfo = HasServer ? "" : $", Serving: {ServerResolver}";
return $"{GetType().Name}({Type.GetName()}, Name: '{Name}', {MethodCount} method(s){serverInfo})";
return $"{GetType().Name}({Type.GetName()}, Name: '{Name}', {Methods.Count} method(s){serverInfo})";
}

public RpcMethodDef? Get(MethodInfo method) => _methods.GetValueOrDefault(method);
Expand Down
3 changes: 2 additions & 1 deletion src/Stl.Rpc/Configuration/RpcServiceRegistry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public void DumpTo(ILogger? log, LogLevel logLevel, string title, bool dumpMetho
var sb = StringBuilderExt.Acquire();
sb.AppendLine(title);
DumpTo(sb, dumpMethods);
// ReSharper disable once TemplateIsNotCompileTimeConstantProblem
log.Log(logLevel, sb.ToStringAndRelease().TrimEnd());
}

Expand All @@ -68,7 +69,7 @@ public void DumpTo(StringBuilder sb, bool dumpMethods = true, string indent = ""
#pragma warning disable MA0011
foreach (var serviceDef in _serviceByName.Values.OrderBy(s => s.Name)) {
var serverInfo = serviceDef.HasServer ? $" -> {serviceDef.ServerResolver}" : "";
sb.AppendLine($"{indent}'{serviceDef.Name}': {serviceDef.Type.GetName()}{serverInfo}, {serviceDef.MethodCount} method(s)");
sb.AppendLine($"{indent}'{serviceDef.Name}': {serviceDef.Type.GetName()}{serverInfo}, {serviceDef.Methods.Count} method(s)");
if (!dumpMethods)
continue;

Expand Down
20 changes: 20 additions & 0 deletions src/Stl.Rpc/Diagnostics/RpcMethodActivityTrace.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System.Diagnostics;
using Stl.Rpc.Infrastructure;

namespace Stl.Rpc.Diagnostics;

public class RpcMethodActivityTrace(RpcMethodActivityTracer tracer, Activity activity) : RpcMethodTrace
{
public override void OnResultTaskReady(RpcInboundCall call)
=> _ = call.UntypedResultTask.ContinueWith(Complete,
CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);

protected void Complete(Task resultTask)
{
activity.Dispose();
tracer.CallCount?.Add(1);
if (!resultTask.IsCompletedSuccessfully())
tracer.ErrorCount?.Add(1);
tracer.CallDuration?.Record(activity.Duration.TotalMilliseconds);
}
}
45 changes: 45 additions & 0 deletions src/Stl.Rpc/Diagnostics/RpcMethodActivityTracer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
using System.Diagnostics;
using System.Diagnostics.Metrics;
using Stl.Rpc.Infrastructure;

namespace Stl.Rpc.Diagnostics;

public class RpcMethodActivityTracer : RpcMethodTracer
{
protected readonly object Lock = new();

public string OperationName { get; init; }
public ActivitySource ActivitySource { get; init; }
public bool UseMeter { get; init; } = false;
public Meter? Meter { get; protected set; }
public Counter<long>? CallCount { get; protected set; }
public Counter<long>? ErrorCount { get; protected set; }
public Histogram<double>? CallDuration { get; protected set; }

public RpcMethodActivityTracer(RpcMethodDef method) : base(method)
{
OperationName = $"rpc:{method.Name.Value}@{method.Service.Name.Value}";
ActivitySource = GetType().GetActivitySource();
}

public override RpcMethodTrace? TryStartTrace(RpcInboundCall call)
{
if (UseMeter && Meter == null)
CreateMeter();
var activity = ActivitySource.StartActivity(OperationName);
return activity == null ? null : new RpcMethodActivityTrace(this, activity);
}

protected void CreateMeter()
{
if (Meter != null) return;
lock (Lock) {
if (Meter != null) return;

Meter = GetType().GetMeter();
CallCount = Meter.CreateCounter<long>("Call count: " + OperationName);
ErrorCount = Meter.CreateCounter<long>("Error count: " + OperationName);
CallDuration = Meter.CreateHistogram<double>("Call duration: " + OperationName, "ms");
}
}
}
8 changes: 8 additions & 0 deletions src/Stl.Rpc/Diagnostics/RpcMethodTrace.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
using Stl.Rpc.Infrastructure;

namespace Stl.Rpc.Diagnostics;

public abstract class RpcMethodTrace
{
public abstract void OnResultTaskReady(RpcInboundCall call);
}
11 changes: 11 additions & 0 deletions src/Stl.Rpc/Diagnostics/RpcMethodTracer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using Stl.Rpc.Infrastructure;

namespace Stl.Rpc.Diagnostics;

public abstract class RpcMethodTracer(RpcMethodDef method)
{
public RpcMethodDef Method { get; init; } = method;
public Sampler Sampler { get; init; } = Sampler.Always;

public abstract RpcMethodTrace? TryStartTrace(RpcInboundCall call);
}
25 changes: 21 additions & 4 deletions src/Stl.Rpc/Infrastructure/RpcInboundCall.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Stl.Interception;
using Stl.Rpc.Diagnostics;
using Stl.Rpc.Internal;

namespace Stl.Rpc.Infrastructure;
Expand All @@ -10,6 +11,7 @@ public abstract class RpcInboundCall : RpcCall
private static readonly ConcurrentDictionary<(byte, Type), Func<RpcInboundContext, RpcMethodDef, RpcInboundCall>> FactoryCache = new();

protected readonly CancellationTokenSource? CancellationTokenSource;
protected ILogger Log => Context.Peer.Log;

public readonly RpcInboundContext Context;
public readonly CancellationToken CancellationToken;
Expand Down Expand Up @@ -124,22 +126,31 @@ public override Task Run()
if (!PrepareToStart())
return Task.CompletedTask;

RpcMethodTrace? trace = null;
var inboundMiddlewares = Hub.InboundMiddlewares.NullIfEmpty();
try {
Arguments ??= DeserializeArguments();
if (Arguments == null)
return Task.CompletedTask; // No way to resolve argument list type -> the related call is already gone

// Before call
var peer = Context.Peer;
peer.CallLog?.Log(peer.CallLogLevel, "'{PeerRef}': <- {Call}", peer.Ref, this);
if (MethodDef.Tracer is { } tracer && tracer.Sampler.Next.Invoke())
trace = tracer.TryStartTrace(this);
inboundMiddlewares?.BeforeCall(this);

Hub.InboundMiddlewares.BeforeCall(this);
// Call
ResultTask = InvokeTarget();
trace?.OnResultTaskReady(this);
inboundMiddlewares?.OnResultTaskReady(this);
}
catch (Exception error) {
ResultTask = Task.FromException<TResult>(error);
trace?.OnResultTaskReady(this);
inboundMiddlewares?.OnResultTaskReady(this);
}
}

return ResultTask.IsCompleted
? Complete()
: CompleteEventually();
Expand Down Expand Up @@ -215,10 +226,16 @@ protected Task SendResult()
Result<TResult> result;
if (!resultTask.IsCompleted)
result = InvocationIsStillInProgressErrorResult();
else if (resultTask.Exception is { } error)
else if (resultTask.Exception is { } error) {
Log.IfEnabled(LogLevel.Error)
?.LogError(error, "Remote call completed with an error: {Call}", this);
result = new Result<TResult>(default!, error.GetBaseException());
else if (resultTask.IsCanceled)
}
else if (resultTask.IsCanceled) {
Log.IfEnabled(LogLevel.Warning)
?.LogWarning("Remote call cancelled on the server side: {Call}", this);
result = new Result<TResult>(default!, new TaskCanceledException());
}
else
result = resultTask.Result;

Expand Down
21 changes: 0 additions & 21 deletions src/Stl.Rpc/Infrastructure/RpcInboundCallActivityMiddleware.cs

This file was deleted.

3 changes: 2 additions & 1 deletion src/Stl.Rpc/Infrastructure/RpcInboundMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ namespace Stl.Rpc.Infrastructure;
public abstract class RpcInboundMiddleware(IServiceProvider services)
: RpcMiddleware(services)
{
public abstract void BeforeCall(RpcInboundCall call);
public virtual void BeforeCall(RpcInboundCall call) { }
public virtual void OnResultTaskReady(RpcInboundCall call) { }
}
10 changes: 10 additions & 0 deletions src/Stl.Rpc/Infrastructure/RpcInboundMiddlewares.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,19 @@ namespace Stl.Rpc.Infrastructure;
public sealed class RpcInboundMiddlewares(IServiceProvider services)
: RpcMiddlewares<RpcInboundMiddleware>(services)
{
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public RpcInboundMiddlewares? NullIfEmpty()
=> HasInstances ? this : null;

public void BeforeCall(RpcInboundCall call)
{
foreach (var m in Instances)
m.BeforeCall(call);
}

public void OnResultTaskReady(RpcInboundCall call)
{
foreach (var m in Instances)
m.OnResultTaskReady(call);
}
}
4 changes: 3 additions & 1 deletion src/Stl.Rpc/Infrastructure/RpcMiddlewares.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ namespace Stl.Rpc.Infrastructure;
public abstract class RpcMiddlewares<TMiddleware>
where TMiddleware : RpcMiddleware
{
public TMiddleware[] Instances { get; }
public readonly TMiddleware[] Instances;
public readonly bool HasInstances;

protected RpcMiddlewares(IServiceProvider services)
{
var instances = services.GetRequiredService<IEnumerable<TMiddleware>>();
Instances = instances.OrderByDescending(x => x.Priority).ToArray();
HasInstances = Instances.Length != 0;
}
}
2 changes: 1 addition & 1 deletion src/Stl.Rpc/Infrastructure/RpcOutboundContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public Scope Activate()
// Call
Call = RpcOutboundCall.New(this);
if (!Call.NoWait)
hub.OutboundMiddlewares.PrepareCall(this);
hub.OutboundMiddlewares.NullIfEmpty()?.PrepareCall(this);
return Call;
}

Expand Down
2 changes: 1 addition & 1 deletion src/Stl.Rpc/Infrastructure/RpcOutboundMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ namespace Stl.Rpc.Infrastructure;
public abstract class RpcOutboundMiddleware(IServiceProvider services)
: RpcMiddleware(services)
{
public abstract void PrepareCall(RpcOutboundContext context);
public virtual void PrepareCall(RpcOutboundContext context) { }
}
4 changes: 4 additions & 0 deletions src/Stl.Rpc/Infrastructure/RpcOutboundMiddlewares.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ namespace Stl.Rpc.Infrastructure;
public sealed class RpcOutboundMiddlewares(IServiceProvider services)
: RpcMiddlewares<RpcOutboundMiddleware>(services)
{
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public RpcOutboundMiddlewares? NullIfEmpty()
=> HasInstances ? this : null;

public void PrepareCall(RpcOutboundContext context)
{
foreach (var m in Instances)
Expand Down
Loading

0 comments on commit 7bf5780

Please sign in to comment.