Skip to content

Commit

Permalink
fix: keep channels open
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos committed Aug 6, 2024
1 parent 240051b commit e239db9
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 32 deletions.
29 changes: 19 additions & 10 deletions waku/v2/api/publish/message_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,37 +103,43 @@ func (m *MessageQueue) Start(ctx context.Context) {
m.envelopeAvailableOnPriorityQueueSignal <- struct{}{}

case <-ctx.Done():
if m.usePriorityQueue {
close(m.throttledPrioritySendQueue)
close(m.envelopeAvailableOnPriorityQueueSignal)
} else {
close(m.toSendChan)
}
return
}
}
}

// Push an envelope into the message queue. The priority is optional, and will be ignored
// if the message queue does not use a priority queue
func (m *MessageQueue) Push(envelope *protocol.Envelope, priority ...MessagePriority) {
func (m *MessageQueue) Push(ctx context.Context, envelope *protocol.Envelope, priority ...MessagePriority) {
if m.usePriorityQueue {
msgPriority := NormalPriority
if len(priority) != 0 {
msgPriority = priority[0]
}

m.throttledPrioritySendQueue <- &envelopePriority{
pEnvelope := &envelopePriority{
envelope: envelope,
priority: msgPriority,
}

select {
case m.throttledPrioritySendQueue <- pEnvelope:
// Do nothing
case <-ctx.Done():
return
}
} else {
m.toSendChan <- envelope
select {
case m.toSendChan <- envelope:
// Do nothing
case <-ctx.Done():
return
}
}
}

// Pop will return a channel on which a message can be retrieved from the message queue
func (m *MessageQueue) Pop() <-chan *protocol.Envelope {
func (m *MessageQueue) Pop(ctx context.Context) <-chan *protocol.Envelope {
ch := make(chan *protocol.Envelope)

go func() {
Expand All @@ -147,6 +153,9 @@ func (m *MessageQueue) Pop() <-chan *protocol.Envelope {
if ok {
ch <- envelope
}

case <-ctx.Done():

}

close(ch)
Expand Down
44 changes: 22 additions & 22 deletions waku/v2/api/publish/message_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,25 @@ func TestFifoQueue(t *testing.T) {
queue := NewMessageQueue(10, false)
go queue.Start(ctx)

queue.Push(protocol.NewEnvelope(&pb.WakuMessage{}, 0, "A"))
queue.Push(protocol.NewEnvelope(&pb.WakuMessage{}, 0, "B"))
queue.Push(protocol.NewEnvelope(&pb.WakuMessage{}, 0, "C"))
queue.Push(ctx, protocol.NewEnvelope(&pb.WakuMessage{}, 0, "A"))
queue.Push(ctx, protocol.NewEnvelope(&pb.WakuMessage{}, 0, "B"))
queue.Push(ctx, protocol.NewEnvelope(&pb.WakuMessage{}, 0, "C"))

envelope, ok := <-queue.Pop()
envelope, ok := <-queue.Pop(ctx)
require.True(t, ok)
require.Equal(t, "A", envelope.PubsubTopic())

envelope, ok = <-queue.Pop()
envelope, ok = <-queue.Pop(ctx)
require.True(t, ok)
require.Equal(t, "B", envelope.PubsubTopic())

envelope, ok = <-queue.Pop()
envelope, ok = <-queue.Pop(ctx)
require.True(t, ok)
require.Equal(t, "C", envelope.PubsubTopic())

cancel()

_, ok = <-queue.Pop()
_, ok = <-queue.Pop(ctx)
require.False(t, ok)
}

Expand All @@ -45,47 +45,47 @@ func TestPriorityQueue(t *testing.T) {
queue := NewMessageQueue(10, true)
go queue.Start(ctx)

queue.Push(protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(0)}, 0, "A"), LowPriority)
queue.Push(protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(1)}, 0, "B"), LowPriority)
queue.Push(protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(2)}, 0, "C"), HighPriority)
queue.Push(protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(3)}, 0, "D"), NormalPriority)
queue.Push(protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(4)}, 0, "E"), HighPriority)
queue.Push(protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(5)}, 0, "F"), LowPriority)
queue.Push(protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(6)}, 0, "G"), NormalPriority)
queue.Push(ctx, protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(0)}, 0, "A"), LowPriority)
queue.Push(ctx, protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(1)}, 0, "B"), LowPriority)
queue.Push(ctx, protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(2)}, 0, "C"), HighPriority)
queue.Push(ctx, protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(3)}, 0, "D"), NormalPriority)
queue.Push(ctx, protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(4)}, 0, "E"), HighPriority)
queue.Push(ctx, protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(5)}, 0, "F"), LowPriority)
queue.Push(ctx, protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(6)}, 0, "G"), NormalPriority)

time.Sleep(2 * time.Second)

envelope, ok := <-queue.Pop()
envelope, ok := <-queue.Pop(ctx)
require.True(t, ok)
require.Equal(t, "C", envelope.PubsubTopic())

envelope, ok = <-queue.Pop()
envelope, ok = <-queue.Pop(ctx)
require.True(t, ok)
require.Equal(t, "E", envelope.PubsubTopic())

envelope, ok = <-queue.Pop()
envelope, ok = <-queue.Pop(ctx)
require.True(t, ok)
require.Equal(t, "D", envelope.PubsubTopic())

envelope, ok = <-queue.Pop()
envelope, ok = <-queue.Pop(ctx)
require.True(t, ok)
require.Equal(t, "G", envelope.PubsubTopic())

envelope, ok = <-queue.Pop()
envelope, ok = <-queue.Pop(ctx)
require.True(t, ok)
require.Equal(t, "A", envelope.PubsubTopic())

envelope, ok = <-queue.Pop()
envelope, ok = <-queue.Pop(ctx)
require.True(t, ok)
require.Equal(t, "B", envelope.PubsubTopic())

envelope, ok = <-queue.Pop()
envelope, ok = <-queue.Pop(ctx)
require.True(t, ok)
require.Equal(t, "F", envelope.PubsubTopic())

cancel()

_, ok = <-queue.Pop()
_, ok = <-queue.Pop(ctx)
require.False(t, ok)

}

0 comments on commit e239db9

Please sign in to comment.