Skip to content

Commit

Permalink
feat: RpcPeer.ResetTryIndex() and Connector.ResetTryIndex()
Browse files Browse the repository at this point in the history
  • Loading branch information
alexyakunin committed Dec 4, 2023
1 parent f31c964 commit c54285c
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 1 deletion.
11 changes: 11 additions & 0 deletions src/Stl.Rpc/RpcPeer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public abstract class RpcPeer : WorkerBase, IHasId<Guid>
private ILogger? _log;
private readonly Lazy<ILogger?> _callLogLazy;
private AsyncState<RpcPeerConnectionState> _connectionState = new(RpcPeerConnectionState.Disconnected, true);
private bool _resetTryIndex;
private ChannelWriter<RpcMessage>? _sender;

protected IServiceProvider Services => Hub.Services;
Expand Down Expand Up @@ -102,6 +103,12 @@ public Task Disconnect(
return connectionState.WhenNext();
}

public void ResetTryIndex()
{
lock (Lock)
_resetTryIndex = true;
}

// Protected methods

protected abstract Task<RpcConnection> GetConnection(
Expand Down Expand Up @@ -329,6 +336,10 @@ protected AsyncState<RpcPeerConnectionState> SetConnectionState(
}
Exception? terminalError = null;
try {
if (newState.TryIndex != 0 && _resetTryIndex) {
_resetTryIndex = false;
newState = newState with { TryIndex = 0 };
}
_connectionState = connectionState = connectionState.SetNext(newState);
if (newState.Error != null && Hub.UnrecoverableErrorDetector.Invoke(newState.Error, StopToken)) {
terminalError = newState.Error is ConnectionUnrecoverableException
Expand Down
14 changes: 13 additions & 1 deletion src/Stl/Net/Connector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ public sealed class Connector<TConnection> : WorkerBase
private readonly Func<CancellationToken, Task<TConnection>> _connectionFactory;
private volatile AsyncState<State> _state = new(State.New(), true);
private long _reconnectsAt;
private bool _resetTryIndex;

public AsyncState<Result<bool>> IsConnected { get; private set; } = new(false, true);
public Moment? ReconnectsAt { // Relative to CpuClock.Now
Expand Down Expand Up @@ -72,6 +73,12 @@ public void DropConnection(TConnection connection, Exception? error)
prevState.Value.Dispose();
}

public void ResetTryIndex()
{
lock (Lock)
_resetTryIndex = true;
}

// Protected & private methods

protected override async Task OnRun(CancellationToken cancellationToken)
Expand Down Expand Up @@ -117,9 +124,14 @@ protected override async Task OnRun(CancellationToken cancellationToken)
lock (Lock) {
if (state == _state) {
var oldState = state;
var newTryIndex = state.Value.TryIndex + 1;
if (_resetTryIndex) {
_resetTryIndex = false;
newTryIndex = 0;
}
state = _state = oldState.SetNext(State.New() with {
LastError = error,
TryIndex = state.Value.TryIndex + 1,
TryIndex = newTryIndex,
});
oldState.Value.Dispose();
}
Expand Down

0 comments on commit c54285c

Please sign in to comment.