Skip to content

Commit

Permalink
Merge pull request #56 from muzzammilshahid/broker-dealer-tests
Browse files Browse the repository at this point in the history
Add more tests for broker, dealer and session
  • Loading branch information
muzzammilshahid authored Jul 10, 2024
2 parents d40cf39 + d3c66ef commit 2aa1545
Show file tree
Hide file tree
Showing 3 changed files with 213 additions and 23 deletions.
101 changes: 101 additions & 0 deletions broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,106 @@ func TestBrokerPublish(t *testing.T) {
require.Equal(t, publication.Ack.Recipient, details.ID())
require.Equal(t, publication.Ack.Message.Type(), messages.MessageTypePublished)
require.Nil(t, publication.Event)
require.Len(t, publication.Recipients, 0)
})

t.Run("WithSubscriber", func(t *testing.T) {
subDetails := wampproto.NewSessionDetails(2, "realm", "authid", "anonymous", false)
err = broker.AddSession(subDetails)
require.NoError(t, err)

subscribe := messages.NewSubscribe(2, nil, "foo.bar")
msgWithRecipient, err := broker.ReceiveMessage(subDetails.ID(), subscribe)
require.NoError(t, err)
require.Equal(t, msgWithRecipient.Recipient, subDetails.ID())
require.Equal(t, msgWithRecipient.Message.Type(), messages.MessageTypeSubscribed)

publish := messages.NewPublish(3, options, "foo.bar", args, kwArgs)
publication, err := broker.ReceivePublish(details.ID(), publish)
require.NoError(t, err)
require.NotNil(t, publication)

require.Equal(t, publication.Ack.Recipient, details.ID())
require.Equal(t, publication.Ack.Message.Type(), messages.MessageTypePublished)
require.NotNil(t, publication.Event)
require.Len(t, publication.Recipients, 1)
})

t.Run("WithoutAcknowledge", func(t *testing.T) {
publish := messages.NewPublish(4, map[string]any{}, "foo.bar", args, kwArgs)
publication, err := broker.ReceivePublish(details.ID(), publish)
require.NoError(t, err)
require.NotNil(t, publication)
require.Nil(t, publication.Ack)
})

t.Run("InvalidSessionID", func(t *testing.T) {
publish := messages.NewPublish(1, options, "foo.bar", args, kwArgs)
_, err = broker.ReceivePublish(5, publish)
require.EqualError(t, err, "broker: cannot publish, session 5 doesn't exist")
})
}

