Skip to content

Commit

Permalink
ECO-4787: Convert a couple of tests to use interception proxy
Browse files Browse the repository at this point in the history
I’ve changed these tests to use the interception proxy instead of
private API for intercepting and injecting Realtime protocol messages,
as an example of how to use the proxy and its test suite client class.
  • Loading branch information
lawrence-forooghian committed Sep 24, 2024
1 parent 63ecb49 commit 7d2977b
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 135 deletions.
85 changes: 50 additions & 35 deletions test/realtime/auth.test.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
'use strict';

define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async, chai) {
define(['ably', 'shared_helper', 'async', 'chai', 'interception_proxy_client'], function (
Ably,
Helper,
async,
chai,
interceptionProxyClient,
) {
var currentTime;
var exampleTokenDetails;
var exports = {};
Expand Down Expand Up @@ -1167,42 +1173,51 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async
* @spec RTN22
*/
it('mocked_reauth', function (done) {
var helper = this.test.helper,
rest = helper.AblyRest(),
authCallback = function (tokenParams, callback) {
// Request a token (should happen twice)
Helper.whenPromiseSettles(rest.auth.requestToken(tokenParams, null), function (err, tokenDetails) {
if (err) {
helper.closeAndFinish(done, realtime, err);
return;
}
callback(null, tokenDetails);
});
},
realtime = helper.AblyRealtime({ authCallback: authCallback, transports: [helper.bestTransport] });
interceptionProxyClient.intercept(done, (done, interceptionContext) => {
var helper = this.test.helper,
rest = helper.AblyRest(),
authCallback = function (tokenParams, callback) {
// Request a token (should happen twice)
Helper.whenPromiseSettles(rest.auth.requestToken(tokenParams, null), function (err, tokenDetails) {
if (err) {
helper.closeAndFinish(done, realtime, err);
return;
}
callback(null, tokenDetails);
});
},
realtime = helper.AblyRealtime({ authCallback: authCallback, transports: [helper.bestTransport] });

realtime.connection.once('connected', function () {
helper.recordPrivateApi('read.connectionManager.activeProtocol.transport');
var transport = realtime.connection.connectionManager.activeProtocol.transport,
originalSend = transport.send;
helper.recordPrivateApi('replace.transport.send');
/* Spy on transport.send to detect the outgoing AUTH */
transport.send = function (message) {
if (message.action === 17) {
try {
expect(message.auth.accessToken, 'Check AUTH message structure is as expected').to.be.ok;
helper.closeAndFinish(done, realtime);
} catch (err) {
helper.closeAndFinish(done, realtime, err);
realtime.connection.once('connected', function () {
/* Spy on client messages to detect the outgoing AUTH */
interceptionContext.transformClientMessage = ({ deserialized: message }) => {
if (message.action === 17) {
// TODO return value? the original code didn’t call originalSend. We should either:
// - make sure that we always return something (i.e. force it on to callers)
// - make sure that if nothing is returned then the interception proxy client makes this very obvious
// - make sure to clean up outstanding messages when the `intercept`-created `done` is called
//
// I think this is what’s causing this in the logs:
// Interception proxy client: got result of transforming message d814955d-8a15-4c2f-b873-1bd0c3448635 undefined
// and what's hence causing Realtime to send
// message: 'Invalid websocket message (decode failure). (See https://help.ably.io/error/40000 for help.)',
//
// TODO
// should we have a separate "spy" interception proxy API that doesn’t require a return value?
try {
expect(message.auth.accessToken, 'Check AUTH message structure is as expected').to.be.ok;
helper.closeAndFinish(done, realtime);
} catch (err) {
helper.closeAndFinish(done, realtime, err);
}
return null;
} else {
return message;
}
} else {
helper.recordPrivateApi('call.transport.send');
originalSend.call(this, message);
}
};
/* Inject a fake AUTH from realtime */
helper.recordPrivateApi('call.transport.onProtocolMessage');
transport.onProtocolMessage({ action: 17 });
};
/* Inject a fake AUTH from realtime */
interceptionContext.injectMessage(interceptionContext.latestConnectionID, { action: 17 }, false);
});
});
});

Expand Down
210 changes: 110 additions & 100 deletions test/realtime/connection.test.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
'use strict';

define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async, chai) {
define(['ably', 'shared_helper', 'async', 'chai', 'interception_proxy_client'], function (
Ably,
Helper,
async,
chai,
interceptionProxyClient,
) {
var expect = chai.expect;
var createPM = Ably.protocolMessageFromDeserialized;

Expand Down Expand Up @@ -185,119 +191,123 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async
* @spec RTN19a2
*/
it('connectionQueuing', function (done) {
var helper = this.test.helper,
realtime = helper.AblyRealtime({ transports: [helper.bestTransport] }),
channel = realtime.channels.get('connectionQueuing'),
connectionManager = realtime.connection.connectionManager;

realtime.connection.once('connected', function () {
helper.recordPrivateApi('read.connectionManager.activeProtocol.transport');
var transport = connectionManager.activeProtocol.transport;
Helper.whenPromiseSettles(channel.attach(), function (err) {
if (err) {
helper.closeAndFinish(done, realtime, err);
return;
}
interceptionProxyClient.intercept(done, (done, interceptionContext) => {
var helper = this.test.helper,
realtime = helper.AblyRealtime({ transports: [helper.bestTransport] }),
channel = realtime.channels.get('connectionQueuing'),
connectionManager = realtime.connection.connectionManager;

let transportSendCallback;
realtime.connection.once('connected', function () {
Helper.whenPromiseSettles(channel.attach(), function (err) {
if (err) {
helper.closeAndFinish(done, realtime, err);
return;
}

helper.recordPrivateApi('replace.transport.send');
/* Sabotage sending the message */
transport.send = function (msg) {
if (msg.action == 15) {
expect(msg.msgSerial).to.equal(0, 'Expect msgSerial to be 0');
let transportSendCallback;

if (!transportSendCallback) {
done(new Error('transport.send override called before transportSendCallback populated'));
}
/* Sabotage sending the message */
interceptionContext.transformClientMessage = (msg) => {
if (msg.deserialized.action == 15) {
expect(msg.deserialized.msgSerial).to.equal(0, 'Expect msgSerial to be 0');

transportSendCallback(null);
}
};
if (!transportSendCallback) {
done(new Error('transport.send override called before transportSendCallback populated'));
}

let publishCallback;
transportSendCallback(null);
}
};

async.series(
[
function (cb) {
transportSendCallback = cb;
let publishCallback;

/* Sabotaged publish */
Helper.whenPromiseSettles(channel.publish('first', null), function (err) {
if (!publishCallback) {
done(new Error('publish completed before publishCallback populated'));
}
publishCallback(err);
});
},
async.series(
[
function (cb) {
transportSendCallback = cb;

// We wait for transport.send to recieve the message that we just
// published before we proceed to disconnecting the transport, to
// make sure that the message got marked as `sendAttempted`.
/* Sabotaged publish */
Helper.whenPromiseSettles(channel.publish('first', null), function (err) {
if (!publishCallback) {
done(new Error('publish completed before publishCallback populated'));
}
publishCallback(err);
});
},

function (cb) {
async.parallel(
[
function (cb) {
publishCallback = function (err) {
try {
expect(!err, 'Check publish happened (eventually) without err').to.be.ok;
} catch (err) {
cb(err);
return;
}
cb();
};
},
function (cb) {
/* After the disconnect, on reconnect, spy on transport.send again */
helper.recordPrivateApi('listen.connectionManager.transport.pending');
connectionManager.once('transport.pending', function (transport) {
var oldSend = transport.send;
// We wait for transport.send to recieve the message that we just
// published before we proceed to disconnecting the transport, to
// make sure that the message got marked as `sendAttempted`.

helper.recordPrivateApi('replace.transport.send');
transport.send = function (msg, msgCb) {
if (msg.action === 15) {
if (msg.messages[0].name === 'first') {
try {
expect(msg.msgSerial).to.equal(0, 'Expect msgSerial of original message to still be 0');
expect(msg.messages.length).to.equal(
1,
'Expect second message to not have been merged with the attempted message',
);
} catch (err) {
cb(err);
return;
}
} else if (msg.messages[0].name === 'second') {
try {
expect(msg.msgSerial).to.equal(1, 'Expect msgSerial of new message to be 1');
} catch (err) {
cb(err);
return;
}
cb();
}
function (cb) {
async.parallel(
[
function (cb) {
publishCallback = function (err) {
try {
expect(!err, 'Check publish happened (eventually) without err').to.be.ok;
} catch (err) {
cb(err);
return;
}
helper.recordPrivateApi('call.transport.send');
oldSend.call(transport, msg, msgCb);
cb();
};
channel.publish('second', null);
});
},
function (cb) {
/* After the disconnect, on reconnect, spy on transport.send again */
helper.recordPrivateApi('listen.connectionManager.transport.pending');
connectionManager.once('transport.pending', function (transport) {
// TODO does the identity of this transport matter, and can we replace the `transport.pending` check with something external too (e.g. detecting a new connection)? perhaps let's have an EventEmitter interface on the interception context that says when there's a new connection or something
interceptionContext.transformClientMessage = function (msg) {
if (msg.deserialized.action === 15) {
if (msg.deserialized.messages[0].name === 'first') {
try {
expect(msg.deserialized.msgSerial).to.equal(
0,
'Expect msgSerial of original message to still be 0',
);
expect(msg.deserialized.messages.length).to.equal(
1,
'Expect second message to not have been merged with the attempted message',
);
} catch (err) {
cb(err);
return msg.deserialized;
}
} else if (msg.deserialized.messages[0].name === 'second') {
try {
expect(msg.deserialized.msgSerial).to.equal(
1,
'Expect msgSerial of new message to be 1',
);
} catch (err) {
cb(err);
return msg.deserialized;
}
cb();
}
}

// preserve the message
return msg.deserialized;
};
channel.publish('second', null);
});

/* Disconnect the transport (will automatically reconnect and resume) () */
helper.recordPrivateApi('call.connectionManager.disconnectAllTransports');
connectionManager.disconnectAllTransports();
},
],
cb,
);
/* Disconnect the transport (will automatically reconnect and resume) () */
helper.recordPrivateApi('call.connectionManager.disconnectAllTransports');
connectionManager.disconnectAllTransports();
},
],
cb,
);
},
],
function (err) {
helper.closeAndFinish(done, realtime, err);
},
],
function (err) {
helper.closeAndFinish(done, realtime, err);
},
);
);
});
});
});
});
Expand Down

0 comments on commit 7d2977b

Please sign in to comment.