From dafb671b173a3a085fff6153658575cbd4f60142 Mon Sep 17 00:00:00 2001 From: Alex Yakunin Date: Sun, 1 Oct 2023 09:02:28 -0700 Subject: [PATCH] fix: more fixes related to Rpc & caching --- .../Client/Interception/AlreadySynchronized.cs | 2 +- .../Interception/ClientComputeMethodFunction.cs | 12 +++--------- .../Client/Interception/ClientComputed.cs | 15 +++++++++------ src/Stl.Rpc/Infrastructure/RpcOutboundCall.cs | 13 +++++++++++++ .../FusionRpcReconnectionTest.cs | 17 +++++++++++------ 5 files changed, 37 insertions(+), 22 deletions(-) diff --git a/src/Stl.Fusion/Client/Interception/AlreadySynchronized.cs b/src/Stl.Fusion/Client/Interception/AlreadySynchronized.cs index cd83ebf50..7cd1b9789 100644 --- a/src/Stl.Fusion/Client/Interception/AlreadySynchronized.cs +++ b/src/Stl.Fusion/Client/Interception/AlreadySynchronized.cs @@ -1,6 +1,6 @@ namespace Stl.Fusion.Client.Interception; -internal static class AlreadySynchronized +internal static class AlwaysSynchronized { public static readonly TaskCompletionSource Source = TaskCompletionSourceExt.New().WithResult(default); diff --git a/src/Stl.Fusion/Client/Interception/ClientComputeMethodFunction.cs b/src/Stl.Fusion/Client/Interception/ClientComputeMethodFunction.cs index fb651d7bd..8eeae5e66 100644 --- a/src/Stl.Fusion/Client/Interception/ClientComputeMethodFunction.cs +++ b/src/Stl.Fusion/Client/Interception/ClientComputeMethodFunction.cs @@ -50,14 +50,11 @@ private async Task> ComputeRpc( var cacheInfoCapture = cache != null ? new RpcCacheInfoCapture() : null; var call = SendRpcCall(input, cacheInfoCapture, cancellationToken); - var typedResultTask = (Task)call.ResultTask; - var result = typedResultTask.IsCompletedSuccessfully() - ? (Result)typedResultTask.Result - : await typedResultTask.ToResultAsync().ConfigureAwait(false); + var result = await call.UnwrapResult().ConfigureAwait(false); var cacheEntry = cacheInfoCapture == null ? null : await UpdateCache(cache!, cacheInfoCapture, result).ConfigureAwait(false); - var synchronizedSource = existing?.SynchronizedSource ?? AlreadySynchronized.Source; + var synchronizedSource = existing?.SynchronizedSource ?? AlwaysSynchronized.Source; var computed = new ClientComputed( input.MethodDef.ComputedOptions, input, result, VersionGenerator.NextVersion(), @@ -120,10 +117,7 @@ private async Task ApplyRpcUpdate( } // 3. Await for its completion - var typedResultTask = (Task)call.ResultTask; - var result = typedResultTask.IsCompletedSuccessfully() - ? (Result)typedResultTask.Result - : await typedResultTask.ToResultAsync().ConfigureAwait(false); + var result = await call.UnwrapResult().ConfigureAwait(false); var cacheEntry = await cacheInfoCapture.GetEntry().ConfigureAwait(false); // 4. Re-entering the lock & check if cached is still consistent diff --git a/src/Stl.Fusion/Client/Interception/ClientComputed.cs b/src/Stl.Fusion/Client/Interception/ClientComputed.cs index 9f05dabce..ead5ba6c8 100644 --- a/src/Stl.Fusion/Client/Interception/ClientComputed.cs +++ b/src/Stl.Fusion/Client/Interception/ClientComputed.cs @@ -90,12 +90,15 @@ protected override void OnInvalidated() BindToCall(null); // PseudoUnregister is used here just to trigger the // Unregistered event in ComputedRegistry. - // We want to keep this computed while it's possible: - // ClientComputed.Compute tries to use the existing - // one to update. If this computed instance is gone - // from registry, ClientComputed is going to be created - // anew and will hit the cache. - ComputedRegistry.Instance.PseudoUnregister(this); + // We want to keep this computed unless SynchronizedSource is + // AlwaysSynchronized.Source, which means it doesn't use cache. + // Otherwise (i.e. when SynchronizedSource is actually used) + // the next computed won't reuse the existing SynchronizedSource, + // which may render it as indefinitely incomplete. + if (ReferenceEquals(SynchronizedSource, AlwaysSynchronized.Source)) + ComputedRegistry.Instance.Unregister(this); + else + ComputedRegistry.Instance.PseudoUnregister(this); CancelTimeouts(); } } diff --git a/src/Stl.Rpc/Infrastructure/RpcOutboundCall.cs b/src/Stl.Rpc/Infrastructure/RpcOutboundCall.cs index 1a4164c42..83cfdda89 100644 --- a/src/Stl.Rpc/Infrastructure/RpcOutboundCall.cs +++ b/src/Stl.Rpc/Infrastructure/RpcOutboundCall.cs @@ -169,6 +169,19 @@ public RpcOutboundCall(RpcOutboundContext context) ResultTask = ResultSource.Task; } + public async ValueTask> UnwrapResult() + { + try { + return await ResultSource.Task.ConfigureAwait(false); + } + catch (OperationCanceledException) { + throw; + } + catch (Exception e) { + return Result.Error(e); + } + } + public override void SetResult(object? result, RpcInboundContext? context) { var typedResult = default(TResult)!; diff --git a/tests/Stl.Fusion.Tests/FusionRpcReconnectionTest.cs b/tests/Stl.Fusion.Tests/FusionRpcReconnectionTest.cs index 80ec9fe6b..0af1e10fd 100644 --- a/tests/Stl.Fusion.Tests/FusionRpcReconnectionTest.cs +++ b/tests/Stl.Fusion.Tests/FusionRpcReconnectionTest.cs @@ -138,12 +138,17 @@ private async Task Worker(int workerIndex, CpuTimestamp endAt) try { var rnd = new Random(); var callCount = 0L; - while (CpuTimestamp.Now < endAt) { - var delay = rnd.Next(10, 100); - var invDelay = rnd.Next(10, 100); - var result = await client.Delay(delay, invDelay).WaitAsync(timeout); - result.Should().Be((delay, invDelay)); - callCount++; + try { + while (CpuTimestamp.Now < endAt) { + var delay = rnd.Next(10, 100); + var invDelay = rnd.Next(10, 100); + var result = await client.Delay(delay, invDelay).WaitAsync(timeout); + result.Should().Be((delay, invDelay)); + callCount++; + } + } + catch (OperationCanceledException) { + // That's ok } disruptorCts.CancelAndDisposeSilently();