Skip to content

Commit

Permalink
refactor: make RpcPeerStateMonitor inheritable
Browse files Browse the repository at this point in the history
  • Loading branch information
alexyakunin committed Dec 3, 2023
1 parent 97619e3 commit a63e4b9
Showing 1 changed file with 23 additions and 13 deletions.
36 changes: 23 additions & 13 deletions src/Stl.Fusion/Extensions/RpcPeerStateMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,45 @@ namespace Stl.Fusion.Extensions;

public class RpcPeerStateMonitor : WorkerBase
{
private readonly IMutableState<RpcPeerState> _peerState;
private IMutableState<RpcPeerState> _peerState = null!;
private ILogger? _log;

private IServiceProvider Services => RpcHub.Services;
private ILogger Log => _log ??= Services.LogFor(GetType());
private Moment Now => RpcHub.Clock.Now;
protected IServiceProvider Services => RpcHub.Services;
protected ILogger Log => _log ??= Services.LogFor(GetType());
protected Moment Now => RpcHub.Clock.Now;

public RpcHub RpcHub { get; }
public RpcPeerRef? PeerRef { get; }
public TimeSpan JustDisconnectedPeriod { get; init; } = TimeSpan.FromSeconds(3);
public TimeSpan MinReconnectsIn { get; init; } = TimeSpan.FromSeconds(1);

public IState<RpcPeerState> PeerState => _peerState;
public IState<Moment> LastReconnectDelayCancelledAt { get; }
public IState<RpcPeerComputedState> State { get; }

public RpcPeerStateMonitor(IServiceProvider services, RpcPeerRef? peerRef, bool mustStart = true)
public IState<RpcPeerState> PeerState {
get => _peerState;
protected set => _peerState = (IMutableState<RpcPeerState>)value;
}
public IState<Moment> LastReconnectDelayCancelledAt { get; protected set; } = null!;
public IState<RpcPeerComputedState> State { get; protected set; } = null!;

public RpcPeerStateMonitor(
IServiceProvider services,
RpcPeerRef? peerRef,
bool mustStart = true,
bool mustCreateStates = true)
{
RpcHub = services.RpcHub();
PeerRef = peerRef;
if (!mustCreateStates)
return;

var connectionState = peerRef == null ? null : RpcHub.GetPeer(peerRef).ConnectionState.Value;
var isConnected = connectionState?.IsConnected() ?? true;
RpcPeerState initialState = isConnected
RpcPeerState initialPeerState = isConnected
? new RpcPeerConnectedState(Now)
: new RpcPeerDisconnectedState(Now, default, connectionState?.Error);

var stateFactory = services.StateFactory();
_peerState = stateFactory.NewMutable(
initialState,
initialPeerState,
$"{GetType().Name}.{nameof(PeerState)}");
var stateCategory = $"{GetType().Name}.{nameof(LastReconnectDelayCancelledAt)}";
LastReconnectDelayCancelledAt = peerRef == null
Expand Down Expand Up @@ -129,7 +139,7 @@ protected override async Task OnRun(CancellationToken cancellationToken)
}
}

private Task<Moment> ComputeLastReconnectDelayCancelledAtState(
protected virtual Task<Moment> ComputeLastReconnectDelayCancelledAtState(
IComputedState<Moment> state, CancellationToken cancellationToken)
{
var reconnectDelayer = RpcHub.InternalServices.ClientPeerReconnectDelayer;
Expand All @@ -143,7 +153,7 @@ private Task<Moment> ComputeLastReconnectDelayCancelledAtState(
return Task.FromResult(Now);
}

private async Task<RpcPeerComputedState> ComputeState(
protected virtual async Task<RpcPeerComputedState> ComputeState(
IComputedState<RpcPeerComputedState> state, CancellationToken cancellationToken)
{
var s = await PeerState.Use(cancellationToken).ConfigureAwait(false);
Expand Down

0 comments on commit a63e4b9

Please sign in to comment.