Skip to content

Commit

Permalink
implement progressive call results
Browse files Browse the repository at this point in the history
  • Loading branch information
muzzammilshahid committed Mar 21, 2024
1 parent 23673d3 commit 2fd5a49
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 36 deletions.
73 changes: 51 additions & 22 deletions autobahn/src/main/java/io/crossbar/autobahn/wamp/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -322,23 +322,32 @@ private void onPreSessionMessage(IMessage message) throws Exception {
private void onMessage(IMessage message) throws Exception {
if (message instanceof Result) {
Result msg = (Result) message;

CallRequest request = getOrDefault(mCallRequests, msg.request, null);
if (request == null) {
throw new ProtocolError(String.format(
"RESULT received for non-pending request ID %s", msg.request));
}

mCallRequests.remove(msg.request);
if (request.resultTypeRef != null) {
// FIXME: check args length > 1 and == 0, and kwargs != null
// we cannot currently POJO automap these cases!
request.onReply.complete(mSerializer.convertValue(
msg.args.get(0), request.resultTypeRef));
} else if (request.resultTypeClass != null) {
request.onReply.complete(mSerializer.convertValue(
msg.args.get(0), request.resultTypeClass));
if (msg.options.containsKey("progress") && (Boolean) msg.options.get("progress")) {
if (request.options.progressHandler == null) {
throw new ProtocolError("Caller not accepting progressive call result");
}

request.options.progressHandler.onProgress(new CallResult(msg.args, msg.kwargs));
} else {
request.onReply.complete(new CallResult(msg.args, msg.kwargs));
mCallRequests.remove(msg.request);
if (request.resultTypeRef != null) {
// FIXME: check args length > 1 and == 0, and kwargs != null
// we cannot currently POJO automap these cases!
request.onReply.complete(mSerializer.convertValue(
msg.args.get(0), request.resultTypeRef));
} else if (request.resultTypeClass != null) {
request.onReply.complete(mSerializer.convertValue(
msg.args.get(0), request.resultTypeClass));
} else {
request.onReply.complete(new CallResult(msg.args, msg.kwargs));
}
}
} else if (message instanceof Subscribed) {
Subscribed msg = (Subscribed) message;
Expand Down Expand Up @@ -452,10 +461,21 @@ private void onMessage(IMessage message) throws Exception {
long callerSessionID = getOrDefault(msg.details, "caller", -1L);
String callerAuthID = getOrDefault(msg.details, "caller_authid", null);
String callerAuthRole = getOrDefault(msg.details, "caller_authrole", null);

InvocationDetails details = new InvocationDetails(
registration, registration.procedure, callerSessionID, callerAuthID, callerAuthRole, this);

Boolean progress = getOrDefault(msg.details, "receive_progress", false);
InvocationDetails details;
if (progress) {
details = new InvocationDetails(
registration, registration.procedure, callerSessionID, callerAuthID, callerAuthRole, this,
(args, kwargs) -> {
HashMap<String, Object> options = new HashMap<>();
options.put("progress", true);
send(new Yield(msg.request, args, kwargs, options));
});
} else {
details = new InvocationDetails(
registration, registration.procedure, callerSessionID, callerAuthID, callerAuthRole, this, null);
}
runAsync(() -> {
Object result;
if (registration.endpoint instanceof Supplier) {
Expand Down Expand Up @@ -494,22 +514,22 @@ private void onMessage(IMessage message) throws Exception {
}

} else {
send(new Yield(msg.request, invocRes.results, invocRes.kwresults));
send(new Yield(msg.request, invocRes.results, invocRes.kwresults, null));
}
}, mExecutor);
} else if (result instanceof InvocationResult) {
InvocationResult res = (InvocationResult) result;
send(new Yield(msg.request, res.results, res.kwresults));
send(new Yield(msg.request, res.results, res.kwresults, null));
} else if (result instanceof List) {
send(new Yield(msg.request, (List) result, null));
send(new Yield(msg.request, (List) result, null, null));
} else if (result instanceof Map) {
send(new Yield(msg.request, null, (Map) result));
send(new Yield(msg.request, null, (Map) result, null));
} else if (result instanceof Void) {
send(new Yield(msg.request, null, null));
send(new Yield(msg.request, null, null, null));
} else {
List<Object> item = new ArrayList<>();
item.add(result);
send(new Yield(msg.request, item, null));
send(new Yield(msg.request, item, null, null));
}
}, mExecutor).whenCompleteAsync((aVoid, throwable) -> {
// FIXME: implement better errors
Expand Down Expand Up @@ -1082,9 +1102,10 @@ private <T> CompletableFuture<T> reallyCall(
resultTypeReference, resultTypeClass));

if (options == null) {
send(new Call(requestID, procedure, args, kwargs, 0));
send(new Call(requestID, procedure, args, kwargs, 0, false));
} else {
send(new Call(requestID, procedure, args, kwargs, options.timeout));
boolean receiveProgress = options.progressHandler != null;
send(new Call(requestID, procedure, args, kwargs, options.timeout, receiveProgress));
}
return future;
}
Expand Down Expand Up @@ -1286,7 +1307,15 @@ private CompletableFuture<SessionDetails> reallyJoin(
roles.put("publisher", new HashMap<>());
roles.put("subscriber", new HashMap<>());
roles.put("caller", new HashMap<>());
roles.put("callee", new HashMap<>());

Map<String, Object> calleeFeatures = new HashMap<>();
calleeFeatures.put("progressive_call_results", true);
calleeFeatures.put("call_canceling", true);

Map<String, Object> callee = new HashMap<>();
callee.put("features", calleeFeatures);
roles.put("callee", callee);

if (mAuthenticators == null) {
send(new Hello(realm, roles));
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.crossbar.autobahn.wamp.interfaces;

import java.util.List;
import java.util.Map;

public interface Progress {
void sendProgress(List<Object> args, Map<String, Object> kwargs);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.crossbar.autobahn.wamp.interfaces;

import io.crossbar.autobahn.wamp.types.CallResult;

public interface ProgressHandler {
void onProgress(CallResult result);
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ public class Call implements IMessage {
public final List<Object> args;
public final Map<String, Object> kwargs;
public final int timeout;
public final boolean receiveProgress;

public Call(long request, String procedure, List<Object> args, Map<String, Object> kwargs, int timeout) {
public Call(long request, String procedure, List<Object> args, Map<String, Object> kwargs,
int timeout, boolean receiveProgress) {
this.request = request;
this.procedure = procedure;
this.args = args;
Expand All @@ -45,6 +47,7 @@ public Call(long request, String procedure, List<Object> args, Map<String, Objec
} else {
this.timeout = timeout;
}
this.receiveProgress = receiveProgress;
}

public static Call parse(List<Object> wmsg) {
Expand All @@ -69,7 +72,9 @@ public static Call parse(List<Object> wmsg) {

int timeout = getOrDefault(options, "timeout", TIMEOUT_DEFAULT);

return new Call(request, procedure, args, kwargs, timeout);
boolean receiveProgress = getOrDefault(options, "receive_progress", false);

return new Call(request, procedure, args, kwargs, timeout, receiveProgress);
}

@Override
Expand All @@ -78,6 +83,7 @@ public List<Object> marshal() {
marshaled.add(MESSAGE_TYPE);
marshaled.add(request);
Map<String, Object> options = new HashMap<>();
options.put("receive_progress", receiveProgress);
if (timeout > TIMEOUT_DEFAULT) {
options.put("timeout", timeout);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,20 @@ public class Result implements IMessage {
public final long request;
public final List<Object> args;
public final Map<String, Object> kwargs;
public final Map<String, Object> options;

public Result(long request, List<Object> args, Map<String, Object> kwargs) {
public Result(long request, List<Object> args, Map<String, Object> kwargs, Map<String, Object> options) {
this.request = request;
this.args = args;
this.kwargs = kwargs;
this.options = options;
}

public static Result parse(List<Object> wmsg) {
MessageUtil.validateMessage(wmsg, MESSAGE_TYPE, "RESULT", 3, 5);

long request = MessageUtil.parseLong(wmsg.get(1));
Map<String, Object> options = (Map<String, Object>) wmsg.get(2);
List<Object> args = null;
if (wmsg.size() > 3) {
if (wmsg.get(3) instanceof byte[]) {
Expand All @@ -48,15 +51,19 @@ public static Result parse(List<Object> wmsg) {
if (wmsg.size() > 4) {
kwargs = (Map<String, Object>) wmsg.get(4);
}
return new Result(request, args, kwargs);
return new Result(request, args, kwargs, options);
}

@Override
public List<Object> marshal() {
List<Object> marshaled = new ArrayList<>();
marshaled.add(MESSAGE_TYPE);
marshaled.add(request);
marshaled.add(Collections.emptyMap());
if (options == null) {
marshaled.add(Collections.emptyMap());
} else {
marshaled.add(options);
}
if (kwargs != null) {
if (args == null) {
// Empty args.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ public class Yield implements IMessage {
public final long request;
public final List<Object> args;
public final Map<String, Object> kwargs;
public final Map<String, Object> options;

public Yield(long request, List<Object> args, Map<String, Object> kwargs) {
public Yield(long request, List<Object> args, Map<String, Object> kwargs, Map<String, Object> options) {
this.request = request;
this.args = args;
this.kwargs = kwargs;
this.options = options;
}

public static Yield parse(List<Object> wmsg) {
Expand All @@ -50,16 +52,19 @@ public static Yield parse(List<Object> wmsg) {
if (wmsg.size() > 4) {
kwargs = (Map<String, Object>) wmsg.get(4);
}
return new Yield(MessageUtil.parseLong(wmsg.get(1)), args, kwargs);
return new Yield(MessageUtil.parseLong(wmsg.get(1)), args, kwargs, options);
}

@Override
public List<Object> marshal() {
List<Object> marshaled = new ArrayList<>();
marshaled.add(MESSAGE_TYPE);
marshaled.add(request);
// Empty options.
marshaled.add(Collections.emptyMap());
if (options == null) {
marshaled.add(Collections.emptyMap());
} else {
marshaled.add(options);
}
if (kwargs != null) {
if (args == null) {
// Empty args.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,22 @@

package io.crossbar.autobahn.wamp.types;

import io.crossbar.autobahn.wamp.interfaces.ProgressHandler;

public class CallOptions {
public final int timeout;
public int timeout;
public ProgressHandler progressHandler;

public CallOptions(int timeout) {
this.timeout = timeout;
}

public CallOptions(ProgressHandler progressHandler) {
this.progressHandler = progressHandler;
}

public CallOptions(int timeout, ProgressHandler progressHandler) {
this.timeout = timeout;
this.progressHandler = progressHandler;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
package io.crossbar.autobahn.wamp.types;

import io.crossbar.autobahn.wamp.Session;
import io.crossbar.autobahn.wamp.interfaces.Progress;

public class InvocationDetails {

Expand All @@ -33,18 +34,18 @@ public class InvocationDetails {
// The WAMP session on which this event is delivered.
public final Session session;

// FIXME
// we need a progress() callback here to allow
// the user to produce progressive results.
// callback produce progressive results.
public final Progress progress;

// XXXX - Tentative, the constructor parameter order may change.
public InvocationDetails(Registration registration, String procedure, long callerSessionID,
String callerAuthID, String callerAuthRole, Session session) {
String callerAuthID, String callerAuthRole, Session session, Progress progress) {
this.registration = registration;
this.procedure = procedure;
this.callerSessionID = callerSessionID;
this.callerAuthID = callerAuthID;
this.callerAuthRole = callerAuthRole;
this.session = session;
this.progress = progress;
}
}

0 comments on commit 2fd5a49

Please sign in to comment.