Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic router #5

Merged
merged 10 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/exports.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export "src/client.dart" show Client;
export "src/router.dart" show Router;
export "src/session.dart" show Session;
export "src/types.dart" show Event, Invocation, Registration, Result, Subscription;
23 changes: 19 additions & 4 deletions lib/src/helpers.dart
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
import "package:wamp/src/wsjoiner.dart";
import "package:wampproto/messages.dart";
import "package:wampproto/serializers.dart";

const String jsonSubProtocol = "wamp.2.json";
const String cborSubProtocol = "wamp.2.cbor";
const String msgpackSubProtocol = "wamp.2.msgpack";

String getSubProtocol(Serializer serializer) {
if (serializer is JSONSerializer) {
return WAMPSessionJoiner.jsonSubProtocol;
return jsonSubProtocol;
} else if (serializer is CBORSerializer) {
return WAMPSessionJoiner.cborSubProtocol;
return cborSubProtocol;
} else if (serializer is MsgPackSerializer) {
return WAMPSessionJoiner.msgpackSubProtocol;
return msgpackSubProtocol;
} else {
throw ArgumentError("invalid serializer");
}
Expand All @@ -26,3 +29,15 @@ String wampErrorString(Error err) {
}
return errStr;
}

Serializer getSerializer(String? subprotocol) {
if (subprotocol == null || subprotocol == jsonSubProtocol) {
return JSONSerializer();
} else if (subprotocol == cborSubProtocol) {
return CBORSerializer();
} else if (subprotocol == msgpackSubProtocol) {
return MsgPackSerializer();
} else {
throw Exception("invalid websocket subprotocol $subprotocol");
}
}
64 changes: 64 additions & 0 deletions lib/src/realm.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import "package:wamp/src/types.dart";
import "package:wampproto/broker.dart";
import "package:wampproto/dealer.dart";
import "package:wampproto/messages.dart";
import "package:wampproto/types.dart";

class Realm {
final Dealer _dealer = Dealer();
final Broker _broker = Broker();

final Map<int, IBaseSession> _clients = {};

void attachClient(IBaseSession base) {
_clients[base.id()] = base;
_dealer.addSession(base.id());
_broker.addSession(base.id());
}

void detachClient(IBaseSession base) {
_clients.remove(base.id());
_broker.removeSession(base.id());
_dealer.removeSession(base.id());
}

void stop() {
// stop will disconnect all clients.
}

Future<void> receiveMessage(int sessionID, Message msg) async {
switch (msg.messageType()) {
case Call.id || Yield.id || Register.id || UnRegister.id:
MessageWithRecipient recipient = _dealer.receiveMessage(sessionID, msg);
var client = _clients[recipient.recipient];
client?.sendMessage(recipient.message);

case Publish.id:
List<MessageWithRecipient>? recipients = _broker.receiveMessage(sessionID, msg);
if (recipients == null) {
return;
}

for (final recipient in recipients) {
var client = _clients[recipient.recipient];
client?.sendMessage(recipient.message);
}

case Subscribe.id || UnSubscribe.id:
List<MessageWithRecipient>? recipients = _broker.receiveMessage(sessionID, msg);
if (recipients == null) {
throw Exception("recipients null");
}
MessageWithRecipient recipient = recipients[0];
var client = _clients[recipient.recipient];
client?.sendMessage(recipient.message);

case Goodbye.id:
_dealer.removeSession(sessionID);
_broker.removeSession(sessionID);
var client = _clients[sessionID];
await client?.close();
_clients.remove(sessionID);
}
}
}
46 changes: 46 additions & 0 deletions lib/src/router.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import "package:wamp/src/realm.dart";
import "package:wamp/src/types.dart";
import "package:wampproto/messages.dart";

class Router {
final Map<String, Realm> _realms = {};

void addRealm(String name) {
_realms[name] = Realm();
}

void removeRealm(String name) {
_realms.remove(name);
}

bool hasRealm(String name) {
return _realms.containsKey(name);
}

void attachClient(IBaseSession baseSession) {
String realm = baseSession.realm();
if (!_realms.containsKey(realm)) {
throw Exception("cannot attach client to non-existent realm $realm");
}

_realms[realm]?.attachClient(baseSession);
}

void detachClient(IBaseSession baseSession) {
String realm = baseSession.realm();
if (!_realms.containsKey(realm)) {
throw Exception("cannot detach client from non-existent realm $realm");
}

_realms[realm]?.detachClient(baseSession);
}

Future<void> receiveMessage(IBaseSession baseSession, Message msg) async {
String realm = baseSession.realm();
if (!_realms.containsKey(realm)) {
throw Exception("cannot process message for non-existent realm $realm");
}

await _realms[realm]?.receiveMessage(baseSession.id(), msg);
}
}
59 changes: 59 additions & 0 deletions lib/src/server.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import "dart:async";
import "dart:io";

import "package:wamp/exports.dart";
import "package:wamp/src/helpers.dart";
import "package:wamp/src/types.dart";
import "package:wamp/src/wsacceptor.dart";

