Skip to content

Commit

Permalink
Merge pull request #1913 from ably/auto-re-enter-diff-connid
Browse files Browse the repository at this point in the history
Support RTP17g1: presence auto re-enter with a different connId
  • Loading branch information
SimonWoolf authored Nov 7, 2024
2 parents 99b7a02 + 2d5323d commit 18a2559
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 120 deletions.
1 change: 0 additions & 1 deletion ably.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2284,7 +2284,6 @@ export declare interface Channels<T> {
* This experimental method allows you to create custom realtime data feeds by selectively subscribing
* to receive only part of the data from the channel.
* See the [announcement post](https://pages.ably.com/subscription-filters-preview) for more information.
*
* @param name - The channel name.
* @param deriveOptions - A {@link DeriveOptions} object.
* @param channelOptions - A {@link ChannelOptions} object.
Expand Down
26 changes: 15 additions & 11 deletions src/common/lib/client/realtimepresence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -425,16 +425,8 @@ class RealtimePresence extends EventEmitter {
}

_ensureMyMembersPresent(): void {
const myMembers = this._myMembers,
reenterCb = (err?: ErrorInfo | null) => {
if (err) {
const msg = 'Presence auto-re-enter failed: ' + err.toString();
const wrappedErr = new ErrorInfo(msg, 91004, 400);
Logger.logAction(this.logger, Logger.LOG_ERROR, 'RealtimePresence._ensureMyMembersPresent()', msg);
const change = new ChannelStateChange(this.channel.state, this.channel.state, true, false, wrappedErr);
this.channel.emit('update', change);
}
};
const myMembers = this._myMembers;
const connId = this.channel.connectionManager.connectionId;

for (const memberKey in myMembers.map) {
const entry = myMembers.map[memberKey];
Expand All @@ -446,7 +438,19 @@ class RealtimePresence extends EventEmitter {
);
// RTP17g: Send ENTER containing the member id, clientId and data
// attributes.
Utils.whenPromiseSettles(this._enterOrUpdateClient(entry.id, entry.clientId, entry.data, 'enter'), reenterCb);
// RTP17g1: suppress id if the connId has changed
const id = entry.connectionId === connId ? entry.id : undefined;
this._enterOrUpdateClient(id, entry.clientId, entry.data, 'enter').catch((err) => {
const wrappedErr = new ErrorInfo('Presence auto re-enter failed', 91004, 400, err);
Logger.logAction(
this.logger,
Logger.LOG_ERROR,
'RealtimePresence._ensureMyMembersPresent()',
'Presence auto re-enter failed; reason = ' + Utils.inspectError(err),
);
const change = new ChannelStateChange(this.channel.state, this.channel.state, true, false, wrappedErr);
this.channel.emit('update', change);
});
}
}

Expand Down
7 changes: 5 additions & 2 deletions test/common/modules/shared_helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ define([

becomeSuspended(realtime, cb) {
const helper = this.addingHelperFunction('becomeSuspended');
helper._becomeSuspended(realtime, cb);
return helper._becomeSuspended(realtime, cb);
}

_becomeSuspended(realtime, cb) {
Expand All @@ -268,10 +268,13 @@ define([
self.recordPrivateApi('call.connectionManager.notifyState');
realtime.connection.connectionManager.notifyState({ state: 'suspended' });
});
if (cb)
if (cb) {
realtime.connection.once('suspended', function () {
cb();
});
} else {
return realtime.connection.once('suspended');
}
}

callbackOnClose(realtime, callback) {
Expand Down
215 changes: 109 additions & 106 deletions test/realtime/presence.test.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict';

define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async, chai) {
var expect = chai.expect;
const { expect, assert } = chai;
var createPM = Ably.protocolMessageFromDeserialized;
var PresenceMessage = Ably.Realtime.PresenceMessage;

Expand Down Expand Up @@ -1627,117 +1627,120 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async
* @spec RTP17g
* @specpartial RTP17i - tests simple re-entry, no RESUMED flag test
*/
it('presence_auto_reenter', function (done) {
it('presence_auto_reenter', async function () {
const helper = this.test.helper;
var channelName = 'presence_auto_reenter';
var realtime = helper.AblyRealtime();
var channel = realtime.channels.get(channelName);
const channelName = 'presence_auto_reenter';
const realtime = helper.AblyRealtime();
const channel = realtime.channels.get(channelName);

await realtime.connection.once('connected');
await channel.attach();
// presence.get will wait for a sync if needed
await channel.presence.get();

const pOnPresence = channel.presence.subscriptions.once('enter');
await channel.presence.enterClient('one', 'onedata');
await pOnPresence;

/* inject an additional member into the myMember set, then force a suspended state */
helper.recordPrivateApi('read.connectionManager.connectionId');
const connId = realtime.connection.connectionManager.connectionId;

helper.recordPrivateApi('call.presence._myMembers.put');
channel.presence._myMembers.put({
action: 'enter',
clientId: 'two',
connectionId: connId,
id: connId + ':0:0',
data: 'twodata',
});

async.series(
[
function (cb) {
realtime.connection.once('connected', function () {
cb();
});
},
function (cb) {
Helper.whenPromiseSettles(channel.attach(), cb);
},
function (cb) {
if (!channel.presence.syncComplete) {
helper.recordPrivateApi('call.presence.waitSync');
channel.presence.members.waitSync(cb);
} else {
cb();
}
},
function (cb) {
channel.presence.enterClient('one', 'onedata');
channel.presence.subscribe('enter', function () {
channel.presence.unsubscribe('enter');
cb();
});
},
function (cb) {
/* inject an additional member into the myMember set, then force a suspended state */
helper.recordPrivateApi('read.connectionManager.connectionId');
var connId = realtime.connection.connectionManager.connectionId;
helper.recordPrivateApi('call.presence._myMembers.put');
channel.presence._myMembers.put({
action: 'enter',
clientId: 'two',
connectionId: connId,
id: connId + ':0:0',
data: 'twodata',
});
helper.becomeSuspended(realtime, cb);
},
function (cb) {
await helper.becomeSuspended(realtime);

expect(channel.state).to.equal('suspended', 'sanity-check channel state');

/* Reconnect */
const pOnceAttached = channel.once('attached');
realtime.connection.connect();
await pOnceAttached;

/* Since we haven't been gone for two minutes, we don't know for sure
* that realtime will feel it necessary to do a sync - if it doesn't,
* we request one */
if (channel.presence.syncComplete) {
helper.recordPrivateApi('call.channel.sync');
channel.sync();
}
await channel.presence.get();

/* Now just wait for an enter! */
const enteredMembers = new Set();
await new Promise((resolve, reject) => {
channel.presence.subscribe('enter', (presmsg) => {
enteredMembers.add(presmsg.clientId);
if (enteredMembers.size === 2) {
try {
expect(channel.state).to.equal('suspended', 'sanity-check channel state');
expect(enteredMembers.has('one')).to.equal(true, 'Check client one entered');
expect(enteredMembers.has('two')).to.equal(true, 'Check client two entered');
channel.presence.unsubscribe('enter');
resolve();
} catch (err) {
cb(err);
reject(err);
return;
}
/* Reconnect */
realtime.connection.connect();
channel.once('attached', function () {
cb();
});
},
function (cb) {
/* Since we haven't been gone for two minutes, we don't know for sure
* that realtime will feel it necessary to do a sync - if it doesn't,
* we request one */
if (channel.presence.syncComplete) {
helper.recordPrivateApi('call.channel.sync');
channel.sync();
}
helper.recordPrivateApi('call.presence.waitSync');
channel.presence.members.waitSync(cb);
},
function (cb) {
/* Now just wait for an enter! */
let enteredMembers = new Set();
channel.presence.subscribe('enter', function (presmsg) {
enteredMembers.add(presmsg.clientId);
if (enteredMembers.size === 2) {
try {
expect(enteredMembers.has('one')).to.equal(true, 'Check client one entered');
expect(enteredMembers.has('two')).to.equal(true, 'Check client two entered');
channel.presence.unsubscribe('enter');
cb();
} catch (err) {
cb(err);
return;
}
}
});
},
function (cb) {
Helper.whenPromiseSettles(channel.presence.get(), function (err, results) {
if (err) {
cb(err);
return;
}
try {
expect(channel.presence.syncComplete, 'Check in sync').to.be.ok;
expect(results.length).to.equal(3, 'Check correct number of results');
expect(extractClientIds(results)).deep.to.equal(['one', 'one', 'two'], 'check correct members');
expect(extractMember(results, 'one').data).to.equal('onedata', 'check correct data on one');
expect(extractMember(results, 'two').data).to.equal('twodata', 'check correct data on two');
} catch (err) {
cb(err);
return;
}
cb();
});
},
],
function (err) {
helper.closeAndFinish(done, realtime, err);
},
);
}
});
});

const results = await channel.presence.get();
expect(channel.presence.syncComplete, 'Check in sync').to.be.ok;
expect(results.length).to.equal(3, 'Check correct number of results');
expect(extractClientIds(results)).deep.to.equal(['one', 'one', 'two'], 'check correct members');
expect(extractMember(results, 'one').data).to.equal('onedata', 'check correct data on one');
expect(extractMember(results, 'two').data).to.equal('twodata', 'check correct data on two');
realtime.close();
});

/**
* Test the auto-re-enter functionality with a resume failure resulting in a different
* connectionId (the re-entry should not have a message id)
*
* @spec RTP17g
* @spec RTP17g1
*/
it('presence_auto_reenter_different_connid', async function () {
const helper = this.test.helper;
const channelName = 'presence_auto_reenter_different_connid';
const realtime = helper.AblyRealtime({ transportParams: { remainPresentFor: 5000 } });
const channel = realtime.channels.get(channelName);

await realtime.connection.once('connected');
const firstConnId = realtime.connection.id;
await channel.attach();
// presence.get will wait for a sync if needed
await channel.presence.get();

const pOnPresence = channel.presence.subscriptions.once('enter');
await channel.presence.enterClient('one', 'onedata');
const member1 = await pOnPresence;

await helper.becomeSuspended(realtime);
assert.equal(channel.state, 'suspended', 'sanity-check channel state');

/* Reconnect. Since we were suspended, we will get a different connection id */
const pOnceAttached = channel.once('attached');
const pOnEnter = channel.presence.subscriptions.once('enter');
const pOnLeave = channel.presence.subscriptions.once('leave');
realtime.connection.connect();
await pOnceAttached;
const secondConnId = realtime.connection.id;
assert.notEqual(firstConnId, secondConnId, 'sanity-check connection id changed post-suspend');
const [enter, leave] = await Promise.all([pOnEnter, pOnLeave]);
assert.equal(leave.connectionId, firstConnId, 'Check the leave for the old connid');
assert.equal(enter.connectionId, secondConnId, 'Check enter for new connid');
assert.notEqual(enter.id, member1.id, 'Check the new enter did not have the msgId of the original');

realtime.close();
});

/**
Expand Down

0 comments on commit 18a2559

Please sign in to comment.