Skip to content

Commit

Permalink
Basic router (#5)
Browse files Browse the repository at this point in the history
* raw router

* initial working router

* remove redundant code

* take host name & port from user

* dont convert message to string for json serializer

* remove redundant imports

* use base session receive message

* run server until websocket connection is not closed

* get subprotocol from client

* make class variables private
  • Loading branch information
Mahad-10 authored May 8, 2024
1 parent 4c33c10 commit c8b6c3e
Show file tree
Hide file tree
Showing 10 changed files with 318 additions and 17 deletions.
1 change: 1 addition & 0 deletions lib/exports.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export "src/client.dart" show Client;
export "src/router.dart" show Router;
export "src/session.dart" show Session;
export "src/types.dart" show Event, Invocation, Registration, Result, Subscription;
23 changes: 19 additions & 4 deletions lib/src/helpers.dart
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
import "package:wamp/src/wsjoiner.dart";
import "package:wampproto/messages.dart";
import "package:wampproto/serializers.dart";

const String jsonSubProtocol = "wamp.2.json";
const String cborSubProtocol = "wamp.2.cbor";
const String msgpackSubProtocol = "wamp.2.msgpack";

String getSubProtocol(Serializer serializer) {
if (serializer is JSONSerializer) {
return WAMPSessionJoiner.jsonSubProtocol;
return jsonSubProtocol;
} else if (serializer is CBORSerializer) {
return WAMPSessionJoiner.cborSubProtocol;
return cborSubProtocol;
} else if (serializer is MsgPackSerializer) {
return WAMPSessionJoiner.msgpackSubProtocol;
return msgpackSubProtocol;
} else {
throw ArgumentError("invalid serializer");
}
Expand All @@ -26,3 +29,15 @@ String wampErrorString(Error err) {
}
return errStr;
}

Serializer getSerializer(String? subprotocol) {
if (subprotocol == null || subprotocol == jsonSubProtocol) {
return JSONSerializer();
} else if (subprotocol == cborSubProtocol) {
return CBORSerializer();
} else if (subprotocol == msgpackSubProtocol) {
return MsgPackSerializer();
} else {
throw Exception("invalid websocket subprotocol $subprotocol");
}
}
64 changes: 64 additions & 0 deletions lib/src/realm.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import "package:wamp/src/types.dart";
import "package:wampproto/broker.dart";
import "package:wampproto/dealer.dart";
import "package:wampproto/messages.dart";
import "package:wampproto/types.dart";

class Realm {
final Dealer _dealer = Dealer();
final Broker _broker = Broker();

final Map<int, IBaseSession> _clients = {};

void attachClient(IBaseSession base) {
_clients[base.id()] = base;
_dealer.addSession(base.id());
_broker.addSession(base.id());
}

void detachClient(IBaseSession base) {
_clients.remove(base.id());
_broker.removeSession(base.id());
_dealer.removeSession(base.id());
}

void stop() {
// stop will disconnect all clients.
}

Future<void> receiveMessage(int sessionID, Message msg) async {
switch (msg.messageType()) {
case Call.id || Yield.id || Register.id || UnRegister.id:
MessageWithRecipient recipient = _dealer.receiveMessage(sessionID, msg);
var client = _clients[recipient.recipient];
client?.sendMessage(recipient.message);

case Publish.id:
List<MessageWithRecipient>? recipients = _broker.receiveMessage(sessionID, msg);
if (recipients == null) {
return;
}

for (final recipient in recipients) {
var client = _clients[recipient.recipient];
client?.sendMessage(recipient.message);
}

case Subscribe.id || UnSubscribe.id:
List<MessageWithRecipient>? recipients = _broker.receiveMessage(sessionID, msg);
if (recipients == null) {
throw Exception("recipients null");
}
MessageWithRecipient recipient = recipients[0];
var client = _clients[recipient.recipient];
client?.sendMessage(recipient.message);

case Goodbye.id:
_dealer.removeSession(sessionID);
_broker.removeSession(sessionID);
var client = _clients[sessionID];
await client?.close();
_clients.remove(sessionID);
}
}
}
46 changes: 46 additions & 0 deletions lib/src/router.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import "package:wamp/src/realm.dart";
import "package:wamp/src/types.dart";
import "package:wampproto/messages.dart";

class Router {
final Map<String, Realm> _realms = {};

void addRealm(String name) {
_realms[name] = Realm();
}

void removeRealm(String name) {
_realms.remove(name);
}

bool hasRealm(String name) {
return _realms.containsKey(name);
}

void attachClient(IBaseSession baseSession) {
String realm = baseSession.realm();
if (!_realms.containsKey(realm)) {
throw Exception("cannot attach client to non-existent realm $realm");
}

_realms[realm]?.attachClient(baseSession);
}

void detachClient(IBaseSession baseSession) {
String realm = baseSession.realm();
if (!_realms.containsKey(realm)) {
throw Exception("cannot detach client from non-existent realm $realm");
}

_realms[realm]?.detachClient(baseSession);
}

Future<void> receiveMessage(IBaseSession baseSession, Message msg) async {
String realm = baseSession.realm();
if (!_realms.containsKey(realm)) {
throw Exception("cannot process message for non-existent realm $realm");
}

await _realms[realm]?.receiveMessage(baseSession.id(), msg);
}
}
59 changes: 59 additions & 0 deletions lib/src/server.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import "dart:async";
import "dart:io";

import "package:wamp/exports.dart";
import "package:wamp/src/helpers.dart";
import "package:wamp/src/types.dart";
import "package:wamp/src/wsacceptor.dart";

class Server {
Server(this._router);

final Router _router;

List<String> supportedProtocols = [jsonSubProtocol, cborSubProtocol, msgpackSubProtocol];

String? protocolSelector(HttpRequest request) {
String? subprotocol = request.headers.value("Sec-WebSocket-Protocol");

if (subprotocol != null) {
List<String> subprotocols = subprotocol.split(",");

subprotocols = subprotocols.map((proto) => proto.trim()).toList();

for (final String sub in subprotocols) {
if (supportedProtocols.contains(sub)) {
return sub;
}
}
}

return null;
}

Future<void> start(String host, int port) async {
var server = await HttpServer.bind(host, port);

await for (final request in server) {
var webSocket = await WebSocketTransformer.upgrade(
request,
protocolSelector: (supportedProtocols) => protocolSelector(request),
);

WAMPSessionAcceptor acceptor = WAMPSessionAcceptor();
BaseSession baseSession = await acceptor.accept(webSocket);
_router.attachClient(baseSession);

_handleWebSocket(baseSession, webSocket);
}
}

void _handleWebSocket(BaseSession baseSession, WebSocket webSocket) {
Future.microtask(() async {
while (webSocket.closeCode == null) {
var message = await baseSession.receiveMessage();
await _router.receiveMessage(baseSession, message);
}
});
}
}
4 changes: 2 additions & 2 deletions lib/src/session.dart
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,10 @@ class Session {
}

Future<void> unsubscribe(Subscription sub) {
var unsubscribe = msg.UnSubscribe(_nextID, sub.subscriptionId);
var unsubscribe = msg.UnSubscribe(_nextID, sub.subscriptionID);

var completer = Completer<void>();
_unsubscribeRequests[unsubscribe.requestID] = UnsubscribeRequest(completer, sub.subscriptionId);
_unsubscribeRequests[unsubscribe.requestID] = UnsubscribeRequest(completer, sub.subscriptionID);
_baseSession.send(_wampSession.sendMessage(unsubscribe));

return completer.future;
Expand Down
79 changes: 76 additions & 3 deletions lib/src/types.dart
Original file line number Diff line number Diff line change
@@ -1,25 +1,98 @@
import "dart:async";
import "dart:io";
import "dart:typed_data";

import "package:wampproto/messages.dart";
import "package:wampproto/serializers.dart";
import "package:wampproto/session.dart";

class BaseSession {
abstract class IBaseSession {
int id() {
throw UnimplementedError();
}

String realm() {
throw UnimplementedError();
}

String authid() {
throw UnimplementedError();
}

String authrole() {
throw UnimplementedError();
}

void send(Uint8List data) {
throw UnimplementedError();
}

Future<Object> receive() async {
throw UnimplementedError();
}

void sendMessage(Message msg) {
throw UnimplementedError();
}

Future<Message> receiveMessage() async {
throw UnimplementedError();
}

Future<void> close() async {
throw UnimplementedError();
}
}

class BaseSession extends IBaseSession {
BaseSession(this._ws, this._wsStreamController, this.sessionDetails, this.serializer);

final WebSocket _ws;
final StreamController _wsStreamController;
SessionDetails sessionDetails;
Serializer serializer;

@override
int id() {
return sessionDetails.sessionID;
}

@override
String realm() {
return sessionDetails.realm;
}

@override
String authid() {
return sessionDetails.authid;
}

@override
String authrole() {
return sessionDetails.authrole;
}

@override
void send(Object data) {
_ws.add(data);
}

@override
void sendMessage(Message msg) {
send(serializer.serialize(msg));
}

@override
Future<Object> receive() async {
return await _wsStreamController.stream.first;
}

@override
Future<Message> receiveMessage() async {
return serializer.deserialize(await receive());
}

@override
Future<void> close() async {
await _ws.close();
}
Expand Down Expand Up @@ -74,9 +147,9 @@ class UnregisterRequest {
}

class Subscription {
Subscription(this.subscriptionId);
Subscription(this.subscriptionID);

final int subscriptionId;
final int subscriptionID;
}

class SubscribeRequest {
Expand Down
48 changes: 48 additions & 0 deletions lib/src/wsacceptor.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import "dart:async";
import "dart:io";

import "package:wamp/src/helpers.dart";
import "package:wamp/src/types.dart";
import "package:wampproto/acceptor.dart";
import "package:wampproto/auth.dart";
import "package:wampproto/serializers.dart";

class WAMPSessionAcceptor {
WAMPSessionAcceptor({IServerAuthenticator? authenticator}) {
_authenticator = authenticator;
}

IServerAuthenticator? _authenticator;
late Serializer _serializer;

Future<BaseSession> accept(WebSocket ws) async {
_serializer = getSerializer(ws.protocol);
Acceptor acceptor = Acceptor(serializer: _serializer, authenticator: _authenticator);

Completer<BaseSession> completer = Completer<BaseSession>();

late StreamSubscription<dynamic> wsStreamSubscription;
final sessionStreamController = StreamController.broadcast();

wsStreamSubscription = ws.listen((message) {
MapEntry<Object, bool> received = acceptor.receive(message);
ws.add(received.key);
if (received.value) {
wsStreamSubscription.onData(sessionStreamController.add);
completer.complete(BaseSession(ws, sessionStreamController, acceptor.getSessionDetails(), _serializer));
return;
}
});

wsStreamSubscription.onDone(() {
sessionStreamController.stream.isEmpty.then(
(isEmpty) => {
if (!isEmpty) {sessionStreamController.close()},
},
);
wsStreamSubscription.cancel();
});

return completer.future;
}
}
4 changes: 0 additions & 4 deletions lib/src/wsjoiner.dart
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ class WAMPSessionJoiner {
_authenticator = authenticator;
}

static const String jsonSubProtocol = "wamp.2.json";
static const String cborSubProtocol = "wamp.2.cbor";
static const String msgpackSubProtocol = "wamp.2.msgpack";

IClientAuthenticator? _authenticator;
late Serializer _serializer;

Expand Down
Loading

0 comments on commit c8b6c3e

Please sign in to comment.