class Server {
Server(this._router);

final Router _router;

List<String> supportedProtocols = [jsonSubProtocol, cborSubProtocol, msgpackSubProtocol];

String? protocolSelector(HttpRequest request) {
String? subprotocol = request.headers.value("Sec-WebSocket-Protocol");

if (subprotocol != null) {
List<String> subprotocols = subprotocol.split(",");

subprotocols = subprotocols.map((proto) => proto.trim()).toList();

for (final String sub in subprotocols) {
if (supportedProtocols.contains(sub)) {
return sub;
}
}
}

return null;
}

Future<void> start(String host, int port) async {
var server = await HttpServer.bind(host, port);

await for (final request in server) {
var webSocket = await WebSocketTransformer.upgrade(
request,
protocolSelector: (supportedProtocols) => protocolSelector(request),
);

WAMPSessionAcceptor acceptor = WAMPSessionAcceptor();
BaseSession baseSession = await acceptor.accept(webSocket);
_router.attachClient(baseSession);

_handleWebSocket(baseSession, webSocket);
}
}

void _handleWebSocket(BaseSession baseSession, WebSocket webSocket) {
Future.microtask(() async {
while (webSocket.closeCode == null) {
var message = await baseSession.receiveMessage();
await _router.receiveMessage(baseSession, message);
}
});
}
}
4 changes: 2 additions & 2 deletions lib/src/session.dart
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,10 @@ class Session {
}

Future<void> unsubscribe(Subscription sub) {
var unsubscribe = msg.UnSubscribe(_nextID, sub.subscriptionId);
var unsubscribe = msg.UnSubscribe(_nextID, sub.subscriptionID);

var completer = Completer<void>();
_unsubscribeRequests[unsubscribe.requestID] = UnsubscribeRequest(completer, sub.subscriptionId);
_unsubscribeRequests[unsubscribe.requestID] = UnsubscribeRequest(completer, sub.subscriptionID);
_baseSession.send(_wampSession.sendMessage(unsubscribe));

return completer.future;
Expand Down
79 changes: 76 additions & 3 deletions lib/src/types.dart
Original file line number Diff line number Diff line change
@@ -1,25 +1,98 @@
import "dart:async";
import "dart:io";
import "dart:typed_data";

import "package:wampproto/messages.dart";
import "package:wampproto/serializers.dart";
import "package:wampproto/session.dart";

class BaseSession {
abstract class IBaseSession {
int id() {
throw UnimplementedError();
}

String realm() {
throw UnimplementedError();
}

String authid() {
throw UnimplementedError();
}

String authrole() {
throw UnimplementedError();
}

void send(Uint8List data) {
throw UnimplementedError();
}

Future<Object> receive() async {
throw UnimplementedError();
}

void sendMessage(Message msg) {
throw UnimplementedError();
}

Future<Message> receiveMessage() async {
throw UnimplementedError();
}

Future<void> close() async {
throw UnimplementedError();
}
}

class BaseSession extends IBaseSession {
BaseSession(this._ws, this._wsStreamController, this.sessionDetails, this.serializer);

final WebSocket _ws;
final StreamController _wsStreamController;
SessionDetails sessionDetails;
Serializer serializer;

@override
int id() {
return sessionDetails.sessionID;
}

@override
String realm() {
return sessionDetails.realm;
}

@override
String authid() {
return sessionDetails.authid;
}

@override
String authrole() {
return sessionDetails.authrole;
}

@override
void send(Object data) {
_ws.add(data);
}

@override
void sendMessage(Message msg) {
send(serializer.serialize(msg));
}

@override
Future<Object> receive() async {
return await _wsStreamController.stream.first;
}

@override
Future<Message> receiveMessage() async {
return serializer.deserialize(await receive());
}

@override
Future<void> close() async {
await _ws.close();
}
Expand Down Expand Up @@ -74,9 +147,9 @@ class UnregisterRequest {
}

class Subscription {
Subscription(this.subscriptionId);
Subscription(this.subscriptionID);

final int subscriptionId;
final int subscriptionID;
}

class SubscribeRequest {
Expand Down
48 changes: 48 additions & 0 deletions lib/src/wsacceptor.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import "dart:async";
import "dart:io";

import "package:wamp/src/helpers.dart";
import "package:wamp/src/types.dart";
import "package:wampproto/acceptor.dart";
import "package:wampproto/auth.dart";
import "package:wampproto/serializers.dart";

class WAMPSessionAcceptor {
WAMPSessionAcceptor({IServerAuthenticator? authenticator}) {
_authenticator = authenticator;
}

IServerAuthenticator? _authenticator;
late Serializer _serializer;

Future<BaseSession> accept(WebSocket ws) async {
_serializer = getSerializer(ws.protocol);
Acceptor acceptor = Acceptor(serializer: _serializer, authenticator: _authenticator);

Completer<BaseSession> completer = Completer<BaseSession>();

late StreamSubscription<dynamic> wsStreamSubscription;
final sessionStreamController = StreamController.broadcast();

wsStreamSubscription = ws.listen((message) {
MapEntry<Object, bool> received = acceptor.receive(message);
ws.add(received.key);
if (received.value) {
wsStreamSubscription.onData(sessionStreamController.add);
completer.complete(BaseSession(ws, sessionStreamController, acceptor.getSessionDetails(), _serializer));
return;
}
});

wsStreamSubscription.onDone(() {
sessionStreamController.stream.isEmpty.then(
(isEmpty) => {
if (!isEmpty) {sessionStreamController.close()},
},
);
wsStreamSubscription.cancel();
});

return completer.future;
}
}
4 changes: 0 additions & 4 deletions lib/src/wsjoiner.dart
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ class WAMPSessionJoiner {
_authenticator = authenticator;
}

static const String jsonSubProtocol = "wamp.2.json";
static const String cborSubProtocol = "wamp.2.cbor";
static const String msgpackSubProtocol = "wamp.2.msgpack";

IClientAuthenticator? _authenticator;
late Serializer _serializer;

Expand Down
Loading