Skip to content

Commit

Permalink
Implement progressive call invocations (#70)
Browse files Browse the repository at this point in the history
* Implement progressive call invocations

* Add test for progressive call invocations

* Add progressive call invocations to callee and caller features

* Enable progressive call invocation and call canceling in dealer
  • Loading branch information
muzzammilshahid authored Oct 21, 2024
1 parent fd8f83f commit de559a3
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 12 deletions.
5 changes: 4 additions & 1 deletion acceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ const (

var RouterRoles = map[string]any{ //nolint:gochecknoglobals
"dealer": map[string]any{
"features": map[string]any{},
"features": map[string]any{
FeatureProgressiveCallInvocations: true,
FeatureCallCancelling: true,
},
},
"broker": map[string]any{
"features": map[string]any{},
Expand Down
41 changes: 33 additions & 8 deletions dealer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ const (
OptionProgress = "progress"
)

const (
FeatureProgressiveCallInvocations = "progressive_call_invocations"
FeatureProgressiveCallResults = "progressive_call_results"
FeatureCallCancelling = "call_canceling"
)

type PendingInvocation struct {
RequestID int64
CallerID int64
Expand All @@ -27,11 +33,17 @@ type Registration struct {
InvocationPolicy string
}

type CallMap struct {
CallerID int64
CallID int64
}

type Dealer struct {
sessions map[int64]*SessionDetails
registrationsByProcedure map[string]*Registration
registrationsBySession map[int64]map[int64]*Registration
pendingCalls map[int64]*PendingInvocation
invocationIDbyCall map[CallMap]int64

idGen *SessionScopeIDGenerator
sync.Mutex
Expand All @@ -43,6 +55,7 @@ func NewDealer() *Dealer {
registrationsByProcedure: make(map[string]*Registration),
registrationsBySession: make(map[int64]map[int64]*Registration),
pendingCalls: make(map[int64]*PendingInvocation),
invocationIDbyCall: make(map[CallMap]int64),
idGen: &SessionScopeIDGenerator{},
}
}
Expand Down Expand Up @@ -113,21 +126,33 @@ func (d *Dealer) ReceiveMessage(sessionID int64, msg messages.Message) (*Message
break
}
receiveProgress, _ := call.Options()[OptionReceiveProgress].(bool)

invocationID := d.idGen.NextID()
d.pendingCalls[invocationID] = &PendingInvocation{
RequestID: call.RequestID(),
CallerID: sessionID,
CalleeID: callee,
ReceiveProgress: receiveProgress,
progress, _ := call.Options()[OptionProgress].(bool)

invocationID, ok := d.invocationIDbyCall[CallMap{CallerID: sessionID, CallID: call.RequestID()}]
if !ok || !progress {
invocationID = d.idGen.NextID()
d.pendingCalls[invocationID] = &PendingInvocation{
RequestID: call.RequestID(),
CallerID: sessionID,
CalleeID: callee,
ReceiveProgress: receiveProgress,
Progress: progress,
}
d.invocationIDbyCall[CallMap{CallerID: sessionID, CallID: call.RequestID()}] = invocationID
}

var invocation *messages.Invocation
if call.PayloadIsBinary() && d.sessions[callee].StaticSerializer() {
invocation = messages.NewInvocationBinary(invocationID, regs.ID, nil, call.Payload(),
call.PayloadSerializer())
} else {
details := map[string]any{OptionReceiveProgress: receiveProgress}
details := map[string]any{}
if receiveProgress {
details[OptionReceiveProgress] = receiveProgress
}
if progress {
details[OptionProgress] = progress
}
invocation = messages.NewInvocation(invocationID, regs.ID, details, call.Args(), call.KwArgs())
}

Expand Down
44 changes: 44 additions & 0 deletions dealer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,47 @@ func TestProgressiveCallResults(t *testing.T) {
progress, _ := result.Details()[wampproto.OptionReceiveProgress].(bool)
require.False(t, progress)
}

func TestProgressiveCallInvocations(t *testing.T) {
dealer := wampproto.NewDealer()

callee := wampproto.NewSessionDetails(1, "realm", "authid", "anonymous", false)
caller := wampproto.NewSessionDetails(2, "realm", "authid", "anonymous", false)

err := dealer.AddSession(callee)
require.NoError(t, err)
err = dealer.AddSession(caller)
require.NoError(t, err)

register := messages.NewRegister(3, nil, "foo.bar")
_, err = dealer.ReceiveMessage(callee.ID(), register)
require.NoError(t, err)

call := messages.NewCall(4, map[string]any{wampproto.OptionProgress: true}, "foo.bar", []any{}, nil)
messageWithRecipient, err := dealer.ReceiveMessage(callee.ID(), call)
require.NoError(t, err)
require.Equal(t, callee.ID(), messageWithRecipient.Recipient)

invMessage := messageWithRecipient.Message.(*messages.Invocation)
require.True(t, invMessage.Details()[wampproto.OptionProgress].(bool))

invRequestID := invMessage.RequestID()
for i := 0; i < 10; i++ {
call = messages.NewCall(4, map[string]any{wampproto.OptionProgress: true}, "foo.bar", []any{}, nil)
messageWithRecipient, err = dealer.ReceiveMessage(callee.ID(), call)
require.NoError(t, err)

invMessage = messageWithRecipient.Message.(*messages.Invocation)
require.True(t, invMessage.Details()[wampproto.OptionProgress].(bool))
require.Equal(t, invRequestID, invMessage.RequestID())
}

finalCall := messages.NewCall(4, map[string]any{}, "foo.bar", []any{}, nil)
messageWithRecipient, err = dealer.ReceiveMessage(callee.ID(), finalCall)
require.NoError(t, err)
require.Equal(t, callee.ID(), messageWithRecipient.Recipient)

invocation := messageWithRecipient.Message.(*messages.Invocation)
inProgress, _ := invocation.Details()[wampproto.OptionProgress].(bool)
require.False(t, inProgress)
}
9 changes: 6 additions & 3 deletions joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@ import (

var ClientRoles = map[string]any{ //nolint:gochecknoglobals
"caller": map[string]any{
"features": map[string]any{},
"features": map[string]any{
FeatureProgressiveCallInvocations: true,
},
},
"callee": map[string]any{
"features": map[string]any{
"progressive_call_results": true,
"call_canceling": true,
FeatureProgressiveCallInvocations: true,
FeatureProgressiveCallResults: true,
FeatureCallCancelling: true,
},
},
"publisher": map[string]any{
Expand Down

0 comments on commit de559a3

Please sign in to comment.