Skip to content

Commit

Permalink
fix: more fixes related to Rpc & caching
Browse files Browse the repository at this point in the history
  • Loading branch information
alexyakunin committed Oct 1, 2023
1 parent 8ea98ac commit dafb671
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 22 deletions.
2 changes: 1 addition & 1 deletion src/Stl.Fusion/Client/Interception/AlreadySynchronized.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace Stl.Fusion.Client.Interception;

internal static class AlreadySynchronized
internal static class AlwaysSynchronized
{
public static readonly TaskCompletionSource<Unit> Source
= TaskCompletionSourceExt.New<Unit>().WithResult(default);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,11 @@ private async Task<Computed<T>> ComputeRpc(
var cacheInfoCapture = cache != null ? new RpcCacheInfoCapture() : null;
var call = SendRpcCall(input, cacheInfoCapture, cancellationToken);

var typedResultTask = (Task<T>)call.ResultTask;
var result = typedResultTask.IsCompletedSuccessfully()
? (Result<T>)typedResultTask.Result
: await typedResultTask.ToResultAsync().ConfigureAwait(false);
var result = await call.UnwrapResult().ConfigureAwait(false);
var cacheEntry = cacheInfoCapture == null ? null :
await UpdateCache(cache!, cacheInfoCapture, result).ConfigureAwait(false);

var synchronizedSource = existing?.SynchronizedSource ?? AlreadySynchronized.Source;
var synchronizedSource = existing?.SynchronizedSource ?? AlwaysSynchronized.Source;
var computed = new ClientComputed<T>(
input.MethodDef.ComputedOptions,
input, result, VersionGenerator.NextVersion(),
Expand Down Expand Up @@ -120,10 +117,7 @@ private async Task ApplyRpcUpdate(
}

// 3. Await for its completion
var typedResultTask = (Task<T>)call.ResultTask;
var result = typedResultTask.IsCompletedSuccessfully()
? (Result<T>)typedResultTask.Result
: await typedResultTask.ToResultAsync().ConfigureAwait(false);
var result = await call.UnwrapResult().ConfigureAwait(false);
var cacheEntry = await cacheInfoCapture.GetEntry().ConfigureAwait(false);

// 4. Re-entering the lock & check if cached is still consistent
Expand Down
15 changes: 9 additions & 6 deletions src/Stl.Fusion/Client/Interception/ClientComputed.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,15 @@ protected override void OnInvalidated()
BindToCall(null);
// PseudoUnregister is used here just to trigger the
// Unregistered event in ComputedRegistry.
// We want to keep this computed while it's possible:
// ClientComputed.Compute tries to use the existing
// one to update. If this computed instance is gone
// from registry, ClientComputed is going to be created
// anew and will hit the cache.
ComputedRegistry.Instance.PseudoUnregister(this);
// We want to keep this computed unless SynchronizedSource is
// AlwaysSynchronized.Source, which means it doesn't use cache.
// Otherwise (i.e. when SynchronizedSource is actually used)
// the next computed won't reuse the existing SynchronizedSource,
// which may render it as indefinitely incomplete.
if (ReferenceEquals(SynchronizedSource, AlwaysSynchronized.Source))
ComputedRegistry.Instance.Unregister(this);
else
ComputedRegistry.Instance.PseudoUnregister(this);
CancelTimeouts();
}
}
13 changes: 13 additions & 0 deletions src/Stl.Rpc/Infrastructure/RpcOutboundCall.cs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,19 @@ public RpcOutboundCall(RpcOutboundContext context)
ResultTask = ResultSource.Task;
}

public async ValueTask<Result<TResult>> UnwrapResult()
{
try {
return await ResultSource.Task.ConfigureAwait(false);
}
catch (OperationCanceledException) {
throw;
}
catch (Exception e) {
return Result.Error<TResult>(e);
}
}

public override void SetResult(object? result, RpcInboundContext? context)
{
var typedResult = default(TResult)!;
Expand Down
17 changes: 11 additions & 6 deletions tests/Stl.Fusion.Tests/FusionRpcReconnectionTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,17 @@ private async Task<long> Worker(int workerIndex, CpuTimestamp endAt)
try {
var rnd = new Random();
var callCount = 0L;
while (CpuTimestamp.Now < endAt) {
var delay = rnd.Next(10, 100);
var invDelay = rnd.Next(10, 100);
var result = await client.Delay(delay, invDelay).WaitAsync(timeout);
result.Should().Be((delay, invDelay));
callCount++;
try {
while (CpuTimestamp.Now < endAt) {
var delay = rnd.Next(10, 100);
var invDelay = rnd.Next(10, 100);
var result = await client.Delay(delay, invDelay).WaitAsync(timeout);
result.Should().Be((delay, invDelay));
callCount++;
}
}
catch (OperationCanceledException) {
// That's ok
}

disruptorCts.CancelAndDisposeSilently();
Expand Down

0 comments on commit dafb671

Please sign in to comment.