func TestBrokerSubscribeUnsubscribe(t *testing.T) {
broker := wampproto.NewBroker()

subDetails := wampproto.NewSessionDetails(1, "realm", "authid", "anonymous", false)
err := broker.AddSession(subDetails)
require.NoError(t, err)

var subscriptionID int64
t.Run("Subscribe", func(t *testing.T) {
subscribe := messages.NewSubscribe(1, nil, "foo.bar")
msgWithRecipient, err := broker.ReceiveMessage(subDetails.ID(), subscribe)
require.NoError(t, err)
require.Equal(t, msgWithRecipient.Recipient, subDetails.ID())
require.Equal(t, msgWithRecipient.Message.Type(), messages.MessageTypeSubscribed)

subscriptionID = msgWithRecipient.Message.(*messages.Subscribed).SubscriptionID()
})

t.Run("PublishAndReceiveEvent", func(t *testing.T) {
pubDetails := wampproto.NewSessionDetails(2, "realm", "authid", "anonymous", false)
err = broker.AddSession(pubDetails)
require.NoError(t, err)

publish := messages.NewPublish(2, map[string]any{wampproto.OptAcknowledge: true}, "foo.bar", []any{1, 2}, nil)
publication, err := broker.ReceivePublish(pubDetails.ID(), publish)
require.NoError(t, err)
require.NotNil(t, publication)

require.Equal(t, publication.Ack.Recipient, pubDetails.ID())
require.Equal(t, publication.Ack.Message.Type(), messages.MessageTypePublished)
require.NotNil(t, publication.Event)
require.Len(t, publication.Recipients, 1)
})

t.Run("Unsubscribe", func(t *testing.T) {
unsubscribe := messages.NewUnsubscribe(3, subscriptionID)

msgWithRecipient, err := broker.ReceiveMessage(subDetails.ID(), unsubscribe)
require.NoError(t, err)
require.Equal(t, msgWithRecipient.Recipient, subDetails.ID())
require.Equal(t, msgWithRecipient.Message.Type(), messages.MessageTypeUnsubscribed)
})

t.Run("SubscribeInvalidSessionID", func(t *testing.T) {
subscribe := messages.NewSubscribe(4, nil, "foo.bar")
_, err = broker.ReceiveMessage(5, subscribe)
require.EqualError(t, err, "broker: cannot subscribe, session 5 doesn't exist")
})

t.Run("UnsubscribeInvalidSessionID", func(t *testing.T) {
unsubscribe := messages.NewUnsubscribe(5, subscriptionID)

_, err = broker.ReceiveMessage(5, unsubscribe)
require.EqualError(t, err, "broker: cannot unsubscribe, session 5 doesn't exist")
})

t.Run("UnsubscribeInvalidSubscriptionID", func(t *testing.T) {
unsubscribe := messages.NewUnsubscribe(3, 5)

_, err = broker.ReceiveMessage(subDetails.ID(), unsubscribe)
require.EqualError(t, err, "broker: cannot unsubscribe non-existent subscription 5")
})
}
97 changes: 82 additions & 15 deletions dealer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,26 +33,93 @@ func TestDealerAddRemoveSession(t *testing.T) {
func TestDealerRegisterUnregister(t *testing.T) {
dealer := wampproto.NewDealer()

t.Run("Register", func(t *testing.T) {
details := wampproto.NewSessionDetails(1, "realm", "authid", "anonymous", false)
err := dealer.AddSession(details)
require.NoError(t, err)
callee := wampproto.NewSessionDetails(1, "realm", "authid", "anonymous", false)
err := dealer.AddSession(callee)
require.NoError(t, err)

var registerationID int64

t.Run("Register", func(t *testing.T) {
register := messages.NewRegister(1, nil, "foo.bar")
msg, err := dealer.ReceiveMessage(details.ID(), register)
msg, err := dealer.ReceiveMessage(callee.ID(), register)
require.NoError(t, err)
require.NotNil(t, msg)
require.Equal(t, msg.Recipient, details.ID())
require.Equal(t, msg.Message.Type(), messages.MessageTypeRegistered)
require.Equal(t, msg.Recipient, callee.ID())
require.Equal(t, messages.MessageTypeRegistered, msg.Message.Type())

register = messages.NewRegister(2, nil, "foo.bar")
msg, err = dealer.ReceiveMessage(details.ID(), register)
hasProcedure := dealer.HasProcedure("foo.bar")
require.True(t, hasProcedure)
registerationID = msg.Message.(*messages.Registered).RegistrationID()

t.Run("DuplicateProcedure", func(t *testing.T) {
register = messages.NewRegister(2, nil, "foo.bar")
msg, err = dealer.ReceiveMessage(callee.ID(), register)
require.NoError(t, err)
require.NotNil(t, msg)
require.Equal(t, msg.Recipient, callee.ID())
require.Equal(t, messages.MessageTypeError, msg.Message.Type())
errMsg := msg.Message.(*messages.Error)
require.NotNil(t, errMsg)
require.Equal(t, errMsg.URI(), "wamp.error.procedure_already_exists")
})

t.Run("InvalidSessionID", func(t *testing.T) {
invalidRegister := messages.NewRegister(2, nil, "foo.bar")
_, err = dealer.ReceiveMessage(5, invalidRegister)
require.EqualError(t, err, "cannot register procedure for non-existent session 5")
})
})

t.Run("Call", func(t *testing.T) {
caller := wampproto.NewSessionDetails(2, "realm", "authid", "anonymous", false)
err := dealer.AddSession(caller)
require.NoError(t, err)
require.NotNil(t, msg)
require.Equal(t, msg.Recipient, details.ID())
require.Equal(t, msg.Message.Type(), messages.MessageTypeError)
errMsg := msg.Message.(*messages.Error)
require.NotNil(t, errMsg)
require.Equal(t, errMsg.URI(), "wamp.error.procedure_already_exists")

call := messages.NewCall(3, map[string]any{}, "foo.bar", []any{"abc"}, nil)
invWithRecipient, err := dealer.ReceiveMessage(caller.ID(), call)
require.NoError(t, err)
require.NotNil(t, invWithRecipient)
require.Equal(t, callee.ID(), invWithRecipient.Recipient)
require.Equal(t, messages.MessageTypeInvocation, invWithRecipient.Message.Type())

// receive yield for invocation
invocation := invWithRecipient.Message.(*messages.Invocation)
yield := messages.NewYield(invocation.RequestID(), map[string]any{}, []any{"abc"}, nil)
yieldWithRecipient, err := dealer.ReceiveMessage(caller.ID(), yield)
require.NoError(t, err)
require.NotNil(t, yieldWithRecipient)
require.Equal(t, caller.ID(), yieldWithRecipient.Recipient)
require.Equal(t, messages.MessageTypeResult, yieldWithRecipient.Message.Type())

t.Run("NonExistingProcedure", func(t *testing.T) {
invalidCallMessage := messages.NewCall(3, map[string]any{}, "invalid", []any{"abc"}, nil)
errWithRecipient, err := dealer.ReceiveMessage(caller.ID(), invalidCallMessage)
require.NoError(t, err)
require.NotNil(t, errWithRecipient)
require.Equal(t, caller.ID(), errWithRecipient.Recipient)
require.Equal(t, errWithRecipient.Message.Type(), messages.MessageTypeError)
})

t.Run("InvalidYield", func(t *testing.T) {
_, err = dealer.ReceiveMessage(5, yield)
require.EqualError(t, err, "yield: not pending calls for session 5")
})
})

t.Run("Unregister", func(t *testing.T) {
unregister := messages.NewUnregister(callee.ID(), registerationID)
unregWithRecipient, err := dealer.ReceiveMessage(callee.ID(), unregister)
require.NoError(t, err)
require.NotNil(t, unregWithRecipient)
require.Equal(t, callee.ID(), unregWithRecipient.Recipient)
require.Equal(t, messages.MessageTypeUnregistered, unregWithRecipient.Message.Type())

hasProcedure := dealer.HasProcedure("foo.bar")
require.False(t, hasProcedure)

t.Run("InvalidRegistration", func(t *testing.T) {
_, err = dealer.ReceiveMessage(callee.ID(), unregister)
require.EqualError(t, err, "unregister: session 1 has no registration 1")
})
})
}
38 changes: 30 additions & 8 deletions session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,29 +46,40 @@ func callProc(t *testing.T, caller, callee *wampproto.Session, uri string) {
require.Equal(t, rslt, result)
}

func registerAndCall(t *testing.T, procedure string, serializer serializers.Serializer) {
func unregisterProcedure(t *testing.T, callee *wampproto.Session) {
unregister := messages.NewUnregister(3, 1)
_, err := callee.SendMessage(unregister)
require.NoError(t, err)

unregistered := messages.NewUnregistered(unregister.RequestID())
_, err = callee.ReceiveMessage(unregistered)
require.NoError(t, err)
}

func registerCallAndUnregister(t *testing.T, procedure string, serializer serializers.Serializer) {
caller := wampproto.NewSession(serializer)
callee := wampproto.NewSession(serializer)

registerProc(t, callee, procedure)
callProc(t, caller, callee, procedure)
unregisterProcedure(t, callee)
}

func TestSessionCall(t *testing.T) {
procedure := "foo.bar"
t.Run("JSON", func(t *testing.T) {
serializer := &serializers.JSONSerializer{}
registerAndCall(t, procedure, serializer)
registerCallAndUnregister(t, procedure, serializer)
})

t.Run("CBOR", func(t *testing.T) {
serializer := &serializers.CBORSerializer{}
registerAndCall(t, procedure, serializer)
registerCallAndUnregister(t, procedure, serializer)
})

t.Run("MSGPACK", func(t *testing.T) {
serializer := &serializers.MsgPackSerializer{}
registerAndCall(t, procedure, serializer)
registerCallAndUnregister(t, procedure, serializer)
})
}

Expand All @@ -94,28 +105,39 @@ func publishTopic(t *testing.T, publisher, subscriber *wampproto.Session, uri st
require.NoError(t, err)
}

func subscribeAndPublish(t *testing.T, topic string, serializer serializers.Serializer) {
func unsubscribeTopic(t *testing.T, subscriber *wampproto.Session) {
unsubscribe := messages.NewUnsubscribe(3, 1)
_, err := subscriber.SendMessage(unsubscribe)
require.NoError(t, err)

unsubscribed := messages.NewUnsubscribed(unsubscribe.RequestID())
_, err = subscriber.ReceiveMessage(unsubscribed)
require.NoError(t, err)
}

func subscribePublishAndUnsubscribe(t *testing.T, topic string, serializer serializers.Serializer) {
publisher := wampproto.NewSession(serializer)
subscriber := wampproto.NewSession(serializer)

subscribeTopic(t, subscriber, topic)
publishTopic(t, publisher, subscriber, topic)
unsubscribeTopic(t, subscriber)
}

func TestSessionPublish(t *testing.T) {
topic := "foo.bar"
t.Run("JSON", func(t *testing.T) {
serializer := &serializers.JSONSerializer{}
subscribeAndPublish(t, topic, serializer)
subscribePublishAndUnsubscribe(t, topic, serializer)
})

t.Run("CBOR", func(t *testing.T) {
serializer := &serializers.CBORSerializer{}
subscribeAndPublish(t, topic, serializer)
subscribePublishAndUnsubscribe(t, topic, serializer)
})

t.Run("MSGPACK", func(t *testing.T) {
serializer := &serializers.MsgPackSerializer{}
subscribeAndPublish(t, topic, serializer)
subscribePublishAndUnsubscribe(t, topic, serializer)
})
}

0 comments on commit 2aa1545

Please sign in to comment.