diff --git a/test/realtime/auth.test.js b/test/realtime/auth.test.js index 3cdd43dcb..81899d0ca 100644 --- a/test/realtime/auth.test.js +++ b/test/realtime/auth.test.js @@ -1,6 +1,12 @@ 'use strict'; -define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async, chai) { +define(['ably', 'shared_helper', 'async', 'chai', 'interception_proxy_client'], function ( + Ably, + Helper, + async, + chai, + interceptionProxyClient, +) { var currentTime; var exampleTokenDetails; var exports = {}; @@ -1167,42 +1173,51 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async * @spec RTN22 */ it('mocked_reauth', function (done) { - var helper = this.test.helper, - rest = helper.AblyRest(), - authCallback = function (tokenParams, callback) { - // Request a token (should happen twice) - Helper.whenPromiseSettles(rest.auth.requestToken(tokenParams, null), function (err, tokenDetails) { - if (err) { - helper.closeAndFinish(done, realtime, err); - return; - } - callback(null, tokenDetails); - }); - }, - realtime = helper.AblyRealtime({ authCallback: authCallback, transports: [helper.bestTransport] }); + interceptionProxyClient.intercept(done, (done, interceptionContext) => { + var helper = this.test.helper, + rest = helper.AblyRest(), + authCallback = function (tokenParams, callback) { + // Request a token (should happen twice) + Helper.whenPromiseSettles(rest.auth.requestToken(tokenParams, null), function (err, tokenDetails) { + if (err) { + helper.closeAndFinish(done, realtime, err); + return; + } + callback(null, tokenDetails); + }); + }, + realtime = helper.AblyRealtime({ authCallback: authCallback, transports: [helper.bestTransport] }); - realtime.connection.once('connected', function () { - helper.recordPrivateApi('read.connectionManager.activeProtocol.transport'); - var transport = realtime.connection.connectionManager.activeProtocol.transport, - originalSend = transport.send; - helper.recordPrivateApi('replace.transport.send'); - /* Spy on transport.send to detect the outgoing AUTH */ - transport.send = function (message) { - if (message.action === 17) { - try { - expect(message.auth.accessToken, 'Check AUTH message structure is as expected').to.be.ok; - helper.closeAndFinish(done, realtime); - } catch (err) { - helper.closeAndFinish(done, realtime, err); + realtime.connection.once('connected', function () { + /* Spy on client messages to detect the outgoing AUTH */ + interceptionContext.transformClientMessage = ({ deserialized: message }) => { + if (message.action === 17) { + // TODO return value? the original code didn’t call originalSend. We should either: + // - make sure that we always return something (i.e. force it on to callers) + // - make sure that if nothing is returned then the interception proxy client makes this very obvious + // - make sure to clean up outstanding messages when the `intercept`-created `done` is called + // + // I think this is what’s causing this in the logs: + // Interception proxy client: got result of transforming message d814955d-8a15-4c2f-b873-1bd0c3448635 undefined + // and what's hence causing Realtime to send + // message: 'Invalid websocket message (decode failure). (See https://help.ably.io/error/40000 for help.)', + // + // TODO + // should we have a separate "spy" interception proxy API that doesn’t require a return value? + try { + expect(message.auth.accessToken, 'Check AUTH message structure is as expected').to.be.ok; + helper.closeAndFinish(done, realtime); + } catch (err) { + helper.closeAndFinish(done, realtime, err); + } + return null; + } else { + return message; } - } else { - helper.recordPrivateApi('call.transport.send'); - originalSend.call(this, message); - } - }; - /* Inject a fake AUTH from realtime */ - helper.recordPrivateApi('call.transport.onProtocolMessage'); - transport.onProtocolMessage({ action: 17 }); + }; + /* Inject a fake AUTH from realtime */ + interceptionContext.injectMessage(interceptionContext.latestConnectionID, { action: 17 }, false); + }); }); }); diff --git a/test/realtime/connection.test.js b/test/realtime/connection.test.js index 5a6b7952c..dce9d6c7d 100644 --- a/test/realtime/connection.test.js +++ b/test/realtime/connection.test.js @@ -1,6 +1,12 @@ 'use strict'; -define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async, chai) { +define(['ably', 'shared_helper', 'async', 'chai', 'interception_proxy_client'], function ( + Ably, + Helper, + async, + chai, + interceptionProxyClient, +) { var expect = chai.expect; var createPM = Ably.protocolMessageFromDeserialized; @@ -185,119 +191,123 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async * @spec RTN19a2 */ it('connectionQueuing', function (done) { - var helper = this.test.helper, - realtime = helper.AblyRealtime({ transports: [helper.bestTransport] }), - channel = realtime.channels.get('connectionQueuing'), - connectionManager = realtime.connection.connectionManager; - - realtime.connection.once('connected', function () { - helper.recordPrivateApi('read.connectionManager.activeProtocol.transport'); - var transport = connectionManager.activeProtocol.transport; - Helper.whenPromiseSettles(channel.attach(), function (err) { - if (err) { - helper.closeAndFinish(done, realtime, err); - return; - } + interceptionProxyClient.intercept(done, (done, interceptionContext) => { + var helper = this.test.helper, + realtime = helper.AblyRealtime({ transports: [helper.bestTransport] }), + channel = realtime.channels.get('connectionQueuing'), + connectionManager = realtime.connection.connectionManager; - let transportSendCallback; + realtime.connection.once('connected', function () { + Helper.whenPromiseSettles(channel.attach(), function (err) { + if (err) { + helper.closeAndFinish(done, realtime, err); + return; + } - helper.recordPrivateApi('replace.transport.send'); - /* Sabotage sending the message */ - transport.send = function (msg) { - if (msg.action == 15) { - expect(msg.msgSerial).to.equal(0, 'Expect msgSerial to be 0'); + let transportSendCallback; - if (!transportSendCallback) { - done(new Error('transport.send override called before transportSendCallback populated')); - } + /* Sabotage sending the message */ + interceptionContext.transformClientMessage = (msg) => { + if (msg.deserialized.action == 15) { + expect(msg.deserialized.msgSerial).to.equal(0, 'Expect msgSerial to be 0'); - transportSendCallback(null); - } - }; + if (!transportSendCallback) { + done(new Error('transport.send override called before transportSendCallback populated')); + } - let publishCallback; + transportSendCallback(null); + } + }; - async.series( - [ - function (cb) { - transportSendCallback = cb; + let publishCallback; - /* Sabotaged publish */ - Helper.whenPromiseSettles(channel.publish('first', null), function (err) { - if (!publishCallback) { - done(new Error('publish completed before publishCallback populated')); - } - publishCallback(err); - }); - }, + async.series( + [ + function (cb) { + transportSendCallback = cb; - // We wait for transport.send to recieve the message that we just - // published before we proceed to disconnecting the transport, to - // make sure that the message got marked as `sendAttempted`. + /* Sabotaged publish */ + Helper.whenPromiseSettles(channel.publish('first', null), function (err) { + if (!publishCallback) { + done(new Error('publish completed before publishCallback populated')); + } + publishCallback(err); + }); + }, - function (cb) { - async.parallel( - [ - function (cb) { - publishCallback = function (err) { - try { - expect(!err, 'Check publish happened (eventually) without err').to.be.ok; - } catch (err) { - cb(err); - return; - } - cb(); - }; - }, - function (cb) { - /* After the disconnect, on reconnect, spy on transport.send again */ - helper.recordPrivateApi('listen.connectionManager.transport.pending'); - connectionManager.once('transport.pending', function (transport) { - var oldSend = transport.send; + // We wait for transport.send to recieve the message that we just + // published before we proceed to disconnecting the transport, to + // make sure that the message got marked as `sendAttempted`. - helper.recordPrivateApi('replace.transport.send'); - transport.send = function (msg, msgCb) { - if (msg.action === 15) { - if (msg.messages[0].name === 'first') { - try { - expect(msg.msgSerial).to.equal(0, 'Expect msgSerial of original message to still be 0'); - expect(msg.messages.length).to.equal( - 1, - 'Expect second message to not have been merged with the attempted message', - ); - } catch (err) { - cb(err); - return; - } - } else if (msg.messages[0].name === 'second') { - try { - expect(msg.msgSerial).to.equal(1, 'Expect msgSerial of new message to be 1'); - } catch (err) { - cb(err); - return; - } - cb(); - } + function (cb) { + async.parallel( + [ + function (cb) { + publishCallback = function (err) { + try { + expect(!err, 'Check publish happened (eventually) without err').to.be.ok; + } catch (err) { + cb(err); + return; } - helper.recordPrivateApi('call.transport.send'); - oldSend.call(transport, msg, msgCb); + cb(); }; - channel.publish('second', null); - }); + }, + function (cb) { + /* After the disconnect, on reconnect, spy on transport.send again */ + helper.recordPrivateApi('listen.connectionManager.transport.pending'); + connectionManager.once('transport.pending', function (transport) { + // TODO does the identity of this transport matter, and can we replace the `transport.pending` check with something external too (e.g. detecting a new connection)? perhaps let's have an EventEmitter interface on the interception context that says when there's a new connection or something + interceptionContext.transformClientMessage = function (msg) { + if (msg.deserialized.action === 15) { + if (msg.deserialized.messages[0].name === 'first') { + try { + expect(msg.deserialized.msgSerial).to.equal( + 0, + 'Expect msgSerial of original message to still be 0', + ); + expect(msg.deserialized.messages.length).to.equal( + 1, + 'Expect second message to not have been merged with the attempted message', + ); + } catch (err) { + cb(err); + return msg.deserialized; + } + } else if (msg.deserialized.messages[0].name === 'second') { + try { + expect(msg.deserialized.msgSerial).to.equal( + 1, + 'Expect msgSerial of new message to be 1', + ); + } catch (err) { + cb(err); + return msg.deserialized; + } + cb(); + } + } + + // preserve the message + return msg.deserialized; + }; + channel.publish('second', null); + }); - /* Disconnect the transport (will automatically reconnect and resume) () */ - helper.recordPrivateApi('call.connectionManager.disconnectAllTransports'); - connectionManager.disconnectAllTransports(); - }, - ], - cb, - ); + /* Disconnect the transport (will automatically reconnect and resume) () */ + helper.recordPrivateApi('call.connectionManager.disconnectAllTransports'); + connectionManager.disconnectAllTransports(); + }, + ], + cb, + ); + }, + ], + function (err) { + helper.closeAndFinish(done, realtime, err); }, - ], - function (err) { - helper.closeAndFinish(done, realtime, err); - }, - ); + ); + }); }); }); });