From c54285cb858dc865c54b89b30f537fbf9b40c574 Mon Sep 17 00:00:00 2001 From: Alex Yakunin Date: Sun, 3 Dec 2023 20:06:35 -0800 Subject: [PATCH] feat: RpcPeer.ResetTryIndex() and Connector.ResetTryIndex() --- src/Stl.Rpc/RpcPeer.cs | 11 +++++++++++ src/Stl/Net/Connector.cs | 14 +++++++++++++- 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/src/Stl.Rpc/RpcPeer.cs b/src/Stl.Rpc/RpcPeer.cs index 77c93ea84..97d24cd12 100644 --- a/src/Stl.Rpc/RpcPeer.cs +++ b/src/Stl.Rpc/RpcPeer.cs @@ -10,6 +10,7 @@ public abstract class RpcPeer : WorkerBase, IHasId private ILogger? _log; private readonly Lazy _callLogLazy; private AsyncState _connectionState = new(RpcPeerConnectionState.Disconnected, true); + private bool _resetTryIndex; private ChannelWriter? _sender; protected IServiceProvider Services => Hub.Services; @@ -102,6 +103,12 @@ public Task Disconnect( return connectionState.WhenNext(); } + public void ResetTryIndex() + { + lock (Lock) + _resetTryIndex = true; + } + // Protected methods protected abstract Task GetConnection( @@ -329,6 +336,10 @@ protected AsyncState SetConnectionState( } Exception? terminalError = null; try { + if (newState.TryIndex != 0 && _resetTryIndex) { + _resetTryIndex = false; + newState = newState with { TryIndex = 0 }; + } _connectionState = connectionState = connectionState.SetNext(newState); if (newState.Error != null && Hub.UnrecoverableErrorDetector.Invoke(newState.Error, StopToken)) { terminalError = newState.Error is ConnectionUnrecoverableException diff --git a/src/Stl/Net/Connector.cs b/src/Stl/Net/Connector.cs index a528175b3..7b37ba1c6 100644 --- a/src/Stl/Net/Connector.cs +++ b/src/Stl/Net/Connector.cs @@ -8,6 +8,7 @@ public sealed class Connector : WorkerBase private readonly Func> _connectionFactory; private volatile AsyncState _state = new(State.New(), true); private long _reconnectsAt; + private bool _resetTryIndex; public AsyncState> IsConnected { get; private set; } = new(false, true); public Moment? ReconnectsAt { // Relative to CpuClock.Now @@ -72,6 +73,12 @@ public void DropConnection(TConnection connection, Exception? error) prevState.Value.Dispose(); } + public void ResetTryIndex() + { + lock (Lock) + _resetTryIndex = true; + } + // Protected & private methods protected override async Task OnRun(CancellationToken cancellationToken) @@ -117,9 +124,14 @@ protected override async Task OnRun(CancellationToken cancellationToken) lock (Lock) { if (state == _state) { var oldState = state; + var newTryIndex = state.Value.TryIndex + 1; + if (_resetTryIndex) { + _resetTryIndex = false; + newTryIndex = 0; + } state = _state = oldState.SetNext(State.New() with { LastError = error, - TryIndex = state.Value.TryIndex + 1, + TryIndex = newTryIndex, }); oldState.Value.Dispose(); }