Skip to content

Commit

Permalink
feat: added more tests, reconnect capability, peer retrieve missing m…
Browse files Browse the repository at this point in the history
…sgs, rolling bloom filter, eager push, conflict resolution (#1178)

and other fixed, cleanup
  • Loading branch information
shash256 authored Aug 2, 2024
1 parent 59f2d74 commit 4355c4a
Show file tree
Hide file tree
Showing 7 changed files with 958 additions and 339 deletions.
85 changes: 44 additions & 41 deletions examples/chat2-reliable/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"sync"
"time"

"github.com/bits-and-blooms/bloom/v3"
"github.com/google/uuid"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
Expand All @@ -33,38 +32,37 @@ const (
)

type Chat struct {
ctx context.Context
wg sync.WaitGroup
node *node.WakuNode
ui UI
uiReady chan struct{}
inputChan chan string
options Options
C chan *protocol.Envelope
nick string
lamportTimestamp int32
bloomFilter *bloom.BloomFilter
outgoingBuffer []*pb.Message
incomingBuffer []*pb.Message
messageHistory []*pb.Message
receivedBloomFilters map[string]*bloom.BloomFilter
mutex sync.Mutex
ctx context.Context
wg sync.WaitGroup
node *node.WakuNode
ui UI
uiReady chan struct{}
inputChan chan string
options Options
C chan *protocol.Envelope
nick string
lamportTimestamp int32
bloomFilter *RollingBloomFilter
outgoingBuffer []UnacknowledgedMessage
incomingBuffer []*pb.Message
messageHistory []*pb.Message
mutex sync.Mutex
}

func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node.PeerConnection, options Options) *Chat {
chat := &Chat{
ctx: ctx,
node: node,
options: options,
nick: options.Nickname,
uiReady: make(chan struct{}, 1),
inputChan: make(chan string, 100),
lamportTimestamp: 0,
bloomFilter: bloom.NewWithEstimates(bloomFilterSize, bloomFilterFPRate),
outgoingBuffer: make([]*pb.Message, 0),
incomingBuffer: make([]*pb.Message, 0),
messageHistory: make([]*pb.Message, 0),
receivedBloomFilters: make(map[string]*bloom.BloomFilter),
ctx: ctx,
node: node,
options: options,
nick: options.Nickname,
uiReady: make(chan struct{}, 1),
inputChan: make(chan string, 100),
lamportTimestamp: 0,
bloomFilter: NewRollingBloomFilter(),
outgoingBuffer: make([]UnacknowledgedMessage, 0),
incomingBuffer: make([]*pb.Message, 0),
messageHistory: make([]*pb.Message, 0),
mutex: sync.Mutex{},
}

chat.ui = NewUIModel(chat.uiReady, chat.inputChan)
Expand All @@ -91,7 +89,7 @@ func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node.
if err != nil {
chat.ui.ErrorMessage(err)
} else {
chat.C = theFilters[0].C //Picking first subscription since there is only 1 contentTopic specified.
chat.C = theFilters[0].C // Picking first subscription since there is only 1 contentTopic specified.
}
} else {
for _, topic := range topics {
Expand All @@ -110,18 +108,18 @@ func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node.
}

connWg := sync.WaitGroup{}
connWg.Add(2)
connWg.Add(3)

chat.wg.Add(9) // Added 2 more goroutines for periodic tasks
chat.wg.Add(7) // Added 2 more goroutines for periodic tasks
go chat.parseInput()
go chat.receiveMessages()
go chat.welcomeMessage()
go chat.connectionWatcher(connNotifier)
go chat.staticNodes(&connWg)
go chat.discoverNodes(&connWg)
go chat.retrieveHistory(&connWg)
go chat.periodicBufferSweep()
go chat.periodicSyncMessage()

chat.initReliabilityProtocol() // Initialize the reliability protocol

return chat
}
Expand Down Expand Up @@ -279,7 +277,7 @@ func (c *Chat) parseInput() {
func (c *Chat) SendMessage(line string) {
c.incLamportTimestamp()

bloomBytes, err := c.bloomFilterBytes()
bloomBytes, err := c.bloomFilter.MarshalBinary()
if err != nil {
c.ui.ErrorMessage(fmt.Errorf("failed to marshal bloom filter: %w", err))
return
Expand All @@ -288,27 +286,32 @@ func (c *Chat) SendMessage(line string) {
msg := &pb.Message{
SenderId: c.node.Host().ID().String(),
MessageId: generateUniqueID(),
LamportTimestamp: c.lamportTimestamp,
LamportTimestamp: c.getLamportTimestamp(),
CausalHistory: c.getRecentMessageIDs(2),
ChannelId: c.options.ContentTopic,
BloomFilter: bloomBytes,
Content: line,
}

c.outgoingBuffer = append(c.outgoingBuffer, msg)
unackMsg := UnacknowledgedMessage{
Message: msg,
SendTime: time.Now(),
ResendAttempts: 0,
}
c.outgoingBuffer = append(c.outgoingBuffer, unackMsg)

tCtx, cancel := context.WithTimeout(c.ctx, 3*time.Second)
ctx, cancel := context.WithTimeout(c.ctx, messageAckTimeout)
defer cancel()

err = c.publish(tCtx, msg)
err = c.publish(ctx, msg)
if err != nil {
if err.Error() == "validation failed" {
err = errors.New("message rate violation")
}
c.ui.ErrorMessage(err)
} else {
c.addToMessageHistory(msg)
c.updateBloomFilter(msg.MessageId)
c.bloomFilter.Add(msg.MessageId)
}
}

Expand Down Expand Up @@ -364,7 +367,7 @@ func (c *Chat) publish(ctx context.Context, message *pb.Message) error {
lightOpt = append(lightOpt, lightpush.WithPeer(peerID))
}

_, err = c.node.Lightpush().Publish(c.ctx, wakuMsg, lightOpt...)
_, err = c.node.Lightpush().Publish(ctx, wakuMsg, lightOpt...)
} else {
_, err = c.node.Relay().Publish(ctx, wakuMsg, relay.WithDefaultPubsubTopic())
}
Expand Down
Loading

0 comments on commit 4355c4a

Please sign in to comment.