diff --git a/.github/workflows/package.yml b/.github/workflows/package.yml index a20cd5e31..370d130aa 100644 --- a/.github/workflows/package.yml +++ b/.github/workflows/package.yml @@ -18,8 +18,8 @@ jobs: with: dotnet-version: | 3.1.x - 6.0.x - 7.0.x + 6.0.403 + 7.0.100 - name: Download fake-cli run: dotnet tool restore - name: Package @@ -45,8 +45,8 @@ jobs: with: dotnet-version: | 3.1.x - 6.0.x - 7.0.x + 6.0.403 + 7.0.100 - name: Download fake-cli run: dotnet tool install fake-cli --version 5.20.4 --tool-path . - name: Restore packages @@ -73,7 +73,7 @@ jobs: with: dotnet-version: | 2.x.x - 6.x.x + 6.0.403 # Checkout unity packager - name: Checkout unity-packager repo diff --git a/.github/workflows/run-tests-linux.yml b/.github/workflows/run-tests-linux.yml index 2c26b9003..a73ea8cb7 100644 --- a/.github/workflows/run-tests-linux.yml +++ b/.github/workflows/run-tests-linux.yml @@ -24,8 +24,8 @@ jobs: uses: actions/setup-dotnet@v3 with: dotnet-version: | - 6.0.x - 7.0.x + 6.0.403 + 7.0.100 - name: Download dotnet build-script tools run: dotnet tool restore diff --git a/.github/workflows/run-tests-macos.yml b/.github/workflows/run-tests-macos.yml index 2c9ac1edd..4170caffc 100644 --- a/.github/workflows/run-tests-macos.yml +++ b/.github/workflows/run-tests-macos.yml @@ -24,8 +24,8 @@ jobs: uses: actions/setup-dotnet@v3 with: dotnet-version: | - 6.0.x - 7.0.x + 6.0.403 + 7.0.100 - name: Download dotnet build-script tools run: dotnet tool restore diff --git a/.github/workflows/run-tests-windows.yml b/.github/workflows/run-tests-windows.yml index d40cf1501..161ede25e 100644 --- a/.github/workflows/run-tests-windows.yml +++ b/.github/workflows/run-tests-windows.yml @@ -24,8 +24,8 @@ jobs: uses: actions/setup-dotnet@v3 with: dotnet-version: | - 6.0.x - 7.0.x + 6.0.403 + 7.0.100 - name: Download dotnet build-script tools run: dotnet tool restore diff --git a/src/IO.Ably.Shared/AblyRealtime.cs b/src/IO.Ably.Shared/AblyRealtime.cs index 945a2dfe4..c5d53c991 100644 --- a/src/IO.Ably.Shared/AblyRealtime.cs +++ b/src/IO.Ably.Shared/AblyRealtime.cs @@ -67,7 +67,7 @@ internal AblyRealtime(ClientOptions options, Func /// A connection recovery string, specified by a client when initializing the library /// with the intention of inheriting the state of an earlier connection. See the Ably - /// Realtime API documentation for further information on connection state recovery. + /// Realtime API documentation for further information on connection state recovery. (RTN16i) /// Default: null. /// public string Recover { get; set; } @@ -358,7 +358,7 @@ public bool UseBinaryProtocol /// Default before 1.2: false. /// Default from 1.2: true. /// - public bool IdempotentRestPublishing { get; set; } = Defaults.ProtocolVersionNumber >= 1.2; + public bool IdempotentRestPublishing { get; set; } = true; /// /// Additional parameters to be sent in the querystring when initiating a realtime connection. diff --git a/src/IO.Ably.Shared/Defaults.cs b/src/IO.Ably.Shared/Defaults.cs index f712d890f..e0fbbc06a 100644 --- a/src/IO.Ably.Shared/Defaults.cs +++ b/src/IO.Ably.Shared/Defaults.cs @@ -6,10 +6,8 @@ namespace IO.Ably { - internal static class Defaults + internal class Defaults { - internal static readonly float ProtocolVersionNumber = 1.2F; - internal static readonly string LibraryVersion = GetVersion(); internal static string GetVersion() @@ -18,7 +16,7 @@ internal static string GetVersion() return version.Split('.').Take(3).JoinStrings("."); } - public static string ProtocolVersion { get; } = ProtocolVersionNumber.ToString(CultureInfo.InvariantCulture); + public const string ProtocolVersion = "2"; // CSV2 public const int QueryLimit = 100; diff --git a/src/IO.Ably.Shared/Extensions/PresenceExtensions.cs b/src/IO.Ably.Shared/Extensions/PresenceExtensions.cs index f151e30c0..49d13126b 100644 --- a/src/IO.Ably.Shared/Extensions/PresenceExtensions.cs +++ b/src/IO.Ably.Shared/Extensions/PresenceExtensions.cs @@ -7,13 +7,16 @@ public static bool IsSynthesized(this PresenceMessage msg) return msg.Id == null || !msg.Id.StartsWith(msg.ConnectionId); } + // RTP2b, RTP2c public static bool IsNewerThan(this PresenceMessage thisMessage, PresenceMessage thatMessage) { + // RTP2b1 if (thisMessage.IsSynthesized() || thatMessage.IsSynthesized()) { return thisMessage.Timestamp > thatMessage.Timestamp; } + // RTP2b2 var thisValues = thisMessage.Id.Split(':'); var thatValues = thatMessage.Id.Split(':'); diff --git a/src/IO.Ably.Shared/Http/AblyHttpClient.cs b/src/IO.Ably.Shared/Http/AblyHttpClient.cs index 05a33337c..ea482174d 100644 --- a/src/IO.Ably.Shared/Http/AblyHttpClient.cs +++ b/src/IO.Ably.Shared/Http/AblyHttpClient.cs @@ -73,7 +73,7 @@ internal void SetPreferredHost(string currentHost) internal void CreateInternalHttpClient(TimeSpan timeout, HttpMessageHandler messageHandler) { Client = messageHandler != null ? new HttpClient(messageHandler) : new HttpClient(); - Client.DefaultRequestHeaders.Add("X-Ably-Version", Defaults.ProtocolVersion); + Client.DefaultRequestHeaders.Add("X-Ably-Version", Defaults.ProtocolVersion); // RSC7a Client.DefaultRequestHeaders.Add(Agent.AblyAgentHeader, Agent.AblyAgentIdentifier(Options.Agents)); // RSC7d Client.Timeout = timeout; } diff --git a/src/IO.Ably.Shared/IO.Ably.Shared.projitems b/src/IO.Ably.Shared/IO.Ably.Shared.projitems index 4b7ee91e1..c48adcd05 100644 --- a/src/IO.Ably.Shared/IO.Ably.Shared.projitems +++ b/src/IO.Ably.Shared/IO.Ably.Shared.projitems @@ -55,6 +55,7 @@ + diff --git a/src/IO.Ably.Shared/Realtime/ChannelMessageProcessor.cs b/src/IO.Ably.Shared/Realtime/ChannelMessageProcessor.cs index 5f232cf87..001dac60b 100644 --- a/src/IO.Ably.Shared/Realtime/ChannelMessageProcessor.cs +++ b/src/IO.Ably.Shared/Realtime/ChannelMessageProcessor.cs @@ -39,13 +39,24 @@ public Task MessageReceived(ProtocolMessage protocolMessage, RealtimeState return Task.FromResult(false); } + // RTL15b + if (protocolMessage.ChannelSerial.IsNotEmpty() && + (protocolMessage.Action == ProtocolMessage.MessageAction.Message || + protocolMessage.Action == ProtocolMessage.MessageAction.Presence || + protocolMessage.Action == ProtocolMessage.MessageAction.Attached)) + { + Logger.Debug($"Setting channel serial for channelName - {channel.Name}," + + $"previous - {channel.Properties.ChannelSerial}, current - {protocolMessage.ChannelSerial}"); + channel.Properties.ChannelSerial = protocolMessage.ChannelSerial; + } + switch (protocolMessage.Action) { case ProtocolMessage.MessageAction.Error: channel.SetChannelState(ChannelState.Failed, protocolMessage); break; case ProtocolMessage.MessageAction.Attached: - channel.Properties.AttachSerial = protocolMessage.ChannelSerial; + channel.Properties.AttachSerial = protocolMessage.ChannelSerial; // RTL15a if (protocolMessage.Flags.HasValue) { @@ -58,17 +69,15 @@ public Task MessageReceived(ProtocolMessage protocolMessage, RealtimeState channel.Params = new ReadOnlyChannelParams(protocolMessage.Params); } - if (channel.State == ChannelState.Attached) + // RTL12 + if (channel.State == ChannelState.Attached && !protocolMessage.HasFlag(ProtocolMessage.Flag.Resumed)) { - // RTL12 - if (!protocolMessage.HasFlag(ProtocolMessage.Flag.Resumed)) - { - channel.Presence.ChannelAttached(protocolMessage); - channel.EmitUpdate(protocolMessage.Error, false, protocolMessage); - } + channel.Presence.ChannelAttached(protocolMessage, false); + channel.EmitErrorUpdate(protocolMessage.Error, false, protocolMessage); } else { + channel.Presence.ChannelAttached(protocolMessage); channel.SetChannelState(ChannelState.Attached, protocolMessage); } @@ -139,11 +148,14 @@ public Task MessageReceived(ProtocolMessage protocolMessage, RealtimeState channel.OnError(presenceDecodeResult.Error); } - string syncSerial = protocolMessage.Action == ProtocolMessage.MessageAction.Sync - ? protocolMessage.ChannelSerial - : null; - - channel.Presence.OnPresence(protocolMessage.Presence, syncSerial); + if (protocolMessage.Action == ProtocolMessage.MessageAction.Sync) + { + channel.Presence.OnSyncMessage(protocolMessage); + } + else + { + channel.Presence.OnPresence(protocolMessage.Presence); + } break; } diff --git a/src/IO.Ably.Shared/Realtime/ChannelProperties.cs b/src/IO.Ably.Shared/Realtime/ChannelProperties.cs index c7bb2c44d..8c2282137 100644 --- a/src/IO.Ably.Shared/Realtime/ChannelProperties.cs +++ b/src/IO.Ably.Shared/Realtime/ChannelProperties.cs @@ -6,8 +6,13 @@ public class ChannelProperties { /// - /// contains the last channelSerial received in an ATTACHED ProtocolMessage for the channel, see RTL15a. + /// contains the channelSerial from latest ATTACHED ProtocolMessage received on the channel, see CP2a, RTL15a. /// public string AttachSerial { get; internal set; } + + /// + /// contains the channelSerial from latest ProtocolMessage of action type Message/PresenceMessage received on the channel, see CP2b, RTL15b. + /// + public string ChannelSerial { get; internal set; } } } diff --git a/src/IO.Ably.Shared/Realtime/Connection.cs b/src/IO.Ably.Shared/Realtime/Connection.cs index f2bf3f4f8..5eae16db9 100644 --- a/src/IO.Ably.Shared/Realtime/Connection.cs +++ b/src/IO.Ably.Shared/Realtime/Connection.cs @@ -4,6 +4,7 @@ using System.Threading; using System.Threading.Tasks; using IO.Ably.Realtime.Workflow; +using IO.Ably.Shared.Realtime; using IO.Ably.Transport; using IO.Ably.Transport.States.Connection; @@ -156,12 +157,6 @@ private void HandleNetworkStateChange(NetworkState state) /// public string Id => InnerState.Id; - /// - /// The serial number of the last message received on this connection. - /// The serial number may be used when recovering connection state. - /// - public long? Serial => InnerState.Serial; - internal long MessageSerial => InnerState.MessageSerial; /// @@ -172,18 +167,35 @@ private void HandleNetworkStateChange(NetworkState state) /// /// Indicates whether the current connection can be resumed. /// - public bool ConnectionResumable => Key.IsNotEmpty() && Serial.HasValue; + public bool ConnectionResumable => Key.IsNotEmpty(); + + /// + /// Connection#recoveryKey is an attribute composed of the connectionKey, channelSerials, and the current msgSerial (RTN16m). + /// + [Obsolete("This property is deprecated, use CreateRecoveryKey method instead")] + public string RecoveryKey => CreateRecoveryKey(); /// - /// - (RTN16b) Connection#recoveryKey is an attribute composed of the connectionKey, and the latest connectionSerial received on the connection, and the current msgSerial. + /// Connection#CreateRecoveryKey is an attribute composed of the connectionKey, messageSerial and channelSerials (RTN16g, RTN16g1, RTN16h). /// - public string RecoveryKey + /// recoveryKey. + public string CreateRecoveryKey() { - get + if (Key.IsEmpty() || InnerState.State == Realtime.ConnectionState.Closing + || InnerState.State == Realtime.ConnectionState.Closed + || InnerState.State == Realtime.ConnectionState.Failed + || InnerState.State == Realtime.ConnectionState.Suspended) { - Debug.Assert(Serial.HasValue, "Expected a Value, found none"); - return ConnectionResumable ? $"{Key}:{Serial.Value}:{MessageSerial}" : string.Empty; + return string.Empty; } + + var recoveryContext = new RecoveryKeyContext() + { + MsgSerial = MessageSerial, + ConnectionKey = Key, + ChannelSerials = RealtimeClient.Channels.GetChannelSerials(), + }; + return recoveryContext.Encode(); } /// @@ -197,7 +209,7 @@ public string RecoveryKey /// message and, in the failed state in particular, provides diagnostic /// error information. /// - public ErrorInfo ErrorReason => InnerState.ErrorReason; + public ErrorInfo ErrorReason => InnerState.ErrorReason; // RTN15c7 /// /// Gets the currently used Host. diff --git a/src/IO.Ably.Shared/Realtime/Presence.cs b/src/IO.Ably.Shared/Realtime/Presence.cs index 7a45266f9..90faf4edb 100644 --- a/src/IO.Ably.Shared/Realtime/Presence.cs +++ b/src/IO.Ably.Shared/Realtime/Presence.cs @@ -21,72 +21,51 @@ public sealed partial class Presence : IDisposable private readonly Handlers _handlers = new Handlers(); private readonly IConnectionManager _connection; private string _currentSyncChannelSerial; - private bool _initialSyncCompleted; private bool _disposedValue; internal Presence(IConnectionManager connection, RealtimeChannel channel, string clientId, ILogger logger) { Logger = logger; - Map = new PresenceMap(channel.Name, logger); - InternalMap = new PresenceMap(channel.Name, logger); + MembersMap = new PresenceMap(channel.Name, logger); + InternalMembersMap = new InternalPresenceMap(channel.Name, logger); PendingPresenceQueue = new ConcurrentQueue(); _connection = connection; _channel = channel; _clientId = clientId; } - private event EventHandler InitialSyncCompleted; - - internal event EventHandler SyncCompleted; + internal event EventHandler SyncCompletedEventHandler; internal ILogger Logger { get; private set; } /// /// Has the sync completed. /// - public bool SyncComplete - { - get => Map.InitialSyncCompleted | _initialSyncCompleted; - - private set - { - _initialSyncCompleted = value; - if (_initialSyncCompleted) - { - OnInitialSyncCompleted(); - } - } - } + public bool IsSyncComplete => MembersMap.SyncCompleted && !IsSyncInProgress; /// /// Indicates whether there is currently a sync in progress. /// - public bool IsSyncInProgress => Map.IsSyncInProgress; + public bool IsSyncInProgress => MembersMap.SyncInProgress; - internal bool InternalSyncComplete => !Map.IsSyncInProgress && SyncComplete; - - internal PresenceMap Map { get; } - - internal PresenceMap InternalMap { get; } - - internal ConcurrentQueue PendingPresenceQueue { get; } + /// + /// Indicates all members present on the channel. + /// + internal PresenceMap MembersMap { get; } // RTP2 /// - /// Called when a protocol message HasPresenceFlag == false. The presence map should be considered in sync immediately - /// with no members present on the channel. See [RTP1] for more detail. + /// Indicates members belonging to current connectionId. /// - internal void SkipSync() - { - SyncComplete = true; - } + internal PresenceMap InternalMembersMap { get; } // RTP17 + + internal ConcurrentQueue PendingPresenceQueue { get; } /// /// Disposes the current Presence instance. Removes all listening handlers. /// internal void RemoveAllListeners() { - InitialSyncCompleted = null; - SyncCompleted = null; + SyncCompletedEventHandler = null; _handlers.RemoveAll(); } @@ -123,7 +102,7 @@ public async Task> GetAsync(GetParams options) _ = await WaitForSyncAsync(); } - var result = Map.Values.Where(x => (getOptions.ClientId.IsEmpty() || x.ClientId == getOptions.ClientId) + var result = MembersMap.Values.Where(x => (getOptions.ClientId.IsEmpty() || x.ClientId == getOptions.ClientId) && (getOptions.ConnectionId.IsEmpty() || x.ConnectionId == getOptions.ConnectionId)); return result; } @@ -174,7 +153,7 @@ private async Task WaitForSyncAsync() // The InternalSync should be completed and the channels Attached or Attaching void CheckAndSet() { - if (InternalSyncComplete + if (IsSyncComplete && (_channel.State == ChannelState.Attached || _channel.State == ChannelState.Attaching)) { tsc.TrySetResult(true); @@ -182,7 +161,7 @@ void CheckAndSet() } // if the channel state changes and is not Attached or Attaching then we should exit - void OnChannelOnStateChanged(object sender, ChannelStateChange args) + void OnChannelStateChanged(object sender, ChannelStateChange args) { if (_channel.State != ChannelState.Attached && _channel.State != ChannelState.Attaching) { @@ -192,18 +171,16 @@ void OnChannelOnStateChanged(object sender, ChannelStateChange args) void OnSyncEvent(object sender, EventArgs args) => CheckAndSet(); - _channel.StateChanged += OnChannelOnStateChanged; - InitialSyncCompleted += OnSyncEvent; - Map.SyncNoLongerInProgress += OnSyncEvent; + _channel.StateChanged += OnChannelStateChanged; + SyncCompletedEventHandler += OnSyncEvent; // Do a manual check in case we are already in the desired state CheckAndSet(); bool syncIsComplete = await tsc.Task; // unsubscribe from events - _channel.StateChanged -= OnChannelOnStateChanged; - InitialSyncCompleted -= OnSyncEvent; - Map.SyncNoLongerInProgress -= OnSyncEvent; + _channel.StateChanged -= OnChannelStateChanged; + SyncCompletedEventHandler -= OnSyncEvent; if (!syncIsComplete) { @@ -476,6 +453,7 @@ internal async Task UpdatePresenceAsync(PresenceMessage msg) internal void UpdatePresence(PresenceMessage msg, Action callback) { + // RTP16a, RTL6c switch (_connection.Connection.State) { case ConnectionState.Initialized: @@ -493,27 +471,28 @@ internal void UpdatePresence(PresenceMessage msg, Action callba return; } + // RTP16 switch (_channel.State) { - case ChannelState.Initialized: + case ChannelState.Initialized: // RTP16b if (PendingPresenceEnqueue(new QueuedPresenceMessage(msg, callback))) { _channel.Attach(); } break; - case ChannelState.Attaching: + case ChannelState.Attaching: // RTP16b PendingPresenceEnqueue(new QueuedPresenceMessage(msg, callback)); break; - case ChannelState.Attached: + case ChannelState.Attached: // RTP16a var message = new ProtocolMessage(ProtocolMessage.MessageAction.Presence, _channel.Name) { Presence = new[] { msg }, }; _connection.Send(message, callback); break; - default: + default: // RTP16c var error = new ErrorInfo($"Unable to enter presence channel in {_channel.State} state", ErrorCodes.UnableToEnterPresenceChannelInvalidState); Logger.Warning(error.ToString()); ActionUtils.SafeExecute(() => callback?.Invoke(false, error), Logger, nameof(UpdatePresence)); @@ -521,6 +500,7 @@ internal void UpdatePresence(PresenceMessage msg, Action callba } } + // RTP16b private bool PendingPresenceEnqueue(QueuedPresenceMessage msg) { if (!_connection.Options.QueueMessages) @@ -536,64 +516,81 @@ private bool PendingPresenceEnqueue(QueuedPresenceMessage msg) return true; } - internal void OnPresence(PresenceMessage[] messages, string syncChannelSerial) + // RTP18 + internal void OnSyncMessage(ProtocolMessage protocolMessage) { - try + string syncCursor = null; + var syncChannelSerial = protocolMessage.ChannelSerial; + + // RTP18a + if (syncChannelSerial.IsNotEmpty()) { - string syncCursor = null; + var serials = syncChannelSerial.Split(':'); + var syncSequenceId = serials[0]; + syncCursor = serials.Length > 1 ? serials[1] : string.Empty; - // if we got here from SYNC message - if (syncChannelSerial != null) + /* If a new sequence identifier is sent from Ably, then the client library + * must consider that to be the start of a new sync sequence + * and any previous in-flight sync should be discarded. (part of RTP18)*/ + if (IsSyncInProgress && _currentSyncChannelSerial.IsNotEmpty() && _currentSyncChannelSerial != syncSequenceId) { - int colonPos = syncChannelSerial.IndexOf(':'); - string serial = colonPos >= 0 ? syncChannelSerial.Substring(0, colonPos) : syncChannelSerial; - - /* If a new sequence identifier is sent from Ably, then the client library - * must consider that to be the start of a new sync sequence - * and any previous in-flight sync should be discarded. (part of RTP18)*/ - if (Map.IsSyncInProgress && _currentSyncChannelSerial != null - && _currentSyncChannelSerial != serial) - { - EndSync(); - } + EndSync(); + } - StartSync(); + StartSync(); - syncCursor = syncChannelSerial.Substring(colonPos); - if (syncCursor.Length > 1) - { - _currentSyncChannelSerial = serial; - } + if (syncCursor.IsNotEmpty()) + { + _currentSyncChannelSerial = syncSequenceId; } + } + + OnPresence(protocolMessage.Presence); + // RTP18b, RTP18c + if (syncChannelSerial.IsEmpty() || syncCursor.IsEmpty()) + { + EndSync(); + _currentSyncChannelSerial = null; + } + } + + internal void OnPresence(PresenceMessage[] messages) + { + try + { if (messages != null) { foreach (var message in messages) { - bool updateInternalPresence = message.ConnectionId == _channel.RealtimeClient.Connection.Id; + bool updateInternalPresence = message.ConnectionId == _channel.RealtimeClient.Connection.Id; // RTP17 var broadcast = true; switch (message.Action) { + // RTP2d case PresenceAction.Enter: case PresenceAction.Update: case PresenceAction.Present: - broadcast &= Map.Put(message); + broadcast &= MembersMap.Put(message); if (updateInternalPresence) { - InternalMap.Put(message); + InternalMembersMap.Put(message); // RTP17b } break; + + // RTP2e case PresenceAction.Leave: - broadcast &= Map.Remove(message); + broadcast &= MembersMap.Remove(message); if (updateInternalPresence && !message.IsSynthesized()) { - InternalMap.Remove(message); + InternalMembersMap.Remove(message); } break; } + // RTP2g if (broadcast) { Publish(message); @@ -604,13 +601,6 @@ internal void OnPresence(PresenceMessage[] messages, string syncChannelSerial) { Logger.Debug("Sync with no presence"); } - - // if this is the last message in a sequence of sync updates, end the sync - if (syncChannelSerial == null || syncCursor.Length <= 1) - { - EndSync(); - _currentSyncChannelSerial = null; - } } catch (Exception ex) { @@ -625,7 +615,7 @@ internal void StartSync() { if (!IsSyncInProgress) { - Map.StartSync(); + MembersMap.StartSync(); } } @@ -636,76 +626,50 @@ private void EndSync() return; } - var residualMembers = Map.EndSync(); - - /* - * RTP19: ... The PresenceMessage published should contain the original attributes of the presence - * member with the action set to LEAVE, PresenceMessage#id set to null, and the timestamp set - * to the current time ... - */ - foreach (var presenceMessage in residualMembers) + // RTP19 + var localNonUpdatedMembersDuringSync = MembersMap.EndSync(); + foreach (var presenceMember in localNonUpdatedMembersDuringSync) { - presenceMessage.Action = PresenceAction.Leave; - presenceMessage.Id = null; - presenceMessage.Timestamp = DateTimeOffset.UtcNow; + presenceMember.Action = PresenceAction.Leave; + presenceMember.Id = null; + presenceMember.Timestamp = DateTimeOffset.UtcNow; } - Publish(residualMembers); + Publish(localNonUpdatedMembersDuringSync); - /* - * (RTP5c2) If a SYNC is initiated as part of the attach, then once the SYNC is complete, - * all members not present in the PresenceMap but present in the internal PresenceMap must - * be re-entered automatically by the client using the clientId and data attributes from - * each. The members re-entered automatically must be removed from the internal PresenceMap - * ensuring that members present on the channel are constructed from presence events sent - * from Ably since the channel became ATTACHED - */ - EnsureLocalPresenceEntered(); - - OnSyncCompleted(); + NotifySyncCompleted(); } - private void EnsureLocalPresenceEntered() + private void EnterMembersFromInternalPresenceMap() { - foreach (var item in InternalMap.Values) + // RTP17g + foreach (var item in InternalMembersMap.Values) { - if (!Map.Members.ContainsKey(item.MemberKey)) + try { - var clientId = item.ClientId; - try + var itemToSend = new PresenceMessage(PresenceAction.Enter, item.ClientId, item.Data); + UpdatePresence(itemToSend, (success, info) => { - /* Message is new to presence map, send it */ - var itemToSend = new PresenceMessage(PresenceAction.Enter, item.ClientId, item.Data); - UpdatePresence(itemToSend, (success, info) => + if (!success) { - if (!success) - { - /* - * (RTP5c3) If any of the automatic ENTER presence messages published - * in RTP5c2 fail, then an UPDATE event should be emitted on the channel - * with resumed set to true and reason set to an ErrorInfo object with error - * code value 91004 and the error message string containing the message - * received from Ably (if applicable), the code received from Ably - * (if applicable) and the explicit or implicit client_id of the PresenceMessage - */ - var errorString = - $"Cannot automatically re-enter {clientId} on channel {_channel.Name} ({info.Message})"; - Logger.Error(errorString); - _channel.EmitUpdate(new ErrorInfo(errorString, 91004), true); - } - }); - - InternalMap.Remove(item); - } - catch (AblyException e) - { - var errorString = - $"Cannot automatically re-enter {clientId} on channel {_channel.Name} ({e.ErrorInfo.Message})"; - Logger.Error(errorString); - _channel.EmitUpdate(new ErrorInfo(errorString, 91004), true); - } + EmitErrorUpdate(item.ClientId, _channel.Name, info.Message); + } + }); + } + catch (AblyException e) + { + EmitErrorUpdate(item.ClientId, _channel.Name, e.ErrorInfo.Message); } } + + // (RTP17e) + void EmitErrorUpdate(string clientId, string channelName, string errorMessage) + { + var errorString = + $"Cannot automatically re-enter {clientId} on channel {channelName} ({errorMessage})"; + Logger.Error(errorString); + _channel.EmitErrorUpdate(new ErrorInfo(errorString, 91004), true); + } } private void Publish(params PresenceMessage[] messages) @@ -743,54 +707,49 @@ private void NotifySubscribers(PresenceMessage message) } } + // RTP5a internal void ChannelDetachedOrFailed(ErrorInfo error) { FailQueuedMessages(error); - Map.Clear(); - InternalMap.Clear(); + MembersMap.Clear(); + InternalMembersMap.Clear(); } + // RTP5f internal void ChannelSuspended(ErrorInfo error) { - /* - * (RTP5f) If the channel enters the SUSPENDED state then all queued presence messages will fail - * immediately, and the PresenceMap is maintained - */ FailQueuedMessages(error); } - internal void ChannelAttached(ProtocolMessage attachMessage) + internal void ChannelAttached(ProtocolMessage attachedMessage, bool isAttachWithoutMessageLoss = true) { - /* Start sync, if hasPresence is not set end sync immediately dropping all the current presence members */ + // RTP19 StartSync(); - var hasPresence = attachMessage != null && - attachMessage.HasFlag(ProtocolMessage.Flag.HasPresence); + // RTP1 + var hasPresence = attachedMessage != null && attachedMessage.HasFlag(ProtocolMessage.Flag.HasPresence); if (hasPresence) { - // RTP1 If [HAS_PRESENCE] flag is 1, should set presence sync as active (Doesn't necessarily mean members are available) if (Logger.IsDebug) { Logger.Debug( - $"Protocol message has presence flag. Starting Presence SYNC. Flag: {attachMessage.Flags}"); + $"Protocol message has presence flag. Starting Presence SYNC. Flag: {attachedMessage.Flags}"); } StartSync(); - SendQueuedMessages(); } else { - /* RTP1 If [HAS_PRESENCE] flag is 0 or there is no flags field, - * the presence map should be considered in sync immediately - * with no members present on the channel - * - * RTP19a If the PresenceMap has existing members when an ATTACHED message is received without a - * HAS_PRESENCE flag, the client library should emit a LEAVE event for each existing member ... - */ - EndSync(); - SendQueuedMessages(); + EndSync(); // RTP19 + } - // TODO: Missing sending my members if any + // RTP5b + SendQueuedMessages(); + + // RTP17f + if (isAttachWithoutMessageLoss) + { + EnterMembersFromInternalPresenceMap(); } } @@ -879,23 +838,18 @@ public Task> HistoryAsync(PaginatedRequestParam return _channel.RestChannel.Presence.HistoryAsync(query); } - private void OnSyncCompleted() + private void NotifySyncCompleted() { - SyncCompleted?.Invoke(this, EventArgs.Empty); + SyncCompletedEventHandler?.Invoke(this, EventArgs.Empty); } internal JToken GetState() => new JObject { ["handlers"] = _handlers.GetState(), - ["members"] = Map.GetState(), + ["members"] = MembersMap.GetState(), ["pendingQueue"] = new JArray(PendingPresenceQueue.Select(x => JObject.FromObject(x.Message))), }; - private void OnInitialSyncCompleted() - { - InitialSyncCompleted?.Invoke(this, EventArgs.Empty); - } - /// /// Dispose(bool disposing) executes in two distinct scenarios. If disposing equals true, the method has /// been called directly or indirectly by a user's code. Managed and unmanaged resources can be disposed. diff --git a/src/IO.Ably.Shared/Realtime/PresenceMap.cs b/src/IO.Ably.Shared/Realtime/PresenceMap.cs index d3f302a23..a6fc275bb 100644 --- a/src/IO.Ably.Shared/Realtime/PresenceMap.cs +++ b/src/IO.Ably.Shared/Realtime/PresenceMap.cs @@ -6,15 +6,16 @@ namespace IO.Ably.Realtime { - internal sealed class PresenceMap + internal class PresenceMap { private readonly object _lock = new object(); private readonly ILogger _logger; private readonly string _channelName; private readonly ConcurrentDictionary _members; - private ICollection _residualMembers; + private ICollection _beforeSyncMembers; private bool _isSyncInProgress; + private bool _isSyncCompleted; public PresenceMap(string channelName, ILogger logger) { @@ -23,12 +24,15 @@ public PresenceMap(string channelName, ILogger logger) _members = new ConcurrentDictionary(); } - internal event EventHandler SyncNoLongerInProgress; + internal virtual string GetKey(PresenceMessage presence) + { + return presence.MemberKey; + } // Exposed internally to allow for testing. internal ConcurrentDictionary Members => _members; - public bool IsSyncInProgress + public bool SyncInProgress { get { @@ -42,19 +46,29 @@ private set { lock (_lock) { - var previous = _isSyncInProgress; _isSyncInProgress = value; - - // if we have gone from true to false then fire SyncNoLongerInProgress - if (previous && !_isSyncInProgress) - { - OnSyncNoLongerInProgress(); - } } } } - public bool InitialSyncCompleted { get; private set; } + public bool SyncCompleted + { + get + { + lock (_lock) + { + return _isSyncCompleted; + } + } + + private set + { + lock (_lock) + { + _isSyncCompleted = value; + } + } + } public PresenceMessage[] Values { @@ -70,12 +84,13 @@ public bool Put(PresenceMessage item) lock (_lock) { // we've seen this member, so do not remove it at the end of sync - _residualMembers?.Remove(item.MemberKey); + _beforeSyncMembers?.Remove(GetKey(item)); } try { - if (_members.TryGetValue(item.MemberKey, out var existingItem) && existingItem.IsNewerThan(item)) + // RTP2a, RTP2b + if (_members.TryGetValue(GetKey(item), out var existingItem) && existingItem.IsNewerThan(item)) { return false; } @@ -95,7 +110,7 @@ public bool Put(PresenceMessage item) break; } - _members[item.MemberKey] = item; + _members[GetKey(item)] = item; return true; } @@ -103,12 +118,14 @@ public bool Put(PresenceMessage item) public bool Remove(PresenceMessage item) { PresenceMessage existingItem; - if (_members.TryGetValue(item.MemberKey, out existingItem) && existingItem.IsNewerThan(item)) + + // RTP2a, RTP2b + if (_members.TryGetValue(GetKey(item), out existingItem) && existingItem.IsNewerThan(item)) { return false; } - _members.TryRemove(item.MemberKey, out PresenceMessage _); + _members.TryRemove(GetKey(item), out PresenceMessage _); if (existingItem?.Action == PresenceAction.Absent) { return false; @@ -121,15 +138,16 @@ public void StartSync() { if (_logger.IsDebug) { - _logger.Debug($"StartSync | Channel: {_channelName}, SyncInProgress: {IsSyncInProgress}"); + _logger.Debug($"StartSync | Channel: {_channelName}, SyncInProgress: {SyncInProgress}"); } - if (!IsSyncInProgress) + if (!SyncInProgress) { lock (_lock) { - _residualMembers = new HashSet(_members.Keys); - IsSyncInProgress = true; + _beforeSyncMembers = new HashSet(_members.Keys); // RTP19 + SyncInProgress = true; + SyncCompleted = false; } } } @@ -138,19 +156,19 @@ public PresenceMessage[] EndSync() { if (_logger.IsDebug) { - _logger.Debug($"EndSync | Channel: {_channelName}, SyncInProgress: {IsSyncInProgress}"); + _logger.Debug($"EndSync | Channel: {_channelName}, SyncInProgress: {SyncInProgress}"); } List removed = new List(); try { - if (!IsSyncInProgress) + if (!SyncInProgress) { + SyncCompleted = true; return removed.ToArray(); } - // We can now strip out the ABSENT members, as we have - // received all of the out-of-order sync messages + // RTP2f foreach (var member in _members.ToArray()) { if (member.Value.Action == PresenceAction.Absent) @@ -161,11 +179,10 @@ public PresenceMessage[] EndSync() lock (_lock) { - if (_residualMembers != null) + // RTP19 + if (_beforeSyncMembers != null) { - // Any members that were present at the start of the sync, - // and have not been seen in sync, can be removed - foreach (var member in _residualMembers) + foreach (var member in _beforeSyncMembers) { if (_members.TryRemove(member, out PresenceMessage pm)) { @@ -173,7 +190,7 @@ public PresenceMessage[] EndSync() } } - _residualMembers = null; + _beforeSyncMembers = null; } } } @@ -186,8 +203,8 @@ public PresenceMessage[] EndSync() { lock (_lock) { - InitialSyncCompleted = true; - IsSyncInProgress = false; + SyncCompleted = true; + SyncInProgress = false; } } @@ -199,7 +216,7 @@ public void Clear() lock (_lock) { _members?.Clear(); - _residualMembers?.Clear(); + _beforeSyncMembers?.Clear(); } } @@ -210,17 +227,27 @@ internal JObject GetState() var state = new JObject { ["channelName"] = _channelName, - ["syncInProgress"] = _isSyncInProgress, - ["initialSyncComplete"] = InitialSyncCompleted, + ["syncInProgress"] = SyncInProgress, + ["syncCompleted"] = SyncCompleted, ["members"] = new JArray(matchingMembers), }; return state; } + } + + // RTP17 + internal class InternalPresenceMap : PresenceMap + { + public InternalPresenceMap(string channelName, ILogger logger) + : base(channelName, logger) + { + } - private void OnSyncNoLongerInProgress() + // RTP17h + internal override string GetKey(PresenceMessage presence) { - SyncNoLongerInProgress?.Invoke(this, EventArgs.Empty); + return presence.ClientId; } } } diff --git a/src/IO.Ably.Shared/Realtime/RealtimeChannel.cs b/src/IO.Ably.Shared/Realtime/RealtimeChannel.cs index d040c7eb7..c9159ab2b 100644 --- a/src/IO.Ably.Shared/Realtime/RealtimeChannel.cs +++ b/src/IO.Ably.Shared/Realtime/RealtimeChannel.cs @@ -228,7 +228,7 @@ public void Attach(Action callback = null) Attach(null, null, callback); } - private void Attach( + internal void Attach( ErrorInfo error, ProtocolMessage msg = null, Action callback = null, @@ -286,9 +286,13 @@ bool IsInStateThatShouldFailAttach() ProtocolMessage CreateAttachMessage() { var message = new ProtocolMessage(ProtocolMessage.MessageAction.Attach, Name); + + // RTL4c1 + message.ChannelSerial = Properties.ChannelSerial; + if (DecodeRecovery && LastSuccessfulMessageIds != LastMessageIds.Empty) { - message.ChannelSerial = LastSuccessfulMessageIds.ProtocolMessageChannelSerial; + message.ChannelSerial = LastSuccessfulMessageIds.ProtocolMessageChannelSerial; // Excludes PresenceMessage ChannelSerial (not included in backlogs anyways) } if (Options.Params.Any()) @@ -660,6 +664,12 @@ private void HandleStateChange(ChannelState state, ErrorInfo error, ProtocolMess Logger.Debug($"HandleStateChange state change from {State} to {state}"); } + // RTP5a1 + if (state == ChannelState.Detached || state == ChannelState.Suspended || state == ChannelState.Failed) + { + Properties.ChannelSerial = null; + } + var previousState = State; State = state; @@ -675,7 +685,6 @@ private void HandleStateChange(ChannelState state, ErrorInfo error, ProtocolMess case ChannelState.Attached: _retryCount = 0; AttachResume = true; - Presence.ChannelAttached(protocolMessage); break; case ChannelState.Detached: /* RTL13a check for unexpected detach */ @@ -813,15 +822,12 @@ private void SendMessage(ProtocolMessage protocolMessage, Action serials) + { + foreach (var keyValuePair in serials) + { + var channelName = keyValuePair.Key; + var channelSerial = keyValuePair.Value; + var channel = (RealtimeChannel)Get(channelName); + channel.Properties.ChannelSerial = channelSerial; + } + } + + internal IDictionary GetChannelSerials() + { + var channelSerials = new Dictionary(); + foreach (var realtimeChannel in this) + { + channelSerials[realtimeChannel.Name] = realtimeChannel.Properties.ChannelSerial; + } + + return channelSerials; + } } } diff --git a/src/IO.Ably.Shared/Realtime/RecoveryKeyContext.cs b/src/IO.Ably.Shared/Realtime/RecoveryKeyContext.cs new file mode 100644 index 000000000..db1da65ac --- /dev/null +++ b/src/IO.Ably.Shared/Realtime/RecoveryKeyContext.cs @@ -0,0 +1,36 @@ +using System; +using Newtonsoft.Json; +using System.Collections.Generic; + +namespace IO.Ably.Shared.Realtime +{ + internal class RecoveryKeyContext + { + [JsonProperty("connectionKey")] + public string ConnectionKey { get; set; } + + [JsonProperty("msgSerial")] + public long MsgSerial { get; set; } + + [JsonProperty("channelSerials")] + public IDictionary ChannelSerials { get; set; } + + public string Encode() + { + return JsonHelper.Serialize(this); + } + + public static RecoveryKeyContext Decode(string recover, ILogger logger = null) + { + try + { + return JsonHelper.Deserialize(recover); + } + catch (Exception) + { + logger?.Warning($"Error deserializing recover - {recover}, setting it as null"); + return null; + } + } + } +} diff --git a/src/IO.Ably.Shared/Realtime/Workflows/RealtimeState.cs b/src/IO.Ably.Shared/Realtime/Workflows/RealtimeState.cs index a8641dcb9..07515f861 100644 --- a/src/IO.Ably.Shared/Realtime/Workflows/RealtimeState.cs +++ b/src/IO.Ably.Shared/Realtime/Workflows/RealtimeState.cs @@ -27,12 +27,6 @@ public ConnectionData(List fallbackHosts) /// public string Id { get; set; } - /// - /// The serial number of the last message received on this connection. - /// The serial number may be used when recovering connection state. - /// - public long? Serial { get; set; } - public string Host { get; set; } public bool IsFallbackHost => FallbackHosts.Contains(Host); @@ -87,16 +81,12 @@ public void Update(ConnectionInfo info) { Id = info.ConnectionId; Key = info.ConnectionKey; - Serial = info.ConnectionSerial; if (info.ConnectionStateTtl.HasValue) { ConnectionStateTtl = info.ConnectionStateTtl.Value; } } - public bool IsResumed(ConnectionInfo info) => - Key.IsNotEmpty() && Id == info.ConnectionId; - public void ClearKeyAndId() { Id = string.Empty; @@ -108,14 +98,6 @@ public void SetConfirmedAlive(DateTimeOffset now) ConfirmedAliveAt = now; } - public void UpdateSerial(ProtocolMessage message) - { - if (message.ConnectionSerial.HasValue) - { - Serial = message.ConnectionSerial.Value; - } - } - public void ClearKey() { Key = string.Empty; diff --git a/src/IO.Ably.Shared/Realtime/Workflows/RealtimeWorkflow.cs b/src/IO.Ably.Shared/Realtime/Workflows/RealtimeWorkflow.cs index 13572e063..fbf3dc3c3 100644 --- a/src/IO.Ably.Shared/Realtime/Workflows/RealtimeWorkflow.cs +++ b/src/IO.Ably.Shared/Realtime/Workflows/RealtimeWorkflow.cs @@ -93,7 +93,6 @@ private void SetInitialConnectionState() { var initialState = new ConnectionInitializedState(ConnectionManager, Logger); State.Connection.CurrentStateObject = initialState; - SetRecoverKeyIfPresent(Client.Options.Recover); } public void Start() @@ -520,7 +519,6 @@ async Task ProcessMessage(ProtocolMessage message) { try { - State.Connection.UpdateSerial(message); State.Connection.SetConfirmedAlive(Now()); foreach (var (name, handler) in ProtocolMessageProcessors) @@ -597,30 +595,16 @@ private Result SendMessage(ProtocolMessage message, Action call return ConnectionManager.SendToTransport(message); } - private void SetRecoverKeyIfPresent(string recover) - { - if (recover.IsNotEmpty()) - { - var match = TransportParams.RecoveryKeyRegex.Match(recover); - if (match.Success && long.TryParse(match.Groups[3].Value, out long messageSerial)) - { - State.Connection.MessageSerial = messageSerial; - } - else - { - Logger.Error($"Recovery Key '{recover}' could not be parsed."); - } - } - } - private void HandleConnectedCommand(SetConnectedStateCommand cmd) { var info = new ConnectionInfo(cmd.Message); - bool resumed = State.Connection.IsResumed(info); - bool hadPreviousConnection = State.Connection.Key.IsNotEmpty(); + // recover is used when set via clientOptions#recover initially, resume will be used for all subsequent requests. + var isConnectionResumeOrRecoverAttempt = State.Connection.Key.IsNotEmpty() || Client.Options.Recover.IsNotEmpty(); - State.Connection.Update(info); + var failedResumeOrRecover = State.Connection.Id != info.ConnectionId && cmd.Message.Error != null; // RTN15c7, RTN16d + + State.Connection.Update(info); // RTN16d, RTN15e if (info.ClientId.IsNotEmpty()) { @@ -633,25 +617,29 @@ private void HandleConnectedCommand(SetConnectedStateCommand cmd) cmd.IsUpdate, Logger); - SetState(connectedState); + SetState(connectedState); // RTN15c7 - if error, set on connection and part of emitted connected event - if (hadPreviousConnection && resumed == false) - { - ClearAckQueueAndFailMessages(null); + Client.Options.Recover = null; // RTN16k, explicitly setting null so it won't be used for subsequent connection requests - Logger.Warning( - "Force detaching all attached channels because the connection did not resume successfully!"); + // RTN15c7 + if (isConnectionResumeOrRecoverAttempt && failedResumeOrRecover) + { + State.Connection.MessageSerial = 0; + } + // RTN15g3, RTN15c6, RTN15c7, RTN16l - for resume/recovered or when connection ttl passed, re-attach channels + if (State.Connection.HasConnectionStateTtlPassed(Now) || isConnectionResumeOrRecoverAttempt) + { foreach (var channel in Channels) { - if (channel.State == ChannelState.Attached || channel.State == ChannelState.Attaching) + if (channel.State == ChannelState.Attaching || channel.State == ChannelState.Attached || channel.State == ChannelState.Suspended) { - ((RealtimeChannel)channel).SetChannelState(ChannelState.Detached, cmd.Message.Error); + ((RealtimeChannel)channel).Attach(null, null, null, true); // state changes as per RTL2g } } } - SendPendingMessages(resumed); + SendPendingMessagesOnConnected(failedResumeOrRecover); // RTN19a } private void HandlePingTimer(PingTimerCommand cmd) @@ -775,12 +763,12 @@ private async Task HandleSetStateCommand(RealtimeCommand comman case SetFailedStateCommand cmd: - State.Connection.ClearKeyAndId(); ClearAckQueueAndFailMessages(ErrorInfo.ReasonFailed); var error = TransformIfTokenErrorAndNotRetryable(); var failedState = new ConnectionFailedState(ConnectionManager, error, Logger); SetState(failedState); + State.Connection.ClearKeyAndId(); // RTN8c, RTN9c ConnectionManager.DestroyTransport(); @@ -813,6 +801,7 @@ ErrorInfo TransformIfTokenErrorAndNotRetryable() SetState(disconnectedState, skipTimer: cmd.SkipAttach); + // RTN7d if (Client.Options.QueueMessages == false) { var failAckMessages = new ErrorInfo( @@ -852,12 +841,13 @@ ErrorInfo TransformIfTokenErrorAndNotRetryable() break; case SetClosingStateCommand _: + var transport = ConnectionManager.Transport; var connectedTransport = transport?.State == TransportState.Connected; var closingState = new ConnectionClosingState(ConnectionManager, connectedTransport, Logger); - SetState(closingState); + State.Connection.ClearKeyAndId(); // RTN8c, RTN9c if (connectedTransport) { @@ -879,11 +869,12 @@ ErrorInfo TransformIfTokenErrorAndNotRetryable() var suspendedState = new ConnectionSuspendedState(ConnectionManager, cmd.Error, Logger); SetState(suspendedState); + State.Connection.ClearKeyAndId(); // RTN8c, RTN9c + break; case SetClosedStateCommand cmd: - State.Connection.ClearKeyAndId(); ClearAckQueueAndFailMessages(ErrorInfo.ReasonClosed); var closedState = new ConnectionClosedState(ConnectionManager, cmd.Error, Logger) @@ -892,6 +883,7 @@ ErrorInfo TransformIfTokenErrorAndNotRetryable() }; SetState(closedState); + State.Connection.ClearKeyAndId(); // RTN8c, RTN9c ConnectionManager.DestroyTransport(); @@ -974,11 +966,22 @@ private void UpdateStateAndNotifyConnection(ConnectionStateBase newState) } } - private void SendPendingMessages(bool resumed) + private void SendPendingMessagesOnConnected(bool failedResumeOrRecover) { - if (resumed) + // RTN19a1 + if (failedResumeOrRecover) + { + foreach (var messageAndCallback in State.WaitingForAck) + { + State.PendingMessages.Add(new MessageAndCallback( + messageAndCallback.Message, + messageAndCallback.Callback, + messageAndCallback.Logger)); + } + } + else { - // Resend any messages waiting an Ack Queue + // RTN19a2 - successful resume, msgSerial doesn't change foreach (var message in State.WaitingForAck.Select(x => x.Message)) { ConnectionManager.SendToTransport(message); diff --git a/src/IO.Ably.Shared/Transport/ConnectionInfo.cs b/src/IO.Ably.Shared/Transport/ConnectionInfo.cs index a5f5de4d9..d7f47269b 100644 --- a/src/IO.Ably.Shared/Transport/ConnectionInfo.cs +++ b/src/IO.Ably.Shared/Transport/ConnectionInfo.cs @@ -31,7 +31,6 @@ public ConnectionInfo(ProtocolMessage message) } ConnectionId = message.ConnectionId; - ConnectionSerial = message.ConnectionSerial ?? -1; ClientId = message.ConnectionDetails?.ClientId; ConnectionStateTtl = message.ConnectionDetails?.ConnectionStateTtl; ConnectionKey = message.ConnectionDetails?.ConnectionKey; @@ -52,11 +51,6 @@ public ConnectionInfo(ProtocolMessage message) /// public string ConnectionId { get; private set; } - /// - /// the connection serial. - /// - public long ConnectionSerial { get; private set; } - /// /// the connection secret key string that is used to resume a connection and its state. /// diff --git a/src/IO.Ably.Shared/Transport/ConnectionManager.cs b/src/IO.Ably.Shared/Transport/ConnectionManager.cs index 5daef36a0..d2a2869f2 100644 --- a/src/IO.Ably.Shared/Transport/ConnectionManager.cs +++ b/src/IO.Ably.Shared/Transport/ConnectionManager.cs @@ -4,6 +4,7 @@ using IO.Ably.MessageEncoders; using IO.Ably.Realtime; using IO.Ably.Realtime.Workflow; +using IO.Ably.Shared.Realtime; using IO.Ably.Transport.States.Connection; using IO.Ably.Types; using IO.Ably.Utils; @@ -65,7 +66,18 @@ public async Task CreateTransport(string host) try { - var transport = GetTransportFactory().CreateTransport(await CreateTransportParameters(host)); + var transportParams = await CreateTransportParameters(host); + if (transportParams.RecoverValue.IsNotEmpty()) + { + var recoveryKeyContext = RecoveryKeyContext.Decode(transportParams.RecoverValue, Logger); + if (recoveryKeyContext != null) + { + Connection.RealtimeClient.Channels.SetChannelSerialsFromRecoverOption(recoveryKeyContext.ChannelSerials); + Connection.InnerState.MessageSerial = recoveryKeyContext.MsgSerial; + } + } + + var transport = GetTransportFactory().CreateTransport(transportParams); transport.Listener = this; Transport = transport; Transport.Connect(); @@ -296,8 +308,7 @@ internal async Task CreateTransportParameters(string host) host, RestClient.AblyAuth, Options, - Connection.Key, - Connection.Serial); + Connection.Key); } public void HandleNetworkStateChange(NetworkState state) diff --git a/src/IO.Ably.Shared/Transport/TransportParams.cs b/src/IO.Ably.Shared/Transport/TransportParams.cs index e43eecf1a..357a0943b 100644 --- a/src/IO.Ably.Shared/Transport/TransportParams.cs +++ b/src/IO.Ably.Shared/Transport/TransportParams.cs @@ -2,8 +2,8 @@ using System.Collections.Generic; using System.Linq; using System.Net; -using System.Text.RegularExpressions; using System.Threading.Tasks; +using IO.Ably.Shared.Realtime; namespace IO.Ably.Transport { @@ -12,8 +12,6 @@ namespace IO.Ably.Transport /// public class TransportParams { - internal static Regex RecoveryKeyRegex { get; set; } = new Regex(@"^([\w!-]+):(-?\d+):(-?\d+)$"); - internal ILogger Logger { get; private set; } /// @@ -41,11 +39,6 @@ public class TransportParams /// public string ConnectionKey { get; private set; } - /// - /// Connection serial. - /// - public long? ConnectionSerial { get; set; } - /// /// Whether to use the binary protocol. /// @@ -87,7 +80,7 @@ private TransportParams() { } - internal static async Task Create(string host, AblyAuth auth, ClientOptions options, string connectionKey = null, long? connectionSerial = null, ILogger logger = null) + internal static async Task Create(string host, AblyAuth auth, ClientOptions options, string connectionKey = null, ILogger logger = null) { var result = new TransportParams { @@ -96,7 +89,6 @@ internal static async Task Create(string host, AblyAuth auth, C Port = options.Tls ? options.TlsPort : options.Port, ClientId = options.GetClientId(), ConnectionKey = connectionKey, - ConnectionSerial = connectionSerial, EchoMessages = options.EchoMessages, FallbackHosts = options.GetFallbackHosts(), UseBinaryProtocol = options.UseBinaryProtocol, @@ -195,21 +187,19 @@ public Dictionary GetParams() result["format"] = UseBinaryProtocol ? "msgpack" : "json"; result["echo"] = EchoMessages.ToString().ToLower(); + // RTN15b - resume connection using connectionKey if (ConnectionKey.IsNotEmpty()) { result["resume"] = ConnectionKey; - if (ConnectionSerial.HasValue) - { - result["connection_serial"] = ConnectionSerial.Value.ToString(); - } } + + // RTN16k - recover connection using clientOptions#recover connectionKey else if (RecoverValue.IsNotEmpty()) { - var match = RecoveryKeyRegex.Match(RecoverValue); - if (match.Success) + var recoveryKeyContext = RecoveryKeyContext.Decode(RecoverValue, Logger); + if (recoveryKeyContext != null) { - result["recover"] = match.Groups[1].Value; - result["connection_serial"] = match.Groups[2].Value; + result["recover"] = recoveryKeyContext.ConnectionKey; } } diff --git a/src/IO.Ably.Shared/Types/ErrorCodes.cs b/src/IO.Ably.Shared/Types/ErrorCodes.cs index cfd364e24..c1f303d5b 100644 --- a/src/IO.Ably.Shared/Types/ErrorCodes.cs +++ b/src/IO.Ably.Shared/Types/ErrorCodes.cs @@ -80,7 +80,6 @@ internal static class ErrorCodes public const int ConnectionNotEstablishedNoTransportHandle = 80009; public const int InvalidTransportHandle = 80010; public const int UnableToRecoverIncompatibleAuthParams = 80011; - public const int UnableToRecoverInvalidOrUnspecifiedConnectionSerial = 80012; public const int ProtocolError = 80013; public const int ConnectionTimedOut = 80014; public const int IncompatibleConnectionParams = 80015; diff --git a/src/IO.Ably.Shared/Types/PresenceMessage.cs b/src/IO.Ably.Shared/Types/PresenceMessage.cs index 38f9abe09..10f1ba86a 100644 --- a/src/IO.Ably.Shared/Types/PresenceMessage.cs +++ b/src/IO.Ably.Shared/Types/PresenceMessage.cs @@ -53,7 +53,7 @@ public PresenceMessage() /// presence action. /// id of client. public PresenceMessage(PresenceAction action, string clientId) - : this(action, clientId, null) + : this(action, clientId, null, null) { } @@ -63,11 +63,13 @@ public PresenceMessage(PresenceAction action, string clientId) /// presence action. /// id of client. /// custom data object passed with the presence message. - public PresenceMessage(PresenceAction action, string clientId, object data) + /// ably message id. + public PresenceMessage(PresenceAction action, string clientId, object data, string id = null) { Action = action; ClientId = clientId; Data = data; + Id = id; } /// diff --git a/src/IO.Ably.Shared/Types/ProtocolMessage.cs b/src/IO.Ably.Shared/Types/ProtocolMessage.cs index 8712fcafe..0083046d2 100644 --- a/src/IO.Ably.Shared/Types/ProtocolMessage.cs +++ b/src/IO.Ably.Shared/Types/ProtocolMessage.cs @@ -41,7 +41,7 @@ public enum MessageAction Presence = 14, Message = 15, Sync = 16, - Auth = 17 + Auth = 17, #pragma warning restore SA1602 // Enumeration items should be documented #pragma warning restore CS1591 // Missing XML comment for publicly visible type or member } @@ -170,12 +170,6 @@ internal ProtocolMessage(MessageAction action, string channel) [JsonProperty("connectionId")] public string ConnectionId { get; set; } - /// - /// Current connection serial. - /// - [JsonProperty("connectionSerial")] - public long? ConnectionSerial { get; set; } - /// /// Current message serial. /// diff --git a/src/IO.Ably.Tests.Shared/IO.Ably.Tests.Shared.projitems b/src/IO.Ably.Tests.Shared/IO.Ably.Tests.Shared.projitems index 69a5c73b1..bfbf3889a 100644 --- a/src/IO.Ably.Tests.Shared/IO.Ably.Tests.Shared.projitems +++ b/src/IO.Ably.Tests.Shared/IO.Ably.Tests.Shared.projitems @@ -100,7 +100,6 @@ - @@ -122,6 +121,7 @@ + diff --git a/src/IO.Ably.Tests.Shared/Infrastructure/AblyRealtimeSpecs.cs b/src/IO.Ably.Tests.Shared/Infrastructure/AblyRealtimeSpecs.cs index 0037d5296..916bc7a47 100644 --- a/src/IO.Ably.Tests.Shared/Infrastructure/AblyRealtimeSpecs.cs +++ b/src/IO.Ably.Tests.Shared/Infrastructure/AblyRealtimeSpecs.cs @@ -32,8 +32,7 @@ protected AblyRealtimeSpecs(ITestOutputHelper output) new ProtocolMessage(ProtocolMessage.MessageAction.Connected) { ConnectionDetails = new ConnectionDetails { ConnectionKey = "connectionKey" }, - ConnectionId = "1", - ConnectionSerial = 100 + ConnectionId = "1" }; public void Dispose() diff --git a/src/IO.Ably.Tests.Shared/Infrastructure/ConnectionAwaiter.cs b/src/IO.Ably.Tests.Shared/Infrastructure/ConnectionAwaiter.cs index bb80eccc3..3f288526d 100644 --- a/src/IO.Ably.Tests.Shared/Infrastructure/ConnectionAwaiter.cs +++ b/src/IO.Ably.Tests.Shared/Infrastructure/ConnectionAwaiter.cs @@ -78,7 +78,7 @@ public async Task Wait(TimeSpan timeout) DefaultLogger.Debug($"[{_id} Timeout exceeded. Throwing TimeoutException"); RemoveListener(); throw new TimeoutException( - $"Expected ''{_awaitedStates.Select(x => x.ToString()).JoinStrings()}' but current state was '{_connection.State}'"); + $"Expected '{_awaitedStates.Select(x => x.ToString()).JoinStrings()}' but current state was '{_connection.State}'"); } } } diff --git a/src/IO.Ably.Tests.Shared/Infrastructure/TestTransportFactory.cs b/src/IO.Ably.Tests.Shared/Infrastructure/TestTransportFactory.cs index 9485b5dcc..30c153552 100644 --- a/src/IO.Ably.Tests.Shared/Infrastructure/TestTransportFactory.cs +++ b/src/IO.Ably.Tests.Shared/Infrastructure/TestTransportFactory.cs @@ -4,6 +4,10 @@ namespace IO.Ably.Tests.Infrastructure { + /// + /// This class is used for controlling wrapped TestTranport externally. + /// Can only be passed as a clientOption. + /// public class TestTransportFactory : ITransportFactory { private readonly Action _onWrappedTransportCreated; diff --git a/src/IO.Ably.Tests.Shared/JsonMessageSerializerTests.cs b/src/IO.Ably.Tests.Shared/JsonMessageSerializerTests.cs index 317fa8c1d..bd0b430c3 100644 --- a/src/IO.Ably.Tests.Shared/JsonMessageSerializerTests.cs +++ b/src/IO.Ably.Tests.Shared/JsonMessageSerializerTests.cs @@ -290,27 +290,6 @@ public void DeserializesMessageCorrectly_Id(string id) Assert.Equal(id, target.Id); } - [Theory] - [InlineData(123)] - [InlineData("123")] - [InlineData(0)] - [InlineData("0")] - [InlineData(-1)] - [InlineData("-1")] - public void DeserializesMessageCorrectly_ConnectionSerial(object connectionSerial) - { - // Arrange - string message = $"{{\"connectionSerial\":{connectionSerial}}}"; - - // Act - ProtocolMessage target = JsonHelper.Deserialize(message); - - // Assert - target.Should().NotBeNull(); - Debug.Assert(target.ConnectionSerial.HasValue, $"'{nameof(target.ConnectionSerial)}' should have a value"); - target.ConnectionSerial.Value.Should().Be(ToLong(connectionSerial)); - } - [Theory] [InlineData(123)] [InlineData("123")] diff --git a/src/IO.Ably.Tests.Shared/MsgPackMessageSerializerTests.cs b/src/IO.Ably.Tests.Shared/MsgPackMessageSerializerTests.cs index 854471a3b..e41243fb3 100644 --- a/src/IO.Ably.Tests.Shared/MsgPackMessageSerializerTests.cs +++ b/src/IO.Ably.Tests.Shared/MsgPackMessageSerializerTests.cs @@ -375,26 +375,6 @@ public void DeserializesMessageCorrectly_Id(string id) Assert.Equal(id, target.Id); } - [Theory] - [InlineData(123)] - [InlineData(0)] - [InlineData(-1)] - public void DeserializesMessageCorrectly_ConnectionSerial(long connectionSerial) - { - // Arrange - List expectedMessage = new List(); - expectedMessage.Add(0x81); - expectedMessage.AddRange(SerializeString("connectionSerial")); - expectedMessage.Add(BitConverter.GetBytes(connectionSerial).First()); - - // Act - ProtocolMessage target = MsgPackHelper.Deserialise(expectedMessage.ToArray()); - - // Assert - target.Should().NotBeNull(); - Assert.Equal(connectionSerial, target.ConnectionSerial.Value); - } - [Theory] [InlineData(123)] [InlineData(0)] diff --git a/src/IO.Ably.Tests.Shared/Realtime/ConnectionSandBoxSpecs.cs b/src/IO.Ably.Tests.Shared/Realtime/ConnectionSandBoxSpecs.cs index 7e872b37a..7ed10bfd5 100644 --- a/src/IO.Ably.Tests.Shared/Realtime/ConnectionSandBoxSpecs.cs +++ b/src/IO.Ably.Tests.Shared/Realtime/ConnectionSandBoxSpecs.cs @@ -8,6 +8,7 @@ using FluentAssertions; using IO.Ably.Realtime; using IO.Ably.Realtime.Workflow; +using IO.Ably.Shared.Realtime; using IO.Ably.Tests.Infrastructure; using IO.Ably.Tests.Shared.Utils; using IO.Ably.Transport; @@ -419,163 +420,137 @@ public async Task ShouldDisconnectIfConnectionIsNotEstablishedWithInDefaultTimeo [Theory] [ProtocolData] - [Trait("spec", "RTN15e")] - public async Task ShouldUpdateConnectionKeyWhenConnectionIsResumed(Protocol protocol) + [Trait("spec", "RTN15d")] + public async Task ResumeRequest_ShouldReceivePendingMessagesOnceConnectionResumed(Protocol protocol) { - var client = await GetRealtimeClient(protocol); - await WaitForState(client, ConnectionState.Connected, TimeSpan.FromSeconds(10)); - var initialConnectionKey = client.Connection.Key; - var initialConnectionId = client.Connection.Id; - client.ConnectionManager.Transport.Close(false); - await WaitForState(client, ConnectionState.Disconnected); - await WaitForState(client, ConnectionState.Connected, TimeSpan.FromSeconds(10)); - client.Connection.Id.Should().Be(initialConnectionId); - client.Connection.Key.Should().NotBe(initialConnectionKey); - } + var client1 = await GetRealtimeClient(protocol); + await client1.WaitForState(ConnectionState.Connected); - [Theory(Skip = "Intermittently fails")] - [ProtocolData] - [Trait("spec", "RTN15c1")] - public async Task ResumeRequest_ConnectedProtocolMessageWithSameConnectionId_WithNoError(Protocol protocol) - { - var client = await GetRealtimeClient(protocol); - var channel = (RealtimeChannel)client.Channels.Get("RTN15c1".AddRandomSuffix()); - await client.WaitForState(ConnectionState.Connected); - var connectionId = client.Connection.Id; - await channel.AttachAsync(); - channel.State.Should().Be(ChannelState.Attached); - - // kill the transport so the connection becomes DISCONNECTED - client.ConnectionManager.Transport.Close(false); - await client.WaitForState(ConnectionState.Disconnected); - - var awaiter = new TaskCompletionAwaiter(15000); - client.Connection.Once(ConnectionEvent.Connected, change => + var client2TransportFactory = new TestTransportFactory(); + var client2 = await GetRealtimeClient(protocol, (options, _) => { - change.HasError.Should().BeFalse(); - awaiter.SetCompleted(); + options.TransportFactory = client2TransportFactory; }); + await client2.WaitForState(ConnectionState.Connected); + var client2InitialConnectionId = client2.Connection.Id; - channel.Publish(null, "foo"); - - await client.ProcessCommands(); + var channelName = "RTN15d".AddRandomSuffix(); + var channelsCount = 5; + for (var i = 0; i < channelsCount; i++) + { + await client1.Channels.Get($"{channelName}_{i}").AttachAsync(); + await client2.Channels.Get($"{channelName}_{i}").AttachAsync(); + } - // currently disconnected so message is queued - // client.State.PendingMessages.Should().HaveCount(1); + var client2MessageAwaiter = new TaskCompletionAwaiter(15000, channelsCount); + var channel2Messages = new List(); + foreach (var realtimeChannel in client2.Channels) + { + realtimeChannel.Subscribe("data", message => + { + channel2Messages.Add(message); + client2MessageAwaiter.Tick(); + }); + } - // wait for reconnection - var didConnect = await awaiter.Task; - didConnect.Should().BeTrue(); + var client2ConnectedAwaiter = new TaskCompletionAwaiter(15000, channelsCount); + client2TransportFactory.BeforeMessageSent += message => + { + if (message.Action == ProtocolMessage.MessageAction.Connect) + { + client2ConnectedAwaiter.Task.Wait(); + } + }; + client2.ConnectionManager.Transport.Close(false); + await client2.WaitForState(ConnectionState.Disconnected); - // we should have received a CONNECTED Protocol message with a corresponding connectionId - client.GetTestTransport().ProtocolMessagesReceived.Count(x => x.Action == ProtocolMessage.MessageAction.Connected).Should().Be(1); - var connectedProtocolMessage = client.GetTestTransport().ProtocolMessagesReceived.First(x => x.Action == ProtocolMessage.MessageAction.Connected); - connectedProtocolMessage.ConnectionId.Should().Be(connectionId); + // Publish messages on client1 channels + foreach (var client1Channel in client1.Channels) + { + await client1Channel.PublishAsync("data", "hello"); + } - await client.ProcessCommands(); + client2ConnectedAwaiter.SetCompleted(); + await client2.WaitForState(ConnectionState.Connected); + client2.Connection.ErrorReason.Should().BeNull(); + client2.Connection.Id.Should().Be(client2InitialConnectionId); // connection is successfully resumed/recovered - // channel should be attached and pending messages sent - channel.State.Should().Be(ChannelState.Attached); - client.State.PendingMessages.Should().HaveCount(0); + var client2MessagesReceived = await client2MessageAwaiter.Task; + client2MessagesReceived.Should().BeTrue(); - var history = await channel.HistoryAsync(); - history.Items.Should().HaveCount(1); - history.Items[0].Data.Should().Be("foo"); + channel2Messages.Should().HaveCount(channelsCount); + foreach (var channel2Message in channel2Messages) + { + channel2Message.Data.Should().Be("hello"); + } } - [Theory(Skip = "Intermittently fails")] + [Theory] [ProtocolData] - [Trait("spec", "RTN15c2")] - public async Task ResumeRequest_ConnectedProtocolMessageWithSameConnectionId_WithError(Protocol protocol) + [Trait("spec", "RTN16d")] + public async Task RecoverRequest_ShouldInitializeRecoveryContextAndReceiveSameConnectionIdOnRecoverSuccess(Protocol protocol) { - var client = await GetRealtimeClient(protocol); - var channel = (RealtimeChannel)client.Channels.Get("RTN15c1".AddRandomSuffix()); - await client.WaitForState(ConnectionState.Connected); - var connectionId = client.Connection.Id; - - // inject fake error messages into protocol messages - var transportFactory = (TestTransportFactory)client.Options.TransportFactory; - transportFactory.OnTransportCreated += wrapper => + var client1 = await GetRealtimeClient(protocol); + await client1.WaitForState(ConnectionState.Connected); + var client1ConnectionId = client1.Connection.Id; + for (var i = 0; i < 5; i++) { - wrapper.BeforeDataProcessed = message => - { - // inject an error before the protocol message is processed - if (message.Action == ProtocolMessage.MessageAction.Connected) - { - message.Error = new ErrorInfo("Faked error", 0); - } + var channel = client1.Channels.Get("RTN16d".AddRandomSuffix()); + await channel.AttachAsync(); + } - if (message.Action == ProtocolMessage.MessageAction.Attached) - { - message.Error = new ErrorInfo("Faked channel error", 0); - } - }; - }; + var recoveryKey = client1.Connection.CreateRecoveryKey(); + var recoveryKeyContext = RecoveryKeyContext.Decode(recoveryKey); - // kill the transport so the connection becomes DISCONNECTED - client.ConnectionManager.Transport.Close(false); - await client.WaitForState(ConnectionState.Disconnected); + recoveryKeyContext.ConnectionKey.Should().Be(client1.Connection.Key); + recoveryKeyContext.ChannelSerials.Count.Should().Be(client1.Channels.Count()); + recoveryKeyContext.MsgSerial.Should().Be(client1.Connection.MessageSerial); - // track connection state change - ConnectionStateChange stateChange = null; - var connectedAwaiter = new TaskCompletionAwaiter(15000); - client.Connection.Once(ConnectionEvent.Connected, change => - { - stateChange = change; - connectedAwaiter.SetCompleted(); - }); + client1.ExecuteCommand(SetDisconnectedStateCommand.Create(null)); - // track channel stage change - ChannelStateChange channelStateChange = null; - var attachedAwaiter = new TaskCompletionAwaiter(30000); - channel.Once(ChannelEvent.Attached, change => + var client2 = await GetRealtimeClient(protocol, (options, _) => { - channelStateChange = change; - attachedAwaiter.SetCompleted(); + options.Recover = recoveryKey; }); - // publish - channel.Attach(); - channel.Publish(null, "foo"); - - // wait for connection - var didConnect = await connectedAwaiter.Task; - didConnect.Should().BeTrue(); - - // it should have the injected error - stateChange.HasError.Should().BeTrue(); - stateChange.Reason.Message.Should().Be("Faked error"); - - // we should have received a CONNECTED Protocol message with a corresponding connectionId - client.GetTestTransport().ProtocolMessagesReceived.Count(x => x.Action == ProtocolMessage.MessageAction.Connected).Should().Be(1); - var connectedProtocolMessage = client.GetTestTransport().ProtocolMessagesReceived.First(x => x.Action == ProtocolMessage.MessageAction.Connected); - connectedProtocolMessage.ConnectionId.Should().Be(connectionId); - client.Connection.ErrorReason.Should().Be(stateChange.Reason); - - // wait for the channel to attach - await attachedAwaiter.Task; - - // it chanel state change event should have the injected error - channelStateChange.Error.Message.Should().Be("Faked channel error"); + await client2.WaitForState(ConnectionState.Connected); + client2.Connection.Id.Should().Be(client1ConnectionId); + client2.Connection.MessageSerial.Should().Be(recoveryKeyContext.MsgSerial); + client2.Connection.Key.Should().NotBe(recoveryKeyContext.ConnectionKey); + foreach (var realtimeChannel in client1.Channels) + { + realtimeChannel.Properties.ChannelSerial.Should().Be(recoveryKeyContext.ChannelSerials[realtimeChannel.Name]); + } - // queued messages should now have been sent - client.State.PendingMessages.Should().HaveCount(0); + client2.Options.Recover.Should().BeNull(); - var history = await channel.HistoryAsync(); - history.Items.Should().HaveCount(1); - history.Items[0].Data.Should().Be("foo"); + client1.Close(); + client2.Close(); + } - // clean up - client.Close(); + [Theory] + [ProtocolData] + [Trait("spec", "RTN15e")] + public async Task ResumeRequest_ShouldUpdateConnectionKeyWhenConnectionIsResumed(Protocol protocol) + { + var client = await GetRealtimeClient(protocol); + await WaitForState(client, ConnectionState.Connected, TimeSpan.FromSeconds(10)); + var initialConnectionKey = client.Connection.Key; + var initialConnectionId = client.Connection.Id; + client.ConnectionManager.Transport.Close(false); + await WaitForState(client, ConnectionState.Disconnected); + await WaitForState(client, ConnectionState.Connected, TimeSpan.FromSeconds(10)); + client.Connection.Id.Should().Be(initialConnectionId); + client.Connection.Key.Should().NotBe(initialConnectionKey); } [Theory] [ProtocolData] - [Trait("spec", "RTN15c3")] - public async Task ResumeRequest_ConnectedProtocolMessageWithNewConnectionId_WithErrorInError(Protocol protocol) + [Trait("spec", "RTN15c7")] + public async Task ResumeRequest_ConnectedProtocolMessageWithResumeFailedShouldEmitErrorOnConnection(Protocol protocol) { var client = await GetRealtimeClient(protocol); - var channel = (RealtimeChannel)client.Channels.Get("RTN15c3".AddRandomSuffix()); + var channel = (RealtimeChannel)client.Channels.Get("RTN15c7".AddRandomSuffix()); await client.WaitForState(ConnectionState.Connected); channel.Attach(); await channel.WaitForAttachedState(); @@ -584,7 +559,9 @@ public async Task ResumeRequest_ConnectedProtocolMessageWithNewConnectionId_With var oldConnectionId = client.Connection.Id; var oldKey = client.Connection.Key; - client.SimulateLostConnectionAndState(); + client.State.Connection.Id = string.Empty; + client.State.Connection.Key = "xxxxx!xxxxxxx-xxxxxxxx-xxxxxxxx"; // invalid connection key for next resume request + client.GetTestTransport().Close(false); ConnectionStateChange stateChange = null; await WaitFor(done => @@ -598,7 +575,7 @@ await WaitFor(done => stateChange.Should().NotBeNull(); stateChange.HasError.Should().BeTrue(); - stateChange.Reason.Code.Should().Be(80008); + stateChange.Reason.Code.Should().Be(ErrorCodes.InvalidFormatForConnectionId); stateChange.Reason.Should().Be(client.Connection.ErrorReason); var protocolMessage = client.GetTestTransport().ProtocolMessagesReceived.FirstOrDefault(x => x.Action == ProtocolMessage.MessageAction.Connected); @@ -610,88 +587,118 @@ await WaitFor(done => client.Connection.Id.Should().NotBe(oldConnectionId); client.Connection.Key.Should().NotBe(oldKey); client.Connection.MessageSerial.Should().Be(0); + + client.Close(); } [Theory] [ProtocolData] - [Trait("spec", "RTN15c3")] - public async Task ResumeRequest_ConnectedProtocolMessageWithNewConnectionId_WithErrorInError_DetachesAllChannels(Protocol protocol) + [Trait("spec", "RTN15c6")] + [Trait("spec", "RTN15c7")] + public async Task ResumeRequest_ConnectedProtocolMessage_AttachAllChannelsAndProcessPendingMessages( + Protocol protocol) { var client = await GetRealtimeClient(protocol); - var channelName = "RTN15c3".AddRandomSuffix(); + var channelName = "RTN15c6.RTN15c7.".AddRandomSuffix(); const int channelCount = 5; await client.WaitForState(ConnectionState.Connected); + var connectionId = client.Connection.Id; - List channels = new List(); + var channels = new List(); for (var i = 0; i < channelCount; i++) { - channels.Add(client.Channels.Get($"{channelName}_{i}") as RealtimeChannel); + var channel = client.Channels.Get($"{channelName}_{i}"); + await channel.AttachAsync(); + channels.Add(channel as RealtimeChannel); } - List detachedChannels = new List(); - List detachedStateChanges = new List(); + TaskCompletionAwaiter connectedAwaiter = null; - var detachAwaiter = new TaskCompletionAwaiter(10000, channelCount); - await WaitForMultiple(channelCount, partialDone => + // Should add messages to connection-wide pending queue + async Task PublishMessagesWhileNotConnected() { + // Make sure new transport is created to apply BeforeProtocolMessageProcessed method + await client.WaitForState(ConnectionState.Connecting); + connectedAwaiter = new TaskCompletionAwaiter(15000); + client.BeforeProtocolMessageProcessed(message => + { + if (message.Action == ProtocolMessage.MessageAction.Connected) + { + connectedAwaiter.Task.Wait(); // Keep in connecting state + } + }); + foreach (var channel in channels) { - channel.Attach(); - channel.Once(ChannelEvent.Attached, _ => + channel.Publish("eventName", "foo"); + } + + await client.ProcessCommands(); + client.State.PendingMessages.Should().HaveCount(channelCount); + } + + async Task CheckForAttachedChannelsAfterResume() + { + var attachedChannels = new List(); + await WaitForMultiple(channelCount, partialDone => + { + foreach (var channel in channels) { - channel.Once(ChannelEvent.Detached, change => + channel.Once(ChannelEvent.Attaching, _ => { - detachedChannels.Add(channel); - detachedStateChanges.Add(change); - detachAwaiter.Tick(); + channel.Once(ChannelEvent.Attached, _ => + { + attachedChannels.Add(channel); + partialDone(); + }); }); - partialDone(); - }); - } - }); + } - client.SimulateLostConnectionAndState(); + connectedAwaiter.SetCompleted(); + }); + attachedChannels.Should().HaveCount(channelCount); + } - var didDetach = await detachAwaiter.Task; - didDetach.Should().BeTrue(); - detachedChannels.Should().HaveCount(channelCount); - detachedStateChanges.Should().HaveCount(channelCount); - foreach (var change in detachedStateChanges) + async Task CheckForProcessedPendingMessages() { - change.Error.Message.Should().StartWith("Unable to recover connection"); + client.State.PendingMessages.Should().HaveCount(0); + foreach (var channel in channels) + { + var history = await channel.HistoryAsync(); + history.Items.Count.Should().BeGreaterOrEqualTo(1); + history.Items[0].Data.Should().Be("foo"); + } } - } - [Theory] - [ProtocolData] - [Trait("spec", "RTN15c3")] - public async Task ResumeRequest_ConnectedProtocolMessageWithNewConnectionId_WithErrorInError_EmitsErrorOnChannel(Protocol protocol) - { - var client = await GetRealtimeClient(protocol); - var channel = (RealtimeChannel)client.Channels.Get("RTN15c3".AddRandomSuffix()); - await client.WaitForState(ConnectionState.Connected); - channel.Attach(); - channel.Once(ChannelEvent.Attached, _ => - { - client.SimulateLostConnectionAndState(); - }); + // resume successful - RTN15c6 + client.GetTestTransport().Close(false); + await PublishMessagesWhileNotConnected(); + await CheckForAttachedChannelsAfterResume(); + await CheckForProcessedPendingMessages(); + client.GetTestTransport().ProtocolMessagesReceived.Count(x => x.Action == ProtocolMessage.MessageAction.Connected).Should().Be(1); // For every new transport, list is reset + var connectedProtocolMessage = client.GetTestTransport().ProtocolMessagesReceived.First(x => x.Action == ProtocolMessage.MessageAction.Connected); + connectedProtocolMessage.ConnectionId.Should().Be(connectionId); + connectedProtocolMessage.Error.Should().BeNull(); + client.Connection.ErrorReason.Should().BeNull(); - ChannelErrorEventArgs err = null; - await WaitFor(done => - { - channel.Error += (sender, args) => - { - err = args; - done(); - }; - }); + // resume unsuccessful - RTN15c7 + client.State.Connection.Id = string.Empty; + client.State.Connection.Key = "xxxxx!xxxxxxx-xxxxxxxx-xxxxxxxx"; // invalid connection key for next resume request + client.GetTestTransport().Close(false); + await PublishMessagesWhileNotConnected(); + await CheckForAttachedChannelsAfterResume(); + await CheckForProcessedPendingMessages(); + client.GetTestTransport().ProtocolMessagesReceived.Count(x => x.Action == ProtocolMessage.MessageAction.Connected).Should().Be(1); // For every new transport, list is reset + connectedProtocolMessage = client.GetTestTransport().ProtocolMessagesReceived.First(x => x.Action == ProtocolMessage.MessageAction.Connected); + connectedProtocolMessage.ConnectionId.Should().NotBe(connectionId); + connectedProtocolMessage.Error.Should().NotBeNull(); + client.Connection.ErrorReason.ToString().Should().Be(connectedProtocolMessage.Error.ToString()); + client.Connection.ErrorReason.Code.Should().Be(ErrorCodes.InvalidFormatForConnectionId); - err.Reason.Message.Should().StartWith("Unable to recover connection"); - err.Reason.Code.Should().Be(80008); - err.Reason.Should().Be(channel.ErrorReason); + client.Close(); } - [Theory(Skip = "Keeps failing")] + [Theory] [ProtocolData] [Trait("spec", "RTN15c4")] public async Task ResumeRequest_WithFatalErrorInConnection_ClientAndChannelsShouldBecomeFailed(Protocol protocol) @@ -700,48 +707,61 @@ public async Task ResumeRequest_WithFatalErrorInConnection_ClientAndChannelsShou { options.DisconnectedRetryTimeout = TimeSpan.FromSeconds(2); }); - var channel = (RealtimeChannel)client.Channels.Get("RTN15c4".AddRandomSuffix()); - channel.Attach(); await client.WaitForState(ConnectionState.Connected); + var channels = new List(); + for (var i = 0; i < 4; i++) + { + var channel = (RealtimeChannel)client.Channels.Get("RTN15c4".AddRandomSuffix()); + channel.Attach(); + await channel.WaitForAttachedState(); + channels.Add(channel); + } + client.GetTestTransport().Close(false); await client.WaitForState(ConnectionState.Disconnected); var errInfo = new ErrorInfo("faked error", 0); - client.Connection.Once(ConnectionEvent.Connecting, change => + var connectedAwaiter = new TaskCompletionAwaiter(15000); + await client.WaitForState(ConnectionState.Connecting); + client.BeforeProtocolMessageProcessed(message => { - client.BeforeProtocolMessageProcessed(message => + if (message.Action == ProtocolMessage.MessageAction.Connected) { - if (message.Action == ProtocolMessage.MessageAction.Connected) - { - message.Action = ProtocolMessage.MessageAction.Error; - message.Error = errInfo; - } - }); + message.Action = ProtocolMessage.MessageAction.Error; + message.Error = errInfo; + connectedAwaiter.Task.Wait(); + } }); - ConnectionStateChange stateChange = null; + ConnectionStateChange failedStateChange = null; await WaitFor(done => { client.Connection.Once(ConnectionEvent.Failed, change => { - stateChange = change; + failedStateChange = change; done(); }); + connectedAwaiter.SetCompleted(); }); - stateChange.Reason.Code.Should().Be(errInfo.Code); - stateChange.Reason.Message.Should().Be(errInfo.Message); - - await channel.WaitForState(ChannelState.Failed); - channel.State.Should().Be(ChannelState.Failed); - channel.ErrorReason.Code.Should().Be(errInfo.Code); - channel.ErrorReason.Message.Should().Be(errInfo.Message); + failedStateChange.Previous.Should().Be(ConnectionState.Connecting); + failedStateChange.Current.Should().Be(ConnectionState.Failed); + failedStateChange.Reason.Code.Should().Be(errInfo.Code); + failedStateChange.Reason.Message.Should().Be(errInfo.Message); client.Connection.ErrorReason.Code.Should().Be(errInfo.Code); client.Connection.ErrorReason.Message.Should().Be(errInfo.Message); client.Connection.State.Should().Be(ConnectionState.Failed); + foreach (var channel in channels) + { + await channel.WaitForState(ChannelState.Failed); + channel.State.Should().Be(ChannelState.Failed); + channel.ErrorReason.Code.Should().Be(errInfo.Code); + channel.ErrorReason.Message.Should().Be(errInfo.Message); + } + client.Close(); } @@ -751,39 +771,49 @@ await WaitFor(done => public async Task ResumeRequest_WithTokenAuthError_TransportWillBeClosed(Protocol protocol) { var authClient = await GetRestClient(protocol); - var tokenDetails = await authClient.AblyAuth.RequestTokenAsync(new TokenParams { ClientId = "123", Ttl = TimeSpan.FromSeconds(10) }); - tokenDetails.Expires = DateTimeOffset.UtcNow.AddMinutes(1); // Cheat to make sure the client uses the token - var client = await GetRealtimeClient(protocol, (options, settings) => { - options.TokenDetails = tokenDetails; + options.AuthCallback = async @params => + { + var tokenDetails = await authClient.AblyAuth.RequestTokenAsync(new TokenParams { ClientId = "123", Ttl = TimeSpan.FromSeconds(5) }); + tokenDetails.Expires = DateTimeOffset.UtcNow.AddMinutes(1); // Cheat to make sure the client uses the token + return tokenDetails; + }; options.DisconnectedRetryTimeout = TimeSpan.FromSeconds(1); }); - await client.WaitForState(ConnectionState.Connected); - var channel = client.Channels.Get("RTN15c5".AddRandomSuffix()); channel.Attach(); - - var initialConnectionId = client.Connection.Id; - var initialTransport = client.GetTestTransport(); - + await channel.WaitForAttachedState(); channel.Once(ChannelEvent.Detached, change => throw new Exception("channel should not detach")); - client.Connection.Once(ConnectionEvent.Disconnected, change => - { - change.Reason.Code.Should().Be(ErrorCodes.TokenExpired); - }); await client.WaitForState(ConnectionState.Disconnected); - await client.WaitForState(ConnectionState.Connected); + await client.WaitForState(ConnectionState.Connected); // Connected with a resume request + var prevConnectionId = client.Connection.Id; + var prevTransport = client.GetTestTransport(); + + ConnectionStateChange stateChange = null; + await WaitFor(done => + { + client.Connection.Once(ConnectionEvent.Disconnected, change => + { + stateChange = change; + done(); + }); + }); + stateChange.Should().NotBeNull(); + stateChange.Current.Should().Be(ConnectionState.Disconnected); // Disconnected due to token expired + stateChange.HasError.Should().BeTrue(); + stateChange.Reason.Code.Should().Be(ErrorCodes.TokenExpired); - // transport should have been closed and the client should have a new transport instanced - var secondTransport = client.GetTestTransport(); - initialTransport.Should().NotBe(secondTransport); - initialTransport.State.Should().Be(TransportState.Closed); + await client.WaitForState(ConnectionState.Connecting); + await client.WaitForState(ConnectionState.Connected); + prevTransport.State.Should().Be(TransportState.Closed); // transport should have been closed and the client should have a new transport instanced + var newTransport = client.GetTestTransport(); + newTransport.Should().NotBe(prevTransport); + client.Connection.Id.Should().Be(prevConnectionId); // connection should be resumed, connectionId should be unchanged - // connection should be resumed, connectionId should be unchanged - client.Connection.Id.Should().Be(initialConnectionId); + client.Close(); } [Theory] @@ -812,7 +842,6 @@ public async Task WhenDisconnectedMessageContainsTokenError_IfTokenIsNotRenewabl { var authClient = await GetRestClient(protocol); var tokenDetails = await authClient.AblyAuth.RequestTokenAsync(new TokenParams { ClientId = "123", Ttl = TimeSpan.FromSeconds(2) }); - tokenDetails.Expires = DateTimeOffset.UtcNow.AddMinutes(10); // Cheat the client var client = await GetRealtimeClient(protocol, (options, settings) => { @@ -839,10 +868,9 @@ public async Task WhenDisconnectedMessageContainsTokenError_IfTokenIsNotRenewabl [Trait("spec", "RTN15h2")] public async Task WhenDisconnectedMessageContainsTokenError_IfTokenIsRenewable_ShouldNotEmitError(Protocol protocol) { - var awaiter = new TaskCompletionAwaiter(); var authClient = await GetRestClient(protocol); var tokenDetails = await authClient.AblyAuth.RequestTokenAsync(new TokenParams { ClientId = "123", Ttl = TimeSpan.FromSeconds(2) }); - tokenDetails.Expires = DateTimeOffset.UtcNow.AddMinutes(10); // Cheat the client + var client = await GetRealtimeClient(protocol, (options, settings) => { options.TokenDetails = tokenDetails; @@ -851,25 +879,20 @@ public async Task WhenDisconnectedMessageContainsTokenError_IfTokenIsRenewable_S await client.WaitForState(ConnectionState.Connected); + var awaiter = new TaskCompletionAwaiter(); var stateChanges = new List(); - client.Connection.Once(ConnectionEvent.Disconnected, state => + client.Connection.On(state => { stateChanges.Add(state); - client.Connection.Once(ConnectionEvent.Connecting, state2 => + if (state.Current == ConnectionState.Connected) { - stateChanges.Add(state2); - client.Connection.Once(ConnectionEvent.Connected, state3 => - { - client.Connection.State.Should().Be(ConnectionState.Connected); - client.Connection.ErrorReason.Should().BeNull(); - stateChanges.Add(state3); - awaiter.SetCompleted(); - }); - }); + client.Connection.State.Should().Be(ConnectionState.Connected); + client.Connection.ErrorReason.Should().BeNull(); + awaiter.SetCompleted(); + } }); await awaiter.Task; - stateChanges.Should().HaveCount(3); stateChanges[0].HasError.Should().BeTrue(); stateChanges[0].Reason.Code.Should().Be(ErrorCodes.TokenExpired); @@ -1067,55 +1090,52 @@ Additionally the Connection#errorReason should be set with the error received fr [Theory] [ProtocolData] - [Trait("spec", "RTN16d")] - public async Task WhenConnectionFailsToRecover_ShouldEmmitDetachedMessageToChannels(Protocol protocol) + [Trait("spec", "RTN16l")] + [Trait("spec", "RTN15c7")] + public async Task WithDummyRecoverData_ShouldConnectAndSetAReasonOnTheConnection(Protocol protocol) { - var stateChanges = new List(); + var client = await GetRealtimeClient(protocol, (opts, _) => + { + opts.Recover = "{\"connectionKey\":\"c17a8!WeXvJum2pbuVYZtF-1b63c17a8\",\"msgSerial\":5,\"channelSerials\":{\"channel1\":\"98\",\"channel2\":\"32\",\"channel3\":\"09\"}}"; + opts.AutoConnect = false; + }); - var client = await GetRealtimeClient(protocol); + ErrorInfo err = null; + client.Connection.On((args) => + { + err = args.Reason; + if (args.Current == ConnectionState.Connected) + { + ResetEvent.Set(); + } + }); client.Connect(); - await WaitForConnectedState(client); - - var channel1 = client.Channels.Get("test"); - channel1.On(x => stateChanges.Add(x)); - - channel1.Attach(); - await channel1.PublishAsync("test", "best"); - await channel1.PublishAsync("test", "best"); - - await Task.Delay(2000); - - client.State.Connection.Key = "e02789NdQA86c7!inI5Ydc-ytp7UOm3-3632e02789NdQA86c7"; - - // Kill the transport - client.ConnectionManager.Transport.Close(false); - await Task.Delay(1000); - - await WaitForConnectedState(client); - - stateChanges.Should().Contain(x => x.Current == ChannelState.Detached); - - Task WaitForConnectedState(AblyRealtime rt) - { - return WaitForState(rt); - } + var result = ResetEvent.WaitOne(10000); + result.Should().BeTrue("Timeout"); + err.Should().NotBeNull(); + err.Code.Should().Be(ErrorCodes.InvalidFormatForConnectionId); + client.Connection.MessageSerial.Should().Be(0); + client.Connection.ErrorReason.Code.Should().Be(ErrorCodes.InvalidFormatForConnectionId); } [Theory] [ProtocolData] - [Trait("spec", "RTN16e")] - public async Task WithDummyRecoverData_ShouldConnectAndSetAReasonOnTheConnection(Protocol protocol) + [Trait("spec", "RTN16l")] + [Trait("spec", "RTN15c4")] + public async Task WithInvalidRecoverData_ShouldFailAndSetAReasonOnTheConnection(Protocol protocol) { var client = await GetRealtimeClient(protocol, (opts, _) => { - opts.Recover = "c17a8!WeXvJum2pbuVYZtF-1b63c17a8:-1:-1"; + opts.Recover = "{\"connectionKey\":\"random_key\",\"msgSerial\":5,\"channelSerials\":{\"channel1\":\"98\",\"channel2\":\"32\",\"channel3\":\"09\"}}"; opts.AutoConnect = false; }); + ErrorInfo err = null; client.Connection.On((args) => { - if (args.Current == ConnectionState.Connected) + err = args.Reason; + if (args.Current == ConnectionState.Failed) { ResetEvent.Set(); } @@ -1124,7 +1144,10 @@ public async Task WithDummyRecoverData_ShouldConnectAndSetAReasonOnTheConnection var result = ResetEvent.WaitOne(10000); result.Should().BeTrue("Timeout"); - client.Connection.ErrorReason.Code.Should().Be(80008); + err.Should().NotBeNull(); + err.Code.Should().Be(ErrorCodes.InvalidFormatForConnectionId); + client.Connection.ErrorReason.Code.Should().Be(ErrorCodes.InvalidFormatForConnectionId); + client.Connection.State.Should().Be(ConnectionState.Failed); } [Theory] diff --git a/src/IO.Ably.Tests.Shared/Realtime/ConnectionSandboxTransportSideEffectsSpecs.cs b/src/IO.Ably.Tests.Shared/Realtime/ConnectionSandboxTransportSideEffectsSpecs.cs index 265c94673..f370b82ec 100644 --- a/src/IO.Ably.Tests.Shared/Realtime/ConnectionSandboxTransportSideEffectsSpecs.cs +++ b/src/IO.Ably.Tests.Shared/Realtime/ConnectionSandboxTransportSideEffectsSpecs.cs @@ -15,112 +15,197 @@ namespace IO.Ably.Tests.Realtime [Trait("type", "integration")] public class ConnectionSandboxTransportSideEffectsSpecs : SandboxSpecs { - /* - * (RTN19b) If there are any pending channels i.e. in the ATTACHING or DETACHING state, - * the respective ATTACH or DETACH message should be resent to Ably - */ - [Theory(Skip = "Intermittently fails")] + [Theory] [ProtocolData] [Trait("spec", "RTN19b")] public async Task WithChannelInAttachingState_WhenTransportIsDisconnected_ShouldResendAttachMessageOnConnectionResumed(Protocol protocol) { - var channelName = "test-channel".AddRandomSuffix(); - var sentMessages = new List(); + var client = await GetRealtimeClient(protocol); + await client.WaitForState(ConnectionState.Connected); - int attachMessageCount = 0; + // Will be unblocked on new transport/connection + client.BlockActionFromSending(ProtocolMessage.MessageAction.Attach); - AblyRealtime client = null; - client = await GetRealtimeClient(protocol, (options, settings) => + var channel = client.Channels.Get("RTN19b".AddRandomSuffix()); + channel.Once(ChannelEvent.Attaching, change => { - options.TransportFactory = new TestTransportFactory - { - OnMessageSent = OnMessageSent, - }; + client.GetTestTransport().Close(suppressClosedEvent: false); }); + await channel.AttachAsync(); + + await client.WaitForState(ConnectionState.Disconnected); + channel.State.Should().Be(ChannelState.Attaching); + await client.WaitForState(ConnectionState.Connected); + + await channel.WaitForAttachedState(); + + client.Close(); + } + + [Theory] + [ProtocolData] + [Trait("spec", "RTN19b")] + [Trait("description", "detached only works if detach message is received on old transport")] + public async Task WithChannelInDetachingState_WhenTransportIsDisconnected_ShouldResendDetachMessageOnConnectionResumed(Protocol protocol) + { + var client = await GetRealtimeClient(protocol); + await client.WaitForState(ConnectionState.Connected); + + var channel = client.Channels.Get("RTN19b".AddRandomSuffix()); + await channel.AttachAsync(); - void OnMessageSent(ProtocolMessage message) + // Will be reset on new transport/connection + client.GetTestTransport().AfterMessageSent += message => { - sentMessages.Add(message); - if (message.Action == ProtocolMessage.MessageAction.Attach) + if (message.Action == ProtocolMessage.MessageAction.Detach) { - if (attachMessageCount == 0) - { - attachMessageCount++; - client.GetTestTransport().Close(suppressClosedEvent: false); - } + client.BlockActionFromReceiving(ProtocolMessage.MessageAction.Detached); + client.GetTestTransport().Close(suppressClosedEvent: false); } - } + }; - bool didDisconnect = false; - client.Connection.Once(ConnectionEvent.Disconnected, change => - { - didDisconnect = true; - }); + channel.Detach(); + await channel.WaitForState(ChannelState.Detaching); + + await client.WaitForState(ConnectionState.Disconnected); + channel.State.Should().Be(ChannelState.Detaching); + await client.WaitForState(ConnectionState.Connected); + + await channel.WaitForState(ChannelState.Detached, TimeSpan.FromSeconds(10)); + } + [Theory] + [ProtocolData] + [Trait("spec", "RTN19")] + [Trait("spec", "RTN19a1")] + public async Task OnConnected_ShouldResendAckWithConnectionMessageSerialIfResumeFailed(Protocol protocol) + { + var client = await GetRealtimeClient(protocol); await client.WaitForState(ConnectionState.Connected); + var initialConnectionId = client.Connection.Id; + var channel = client.Channels.Get("RTN19a".AddRandomSuffix()); + + var client2 = await GetRealtimeClient(protocol); + var channel2 = client2.Channels.Get(channel.Name); + await channel2.AttachAsync(); + var channel2Messages = new List(); + channel2.Subscribe(message => channel2Messages.Add(message)); + + // Sending dummy messages to increment messageSerial + await channel.PublishAsync("dummy1", "data1"); + await channel.PublishAsync("dummy2", "data2"); + + // This will be unblocked on new connection/transport. + client.BlockActionFromReceiving(ProtocolMessage.MessageAction.Ack); + client.BlockActionFromReceiving(ProtocolMessage.MessageAction.Nack); + + var noOfMessagesSent = 10; + var messageAckAwaiter = new TaskCompletionAwaiter(15000, noOfMessagesSent); + for (var i = 0; i < noOfMessagesSent; i++) + { + channel.Publish("eventName" + i, "data" + i, (success, error) => + { + if (success) + { + messageAckAwaiter.Tick(); + } + }); + } - var channel = client.Channels.Get(channelName); - channel.Attach(); + await client.ProcessCommands(); + client.State.WaitingForAck.Count.Should().Be(noOfMessagesSent); + var initialMessagesIdToSerialMap = client.GetTestTransport() + .ProtocolMessagesSent.FindAll(message => message.Channel == channel.Name).ToDictionary(m => m.Messages.First().Name, m => m.MsgSerial); - await channel.WaitForState(ChannelState.Attaching); + client.State.Connection.Id = string.Empty; + client.State.Connection.Key = "xxxxx!xxxxxxx-xxxxxxxx-xxxxxxxx"; // invalid connection key for next resume request + client.GetTestTransport().Close(false); - await client.WaitForState(ConnectionState.Disconnected); - await client.WaitForState(ConnectionState.Connecting); await client.WaitForState(ConnectionState.Connected); + client.Connection.Id.Should().NotBe(initialConnectionId); // resume not successful - client.Connection.State.Should().Be(ConnectionState.Connected); - didDisconnect.Should().BeTrue(); + // Ack received for all messages + var messagePublishSuccess = await messageAckAwaiter.Task; + messagePublishSuccess.Should().BeTrue(); - await channel.WaitForAttachedState(); + var newMessagesIdToSerialMap = client.GetTestTransport() + .ProtocolMessagesSent.FindAll(message => message.Channel == channel.Name).ToDictionary(m => m.Messages.First().Name, m => m.MsgSerial); - var attachCount = sentMessages.Count(x => x.Channel == channelName && x.Action == ProtocolMessage.MessageAction.Attach); - attachCount.Should().Be(2); + // Check for new messageSerial + foreach (var keyValuePair in newMessagesIdToSerialMap) + { + initialMessagesIdToSerialMap[keyValuePair.Key].Should().NotBe(keyValuePair.Value); + } + + // Duplicate messages received on second channel + channel2Messages.Count.Should().Be((noOfMessagesSent * 2) + 2); // add first 2 dummy messages client.Close(); } [Theory] [ProtocolData] - [Trait("spec", "RTN19b")] - [Trait("intermittent", "true")] // I think the logic behind resending the detach message has an issue - public async Task WithChannelInDetachingState_WhenTransportIsDisconnected_ShouldResendDetachMessageOnConnectionResumed(Protocol protocol) + [Trait("spec", "RTN19")] + [Trait("spec", "RTN19a2")] + public async Task OnConnected_ShouldResendAckWithSameMessageSerialIfResumeSuccessful(Protocol protocol) { - int detachMessageCount = 0; - AblyRealtime client = null; - var channelName = "test-channel".AddRandomSuffix(); + var client = await GetRealtimeClient(protocol); + await client.WaitForState(ConnectionState.Connected); + var initialConnectionId = client.State.Connection.Id; + var channel = client.Channels.Get("RTN19a".AddRandomSuffix()); - client = await GetRealtimeClient(protocol, (options, settings) => - { - options.TransportFactory = new TestTransportFactory - { - OnMessageSent = OnMessageSent, - }; - }); + var client2 = await GetRealtimeClient(protocol); + var channel2 = client2.Channels.Get(channel.Name); + await channel2.AttachAsync(); + var channel2Messages = new List(); + channel2.Subscribe(message => channel2Messages.Add(message)); + + await channel.PublishAsync("dummy1", "data1"); + await channel.PublishAsync("dummy2", "data2"); + + // This will be unblocked on new connection/transport. + client.BlockActionFromReceiving(ProtocolMessage.MessageAction.Ack); + client.BlockActionFromReceiving(ProtocolMessage.MessageAction.Nack); - void OnMessageSent(ProtocolMessage message) + var noOfMessagesSent = 10; + var messageAckAwaiter = new TaskCompletionAwaiter(15000, noOfMessagesSent); + for (var i = 0; i < noOfMessagesSent; i++) { - if (message.Action == ProtocolMessage.MessageAction.Detach) + channel.Publish("eventName" + i, "data" + i, (success, error) => { - if (detachMessageCount == 0) + if (success) { - detachMessageCount++; - client.GetTestTransport().Close(suppressClosedEvent: false); + messageAckAwaiter.Tick(); } - } + }); } + await client.ProcessCommands(); + client.State.WaitingForAck.Count.Should().Be(noOfMessagesSent); + var initialMessagesIdToSerialMap = client.GetTestTransport() + .ProtocolMessagesSent.FindAll(message => message.Channel == channel.Name).ToDictionary(m => m.Messages.First().Name, m => m.MsgSerial); + + client.GetTestTransport().Close(false); // same connectionKey for next request await client.WaitForState(ConnectionState.Connected); + client.Connection.Id.Should().Be(initialConnectionId); // resume success - var channel = client.Channels.Get(channelName); - await channel.AttachAsync(); - channel.Detach(); - await channel.WaitForState(ChannelState.Detaching); + // Ack received for all messages after reconnection + var messagePublishSuccess = await messageAckAwaiter.Task; + messagePublishSuccess.Should().BeTrue(); - await client.WaitForState(ConnectionState.Disconnected); - channel.State.Should().Be(ChannelState.Detaching); - await client.WaitForState(ConnectionState.Connected); + var newMessagesIdToSerialMap = client.GetTestTransport() + .ProtocolMessagesSent.FindAll(message => message.Channel == channel.Name).ToDictionary(m => m.Messages.First().Name, m => m.MsgSerial); - await channel.WaitForState(ChannelState.Detached, TimeSpan.FromSeconds(10)); + // Check for same messageSerial + foreach (var keyValuePair in newMessagesIdToSerialMap) + { + initialMessagesIdToSerialMap[keyValuePair.Key].Should().Be(keyValuePair.Value); + } + + // No duplicates found on client2 channel + channel2Messages.Count.Should().Be(noOfMessagesSent + 2); // first 2 dummy messages + + client.Close(); } public ConnectionSandboxTransportSideEffectsSpecs(AblySandboxFixture fixture, ITestOutputHelper output) diff --git a/src/IO.Ably.Tests.Shared/Realtime/ConnectionSpecs/AblyRealtimeTestExtensions.cs b/src/IO.Ably.Tests.Shared/Realtime/ConnectionSpecs/AblyRealtimeTestExtensions.cs index 8724d2ba2..8950eb467 100644 --- a/src/IO.Ably.Tests.Shared/Realtime/ConnectionSpecs/AblyRealtimeTestExtensions.cs +++ b/src/IO.Ably.Tests.Shared/Realtime/ConnectionSpecs/AblyRealtimeTestExtensions.cs @@ -37,8 +37,7 @@ public static async Task ConnectClient(this AblyRealtime client) client.FakeProtocolMessageReceived(new ProtocolMessage(ProtocolMessage.MessageAction.Connected) { ConnectionDetails = new ConnectionDetails { ConnectionKey = "connectionKey" }, - ConnectionId = "1", - ConnectionSerial = 100 + ConnectionId = "1" }); await client.WaitForState(ConnectionState.Connected); diff --git a/src/IO.Ably.Tests.Shared/Realtime/ConnectionSpecs/ConnectionFailureSpecs.cs b/src/IO.Ably.Tests.Shared/Realtime/ConnectionSpecs/ConnectionFailureSpecs.cs index 417f0a58f..32794695d 100644 --- a/src/IO.Ably.Tests.Shared/Realtime/ConnectionSpecs/ConnectionFailureSpecs.cs +++ b/src/IO.Ably.Tests.Shared/Realtime/ConnectionSpecs/ConnectionFailureSpecs.cs @@ -302,12 +302,12 @@ public async Task WhenInDisconnectedState_ReconnectUsingIncrementalBackoffTimeou // Upper bound = min((retryAttempt + 2) / 3, 2) * initialTimeout // Lower bound = 0.8 * Upper bound - disconnectedRetryTimeouts[0].Should().BeInRange(4, 5); - disconnectedRetryTimeouts[1].Should().BeInRange(5.33, 6.66); - disconnectedRetryTimeouts[2].Should().BeInRange(6.66, 8.33); + disconnectedRetryTimeouts[0].Should().BeInRange(4, 5 + 0.20); + disconnectedRetryTimeouts[1].Should().BeInRange(5.33, 6.66 + 0.20); + disconnectedRetryTimeouts[2].Should().BeInRange(6.66, 8.33 + 0.20); for (var i = 3; i < disconnectedRetryTimeouts.Count; i++) { - disconnectedRetryTimeouts[i].Should().BeInRange(8, 10); + disconnectedRetryTimeouts[i].Should().BeInRange(8, 10 + 0.20); } } diff --git a/src/IO.Ably.Tests.Shared/Realtime/ConnectionSpecs/ConnectionFailuresOnceConnectedSpecs.cs b/src/IO.Ably.Tests.Shared/Realtime/ConnectionSpecs/ConnectionFailuresOnceConnectedSpecs.cs index 8df44792a..e0da14480 100644 --- a/src/IO.Ably.Tests.Shared/Realtime/ConnectionSpecs/ConnectionFailuresOnceConnectedSpecs.cs +++ b/src/IO.Ably.Tests.Shared/Realtime/ConnectionSpecs/ConnectionFailuresOnceConnectedSpecs.cs @@ -200,8 +200,6 @@ public async Task WhenTransportCloses_ShouldResumeConnection() var firstTransport = LastCreatedTransport; var connectionKey = client.Connection.Key; - Debug.Assert(client.Connection.Serial.HasValue, "Expected a serial number, got null"); - var serial = client.Connection.Serial.Value; LastCreatedTransport.Listener.OnTransportEvent(LastCreatedTransport.Id, TransportState.Closed); await client.WaitForState(ConnectionState.Connecting); @@ -213,19 +211,16 @@ public async Task WhenTransportCloses_ShouldResumeConnection() var urlParams = LastCreatedTransport.Parameters.GetParams(); urlParams.Should().ContainKey("resume") .WhoseValue.Should().Be(connectionKey); - urlParams.Should().ContainKey("connection_serial") - .WhoseValue.Should().Be(serial.ToString()); LastCreatedTransport.Should().NotBeSameAs(firstTransport); } [Fact] - [Trait("spec", "RTN15f")] - public async Task AckMessagesAreFailedWhenConnectionIsDroppedAndNotResumed() + [Trait("spec", "RTN15a")] + public async Task AckMessagesAreSentWhenConnectionIsDroppedAndNotResumed() { var client = await SetupConnectedClient(); List callbackResults = new List(); - void Callback(bool b, ErrorInfo info) => callbackResults.Add(b); client.ConnectionManager.Send(new ProtocolMessage(ProtocolMessage.MessageAction.Message), Callback); @@ -236,16 +231,12 @@ public async Task AckMessagesAreFailedWhenConnectionIsDroppedAndNotResumed() client.State.WaitingForAck.Should().HaveCount(2); await CloseAndWaitToReconnect(client); - - LastCreatedTransport.SentMessages.Should().BeEmpty(); - client.State.WaitingForAck.Should().BeEmpty(); - - callbackResults.Should().HaveCount(2); - callbackResults.All(x => x == false).Should().BeTrue(); + client.State.WaitingForAck.Should().HaveCount(2); + LastCreatedTransport.SentMessages.Should().HaveCount(2); } [Fact] - [Trait("spec", "RTN15f")] + [Trait("spec", "RTN15a")] public async Task AckMessagesAreResentWhenConnectionIsDroppedAndResumed() { var client = await SetupConnectedClient(); diff --git a/src/IO.Ably.Tests.Shared/Realtime/ConnectionSpecs/ConnectionFallbackSpecs.cs b/src/IO.Ably.Tests.Shared/Realtime/ConnectionSpecs/ConnectionFallbackSpecs.cs index df9fe2261..ad312a2ef 100644 --- a/src/IO.Ably.Tests.Shared/Realtime/ConnectionSpecs/ConnectionFallbackSpecs.cs +++ b/src/IO.Ably.Tests.Shared/Realtime/ConnectionSpecs/ConnectionFallbackSpecs.cs @@ -120,8 +120,7 @@ public async Task WithFallbackHost_ShouldMakeRestRequestsOnSameHost() client.FakeProtocolMessageReceived(new ProtocolMessage(ProtocolMessage.MessageAction.Connected) { ConnectionDetails = new ConnectionDetails { ConnectionKey = "connectionKey" }, - ConnectionId = "1", - ConnectionSerial = 100 + ConnectionId = "1" }); await client.WaitForState(ConnectionState.Connected); @@ -185,8 +184,7 @@ HttpResponseMessage GetResponse(HttpRequestMessage request) client.FakeProtocolMessageReceived(new ProtocolMessage(ProtocolMessage.MessageAction.Connected) { ConnectionDetails = new ConnectionDetails { ConnectionKey = "connectionKey" }, - ConnectionId = "1", - ConnectionSerial = 100 + ConnectionId = "1" }); await client.WaitForState(ConnectionState.Connected); diff --git a/src/IO.Ably.Tests.Shared/Realtime/ConnectionSpecs/ConnectionRecoverySpecs.cs b/src/IO.Ably.Tests.Shared/Realtime/ConnectionSpecs/ConnectionRecoverySpecs.cs index 92da1671a..25d400647 100644 --- a/src/IO.Ably.Tests.Shared/Realtime/ConnectionSpecs/ConnectionRecoverySpecs.cs +++ b/src/IO.Ably.Tests.Shared/Realtime/ConnectionSpecs/ConnectionRecoverySpecs.cs @@ -1,7 +1,11 @@ +using System.Collections.Generic; +using System.IO; +using System.Linq; using System.Threading.Tasks; using FluentAssertions; using IO.Ably.Realtime; -using IO.Ably.Transport; +using IO.Ably.Realtime.Workflow; +using IO.Ably.Tests.Infrastructure; using IO.Ably.Types; using Xunit; using Xunit.Abstractions; @@ -11,67 +15,108 @@ namespace IO.Ably.Tests.Realtime.ConnectionSpecs public class ConnectionRecoverySpecs : AblyRealtimeSpecs { [Fact] - [Trait("spec", "RTN16c")] - public async Task WhenConnectionIsClosed_ConnectionIdAndKeyShouldBeReset() + [Trait("spec", "RTN16g")] + [Trait("spec", "RTN16g1")] + public async Task CreateRecoveryKey_ShouldReturnSerializedConnectionKeyAndMsgSerialAndChannelSerials() { - var client = await GetConnectedClient(); + const string expectedRecoveryKey = "{\"connectionKey\":\"connectionKey\",\"msgSerial\":0,\"channelSerials\":{}}"; - client.Close(); + var client = GetClientWithFakeTransport(); + var connectedProtocolMessage = new ProtocolMessage(ProtocolMessage.MessageAction.Connected) + { + ConnectionDetails = new ConnectionDetails { ConnectionKey = "connectionKey" }, + ConnectionId = "1" + }; + client.FakeProtocolMessageReceived(connectedProtocolMessage); + await client.WaitForState(ConnectionState.Connected); + + client.Connection.CreateRecoveryKey().Should().Be(expectedRecoveryKey); + } + + [Fact] + [Trait("spec", "RTN16g2")] + public async Task CreateRecoveryKey_ShouldReturnNullRecoveryKeyForNullConnectionKeyOrWhenStateIsClosed() + { + var client = GetClientWithFakeTransport(); + client.Connection.CreateRecoveryKey().Should().BeNullOrEmpty(); // connectionKey is empty - client.FakeProtocolMessageReceived(new ProtocolMessage(ProtocolMessage.MessageAction.Closed)); + client.FakeProtocolMessageReceived(ConnectedProtocolMessage); + await client.WaitForState(ConnectionState.Connected); + client.Connection.CreateRecoveryKey().Should().NotBeNullOrEmpty(); + client.Close(); await client.WaitForState(ConnectionState.Closed); - client.Connection.Id.Should().BeNullOrEmpty(); - client.Connection.Key.Should().BeNullOrEmpty(); + client.Connection.CreateRecoveryKey().Should().BeNullOrEmpty(); } [Fact] - [Trait("spec", "RTN16b")] - public async Task RecoveryKey_ShouldBeConnectionKeyPlusConnectionSerialPlusMsgSerial() + [Trait("spec", "RTN16m")] + [System.Obsolete] + public async Task DeprecatedRecoveryKeyProperty_ShouldBehaveSameAsCreateRecoveryKey() { - var client = await GetConnectedClient(); - client.Connection.RecoveryKey.Should().Be($"{client.Connection.Key}:{client.Connection.Serial}:{client.Connection.MessageSerial}"); + const string expectedRecoveryKey = "{\"connectionKey\":\"connectionKey\",\"msgSerial\":0,\"channelSerials\":{}}"; + + var client = GetClientWithFakeTransport(); + var connectedProtocolMessage = new ProtocolMessage(ProtocolMessage.MessageAction.Connected) + { + ConnectionDetails = new ConnectionDetails { ConnectionKey = "connectionKey" }, + ConnectionId = "1" + }; + client.FakeProtocolMessageReceived(connectedProtocolMessage); + await client.WaitForState(ConnectionState.Connected); + + client.Connection.RecoveryKey.Should().Be(expectedRecoveryKey); } [Fact] + [Trait("spec", "RTN16i")] [Trait("spec", "RTN16f")] + [Trait("spec", "RTN16j")] + [Trait("spec", "RTN16k")] public async Task RecoveryKey_MsgSerialShouldNotBeSentToAblyButShouldBeSetOnConnection() { - // RecoveryKey should be in the format - // LettersOrNumbers:Number:Number - TransportParams.RecoveryKeyRegex.Match("a:b:c").Success.Should().BeFalse(); - TransportParams.RecoveryKeyRegex.Match("a:b:3").Success.Should().BeFalse(); - TransportParams.RecoveryKeyRegex.Match("a:2:c").Success.Should().BeFalse(); - TransportParams.RecoveryKeyRegex.Match("$1:2:3").Success.Should().BeFalse(); - TransportParams.RecoveryKeyRegex.Match("$a:2:3").Success.Should().BeFalse(); - TransportParams.RecoveryKeyRegex.Match("a:@2:3").Success.Should().BeFalse(); - TransportParams.RecoveryKeyRegex.Match("a:2:3!").Success.Should().BeFalse(); - - // these should be valid - TransportParams.RecoveryKeyRegex.Match("1:2:3").Success.Should().BeTrue(); - TransportParams.RecoveryKeyRegex.Match("a:2:3").Success.Should().BeTrue(); - - const string recoveryKey = "abcxyz:100:99"; - var match = TransportParams.RecoveryKeyRegex.Match(recoveryKey); - match.Success.Should().BeTrue(); - match.Groups[1].Value.Should().Be("abcxyz"); - match.Groups[2].Value.Should().Be("100"); - match.Groups[3].Value.Should().Be("99"); - - var parts = recoveryKey.Split(':'); - - var client = GetRealtimeClient(options => { options.Recover = recoveryKey; }); + var recoveryKey = + "{\"connectionKey\":\"uniqueKey\",\"msgSerial\":45,\"channelSerials\":{\"channel1\":\"1\",\"channel2\":\"2\",\"channel3\":\"3\"}}"; + var client = GetClientWithFakeTransport(options => { options.Recover = recoveryKey; }); var transportParams = await client.ConnectionManager.CreateTransportParameters("https://realtime.ably.io"); var paramsDict = transportParams.GetParams(); paramsDict.ContainsKey("recover").Should().BeTrue(); - paramsDict.ContainsKey("connection_serial").Should().BeTrue(); - paramsDict.ContainsKey("msg_serial").Should().BeFalse(); + paramsDict["recover"].Should().Be("uniqueKey"); + + client.FakeProtocolMessageReceived(ConnectedProtocolMessage); + await client.WaitForState(ConnectionState.Connected); + + client.Connection.MessageSerial.Should().Be(45); + client.Channels.Count().Should().Be(3); + var channelCounter = 1; + foreach (var realtimeChannel in client.Channels.OrderBy(channel => channel.Name)) + { + realtimeChannel.Name.Should().Be($"channel{channelCounter}"); + realtimeChannel.Properties.ChannelSerial.Should().Be($"{channelCounter}"); + channelCounter++; + } + + // Recover should be set to null once used + client.Options.Recover.Should().BeNull(); + + client.Connection.InnerState.MessageSerial = 0; + client.Channels.ReleaseAll(); + + client.ExecuteCommand(SetDisconnectedStateCommand.Create(null, true)); + await client.WaitForState(ConnectionState.Disconnected); + await client.WaitForState(ConnectionState.Connecting); + + transportParams = await client.ConnectionManager.CreateTransportParameters("https://realtime.ably.io"); + paramsDict = transportParams.GetParams(); + paramsDict.ContainsKey("recover").Should().BeFalse(); // recover param should be empty for next attempt - paramsDict["recover"].Should().Be(parts[0]); - paramsDict["connection_serial"].Should().Be(parts[1]); + client.FakeProtocolMessageReceived(ConnectedProtocolMessage); + await client.WaitForState(ConnectionState.Connected); - client.Connection.MessageSerial.Should().Be(99); + // Recover options shouldn't be used for next retry + client.Connection.MessageSerial.Should().Be(0); + client.Channels.Count().Should().Be(0); } public ConnectionRecoverySpecs(ITestOutputHelper output) diff --git a/src/IO.Ably.Tests.Shared/Realtime/ConnectionSpecs/ConnectionSerialSpecs.cs b/src/IO.Ably.Tests.Shared/Realtime/ConnectionSpecs/ConnectionSerialSpecs.cs deleted file mode 100644 index e3ccd7898..000000000 --- a/src/IO.Ably.Tests.Shared/Realtime/ConnectionSpecs/ConnectionSerialSpecs.cs +++ /dev/null @@ -1,79 +0,0 @@ -using System.Threading.Tasks; -using FluentAssertions; -using IO.Ably.Realtime; -using IO.Ably.Types; -using Xunit; -using Xunit.Abstractions; - -namespace IO.Ably.Tests.Realtime -{ - [Trait("spec", "RTN10")] - public class ConnectionSerialSpecs : AblyRealtimeSpecs - { - [Fact] - [Trait("spec", "RTN10a")] - public async Task OnceConnected_ConnectionSerialShouldBeMinusOne() - { - var client = GetClientWithFakeTransport(); - client.FakeProtocolMessageReceived(new ProtocolMessage(ProtocolMessage.MessageAction.Connected)); - await client.WaitForState(ConnectionState.Connected); - - client.Connection.Serial.Should().Be(-1); - } - - [Fact] - [Trait("spec", "RTN10c")] - public async Task WhenRestoringConnection_UsesLastKnownConnectionSerial() - { - // Arrange - var client = GetClientWithFakeTransport(); - const long targetSerial = 1234567; - client.State.Connection.Serial = targetSerial; - - // Act - var transportParams = await client.ConnectionManager.CreateTransportParameters("https://realtime.ably.io"); - - transportParams.ConnectionSerial.Should().Be(targetSerial); - } - - [Fact] - [Trait("spec", "RTN10b")] - public async Task WhenProtocolMessageWithSerialReceived_SerialShouldUpdate() - { - // Arrange - var client = GetClientWithFakeTransport(); - client.FakeProtocolMessageReceived(new ProtocolMessage(ProtocolMessage.MessageAction.Connected)); - - await client.WaitForState(ConnectionState.Connected); - client.FakeProtocolMessageReceived(new ProtocolMessage(ProtocolMessage.MessageAction.Message) - { - ConnectionSerial = 123456 - }); - - await client.ProcessCommands(); - - // Act - client.Connection.Serial.Should().Be(123456); - } - - [Fact] - [Trait("spec", "RTN10b")] - public void WhenProtocolMessageWithOUTSerialReceived_SerialShouldNotUpdate() - { - // Arrange - var client = GetClientWithFakeTransport(); - client.FakeProtocolMessageReceived(new ProtocolMessage(ProtocolMessage.MessageAction.Connected)); - var initialSerial = client.Connection.Serial; - - client.FakeProtocolMessageReceived(new ProtocolMessage(ProtocolMessage.MessageAction.Message)); - - // Act - client.Connection.Serial.Should().Be(initialSerial); - } - - public ConnectionSerialSpecs(ITestOutputHelper output) - : base(output) - { - } - } -} diff --git a/src/IO.Ably.Tests.Shared/Realtime/PresenceSandboxSpecs.cs b/src/IO.Ably.Tests.Shared/Realtime/PresenceSandboxSpecs.cs index ae976710f..f940e9221 100644 --- a/src/IO.Ably.Tests.Shared/Realtime/PresenceSandboxSpecs.cs +++ b/src/IO.Ably.Tests.Shared/Realtime/PresenceSandboxSpecs.cs @@ -2,7 +2,6 @@ using System.Collections.Generic; using System.Diagnostics; using System.Linq; -using System.Threading; using System.Threading.Tasks; using IO.Ably.Realtime; @@ -82,7 +81,7 @@ public async Task WhenAttachingToAChannelWithNoMembers_PresenceShouldBeConsidere await channel.WaitForAttachedState(); channel.State.Should().Be(ChannelState.Attached); - channel.Presence.SyncComplete.Should().BeTrue(); + channel.Presence.IsSyncComplete.Should().BeTrue(); } [Theory] @@ -95,33 +94,31 @@ public async Task WhenAttachingToAChannelWithMembers_PresenceShouldBeInProgress( var client2 = await GetRealtimeClient(protocol); var channel = GetChannel(client, testChannel); - List tasks = new List(); - for (int count = 1; count < 10; count++) + for (var count = 1; count < 10; count++) { - tasks.Add(channel.Presence.EnterClientAsync($"client-{count}", null)); + await channel.Presence.EnterClientAsync($"client-{count}", null); } - await Task.WhenAll(tasks.ToArray()); - var channel2 = GetChannel(client2, testChannel); - int inSync = 0; - int syncComplete = 0; + var awaiter = new TaskCompletionAwaiter(); + bool syncInProgress = false, syncComplete = false; channel2.InternalStateChanged += (_, args) => { if (args.Current == ChannelState.Attached) { - Logger.Debug("Test: Setting inSync to - " + channel2.Presence.Map.IsSyncInProgress); - Interlocked.Add(ref inSync, channel2.Presence.Map.IsSyncInProgress ? 1 : 0); - Interlocked.Add(ref syncComplete, channel2.Presence.InternalSyncComplete ? 1 : 0); + Logger.Debug("Test: Setting inSync to - " + channel2.Presence.MembersMap.SyncInProgress); + syncInProgress = channel2.Presence.IsSyncInProgress; + syncComplete = channel2.Presence.IsSyncComplete; + awaiter.SetCompleted(); } }; - await channel2.AttachAsync(); - await Task.Delay(1000); - inSync.Should().Be(1); - syncComplete.Should().Be(0); + await awaiter.Task; + + syncInProgress.Should().Be(true); + syncComplete.Should().Be(false); } /* @@ -312,10 +309,10 @@ public async Task PresenceMapBehaviour_ShouldConformToSpec(Protocol protocol) } } - [Theory(Skip = "Intermittently fails")] + [Theory(Skip = "Fails for last assertion, rest channel not retrieving updated presence")] [InlineData(Protocol.Json, 30)] // Wait for 30 seconds [InlineData(Protocol.Json, 60)] // Wait for 1 minute - [Trait("spec", "RTP17e")] + [Trait("spec", "RTP17f")] public async Task Presence_ShouldReenterPresenceAfterAConnectionLoss(Protocol protocol, int waitInSeconds) { var channelName = "RTP17e".AddRandomSuffix(); @@ -341,47 +338,53 @@ public async Task Presence_ShouldReenterPresenceAfterAConnectionLoss(Protocol pr IRealtimeClient rt, IRestClient rest) { - var rtChannel = rt.Channels.Get(channelName); - - var rChannel = rest.Channels.Get(channelName); + var realtimeChan = rt.Channels.Get(channelName); - await rtChannel.Presence.EnterAsync(); - await rtChannel.WaitForAttachedState(); - _ = await rtChannel.Presence.WaitSync(); + var restChan = rest.Channels.Get(channelName); - return (rtChannel, rChannel); - } + await realtimeChan.Presence.EnterAsync(); + await realtimeChan.WaitForAttachedState(); + _ = await realtimeChan.Presence.WaitSync(); - async Task HasRestPresence(IRestChannel rChannel) - { - var result = await rChannel.Presence.GetAsync(); - return result.Items.Exists(message => - message.ClientId.EqualsTo("martin")); + return (realtimeChan, restChan); } - Task Sleep(int seconds) => Task.Delay(seconds * 1000); - - async Task WaitForNoPresenceOnChannel(IRestChannel rChannel) + async Task WaitForRestPresence(IRestChannel restChan, bool shouldBePresent = true) { int count = 0; while (true) { - bool hasPresence = await HasRestPresence(rChannel); - - if (count > 30) + var result = await restChan.Presence.GetAsync(); + var hasPresence = result.Items.Exists(message => + message.ClientId.EqualsTo("martin")); + if (shouldBePresent && hasPresence) { - throw new AssertionFailedException("After 1 minute of trying we still have presence. Not good."); + break; } - if (hasPresence == false) + if (!shouldBePresent && !hasPresence) { break; } + if (count > 30) + { + return false; + } + await Sleep(2); count++; } + + return true; + } + + Task Sleep(int seconds) => Task.Delay(seconds * 1000); + + async Task WaitForNoPresenceOnChannel(IRestChannel rChannel) + { + return await WaitForRestPresence(rChannel, false); } // arrange @@ -392,7 +395,7 @@ async Task WaitForNoPresenceOnChannel(IRestChannel rChannel) try { // act - (await HasRestPresence(restChannel)).Should().BeTrue(); + (await WaitForRestPresence(restChannel)).Should().BeTrue(); // Kill the transport but don't tell the library testTransport.Close(); @@ -407,13 +410,10 @@ async Task WaitForNoPresenceOnChannel(IRestChannel rChannel) await realtimeClient.WaitForState(ConnectionState.Disconnected); await realtimeClient.WaitForState(ConnectionState.Connected); await realtimeChannel.WaitForAttachedState(); - _ = await realtimeChannel.Presence.WaitSync(); - - // Wait for a second because the Rest call returns [] if done straight away - await Sleep(1); + var messages = await realtimeChannel.Presence.GetAsync(); + messages.Count().Should().Be(1); - // assert - (await HasRestPresence(restChannel)).Should().BeTrue(); + (await WaitForRestPresence(restChannel)).Should().BeTrue(); } finally { @@ -422,6 +422,93 @@ async Task WaitForNoPresenceOnChannel(IRestChannel rChannel) } } + [Theory] + [ProtocolData] + [Trait("spec", "RTP17f")] + public async Task OnAttach_ShouldEnterMembersFromInternalMap(Protocol protocol) + { + var channelName = "RTP17c2".AddRandomSuffix(); + var setupClient = await GetRealtimeClient(protocol); + var setupChannel = setupClient.Channels.Get(channelName); + + // enter 3 client to the channel + for (int i = 0; i < 3; i++) + { + await setupChannel.Presence.EnterClientAsync($"member_{i}", null); + } + + var client = await GetRealtimeClient(protocol, (options, settings) => { options.ClientId = "local"; }); + await client.WaitForState(); + var channel = client.Channels.Get(channelName); + var presence = channel.Presence; + + var p = await presence.GetAsync(); + p.Should().HaveCount(3); + + await presence.EnterAsync(); + + await Task.Delay(250); + presence.MembersMap.Members.Should().HaveCount(4); + presence.InternalMembersMap.Members.Should().HaveCount(1); + + List leaveMessages = new List(); + PresenceMessage updateMessage = null; + PresenceMessage enterMessage = null; + await WaitForMultiple(2, partialDone => + { + presence.Subscribe(PresenceAction.Leave, message => + { + leaveMessages.Add(message); + }); + presence.Subscribe(PresenceAction.Update, message => + { + updateMessage = message; + partialDone(); // 1 call + }); + presence.Subscribe(PresenceAction.Enter, message => + { + enterMessage = message; // not expected to hit + }); + client.GetTestTransport().AfterDataReceived = message => + { + if (message.Action == ProtocolMessage.MessageAction.Attached) + { + bool hasPresence = message.HasFlag(ProtocolMessage.Flag.HasPresence); + hasPresence.Should().BeFalse(); + bool resumed = message.HasFlag(ProtocolMessage.Flag.Resumed); + resumed.Should().BeTrue(); + client.GetTestTransport().AfterDataReceived = _ => { }; + partialDone(); // 1 call + } + }; + // inject duplicate attached message with resume flag ( no RTL12 message loss event) + var protocolMessage = new ProtocolMessage(ProtocolMessage.MessageAction.Attached) + { + Channel = channelName, + Flags = 0, + }; + protocolMessage.SetFlag(ProtocolMessage.Flag.Resumed); + protocolMessage.HasFlag(ProtocolMessage.Flag.Resumed).Should().BeTrue(); + client.GetTestTransport().FakeReceivedMessage(protocolMessage); + }); + + leaveMessages.Should().HaveCount(4); + foreach (var msg in leaveMessages) + { + msg.ClientId.Should().BeOneOf("member_0", "member_1", "member_2", "local"); + } + + updateMessage.Should().NotBeNull(); + updateMessage.ClientId.Should().Be("local"); + enterMessage.Should().BeNull(); + + presence.Unsubscribe(); + var remainingMembers = await presence.GetAsync(); + + remainingMembers.Should().HaveCount(1); + remainingMembers.First().ClientId.Should().Be("local"); + } + [Theory] [ProtocolData] [Trait("spec", "RTP17")] @@ -451,7 +538,6 @@ public async Task Presence_ShouldHaveInternalMapForCurrentConnectionId(Protocol channelB.State.Should().Be(ChannelState.Attaching); await channelB.WaitForAttachedState(); channelB.State.Should().Be(ChannelState.Attached); - // ENTER PresenceMessage msgA = null, msgB = null; await WaitForMultiple(2, partialDone => @@ -459,12 +545,14 @@ await WaitForMultiple(2, partialDone => channelA.Presence.Subscribe(msg => { msgA = msg; + channelA.Presence.Unsubscribe(); partialDone(); }); channelB.Presence.Subscribe(msg => { msgB = msg; + channelB.Presence.Unsubscribe(); partialDone(); }); @@ -474,15 +562,15 @@ await WaitForMultiple(2, partialDone => msgA.Should().NotBeNull(); msgA.Action.Should().Be(PresenceAction.Enter); msgA.ConnectionId.Should().Be(clientA.Connection.Id); - channelA.Presence.Map.Members.Should().HaveCount(1); - channelA.Presence.InternalMap.Members.Should().HaveCount(1); + channelA.Presence.MembersMap.Members.Should().HaveCount(1); + channelA.Presence.InternalMembersMap.Members.Should().HaveCount(1); channelA.Presence.Unsubscribe(); msgB.Should().NotBeNull(); msgB.Action.Should().Be(PresenceAction.Enter); msgB.ConnectionId.Should().NotBe(clientB.Connection.Id); - channelB.Presence.Map.Members.Should().HaveCount(1); - channelB.Presence.InternalMap.Members.Should().HaveCount(0); + channelB.Presence.MembersMap.Members.Should().HaveCount(1); + channelB.Presence.InternalMembersMap.Members.Should().HaveCount(0); channelB.Presence.Unsubscribe(); msgA = null; @@ -509,14 +597,14 @@ await WaitForMultiple(2, partialDone => msgA.Should().NotBeNull(); msgA.Action.Should().Be(PresenceAction.Enter); msgA.ConnectionId.Should().NotBe(clientA.Connection.Id); - channelA.Presence.Map.Members.Should().HaveCount(2); - channelA.Presence.InternalMap.Members.Should().HaveCount(1); + channelA.Presence.MembersMap.Members.Should().HaveCount(2); + channelA.Presence.InternalMembersMap.Members.Should().HaveCount(1); msgB.Should().NotBeNull(); msgB.Action.Should().Be(PresenceAction.Enter); msgB.ConnectionId.Should().Be(clientB.Connection.Id); - channelB.Presence.Map.Members.Should().HaveCount(2); - channelB.Presence.InternalMap.Members.Should().HaveCount(1); + channelB.Presence.MembersMap.Members.Should().HaveCount(2); + channelB.Presence.InternalMembersMap.Members.Should().HaveCount(1); // UPDATE msgA = null; @@ -544,28 +632,28 @@ await WaitForMultiple(2, partialDone => msgA.Action.Should().Be(PresenceAction.Update); msgA.ConnectionId.Should().NotBe(clientA.Connection.Id); msgA.Data.ToString().Should().Be("chB-update"); - channelA.Presence.Map.Members.Should().HaveCount(2); - channelA.Presence.InternalMap.Members.Should().HaveCount(1); + channelA.Presence.MembersMap.Members.Should().HaveCount(2); + channelA.Presence.InternalMembersMap.Members.Should().HaveCount(1); msgB.Should().NotBeNull(); msgB.Action.Should().Be(PresenceAction.Update); msgB.ConnectionId.Should().Be(clientB.Connection.Id); msgB.Data.ToString().Should().Be("chB-update"); - channelB.Presence.Map.Members.Should().HaveCount(2); - channelB.Presence.InternalMap.Members.Should().HaveCount(1); + channelB.Presence.MembersMap.Members.Should().HaveCount(2); + channelB.Presence.InternalMembersMap.Members.Should().HaveCount(1); // LEAVE with synthesized message msgA = null; msgB = null; var synthesizedMsg = new PresenceMessage(PresenceAction.Leave, clientB.ClientId) { ConnectionId = null }; synthesizedMsg.IsSynthesized().Should().BeTrue(); - channelB.Presence.OnPresence(new[] { synthesizedMsg }, null); + channelB.Presence.OnPresence(new[] { synthesizedMsg }); msgB.Should().BeNull(); - channelB.Presence.Map.Members.Should().HaveCount(2); + channelB.Presence.MembersMap.Members.Should().HaveCount(2); // message was synthesized so should not have been removed (RTP17b) - channelB.Presence.InternalMap.Members.Should().HaveCount(1); + channelB.Presence.InternalMembersMap.Members.Should().HaveCount(1); // LEAVE msgA = null; @@ -593,15 +681,15 @@ await WaitForMultiple(2, partialDone => msgA.Action.Should().Be(PresenceAction.Leave); msgA.ConnectionId.Should().NotBe(clientA.Connection.Id); msgA.Data.ToString().Should().Be("chB-leave"); - channelA.Presence.Map.Members.Should().HaveCount(1); - channelA.Presence.InternalMap.Members.Should().HaveCount(1); + channelA.Presence.MembersMap.Members.Should().HaveCount(1); + channelA.Presence.InternalMembersMap.Members.Should().HaveCount(1); msgB.Should().NotBeNull(); msgB.Action.Should().Be(PresenceAction.Leave); msgB.ConnectionId.Should().Be(clientB.Connection.Id); msgB.Data.ToString().Should().Be("chB-leave"); - channelB.Presence.Map.Members.Should().HaveCount(1); - channelB.Presence.InternalMap.Members.Should().HaveCount(0); + channelB.Presence.MembersMap.Members.Should().HaveCount(1); + channelB.Presence.InternalMembersMap.Members.Should().HaveCount(0); // clean up clientA.Close(); @@ -630,8 +718,8 @@ public async Task Presence_ShouldPublishAllMembersForTheCurrentConnection(Protoc var members = await channel.Presence.GetAsync(); members.Should().HaveCount(1); - channel.Presence.Map.Members.Should().HaveCount(1); - channel.Presence.InternalMap.Members.Should().HaveCount(1); + channel.Presence.MembersMap.Members.Should().HaveCount(1); + channel.Presence.InternalMembersMap.Members.Should().HaveCount(1); } [Theory] @@ -663,13 +751,13 @@ public async Task PresenceMap_WhenNotSyncingAndLeaveActionArrivesMemberKeyShould // sync should not be in progress and initial an sync should have completed channel.Presence.IsSyncInProgress.Should().BeFalse("sync should have completed"); - channel.Presence.SyncComplete.Should().BeTrue(); + channel.Presence.IsSyncComplete.Should().BeTrue(); // pull a random member key from the presence map var memberNumber = new Random().Next(0, 19); var memberId = $"member_{memberNumber}"; var expectedMemberKey = $"{memberId}:{setupClient.Connection.Id}"; - var actualMemberKey = channel.Presence.Map.Members[expectedMemberKey].MemberKey; + var actualMemberKey = channel.Presence.MembersMap.Members[expectedMemberKey].MemberKey; actualMemberKey.Should().Be(expectedMemberKey); @@ -687,8 +775,8 @@ await WaitFor(done => // then assert that the member has left leftClientId.Should().Be(memberId); - channel.Presence.Map.Members.Should().HaveCount(19); - channel.Presence.Map.Members.ContainsKey(actualMemberKey).Should().BeFalse(); + channel.Presence.MembersMap.Members.Should().HaveCount(19); + channel.Presence.MembersMap.Members.ContainsKey(actualMemberKey).Should().BeFalse(); } [Theory] @@ -959,7 +1047,7 @@ public async Task await client.ProcessCommands(); - channel.Presence.Map.Members.Should().HaveCount(1); + channel.Presence.MembersMap.Members.Should().HaveCount(1); var localMessage = new PresenceMessage { @@ -972,9 +1060,9 @@ public async Task }; // inject a member directly into the local PresenceMap - channel.Presence.Map.Members[localMessage.MemberKey] = localMessage; - channel.Presence.Map.Members.Should().HaveCount(2); - channel.Presence.Map.Members.ContainsKey(localMessage.MemberKey).Should().BeTrue(); + channel.Presence.MembersMap.Members[localMessage.MemberKey] = localMessage; + channel.Presence.MembersMap.Members.Should().HaveCount(2); + channel.Presence.MembersMap.Members.ContainsKey(localMessage.MemberKey).Should().BeTrue(); var members = (await channel.Presence.GetAsync()).ToArray(); members.Should().HaveCount(2); @@ -1060,9 +1148,9 @@ all members in the PresenceMap should be removed as there are no members present }; // inject a members directly into the local PresenceMap - channel.Presence.Map.Members[localMessage1.MemberKey] = localMessage1; - channel.Presence.Map.Members[localMessage2.MemberKey] = localMessage2; - channel.Presence.Map.Members.Should().HaveCount(2); + channel.Presence.MembersMap.Members[localMessage1.MemberKey] = localMessage1; + channel.Presence.MembersMap.Members[localMessage2.MemberKey] = localMessage2; + channel.Presence.MembersMap.Members.Should().HaveCount(2); bool hasPresence = true; int leaveCount = 0; @@ -1103,7 +1191,7 @@ await WaitForMultiple(4, partialDone => members.Should().HaveCount(0, "should be no members"); } - [Theory] + [Theory(Skip = "Need to update the test, provided input doesn't really fail")] [ProtocolData] public async Task WithInvalidPresenceMessages_EmmitErrorNoChannel(Protocol protocol) { @@ -1143,7 +1231,7 @@ static PresenceMessage[] TestPresence1() bool hasError = false; channel.Error += (sender, args) => hasError = true; - channel.Presence.OnPresence(TestPresence1(), "xyz"); + channel.Presence.OnPresence(TestPresence1()); hasError.Should().BeTrue(); channel.State.Should().Be(ChannelState.Attached); @@ -1354,8 +1442,8 @@ public async Task WhenChannelBecomesFailedOrDetached_ShouldClearPresenceMapAndSh await Task.Delay(10); - channel.Presence.Map.Members.Should().HaveCount(1); - channel.Presence.InternalMap.Members.Should().HaveCount(1); + channel.Presence.MembersMap.Members.Should().HaveCount(1); + channel.Presence.InternalMembersMap.Members.Should().HaveCount(1); bool didReceiveMessage = false; channel.Subscribe(msg => { didReceiveMessage = true; }); @@ -1363,8 +1451,8 @@ public async Task WhenChannelBecomesFailedOrDetached_ShouldClearPresenceMapAndSh channel.Once((ChannelEvent)channelState, change => { - channel.Presence.Map.Members.Should().HaveCount(0); - channel.Presence.InternalMap.Members.Should().HaveCount(0); + channel.Presence.MembersMap.Members.Should().HaveCount(0); + channel.Presence.InternalMembersMap.Members.Should().HaveCount(0); }); if (channelState == ChannelState.Detached) @@ -1383,102 +1471,6 @@ public async Task WhenChannelBecomesFailedOrDetached_ShouldClearPresenceMapAndSh client.Close(); } - [Theory] - [ProtocolData] - [Trait("spec", "RTP17c2")] - public async Task WhenChannelBecomesAttached_AndSyncInitiatedAsPartOfAttach_AndResumeIsFalseAndSyncNotExpected_ShouldReEnterMembersInInternalMap(Protocol protocol) - { - /* - * If the resumed flag is false and a SYNC is not expected... - */ - - var channelName = "RTP17c2".AddRandomSuffix(); - var setupClient = await GetRealtimeClient(protocol); - var setupChannel = setupClient.Channels.Get(channelName); - - // enter 3 client to the channel - for (int i = 0; i < 3; i++) - { - await setupChannel.Presence.EnterClientAsync($"member_{i}", null); - } - - var client = await GetRealtimeClient(protocol, (options, settings) => { options.ClientId = "local"; }); - await client.WaitForState(); - var channel = client.Channels.Get(channelName); - var presence = channel.Presence; - - var p = await presence.GetAsync(); - p.Should().HaveCount(3); - - await presence.EnterAsync(); - - await Task.Delay(250); - presence.Map.Members.Should().HaveCount(4); - presence.InternalMap.Members.Should().HaveCount(1); - - List leaveMessages = new List(); - PresenceMessage updateMessage = null; - PresenceMessage enterMessage = null; - await WaitForMultiple(2, partialDone => - { - presence.Subscribe(PresenceAction.Leave, message => - { - leaveMessages.Add(message); - }); - - presence.Subscribe(PresenceAction.Update, message => - { - updateMessage = message; - partialDone(); // 1 call - }); - - presence.Subscribe(PresenceAction.Enter, message => - { - enterMessage = message; // not expected to hit - }); - - client.GetTestTransport().AfterDataReceived = message => - { - if (message.Action == ProtocolMessage.MessageAction.Attached) - { - bool hasPresence = message.HasFlag(ProtocolMessage.Flag.HasPresence); - hasPresence.Should().BeFalse(); - - bool resumed = message.HasFlag(ProtocolMessage.Flag.Resumed); - resumed.Should().BeFalse(); - - client.GetTestTransport().AfterDataReceived = _ => { }; - partialDone(); // 1 call - } - }; - - // inject attached message - var protocolMessage = new ProtocolMessage(ProtocolMessage.MessageAction.Attached) - { - Channel = channelName, - Flags = 0, // no presence, no resume - }; - - client.GetTestTransport().FakeReceivedMessage(protocolMessage); - }); - - leaveMessages.Should().HaveCount(4); - foreach (var msg in leaveMessages) - { - msg.ClientId.Should().BeOneOf("member_0", "member_1", "member_2", "local"); - } - - updateMessage.Should().NotBeNull(); - updateMessage.ClientId.Should().Be("local"); - enterMessage.Should().BeNull(); - - presence.Unsubscribe(); - var remainingMembers = await presence.GetAsync(); - - remainingMembers.Should().HaveCount(1); - remainingMembers.First().ClientId.Should().Be("local"); - } - [Theory] [ProtocolData] [Trait("spec", "RTP5b")] @@ -1522,22 +1514,22 @@ await WaitForMultiple(2, partialDone => presence2.Subscribe(PresenceAction.Enter, msg => { - presence2.Map.Members.Should().HaveCount(presence2.SyncComplete ? 2 : 1); + presence2.MembersMap.Members.Should().HaveCount(presence2.IsSyncComplete ? 2 : 1); presence2.Unsubscribe(); partialDone(); }); presence2.PendingPresenceQueue.Should().HaveCount(1); - presence2.SyncComplete.Should().BeFalse(); - presence2.Map.Members.Should().HaveCount(0); + presence2.IsSyncComplete.Should().BeFalse(); + presence2.MembersMap.Members.Should().HaveCount(0); taskCountWaiter.Tick(); }); var transport = client2.GetTestTransport(); - await new ConditionalAwaiter(() => presence2.SyncComplete); + await new ConditionalAwaiter(() => presence2.IsSyncComplete); transport.ProtocolMessagesReceived.Any(m => m.Action == ProtocolMessage.MessageAction.Sync). Should().BeTrue("Should receive sync message"); - presence2.Map.Members.Should().HaveCount(2); + presence2.MembersMap.Members.Should().HaveCount(2); } [Theory] @@ -1665,78 +1657,117 @@ public async Task ChannelStateCondition_WhenQueueMessagesIsFalse_WhenChannelIsIn [Theory] [ProtocolData] [Trait("spec", "RTP16b")] - [Trait("spec", "RTP19a")] - public async Task ChannelStateCondition_WhenQueueMessagesIsFalse_ShouldFailAckQueueMessages_WhenSendFails(Protocol protocol) + public async Task ChannelStateCondition_WhenQueueMessagesIsFalse_WhenChannelIsInitializedOrAttaching_MessageAreNotPublished(Protocol protocol) { - var transportFactory = new TestTransportFactory + var client = await GetRealtimeClient(protocol, (options, settings) => + { + options.ClientId = "RTP16b"; + options.QueueMessages = false; + }); + + await client.WaitForState(ConnectionState.Connected); + + var channel = GetRandomChannel(client, "RTP16a"); + channel.State.Should().Be(ChannelState.Initialized); + await EnterPresenceAndCheckForError(); + + client.BlockActionFromSending(ProtocolMessage.MessageAction.Attach); + channel.Attach(); + await channel.WaitForState(ChannelState.Attaching); + await EnterPresenceAndCheckForError(); + + // QueueCommand will not retry instantly + client.Workflow.QueueCommand(SetDisconnectedStateCommand.Create(null)); + await client.WaitForState(ConnectionState.Disconnected); + await EnterPresenceAndCheckForError(); + + // clean up + client.Close(); + + async Task EnterPresenceAndCheckForError() { - BeforeMessageSent = message => + var enterPresenceAwaiter = new TaskCompletionAwaiter(); + ErrorInfo err = null; + bool? success = null; + channel.Presence.Enter("dummy data", (b, info) => { - if (message.Action == ProtocolMessage.MessageAction.Presence) - { - throw new Exception("RTP16b : error while sending message"); - } + success = b; + err = info; + enterPresenceAwaiter.SetCompleted(); + }); + await client.ProcessCommands(); + + // No messages sent because queueMessages false + channel.Presence.PendingPresenceQueue.Should().HaveCount(0); + client.State.PendingMessages.Should().HaveCount(0); + + if (client.Connection.State != ConnectionState.Disconnected) + { + var presenceMessagesSent = client.GetTestTransport().ProtocolMessagesSent + .FindAll(msg => msg.Action == ProtocolMessage.MessageAction.Presence); + presenceMessagesSent.Should().HaveCount(0); } - }; + await enterPresenceAwaiter.Task; + + success.Should().HaveValue(); + success.Value.Should().BeFalse(); + err.Should().NotBeNull(); + err.Message.Should().Be("Unable enqueue message because Options.QueueMessages is set to False."); + } + } + + [Theory] + [ProtocolData] + [Trait("spec", "RTN7d")] + public async Task ChannelStateCondition_WhenQueueMessagesIsFalse_ShouldFailAckQueueMessages_WhenSendFails(Protocol protocol) + { var client = await GetRealtimeClient(protocol, (options, settings) => { - options.ClientId = "RTP16b"; + options.ClientId = "RTN7d"; options.QueueMessages = false; - options.TransportFactory = transportFactory; }); await client.WaitForState(ConnectionState.Connected); - var channel = GetRandomChannel(client, "RTP16a"); + var channel = GetRandomChannel(client, "RTN7d"); channel.Attach(); await channel.WaitForAttachedState(); - var tsc = new TaskCompletionAwaiter(); + client.BlockActionFromSending(ProtocolMessage.MessageAction.Presence); + + var enterPresenceAwaiter = new TaskCompletionAwaiter(); ErrorInfo err = null; bool? success = null; - channel.Presence.Enter(client.Connection.State.ToString(), (b, info) => + channel.Presence.Enter("dummy data", (b, info) => { success = b; err = info; - tsc.SetCompleted(); + enterPresenceAwaiter.SetCompleted(); }); + await client.ProcessCommands(); - await WaitFor(done => - { - // Ack Queue has one presence message - if (channel.RealtimeClient.State.WaitingForAck.Count == 1) - { - done(); - } - }); - - // No pending message queue, since QueueMessages is false - channel.RealtimeClient.State.PendingMessages.Should().HaveCount(0); - - Presence.QueuedPresenceMessage[] presenceMessages = channel.Presence.PendingPresenceQueue.ToArray(); + // All messages sent + channel.Presence.PendingPresenceQueue.Should().HaveCount(0); + client.State.PendingMessages.Should().HaveCount(0); - presenceMessages.Should().HaveCount(0); + // no ack received, so ack queue has one presence message + await new ConditionalAwaiter(() => channel.RealtimeClient.State.WaitingForAck.Count == 1); - await tsc.Task; + // Disconnect using QueueCommand will not retry instantly + client.Workflow.QueueCommand(SetDisconnectedStateCommand.Create(null)); + await client.WaitForState(ConnectionState.Disconnected); - // No pending message queue, since QueueMessages=false - channel.RealtimeClient.State.PendingMessages.Should().HaveCount(0); + await enterPresenceAwaiter.Task; - await WaitFor(done => - { - // Ack cleared after flushing the queue for transport disconnection, because QueueMessages=false - if (channel.RealtimeClient.State.WaitingForAck.Count == 0) - { - done(); - } - }); + // Fail all ack/nack, because QueueMessages=false + channel.RealtimeClient.State.WaitingForAck.Should().HaveCount(0); success.Should().HaveValue(); success.Value.Should().BeFalse(); err.Should().NotBeNull(); err.Message.Should().Be("Clearing message AckQueue(created at connected state) because Options.QueueMessages is false"); - err.Cause.InnerException.Message.Should().Be("RTP16b : error while sending message"); + err.Code.Should().Be(ErrorCodes.Disconnected); // cleared because of disconnection // clean up client.Close(); @@ -1980,7 +2011,7 @@ await WaitFor(30000, async done => { if (change.Current == ConnectionState.Connected) { - await Task.Delay(500); + await channel.WaitForAttachedState(); p1 = await channel.Presence.GetAsync(); done(); } diff --git a/src/IO.Ably.Tests.Shared/Realtime/ProtocolMessageTests.cs b/src/IO.Ably.Tests.Shared/Realtime/ProtocolMessageTests.cs index 1c68f50e6..d78469ad5 100644 --- a/src/IO.Ably.Tests.Shared/Realtime/ProtocolMessageTests.cs +++ b/src/IO.Ably.Tests.Shared/Realtime/ProtocolMessageTests.cs @@ -86,7 +86,7 @@ public void ShouldHaveCorrectProperties_FlagsShouldContainBitFlags() pm.HasFlag(ProtocolMessage.Flag.Subscribe).Should().BeFalse(); pm.HasFlag(ProtocolMessage.Flag.PresenceSubscribe).Should().BeTrue(); - // TR4a,TR4b,TR4c,TR4d,TR4e (show it is removed),TR4f,TR4g,TR4h,TR4i,TR4j,TR4k,TR4l,TR4m + // TR4a,TR4b,TR4c,TR4d,TR4e (show it is removed),TR4f (removed),TR4g,TR4h,TR4i,TR4j,TR4k,TR4l,TR4m var propertyNamesAndTypes = new[] { ("Action", typeof(ProtocolMessage.MessageAction)), @@ -95,7 +95,6 @@ public void ShouldHaveCorrectProperties_FlagsShouldContainBitFlags() ("Channel", typeof(string)), ("ChannelSerial", typeof(string)), ("ConnectionId", typeof(string)), - ("ConnectionSerial", typeof(long?)), ("ConnectionDetails", typeof(ConnectionDetails)), ("Count", typeof(int?)), ("Error", typeof(ErrorInfo)), @@ -108,8 +107,8 @@ public void ShouldHaveCorrectProperties_FlagsShouldContainBitFlags() }; var props = pm.GetType().GetProperties(); - props.Length.Should().Be(16); - propertyNamesAndTypes.Length.Should().Be(16); + props.Length.Should().Be(15); + propertyNamesAndTypes.Length.Should().Be(15); foreach (var propertyInfo in props) { diff --git a/src/IO.Ably.Tests.Shared/Realtime/RealtimeWorkflowSpecs.cs b/src/IO.Ably.Tests.Shared/Realtime/RealtimeWorkflowSpecs.cs index 88e859eb8..4d07db7e8 100644 --- a/src/IO.Ably.Tests.Shared/Realtime/RealtimeWorkflowSpecs.cs +++ b/src/IO.Ably.Tests.Shared/Realtime/RealtimeWorkflowSpecs.cs @@ -26,7 +26,6 @@ public void ConnectedState_UpdatesConnectionInformation() var connectedProtocolMessage = new ProtocolMessage(ProtocolMessage.MessageAction.Connected) { ConnectionId = "1", - ConnectionSerial = 100, ConnectionDetails = new ConnectionDetails { ClientId = "client1", @@ -39,7 +38,6 @@ public void ConnectedState_UpdatesConnectionInformation() // Assert var connection = client.Connection; connection.Id.Should().Be("1"); - connection.Serial.Should().Be(100); connection.Key.Should().Be("validKey"); client.Auth.ClientId.Should().Be("client1"); } diff --git a/src/IO.Ably.Tests.Shared/Realtime/RecoveryKeyContextSpec.cs b/src/IO.Ably.Tests.Shared/Realtime/RecoveryKeyContextSpec.cs new file mode 100644 index 000000000..c97bd75c4 --- /dev/null +++ b/src/IO.Ably.Tests.Shared/Realtime/RecoveryKeyContextSpec.cs @@ -0,0 +1,65 @@ +using System.Collections.Generic; +using IO.Ably.Shared.Realtime; +using Xunit; + +namespace IO.Ably.Tests.Shared.Realtime +{ + public class RecoveryKeyContextSpec + { + [Fact] + [Trait("spec", "RTN16i")] + [Trait("spec", "RTN16f")] + [Trait("spec", "RTN16j")] + public void ShouldEncodeRecoveryKeyContextObject() + { + const string expectedRecoveryKey = + "{\"connectionKey\":\"uniqueKey\",\"msgSerial\":1,\"channelSerials\":{\"channel1\":\"1\",\"channel2\":\"2\",\"channel3\":\"3\"}}"; + var recoveryKey = new RecoveryKeyContext() + { + ChannelSerials = new Dictionary() + { + { "channel1", "1" }, + { "channel2", "2" }, + { "channel3", "3" }, + }, + ConnectionKey = "uniqueKey", + MsgSerial = 1, + }; + + var encodedRecoveryKey = recoveryKey.Encode(); + Assert.Equal(expectedRecoveryKey, encodedRecoveryKey); + } + + [Fact] + [Trait("spec", "RTN16i")] + [Trait("spec", "RTN16f")] + [Trait("spec", "RTN16j")] + public void ShouldDecodeRecoveryKeyToRecoveryKeyContextObject() + { + const string recoveryKey = + "{\"connectionKey\":\"key2\",\"msgSerial\":5,\"channelSerials\":{\"channel1\":\"98\",\"channel2\":\"32\",\"channel3\":\"09\"}}"; + var recoveryKeyContext = RecoveryKeyContext.Decode(recoveryKey); + Assert.Equal("key2", recoveryKeyContext.ConnectionKey); + Assert.Equal(5, recoveryKeyContext.MsgSerial); + var expectedChannelSerials = new Dictionary() + { + { "channel1", "98" }, + { "channel2", "32" }, + { "channel3", "09" }, + }; + Assert.Equal(expectedChannelSerials, recoveryKeyContext.ChannelSerials); + } + + [Fact] + [Trait("spec", "RTN16i")] + [Trait("spec", "RTN16f")] + [Trait("spec", "RTN16j")] + public void ShouldReturnNullRecoveryContextWhileDecodingFaultyRecoveryKey() + { + const string recoveryKey = + "{\"connectionKey\":\"key2\",\"msgSerial\":\"incorrectStringSerial\",\"channelSerials\":{\"channel1\":\"98\",\"channel2\":\"32\",\"channel3\":\"09\"}}"; + var recoveryKeyContext = RecoveryKeyContext.Decode(recoveryKey); + Assert.Null(recoveryKeyContext); + } + } +} diff --git a/src/IO.Ably.Tests.Shared/Rest/SandboxSpecExtension.cs b/src/IO.Ably.Tests.Shared/Rest/SandboxSpecExtension.cs index 0e2cd1053..db9a4c3d5 100644 --- a/src/IO.Ably.Tests.Shared/Rest/SandboxSpecExtension.cs +++ b/src/IO.Ably.Tests.Shared/Rest/SandboxSpecExtension.cs @@ -55,11 +55,11 @@ internal static async Task WaitSync(this Presence presence, TimeSpan? wait void OnPresenceOnSyncCompleted(object sender, EventArgs e) { - presence.SyncCompleted -= OnPresenceOnSyncCompleted; + presence.SyncCompletedEventHandler -= OnPresenceOnSyncCompleted; taskCompletionSource.SetResult(true); } - presence.SyncCompleted += OnPresenceOnSyncCompleted; + presence.SyncCompletedEventHandler += OnPresenceOnSyncCompleted; var timeout = waitSpan ?? TimeSpan.FromSeconds(2); var waitTask = Task.Delay(timeout); diff --git a/src/IO.Ably.Tests.Shared/TestExtensions.cs b/src/IO.Ably.Tests.Shared/TestExtensions.cs index 3b279ddee..d18fc8630 100644 --- a/src/IO.Ably.Tests.Shared/TestExtensions.cs +++ b/src/IO.Ably.Tests.Shared/TestExtensions.cs @@ -61,13 +61,6 @@ internal static void BlockActionFromReceiving(this IRealtimeClient client, Proto transport.BlockReceiveActions.Add(action); } - internal static void SimulateLostConnectionAndState(this AblyRealtime client) - { - client.State.Connection.Id = string.Empty; - client.State.Connection.Key = "xxxxx!xxxxxxx-xxxxxxxx-xxxxxxxx"; - client.GetTestTransport().Close(false); - } - internal static void BeforeProtocolMessageProcessed(this AblyRealtime client, Action action) { var t = client.GetTestTransport();