Skip to content

Commit

Permalink
get subprotocol from client
Browse files Browse the repository at this point in the history
  • Loading branch information
Mahad-10 committed May 8, 2024
1 parent 92759ba commit 636ba01
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 23 deletions.
23 changes: 19 additions & 4 deletions lib/src/helpers.dart
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
import "package:wamp/src/wsjoiner.dart";
import "package:wampproto/messages.dart";
import "package:wampproto/serializers.dart";

const String jsonSubProtocol = "wamp.2.json";
const String cborSubProtocol = "wamp.2.cbor";
const String msgpackSubProtocol = "wamp.2.msgpack";

String getSubProtocol(Serializer serializer) {
if (serializer is JSONSerializer) {
return WAMPSessionJoiner.jsonSubProtocol;
return jsonSubProtocol;
} else if (serializer is CBORSerializer) {
return WAMPSessionJoiner.cborSubProtocol;
return cborSubProtocol;
} else if (serializer is MsgPackSerializer) {
return WAMPSessionJoiner.msgpackSubProtocol;
return msgpackSubProtocol;
} else {
throw ArgumentError("invalid serializer");
}
Expand All @@ -26,3 +29,15 @@ String wampErrorString(Error err) {
}
return errStr;
}

Serializer getSerializer(String? subprotocol) {
if (subprotocol == null || subprotocol == jsonSubProtocol) {
return JSONSerializer();
} else if (subprotocol == cborSubProtocol) {
return CBORSerializer();
} else if (subprotocol == msgpackSubProtocol) {
return MsgPackSerializer();
} else {
throw Exception("invalid websocket subprotocol $subprotocol");
}
}
31 changes: 28 additions & 3 deletions lib/src/server.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import "dart:async";
import "dart:io";

import "package:wamp/exports.dart";
import "package:wamp/src/helpers.dart";
import "package:wamp/src/types.dart";
import "package:wamp/src/wsacceptor.dart";

Expand All @@ -10,13 +11,37 @@ class Server {

Router router;

List<String> supportedProtocols = [jsonSubProtocol, cborSubProtocol, msgpackSubProtocol];

String? protocolSelector(HttpRequest request) {
String? subprotocol = request.headers.value("Sec-WebSocket-Protocol");

if (subprotocol != null) {
List<String> subprotocols = subprotocol.split(",");

subprotocols = subprotocols.map((proto) => proto.trim()).toList();

for (final String sub in subprotocols) {
if (supportedProtocols.contains(sub)) {
return sub;
}
}
}

return null;
}

Future<void> start(String host, int port) async {
var server = await HttpServer.bind(host, port);

await for (final request in server) {
var webSocket = await WebSocketTransformer.upgrade(request);
WAMPSessionAcceptor a = WAMPSessionAcceptor();
BaseSession baseSession = await a.accept(webSocket);
var webSocket = await WebSocketTransformer.upgrade(
request,
protocolSelector: (supportedProtocols) => protocolSelector(request),
);

WAMPSessionAcceptor acceptor = WAMPSessionAcceptor();
BaseSession baseSession = await acceptor.accept(webSocket);
router.attachClient(baseSession);

_handleWebSocket(baseSession, webSocket);
Expand Down
14 changes: 6 additions & 8 deletions lib/src/wsacceptor.dart
Original file line number Diff line number Diff line change
@@ -1,37 +1,35 @@
import "dart:async";
import "dart:io";

import "package:wamp/src/helpers.dart";
import "package:wamp/src/types.dart";
import "package:wampproto/acceptor.dart";
import "package:wampproto/auth.dart";
import "package:wampproto/serializers.dart";

class WAMPSessionAcceptor {
WAMPSessionAcceptor({IServerAuthenticator? authenticator, Serializer? serializer}) {
_serializer = serializer ?? JSONSerializer();
WAMPSessionAcceptor({IServerAuthenticator? authenticator}) {
_authenticator = authenticator;
}

IServerAuthenticator? _authenticator;
late Serializer _serializer;
static const String jsonSubProtocol = "wamp.2.json";
static const String cborSubProtocol = "wamp.2.cbor";
static const String msgpackSubProtocol = "wamp.2.msgpack";

Future<BaseSession> accept(WebSocket ws) async {
Acceptor a = Acceptor(serializer: _serializer, authenticator: _authenticator);
_serializer = getSerializer(ws.protocol);
Acceptor acceptor = Acceptor(serializer: _serializer, authenticator: _authenticator);

Completer<BaseSession> completer = Completer<BaseSession>();

late StreamSubscription<dynamic> wsStreamSubscription;
final sessionStreamController = StreamController.broadcast();

wsStreamSubscription = ws.listen((message) {
MapEntry<Object, bool> received = a.receive(message);
MapEntry<Object, bool> received = acceptor.receive(message);
ws.add(received.key);
if (received.value) {
wsStreamSubscription.onData(sessionStreamController.add);
completer.complete(BaseSession(ws, sessionStreamController, a.getSessionDetails(), _serializer));
completer.complete(BaseSession(ws, sessionStreamController, acceptor.getSessionDetails(), _serializer));
return;
}
});
Expand Down
4 changes: 0 additions & 4 deletions lib/src/wsjoiner.dart
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ class WAMPSessionJoiner {
_authenticator = authenticator;
}

static const String jsonSubProtocol = "wamp.2.json";
static const String cborSubProtocol = "wamp.2.cbor";
static const String msgpackSubProtocol = "wamp.2.msgpack";

IClientAuthenticator? _authenticator;
late Serializer _serializer;

Expand Down
7 changes: 3 additions & 4 deletions test/wamp_session_joiner_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import "dart:typed_data";

import "package:test/test.dart";
import "package:wamp/src/helpers.dart";
import "package:wamp/src/wsjoiner.dart";
import "package:wampproto/messages.dart";
import "package:wampproto/serializers.dart";

Expand All @@ -21,17 +20,17 @@ class TestSerializer implements Serializer {
void main() {
test("jsonSubProtocol", () {
var result = getSubProtocol(JSONSerializer());
expect(result, equals(WAMPSessionJoiner.jsonSubProtocol));
expect(result, equals(jsonSubProtocol));
});

test("cborSubProtocol", () {
var result = getSubProtocol(CBORSerializer());
expect(result, equals(WAMPSessionJoiner.cborSubProtocol));
expect(result, equals(cborSubProtocol));
});

test("msgpackSubProtocol", () {
var result = getSubProtocol(MsgPackSerializer());
expect(result, equals(WAMPSessionJoiner.msgpackSubProtocol));
expect(result, equals(msgpackSubProtocol));
});

test("invalidSerializer", () {
Expand Down

0 comments on commit 636ba01

Please sign in to comment.