Skip to content

Commit

Permalink
fix: do not swallow ctx error on push
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos committed Aug 6, 2024
1 parent e239db9 commit b61d1df
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 15 deletions.
13 changes: 8 additions & 5 deletions waku/v2/api/publish/message_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (m *MessageQueue) Start(ctx context.Context) {

// Push an envelope into the message queue. The priority is optional, and will be ignored
// if the message queue does not use a priority queue
func (m *MessageQueue) Push(ctx context.Context, envelope *protocol.Envelope, priority ...MessagePriority) {
func (m *MessageQueue) Push(ctx context.Context, envelope *protocol.Envelope, priority ...MessagePriority) error {
if m.usePriorityQueue {
msgPriority := NormalPriority
if len(priority) != 0 {
Expand All @@ -126,23 +126,27 @@ func (m *MessageQueue) Push(ctx context.Context, envelope *protocol.Envelope, pr
case m.throttledPrioritySendQueue <- pEnvelope:
// Do nothing
case <-ctx.Done():
return
return ctx.Err()
}
} else {
select {
case m.toSendChan <- envelope:
// Do nothing
case <-ctx.Done():
return
return ctx.Err()
}
}

return nil
}

// Pop will return a channel on which a message can be retrieved from the message queue
func (m *MessageQueue) Pop(ctx context.Context) <-chan *protocol.Envelope {
ch := make(chan *protocol.Envelope)

go func() {
defer close(ch)

select {
case _, ok := <-m.envelopeAvailableOnPriorityQueueSignal:
if ok {
Expand All @@ -155,10 +159,9 @@ func (m *MessageQueue) Pop(ctx context.Context) <-chan *protocol.Envelope {
}

case <-ctx.Done():

return
}

close(ch)
}()

return ch
Expand Down
38 changes: 28 additions & 10 deletions waku/v2/api/publish/message_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@ func TestFifoQueue(t *testing.T) {
queue := NewMessageQueue(10, false)
go queue.Start(ctx)

queue.Push(ctx, protocol.NewEnvelope(&pb.WakuMessage{}, 0, "A"))
queue.Push(ctx, protocol.NewEnvelope(&pb.WakuMessage{}, 0, "B"))
queue.Push(ctx, protocol.NewEnvelope(&pb.WakuMessage{}, 0, "C"))
err := queue.Push(ctx, protocol.NewEnvelope(&pb.WakuMessage{}, 0, "A"))
require.NoError(t, err)

err = queue.Push(ctx, protocol.NewEnvelope(&pb.WakuMessage{}, 0, "B"))
require.NoError(t, err)

err = queue.Push(ctx, protocol.NewEnvelope(&pb.WakuMessage{}, 0, "C"))
require.NoError(t, err)

envelope, ok := <-queue.Pop(ctx)
require.True(t, ok)
Expand All @@ -45,13 +50,26 @@ func TestPriorityQueue(t *testing.T) {
queue := NewMessageQueue(10, true)
go queue.Start(ctx)

queue.Push(ctx, protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(0)}, 0, "A"), LowPriority)
queue.Push(ctx, protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(1)}, 0, "B"), LowPriority)
queue.Push(ctx, protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(2)}, 0, "C"), HighPriority)
queue.Push(ctx, protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(3)}, 0, "D"), NormalPriority)
queue.Push(ctx, protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(4)}, 0, "E"), HighPriority)
queue.Push(ctx, protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(5)}, 0, "F"), LowPriority)
queue.Push(ctx, protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(6)}, 0, "G"), NormalPriority)
err := queue.Push(ctx, protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(0)}, 0, "A"), LowPriority)
require.NoError(t, err)

err = queue.Push(ctx, protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(1)}, 0, "B"), LowPriority)
require.NoError(t, err)

err = queue.Push(ctx, protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(2)}, 0, "C"), HighPriority)
require.NoError(t, err)

err = queue.Push(ctx, protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(3)}, 0, "D"), NormalPriority)
require.NoError(t, err)

err = queue.Push(ctx, protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(4)}, 0, "E"), HighPriority)
require.NoError(t, err)

err = queue.Push(ctx, protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(5)}, 0, "F"), LowPriority)
require.NoError(t, err)

err = queue.Push(ctx, protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(6)}, 0, "G"), NormalPriority)
require.NoError(t, err)

time.Sleep(2 * time.Second)

Expand Down

0 comments on commit b61d1df

Please sign in to comment.