From 4355c4a9c1edbfb976a3ff3e23ad492bfe4a94e4 Mon Sep 17 00:00:00 2001 From: Akhil <111925100+shash256@users.noreply.github.com> Date: Fri, 2 Aug 2024 06:25:54 +0400 Subject: [PATCH] feat: added more tests, reconnect capability, peer retrieve missing msgs, rolling bloom filter, eager push, conflict resolution (#1178) and other fixed, cleanup --- examples/chat2-reliable/chat.go | 85 ++-- .../chat2-reliable/chat_reliability_test.go | 335 ++++++++------ examples/chat2-reliable/pb/chat2.pb.go | 148 +++++- examples/chat2-reliable/pb/chat2.proto | 9 + examples/chat2-reliable/peer_retrieval.go | 191 ++++++++ examples/chat2-reliable/reliability.go | 437 ++++++++++++------ examples/chat2-reliable/test_utils.go | 92 ++++ 7 files changed, 958 insertions(+), 339 deletions(-) create mode 100644 examples/chat2-reliable/peer_retrieval.go create mode 100644 examples/chat2-reliable/test_utils.go diff --git a/examples/chat2-reliable/chat.go b/examples/chat2-reliable/chat.go index 1850d557c..ac601340c 100644 --- a/examples/chat2-reliable/chat.go +++ b/examples/chat2-reliable/chat.go @@ -10,7 +10,6 @@ import ( "sync" "time" - "github.com/bits-and-blooms/bloom/v3" "github.com/google/uuid" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" @@ -33,38 +32,37 @@ const ( ) type Chat struct { - ctx context.Context - wg sync.WaitGroup - node *node.WakuNode - ui UI - uiReady chan struct{} - inputChan chan string - options Options - C chan *protocol.Envelope - nick string - lamportTimestamp int32 - bloomFilter *bloom.BloomFilter - outgoingBuffer []*pb.Message - incomingBuffer []*pb.Message - messageHistory []*pb.Message - receivedBloomFilters map[string]*bloom.BloomFilter - mutex sync.Mutex + ctx context.Context + wg sync.WaitGroup + node *node.WakuNode + ui UI + uiReady chan struct{} + inputChan chan string + options Options + C chan *protocol.Envelope + nick string + lamportTimestamp int32 + bloomFilter *RollingBloomFilter + outgoingBuffer []UnacknowledgedMessage + incomingBuffer []*pb.Message + messageHistory []*pb.Message + mutex sync.Mutex } func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node.PeerConnection, options Options) *Chat { chat := &Chat{ - ctx: ctx, - node: node, - options: options, - nick: options.Nickname, - uiReady: make(chan struct{}, 1), - inputChan: make(chan string, 100), - lamportTimestamp: 0, - bloomFilter: bloom.NewWithEstimates(bloomFilterSize, bloomFilterFPRate), - outgoingBuffer: make([]*pb.Message, 0), - incomingBuffer: make([]*pb.Message, 0), - messageHistory: make([]*pb.Message, 0), - receivedBloomFilters: make(map[string]*bloom.BloomFilter), + ctx: ctx, + node: node, + options: options, + nick: options.Nickname, + uiReady: make(chan struct{}, 1), + inputChan: make(chan string, 100), + lamportTimestamp: 0, + bloomFilter: NewRollingBloomFilter(), + outgoingBuffer: make([]UnacknowledgedMessage, 0), + incomingBuffer: make([]*pb.Message, 0), + messageHistory: make([]*pb.Message, 0), + mutex: sync.Mutex{}, } chat.ui = NewUIModel(chat.uiReady, chat.inputChan) @@ -91,7 +89,7 @@ func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node. if err != nil { chat.ui.ErrorMessage(err) } else { - chat.C = theFilters[0].C //Picking first subscription since there is only 1 contentTopic specified. + chat.C = theFilters[0].C // Picking first subscription since there is only 1 contentTopic specified. } } else { for _, topic := range topics { @@ -110,9 +108,9 @@ func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node. } connWg := sync.WaitGroup{} - connWg.Add(2) + connWg.Add(3) - chat.wg.Add(9) // Added 2 more goroutines for periodic tasks + chat.wg.Add(7) // Added 2 more goroutines for periodic tasks go chat.parseInput() go chat.receiveMessages() go chat.welcomeMessage() @@ -120,8 +118,8 @@ func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node. go chat.staticNodes(&connWg) go chat.discoverNodes(&connWg) go chat.retrieveHistory(&connWg) - go chat.periodicBufferSweep() - go chat.periodicSyncMessage() + + chat.initReliabilityProtocol() // Initialize the reliability protocol return chat } @@ -279,7 +277,7 @@ func (c *Chat) parseInput() { func (c *Chat) SendMessage(line string) { c.incLamportTimestamp() - bloomBytes, err := c.bloomFilterBytes() + bloomBytes, err := c.bloomFilter.MarshalBinary() if err != nil { c.ui.ErrorMessage(fmt.Errorf("failed to marshal bloom filter: %w", err)) return @@ -288,19 +286,24 @@ func (c *Chat) SendMessage(line string) { msg := &pb.Message{ SenderId: c.node.Host().ID().String(), MessageId: generateUniqueID(), - LamportTimestamp: c.lamportTimestamp, + LamportTimestamp: c.getLamportTimestamp(), CausalHistory: c.getRecentMessageIDs(2), ChannelId: c.options.ContentTopic, BloomFilter: bloomBytes, Content: line, } - c.outgoingBuffer = append(c.outgoingBuffer, msg) + unackMsg := UnacknowledgedMessage{ + Message: msg, + SendTime: time.Now(), + ResendAttempts: 0, + } + c.outgoingBuffer = append(c.outgoingBuffer, unackMsg) - tCtx, cancel := context.WithTimeout(c.ctx, 3*time.Second) + ctx, cancel := context.WithTimeout(c.ctx, messageAckTimeout) defer cancel() - err = c.publish(tCtx, msg) + err = c.publish(ctx, msg) if err != nil { if err.Error() == "validation failed" { err = errors.New("message rate violation") @@ -308,7 +311,7 @@ func (c *Chat) SendMessage(line string) { c.ui.ErrorMessage(err) } else { c.addToMessageHistory(msg) - c.updateBloomFilter(msg.MessageId) + c.bloomFilter.Add(msg.MessageId) } } @@ -364,7 +367,7 @@ func (c *Chat) publish(ctx context.Context, message *pb.Message) error { lightOpt = append(lightOpt, lightpush.WithPeer(peerID)) } - _, err = c.node.Lightpush().Publish(c.ctx, wakuMsg, lightOpt...) + _, err = c.node.Lightpush().Publish(ctx, wakuMsg, lightOpt...) } else { _, err = c.node.Relay().Publish(ctx, wakuMsg, relay.WithDefaultPubsubTopic()) } diff --git a/examples/chat2-reliable/chat_reliability_test.go b/examples/chat2-reliable/chat_reliability_test.go index d519143de..f54c8b922 100644 --- a/examples/chat2-reliable/chat_reliability_test.go +++ b/examples/chat2-reliable/chat_reliability_test.go @@ -12,13 +12,8 @@ import ( "github.com/stretchr/testify/require" "github.com/urfave/cli/v2" "github.com/waku-org/go-waku/waku/v2/node" - "github.com/waku-org/go-waku/waku/v2/payload" "github.com/waku-org/go-waku/waku/v2/peerstore" - "github.com/waku-org/go-waku/waku/v2/protocol" - wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" - "github.com/waku-org/go-waku/waku/v2/utils" - "google.golang.org/protobuf/proto" ) type TestEnvironment struct { @@ -34,14 +29,12 @@ func setupTestEnvironment(ctx context.Context, t *testing.T, nodeCount int) (*Te } for i := 0; i < nodeCount; i++ { - t.Logf("Setting up node %d", i) node, err := setupTestNode(ctx, t, i) if err != nil { return nil, fmt.Errorf("failed to set up node %d: %w", i, err) } env.nodes[i] = node - t.Logf("Creating chat instance for node %d", i) chat, err := setupTestChat(ctx, t, node, fmt.Sprintf("Node%d", i)) if err != nil { return nil, fmt.Errorf("failed to set up chat for node %d: %w", i, err) @@ -52,7 +45,6 @@ func setupTestEnvironment(ctx context.Context, t *testing.T, nodeCount int) (*Te t.Log("Connecting nodes in ring topology") for i := 0; i < nodeCount; i++ { nextIndex := (i + 1) % nodeCount - t.Logf("Connecting node %d to node %d", i, nextIndex) _, err := env.nodes[i].AddPeer(env.nodes[nextIndex].ListenAddresses()[0], peerstore.Static, env.chats[i].options.Relay.Topics.Value()) if err != nil { return nil, fmt.Errorf("failed to connect node %d to node %d: %w", i, nextIndex, err) @@ -66,6 +58,7 @@ func setupTestEnvironment(ctx context.Context, t *testing.T, nodeCount int) (*Te func setupTestNode(ctx context.Context, t *testing.T, index int) (*node.WakuNode, error) { opts := []node.WakuNodeOption{ node.WithWakuRelay(), + // node.WithWakuStore(), } node, err := node.New(opts...) if err != nil { @@ -74,6 +67,11 @@ func setupTestNode(ctx context.Context, t *testing.T, index int) (*node.WakuNode if err := node.Start(ctx); err != nil { return nil, err } + + // if node.Store() == nil { + // t.Logf("Store protocol is not enabled on node %d", index) + // } + return node, nil } @@ -102,6 +100,15 @@ func setupTestChat(ctx context.Context, t *testing.T, node *node.WakuNode, nickn return chat, nil } +func areNodesConnected(nodes []*node.WakuNode, expectedPeers int) bool { + for _, node := range nodes { + if len(node.Host().Network().Peers()) != expectedPeers { + return false + } + } + return true +} + // TestLamportTimestamps verifies that Lamport timestamps are correctly updated func TestLamportTimestamps(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) @@ -114,7 +121,7 @@ func TestLamportTimestamps(t *testing.T) { require.NoError(t, err, "Failed to set up test environment") require.Eventually(t, func() bool { - return areNodesConnected(env.nodes, nodeCount) + return areNodesConnected(env.nodes, 2) }, 30*time.Second, 1*time.Second, "Nodes failed to connect") for i, chat := range env.chats { @@ -162,10 +169,8 @@ func TestCausalOrdering(t *testing.T) { env, err := setupTestEnvironment(ctx, t, nodeCount) require.NoError(t, err, "Failed to set up test environment") - // defer tearDownEnvironment(t, env) - require.Eventually(t, func() bool { - return areNodesConnected(env.nodes, nodeCount) + return areNodesConnected(env.nodes, 2) }, 30*time.Second, 1*time.Second, "Nodes failed to connect") t.Log("Sending messages from different nodes") @@ -207,10 +212,8 @@ func TestBloomFilterDuplicateDetection(t *testing.T) { env, err := setupTestEnvironment(ctx, t, nodeCount) require.NoError(t, err, "Failed to set up test environment") - //defer tearDownEnvironment(t, env) - require.Eventually(t, func() bool { - return areNodesConnected(env.nodes, nodeCount) + return areNodesConnected(env.nodes, 1) }, 30*time.Second, 1*time.Second, "Nodes failed to connect") t.Log("Sending a message") @@ -242,44 +245,16 @@ func TestBloomFilterDuplicateDetection(t *testing.T) { Content: receivedMsg.Content, } - // Encode the duplicate message - msgBytes, err := proto.Marshal(duplicateMsg) - require.NoError(t, err, "Failed to marshal duplicate message") - - version := uint32(0) - timestamp := utils.GetUnixEpochFrom(time.Now()) - keyInfo := &payload.KeyInfo{ - Kind: payload.None, - } - - p := new(payload.Payload) - p.Data = msgBytes - p.Key = keyInfo - - payloadBytes, err := p.Encode(version) - require.NoError(t, err, "Failed to encode payload") - - wakuMsg := &wpb.WakuMessage{ - Payload: payloadBytes, - Version: proto.Uint32(version), - ContentTopic: env.chats[1].options.ContentTopic, - Timestamp: timestamp, - } - - duplicateEnvelope := protocol.NewEnvelope(wakuMsg, time.Now().UnixNano(), relay.DefaultWakuTopic) - - // Manually inject the duplicate message into the receive channel - env.chats[1].C <- duplicateEnvelope - - time.Sleep(5 * time.Second) // Wait a bit to ensure message processing + env.chats[1].processReceivedMessage(duplicateMsg) assert.Len(t, env.chats[1].messageHistory, 1, "Node 1 should still have only one message (no duplicates)") t.Log("TestBloomFilterDuplicateDetection completed successfully") } -func TestMessageRecovery(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) +// TestNetworkPartition ensures that missing messages can be recovered +func TestNetworkPartition(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) defer cancel() t.Log("Starting TestMessageRecovery") @@ -287,49 +262,73 @@ func TestMessageRecovery(t *testing.T) { nodeCount := 3 env, err := setupTestEnvironment(ctx, t, nodeCount) require.NoError(t, err, "Failed to set up test environment") + //defer tearDownEnvironment(t, env) - defer tearDownEnvironment(t, env) + nc := NewNetworkController(env.nodes, env.chats) require.Eventually(t, func() bool { - return areNodesConnected(env.nodes, nodeCount) - }, 30*time.Second, 1*time.Second, "Nodes failed to connect") + return areNodesConnected(env.nodes, 2) + }, 60*time.Second, 1*time.Second, "Nodes failed to connect") - t.Log("Sending initial messages") + t.Log("Stage 1: Sending initial messages") env.chats[0].SendMessage("Message 1") time.Sleep(100 * time.Millisecond) env.chats[1].SendMessage("Message 2") time.Sleep(100 * time.Millisecond) - t.Log("Simulating a missed message") - missedMsg := &pb.Message{ - SenderId: "Node2", - MessageId: "missed-message-id", - LamportTimestamp: 3, - CausalHistory: []string{env.chats[0].messageHistory[0].MessageId, env.chats[1].messageHistory[0].MessageId}, - ChannelId: env.chats[2].options.ContentTopic, - Content: "Missed Message", - } + t.Log("Waiting for message propagation") + require.Eventually(t, func() bool { + for _, chat := range env.chats { + if len(chat.messageHistory) != 2 { + return false + } + } + return true + }, 30*time.Second, 1*time.Second, "Messages did not propagate to all nodes") - // Send to Node 0 and Node 1, simulating Node 2 missing the message - env.chats[0].processReceivedMessage(missedMsg) - env.chats[1].processReceivedMessage(missedMsg) - - t.Log("Sending a new message that depends on the missed message") - newMsg := &pb.Message{ - SenderId: "Node2", - MessageId: "new-message-id", - LamportTimestamp: 4, - CausalHistory: []string{"missed-message-id"}, - ChannelId: env.chats[2].options.ContentTopic, - Content: "New Message", - } - env.chats[2].processReceivedMessage(newMsg) + // Verify that Node 2 has messages before disconnection + require.Equal(t, 2, len(env.chats[2].messageHistory), "Node 2 does not have all messages") + + t.Log("Stage 2: Simulating network partition for Node 2") + nc.DisconnectNode(env.nodes[2]) + time.Sleep(1 * time.Second) // Allow time for disconnection to take effect + + t.Log("Stage 3: Sending message that Node 2 will miss") + env.chats[0].SendMessage("Missed Message") + time.Sleep(100 * time.Millisecond) + + t.Log("Stage 4: Reconnecting Node 2") + nc.ReconnectNode(env.nodes[2]) + time.Sleep(5 * time.Second) // Allow time for reconnection to take effect + + // Verify that Node 2 didn't receive the message + require.Equal(t, 2, len(env.chats[2].messageHistory), "Node 2 should not have received the missed message") - t.Log("Waiting for message recovery") + t.Log("Stage 5: Sending a new message that depends on the missed message") + env.chats[1].SendMessage("New Message") + + // Verify that Node 2 received the new message + require.Eventually(t, func() bool { + msgCount := len(env.chats[2].messageHistory) + return msgCount >= 3 + }, 30*time.Second, 5*time.Second, "Node 2 should have received the new message") + + // Stage 6: Wait for message recovery + t.Log("Stage 6: Waiting for message recovery") require.Eventually(t, func() bool { - return len(env.chats[2].messageHistory) == 4 - }, 30*time.Second, 1*time.Second, "Message recovery failed") + msgCount := len(env.chats[2].messageHistory) + return msgCount == 4 + }, 30*time.Second, 5*time.Second, "Message recovery failed") + + // Print final message history for all nodes + for i, chat := range env.chats { + t.Logf("Node %d final message history:", i) + for j, msg := range chat.messageHistory { + t.Logf(" Message %d: %s", j+1, msg.Content) + } + } + // Verify the results for i, msg := range env.chats[2].messageHistory { t.Logf("Message %d: %s", i+1, msg.Content) } @@ -338,12 +337,10 @@ func TestMessageRecovery(t *testing.T) { assert.Equal(t, "Message 2", env.chats[2].messageHistory[1].Content, "Second message incorrect") assert.Equal(t, "Missed Message", env.chats[2].messageHistory[2].Content, "Missed message not recovered") assert.Equal(t, "New Message", env.chats[2].messageHistory[3].Content, "New message incorrect") - - t.Log("TestMessageRecovery completed successfully") } func TestConcurrentMessageSending(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) defer cancel() t.Log("Starting TestConcurrentMessageSending") @@ -352,11 +349,9 @@ func TestConcurrentMessageSending(t *testing.T) { env, err := setupTestEnvironment(ctx, t, nodeCount) require.NoError(t, err, "Failed to set up test environment") - //defer tearDownEnvironment(t, env) - require.Eventually(t, func() bool { - return areNodesConnected(env.nodes, nodeCount) - }, 30*time.Second, 1*time.Second, "Nodes failed to connect") + return areNodesConnected(env.nodes, 2) + }, 60*time.Second, 3*time.Second, "Nodes failed to connect") messageCount := 10 var wg sync.WaitGroup @@ -393,72 +388,140 @@ func TestConcurrentMessageSending(t *testing.T) { t.Log("TestConcurrentMessageSending completed successfully") } -// Helper functions +func TestLargeGroupScaling(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() -func tearDownEnvironment(t *testing.T, env *TestEnvironment) { - t.Log("Tearing down test environment") - for i, node := range env.nodes { - t.Logf("Stopping node %d", i) - node.Stop() - } + t.Log("Starting TestLargeGroupScaling") + + nodeCount := 20 + env, err := setupTestEnvironment(ctx, t, nodeCount) + require.NoError(t, err, "Failed to set up test environment") + + require.Eventually(t, func() bool { + return areNodesConnected(env.nodes, 2) + }, 2*time.Minute, 3*time.Second, "Nodes failed to connect") + + // Send a message from the first node + env.chats[0].SendMessage("Broadcast message to large group") + + // Allow time for propagation + time.Sleep(time.Duration(nodeCount*100) * time.Millisecond) + + // Verify all nodes received the message for i, chat := range env.chats { - t.Logf("Stopping chat %d", i) - chat.Stop() + assert.Len(t, chat.messageHistory, 1, "Node %d should have received the broadcast message", i) + assert.Equal(t, "Broadcast message to large group", chat.messageHistory[0].Content) } + + t.Log("TestLargeGroupScaling completed successfully") } -func areNodesConnected(nodes []*node.WakuNode, expectedConnections int) bool { - for _, node := range nodes { - if len(node.Host().Network().Peers()) != expectedConnections-1 { - return false - } - } - return true +func TestEagerPushMechanism(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + nodeCount := 2 + env, err := setupTestEnvironment(ctx, t, nodeCount) + require.NoError(t, err, "Failed to set up test environment") + + nc := NewNetworkController(env.nodes, env.chats) + + // Disconnect node 1 + nc.DisconnectNode(env.nodes[1]) + + // Send a message from node 0 + env.chats[0].SendMessage("Test eager push") + + // Wait for the message to be added to the outgoing buffer + time.Sleep(1 * time.Second) + + // Reconnect node 1 + nc.ReconnectNode(env.nodes[1]) + + // Wait for eager push to resend the message + time.Sleep(5 * time.Second) + + // Check if node 1 received the message + assert.Eventually(t, func() bool { + return len(env.chats[1].messageHistory) == 1 + }, 10*time.Second, 1*time.Second, "Node 1 should have received the message via eager push") } -// func TestNetworkPartition(t *testing.T) { -// env := setupTestEnvironment(t, 4) +func TestBloomFilterWindow(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() -// // Send initial messages -// env.chats[0].SendMessage("Message 1 from Node 0") -// env.chats[1].SendMessage("Message 2 from Node 1") -// time.Sleep(100 * time.Millisecond) + nodeCount := 2 + env, err := setupTestEnvironment(ctx, t, nodeCount) + require.NoError(t, err, "Failed to set up test environment") + + // Reduce bloom filter window for testing + for _, chat := range env.chats { + chat.bloomFilter.window = 5 * time.Second + } -// // Simulate network partition: disconnect Node 2 and Node 3 -// env.nodes[2].Stop() -// env.nodes[3].Stop() + // Send a message + env.chats[0].SendMessage("Test bloom filter window") + messageID := env.chats[0].messageHistory[0].MessageId -// // Send messages during partition -// env.chats[0].SendMessage("Message 3 from Node 0 during partition") -// env.chats[1].SendMessage("Message 4 from Node 1 during partition") -// time.Sleep(100 * time.Millisecond) + // Check if the message is in the bloom filter + assert.Eventually(t, func() bool { + return env.chats[1].bloomFilter.Test(messageID) + }, 30*time.Second, 1*time.Second, "Message should be in the bloom filter") -// // Reconnect Node 2 and Node 3 -// env.nodes[2].Start(context.Background()) -// env.nodes[3].Start(context.Background()) + // Wait for the bloom filter window to pass + time.Sleep(5 * time.Second) -// // Allow time for synchronization -// time.Sleep(500 * time.Millisecond) + // Clean the bloom filter + env.chats[1].bloomFilter.Clean() -// // Verify all nodes have all messages -// for i, chat := range env.chats { -// assert.Len(t, chat.messageHistory, 4, "Node %d should have all messages after partition", i) -// } -// } + time.Sleep(8 * time.Second) -// func TestLargeGroupScaling(t *testing.T) { -// nodeCount := 20 -// env := setupTestEnvironment(t, nodeCount) + // Check if the message is no longer in the bloom filter + assert.False(t, env.chats[1].bloomFilter.Test(messageID), "Message should no longer be in the bloom filter") -// // Send a message from the first node -// env.chats[0].SendMessage("Broadcast message to large group") + // Send another message to ensure the filter still works for new messages + env.chats[0].SendMessage("New test message") + time.Sleep(1 * time.Second) -// // Allow time for propagation -// time.Sleep(time.Duration(nodeCount*100) * time.Millisecond) + newMessageID := env.chats[0].messageHistory[1].MessageId + // Check if the new message is in the bloom filter + assert.Eventually(t, func() bool { + return env.chats[1].bloomFilter.Test(newMessageID) + }, 30*time.Second, 1*time.Second, "New message should be in the bloom filter") +} -// // Verify all nodes received the message -// for i, chat := range env.chats { -// assert.Len(t, chat.messageHistory, 1, "Node %d should have received the broadcast message", i) -// assert.Equal(t, "Broadcast message to large group", chat.messageHistory[0].Content) -// } -// } +func TestConflictResolution(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + nodeCount := 3 + env, err := setupTestEnvironment(ctx, t, nodeCount) + require.NoError(t, err, "Failed to set up test environment") + + // Create conflicting messages with the same Lamport timestamp + conflictingMsg1 := &pb.Message{ + SenderId: "Node0", + MessageId: "msg1", + LamportTimestamp: 1, + Content: "Conflict 1", + } + conflictingMsg2 := &pb.Message{ + SenderId: "Node1", + MessageId: "msg2", + LamportTimestamp: 1, + Content: "Conflict 2", + } + + // Process the conflicting messages in different orders on different nodes + env.chats[0].processReceivedMessage(conflictingMsg1) + env.chats[0].processReceivedMessage(conflictingMsg2) + + env.chats[1].processReceivedMessage(conflictingMsg2) + env.chats[1].processReceivedMessage(conflictingMsg1) + + // Check if the messages are ordered consistently across nodes + assert.Equal(t, env.chats[0].messageHistory[0].MessageId, env.chats[1].messageHistory[0].MessageId, "Conflicting messages should be ordered consistently") + assert.Equal(t, env.chats[0].messageHistory[1].MessageId, env.chats[1].messageHistory[1].MessageId, "Conflicting messages should be ordered consistently") +} diff --git a/examples/chat2-reliable/pb/chat2.pb.go b/examples/chat2-reliable/pb/chat2.pb.go index 6230a3305..2ae649563 100644 --- a/examples/chat2-reliable/pb/chat2.pb.go +++ b/examples/chat2-reliable/pb/chat2.pb.go @@ -115,6 +115,101 @@ func (x *Message) GetContent() string { return "" } +// only for peer retrieval instead of store +type MessageRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + MessageId string `protobuf:"bytes,1,opt,name=message_id,json=messageId,proto3" json:"message_id,omitempty"` +} + +func (x *MessageRequest) Reset() { + *x = MessageRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_chat2_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *MessageRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MessageRequest) ProtoMessage() {} + +func (x *MessageRequest) ProtoReflect() protoreflect.Message { + mi := &file_chat2_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MessageRequest.ProtoReflect.Descriptor instead. +func (*MessageRequest) Descriptor() ([]byte, []int) { + return file_chat2_proto_rawDescGZIP(), []int{1} +} + +func (x *MessageRequest) GetMessageId() string { + if x != nil { + return x.MessageId + } + return "" +} + +type MessageResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Message *Message `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"` +} + +func (x *MessageResponse) Reset() { + *x = MessageResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_chat2_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *MessageResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MessageResponse) ProtoMessage() {} + +func (x *MessageResponse) ProtoReflect() protoreflect.Message { + mi := &file_chat2_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MessageResponse.ProtoReflect.Descriptor instead. +func (*MessageResponse) Descriptor() ([]byte, []int) { + return file_chat2_proto_rawDescGZIP(), []int{2} +} + +func (x *MessageResponse) GetMessage() *Message { + if x != nil { + return x.Message + } + return nil +} + var File_chat2_proto protoreflect.FileDescriptor var file_chat2_proto_rawDesc = []byte{ @@ -134,8 +229,14 @@ var file_chat2_proto_rawDesc = []byte{ 0x62, 0x6c, 0x6f, 0x6f, 0x6d, 0x5f, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0b, 0x62, 0x6c, 0x6f, 0x6f, 0x6d, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x07, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x33, + 0x52, 0x07, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x22, 0x2f, 0x0a, 0x0e, 0x4d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x6d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x09, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x49, 0x64, 0x22, 0x38, 0x0a, 0x0f, 0x4d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, + 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0b, + 0x2e, 0x70, 0x62, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x07, 0x6d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -150,16 +251,19 @@ func file_chat2_proto_rawDescGZIP() []byte { return file_chat2_proto_rawDescData } -var file_chat2_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_chat2_proto_msgTypes = make([]protoimpl.MessageInfo, 3) var file_chat2_proto_goTypes = []any{ - (*Message)(nil), // 0: pb.Message + (*Message)(nil), // 0: pb.Message + (*MessageRequest)(nil), // 1: pb.MessageRequest + (*MessageResponse)(nil), // 2: pb.MessageResponse } var file_chat2_proto_depIdxs = []int32{ - 0, // [0:0] is the sub-list for method output_type - 0, // [0:0] is the sub-list for method input_type - 0, // [0:0] is the sub-list for extension type_name - 0, // [0:0] is the sub-list for extension extendee - 0, // [0:0] is the sub-list for field type_name + 0, // 0: pb.MessageResponse.message:type_name -> pb.Message + 1, // [1:1] is the sub-list for method output_type + 1, // [1:1] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name } func init() { file_chat2_proto_init() } @@ -180,6 +284,30 @@ func file_chat2_proto_init() { return nil } } + file_chat2_proto_msgTypes[1].Exporter = func(v any, i int) any { + switch v := v.(*MessageRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_chat2_proto_msgTypes[2].Exporter = func(v any, i int) any { + switch v := v.(*MessageResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -187,7 +315,7 @@ func file_chat2_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_chat2_proto_rawDesc, NumEnums: 0, - NumMessages: 1, + NumMessages: 3, NumExtensions: 0, NumServices: 0, }, diff --git a/examples/chat2-reliable/pb/chat2.proto b/examples/chat2-reliable/pb/chat2.proto index c8776fc05..60db0f1e6 100644 --- a/examples/chat2-reliable/pb/chat2.proto +++ b/examples/chat2-reliable/pb/chat2.proto @@ -10,4 +10,13 @@ message Message { string channel_id = 5; bytes bloom_filter = 6; string content = 7; +} + +// only for peer retrieval instead of store +message MessageRequest { + string message_id = 1; +} + +message MessageResponse { + Message message = 1; } \ No newline at end of file diff --git a/examples/chat2-reliable/peer_retrieval.go b/examples/chat2-reliable/peer_retrieval.go new file mode 100644 index 000000000..3459cc91e --- /dev/null +++ b/examples/chat2-reliable/peer_retrieval.go @@ -0,0 +1,191 @@ +package main + +import ( + "chat2-reliable/pb" + "context" + "encoding/base64" + "fmt" + "io" + "time" + + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" + "github.com/waku-org/go-waku/waku/v2/peermanager" + wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/protocol/store" + "google.golang.org/protobuf/proto" +) + +// below functions are specifically for peer retrieval of missing msgs instead of store +func (c *Chat) doRequestMissingMessageFromPeers(messageID string) error { + peers := c.node.Host().Network().Peers() + for _, peerID := range peers { + msg, err := c.requestMessageFromPeer(peerID, messageID) + if err == nil && msg != nil { + c.processReceivedMessage(msg) + return nil + } + } + return fmt.Errorf("no peers could provide the missing message") +} + +func (c *Chat) requestMessageFromPeer(peerID peer.ID, messageID string) (*pb.Message, error) { + ctx, cancel := context.WithTimeout(c.ctx, 30*time.Second) + defer cancel() + + stream, err := c.node.Host().NewStream(ctx, peerID, protocol.ID("/chat2-reliable/message-request/1.0.0")) + if err != nil { + return nil, fmt.Errorf("failed to open stream to peer: %w", err) + } + defer stream.Close() + + // Send message request + request := &pb.MessageRequest{MessageId: messageID} + err = writeProtobufMessage(stream, request) + if err != nil { + return nil, fmt.Errorf("failed to send message request: %w", err) + } + + // Read response + response := &pb.MessageResponse{} + err = readProtobufMessage(stream, response) + if err != nil { + return nil, fmt.Errorf("failed to read message response: %w", err) + } + + fmt.Printf("Received message response from peer %s\n", peerID.String()) + + if response.Message == nil { + return nil, fmt.Errorf("peer did not have the requested message") + } + return response.Message, nil +} + +// Helper functions for protobuf message reading/writing +func writeProtobufMessage(stream network.Stream, msg proto.Message) error { + data, err := proto.Marshal(msg) + if err != nil { + return err + } + + _, err = stream.Write(data) + if err != nil { + return err + } + + // Add a delay before closing the stream + time.Sleep(1 * time.Second) + + err = stream.Close() + if err != nil { + return err + } + return nil +} + +func readProtobufMessage(stream network.Stream, msg proto.Message) error { + err := stream.SetReadDeadline(time.Now().Add(30 * time.Second)) + if err != nil { + return fmt.Errorf("failed to set read deadline: %w", err) + } + + var data []byte + buffer := make([]byte, 1024) + for { + n, err := stream.Read(buffer) + if err != nil { + if err == io.EOF { + break + } + return err + } + data = append(data, buffer[:n]...) + } + + err = proto.Unmarshal(data, msg) + if err != nil { + return err + } + return nil +} + +func (c *Chat) handleMessageRequest(stream network.Stream) { + defer stream.Close() + + request := &pb.MessageRequest{} + err := readProtobufMessage(stream, request) + if err != nil { + c.ui.ErrorMessage(fmt.Errorf("failed to read message request: %w", err)) + return + } + + c.mutex.Lock() + var foundMessage *pb.Message + for _, msg := range c.messageHistory { + if msg.MessageId == request.MessageId { + foundMessage = msg + break + } + } + c.mutex.Unlock() + + response := &pb.MessageResponse{Message: foundMessage} + err = writeProtobufMessage(stream, response) + if err != nil { + c.ui.ErrorMessage(fmt.Errorf("failed to send message response: %w", err)) + return + } +} + +func (c *Chat) setupMessageRequestHandler() { + c.node.Host().SetStreamHandler(protocol.ID("/chat2-reliable/message-request/1.0.0"), c.handleMessageRequest) +} + +func (c *Chat) _doRequestMissingMessageFromStore(messageID string) error { + ctx, cancel := context.WithTimeout(c.ctx, 10*time.Second) + defer cancel() + + hash, err := base64.URLEncoding.DecodeString(messageID) + if err != nil { + return fmt.Errorf("failed to parse message hash: %w", err) + } + + x := store.MessageHashCriteria{ + MessageHashes: []wpb.MessageHash{wpb.ToMessageHash(hash)}, + } + + peers, err := c.node.PeerManager().SelectPeers(peermanager.PeerSelectionCriteria{ + SelectionType: peermanager.Automatic, + Proto: store.StoreQueryID_v300, + PubsubTopics: []string{relay.DefaultWakuTopic}, + Ctx: ctx, + }) + if err != nil { + return fmt.Errorf("failed to find a store node: %w", err) + } + response, err := c.node.Store().Request(ctx, x, + store.WithAutomaticRequestID(), + store.WithPeer(peers[0]), + //store.WithAutomaticPeerSelection(), + store.WithPaging(true, 100), // Use paging to handle potentially large result sets + ) + + if err != nil { + return fmt.Errorf("failed to retrieve missing message: %w", err) + } + + for _, msg := range response.Messages() { + decodedMsg, err := decodeMessage(c.options.ContentTopic, msg.Message) + if err != nil { + continue + } + if decodedMsg.MessageId == messageID { + c.processReceivedMessage(decodedMsg) + return nil + } + } + + return fmt.Errorf("missing message not found: %s", messageID) +} diff --git a/examples/chat2-reliable/reliability.go b/examples/chat2-reliable/reliability.go index b28dd561e..f02017385 100644 --- a/examples/chat2-reliable/reliability.go +++ b/examples/chat2-reliable/reliability.go @@ -3,29 +3,138 @@ package main import ( "chat2-reliable/pb" "context" - "encoding/base64" "fmt" + "sort" + "sync" "time" "github.com/bits-and-blooms/bloom/v3" - "github.com/waku-org/go-waku/waku/v2/peermanager" - "github.com/waku-org/go-waku/waku/v2/protocol" - wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" - "github.com/waku-org/go-waku/waku/v2/protocol/relay" - "github.com/waku-org/go-waku/waku/v2/protocol/store" ) const ( - bloomFilterSize = 10000 - bloomFilterFPRate = 0.01 - bufferSweepInterval = 5 * time.Second - syncMessageInterval = 30 * time.Second - messageAckTimeout = 10 * time.Second + bloomFilterSize = 10000 + bloomFilterFPRate = 0.01 + bloomFilterWindow = 1 * time.Hour + bloomFilterCleanInterval = 30 * time.Minute + bufferSweepInterval = 5 * time.Second + syncMessageInterval = 60 * time.Second + messageAckTimeout = 10 * time.Second + maxRetries = 3 + retryBaseDelay = 1 * time.Second + maxRetryDelay = 10 * time.Second + ackTimeout = 5 * time.Second + maxResendAttempts = 5 + resendBaseDelay = 1 * time.Second + maxResendDelay = 30 * time.Second ) +func (c *Chat) initReliabilityProtocol() { + c.wg.Add(5) + go c.periodicBufferSweep() + go c.periodicSyncMessage() + go c.setupMessageRequestHandler() + go c.startBloomFilterCleaner() + go c.startEagerPushMechanism() +} + +func (c *Chat) startEagerPushMechanism() { + defer c.wg.Done() + + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + for { + select { + case <-c.ctx.Done(): + return + case <-ticker.C: + c.checkUnacknowledgedMessages() + } + } +} + +type UnacknowledgedMessage struct { + Message *pb.Message + SendTime time.Time + ResendAttempts int +} + +type TimestampedMessageID struct { + ID string + Timestamp time.Time +} + +type RollingBloomFilter struct { + filter *bloom.BloomFilter + window time.Duration + messages []TimestampedMessageID + mutex sync.Mutex +} + +func NewRollingBloomFilter() *RollingBloomFilter { + return &RollingBloomFilter{ + filter: bloom.NewWithEstimates(bloomFilterSize, bloomFilterFPRate), + window: bloomFilterWindow, + messages: make([]TimestampedMessageID, 0), + } +} + +func (rbf *RollingBloomFilter) Add(messageID string) { + rbf.mutex.Lock() + defer rbf.mutex.Unlock() + + rbf.filter.Add([]byte(messageID)) + rbf.messages = append(rbf.messages, TimestampedMessageID{ + ID: messageID, + Timestamp: time.Now(), + }) +} + +func (rbf *RollingBloomFilter) Test(messageID string) bool { + rbf.mutex.Lock() + defer rbf.mutex.Unlock() + + return rbf.filter.Test([]byte(messageID)) +} + +func (rbf *RollingBloomFilter) Clean() { + rbf.mutex.Lock() + defer rbf.mutex.Unlock() + + cutoff := time.Now().Add(-rbf.window) + newMessages := make([]TimestampedMessageID, 0) + newFilter := bloom.NewWithEstimates(bloomFilterSize, bloomFilterFPRate) + + for _, msg := range rbf.messages { + if msg.Timestamp.After(cutoff) { + newMessages = append(newMessages, msg) + newFilter.Add([]byte(msg.ID)) + } + } + + rbf.messages = newMessages + rbf.filter = newFilter +} + +func (c *Chat) startBloomFilterCleaner() { + defer c.wg.Done() + + ticker := time.NewTicker(bloomFilterCleanInterval) + defer ticker.Stop() + + for { + select { + case <-c.ctx.Done(): + return + case <-ticker.C: + c.bloomFilter.Clean() + } + } +} + func (c *Chat) processReceivedMessage(msg *pb.Message) { // Check if the message is already in the bloom filter - if c.bloomFilter.Test([]byte(msg.MessageId)) { + if c.bloomFilter.Test(msg.MessageId) { // Review ACK status of messages in the unacknowledged outgoing buffer c.reviewAckStatus(msg) return @@ -35,36 +144,58 @@ func (c *Chat) processReceivedMessage(msg *pb.Message) { c.updateLamportTimestamp(msg.LamportTimestamp) // Update bloom filter - c.updateBloomFilter(msg.MessageId) + c.bloomFilter.Add(msg.MessageId) // Check causal dependencies - if c.checkCausalDependencies(msg) { + missingDeps := c.checkCausalDependencies(msg) + if len(missingDeps) == 0 { if msg.Content != "" { // Process the message c.ui.ChatMessage(int64(c.getLamportTimestamp()), msg.SenderId, msg.Content) + // Add to message history + c.addToMessageHistory(msg) } - // Add to message history - c.addToMessageHistory(msg) - - // Update received bloom filter for the sender - if msg.BloomFilter != nil { - receivedFilter := &bloom.BloomFilter{} - err := receivedFilter.UnmarshalBinary(msg.BloomFilter) - if err == nil { - c.updateReceivedBloomFilter(msg.SenderId, receivedFilter) - } - } + // Process any messages in the buffer that now have their dependencies met + c.processBufferedMessages() } else { // Request missing dependencies - for _, depID := range msg.CausalHistory { - if !c.bloomFilter.Test([]byte(depID)) { - c.requestMissingMessage(depID) - } + for _, depID := range missingDeps { + c.requestMissingMessage(depID) } // Add to incoming buffer - c.addIncomingBuffer(msg) + c.addToIncomingBuffer(msg) + } +} + +func (c *Chat) processBufferedMessages() { + c.mutex.Lock() + defer c.mutex.Unlock() + + processed := make(map[string]bool) + remainingBuffer := make([]*pb.Message, 0, len(c.incomingBuffer)) + + for _, msg := range c.incomingBuffer { + if processed[msg.MessageId] { + continue + } + + missingDeps := c.checkCausalDependencies(msg) + if len(missingDeps) == 0 { + // Release the lock while processing the message + c.mutex.Unlock() + if msg.Content != "" { + c.ui.ChatMessage(int64(c.getLamportTimestamp()), msg.SenderId, msg.Content) + } + + c.addToMessageHistory(msg) + c.mutex.Lock() + } else { + remainingBuffer = append(remainingBuffer, msg) + } } + + c.incomingBuffer = remainingBuffer } func (c *Chat) reviewAckStatus(msg *pb.Message) { @@ -74,7 +205,7 @@ func (c *Chat) reviewAckStatus(msg *pb.Message) { // Review causal history for _, msgID := range msg.CausalHistory { for i, outMsg := range c.outgoingBuffer { - if outMsg.MessageId == msgID { + if outMsg.Message.MessageId == msgID { // acknowledged and remove from outgoing buffer c.outgoingBuffer = append(c.outgoingBuffer[:i], c.outgoingBuffer[i+1:]...) break @@ -84,13 +215,14 @@ func (c *Chat) reviewAckStatus(msg *pb.Message) { // Review bloom filter if msg.BloomFilter != nil { - receivedFilter := &bloom.BloomFilter{} + receivedFilter := bloom.NewWithEstimates(bloomFilterSize, bloomFilterFPRate) err := receivedFilter.UnmarshalBinary(msg.BloomFilter) if err == nil { - for i, outMsg := range c.outgoingBuffer { - if receivedFilter.Test([]byte(outMsg.MessageId)) { + for i := 0; i < len(c.outgoingBuffer); i++ { + if receivedFilter.Test([]byte(c.outgoingBuffer[i].Message.MessageId)) { // possibly acknowledged and remove it from the outgoing buffer c.outgoingBuffer = append(c.outgoingBuffer[:i], c.outgoingBuffer[i+1:]...) + i-- } } } @@ -98,85 +230,77 @@ func (c *Chat) reviewAckStatus(msg *pb.Message) { } func (c *Chat) requestMissingMessage(messageID string) { - // Implement logic to request a missing message from Store nodes or other participants - //go func() { - ctx, cancel := context.WithTimeout(c.ctx, 10*time.Second) - defer cancel() - - hash, err := base64.URLEncoding.DecodeString(messageID) - if err != nil { - c.ui.ErrorMessage(fmt.Errorf("failed to parse message hash: %w", err)) - return - } - - x := store.MessageHashCriteria{ - MessageHashes: []wpb.MessageHash{wpb.ToMessageHash(hash)}, - } + for retry := 0; retry < maxRetries; retry++ { + err := c.doRequestMissingMessageFromPeers(messageID) + if err == nil { + return + } - peers, err := c.node.PeerManager().SelectPeers(peermanager.PeerSelectionCriteria{ - SelectionType: peermanager.Automatic, - Proto: store.StoreQueryID_v300, - PubsubTopics: []string{relay.DefaultWakuTopic}, - Ctx: ctx, - }) - if err != nil { - c.ui.ErrorMessage(fmt.Errorf("failed to find a store node: %w", err)) - return - } - response, err := c.node.Store().Request(ctx, x, - store.WithAutomaticRequestID(), - store.WithPeer(peers[0]), - //store.WithAutomaticPeerSelection(), - store.WithPaging(true, 100), // Use paging to handle potentially large result sets - ) + c.ui.ErrorMessage(fmt.Errorf("failed to retrieve missing message (attempt %d): %w", retry+1, err)) - if err != nil { - c.ui.ErrorMessage(fmt.Errorf("failed to retrieve missing message: %w", err)) - return - } - - // Filter the response to find the specific message - for _, msg := range response.Messages() { - decodedMsg, err := decodeMessage(c.options.ContentTopic, msg.Message) - if err != nil { - continue - } - if decodedMsg.MessageId == messageID { - c.C <- protocol.NewEnvelope(msg.Message, msg.Message.GetTimestamp(), relay.DefaultWakuTopic) - return + // Exponential backoff + delay := retryBaseDelay * time.Duration(1< maxRetryDelay { + delay = maxRetryDelay } + time.Sleep(delay) } - c.ui.ErrorMessage(fmt.Errorf("missing message not found: %s", messageID)) - //}() + c.ui.ErrorMessage(fmt.Errorf("failed to retrieve missing message after %d attempts: %s", maxRetries, messageID)) } -func (c *Chat) checkCausalDependencies(msg *pb.Message) bool { - c.mutex.Lock() - defer c.mutex.Unlock() +func (c *Chat) checkCausalDependencies(msg *pb.Message) []string { + var missingDeps []string for _, depID := range msg.CausalHistory { - if !c.bloomFilter.Test([]byte(depID)) { - return false + if !c.bloomFilter.Test(depID) { + missingDeps = append(missingDeps, depID) } } - return true -} - -func (c *Chat) updateBloomFilter(messageID string) { - c.mutex.Lock() - defer c.mutex.Unlock() - c.bloomFilter.Add([]byte(messageID)) + return missingDeps } func (c *Chat) addToMessageHistory(msg *pb.Message) { c.mutex.Lock() defer c.mutex.Unlock() + c.messageHistory = append(c.messageHistory, msg) + c.messageHistory = c.resolveConflicts(c.messageHistory) + if len(c.messageHistory) > maxMessageHistory { - c.messageHistory = c.messageHistory[1:] + c.messageHistory = c.messageHistory[len(c.messageHistory)-maxMessageHistory:] } } +func (c *Chat) resolveConflicts(messages []*pb.Message) []*pb.Message { + // Group messages by Lamport timestamp + groupedMessages := make(map[int32][]*pb.Message) + for _, msg := range messages { + groupedMessages[msg.LamportTimestamp] = append(groupedMessages[msg.LamportTimestamp], msg) + } + + // Sort timestamps + var timestamps []int32 + for ts := range groupedMessages { + timestamps = append(timestamps, ts) + } + sort.Slice(timestamps, func(i, j int) bool { return timestamps[i] < timestamps[j] }) + + // Resolve conflicts and create a new ordered list + var resolvedMessages []*pb.Message + for _, ts := range timestamps { + msgs := groupedMessages[ts] + if len(msgs) == 1 { + resolvedMessages = append(resolvedMessages, msgs[0]) + } else { + // Sort conflicting messages by MessageId + sort.Slice(msgs, func(i, j int) bool { return msgs[i].MessageId < msgs[j].MessageId }) + resolvedMessages = append(resolvedMessages, msgs...) + } + } + + return resolvedMessages +} + func (c *Chat) periodicBufferSweep() { defer c.wg.Done() @@ -188,58 +312,64 @@ func (c *Chat) periodicBufferSweep() { case <-c.ctx.Done(): return case <-ticker.C: - c.sweepBuffers() - } - } -} + // Process incoming buffer + c.processBufferedMessages() -func (c *Chat) sweepBuffers() { - // Process incoming buffer - newIncomingBuffer := make([]*pb.Message, 0) - for _, msg := range c.incomingBuffer { - if c.checkCausalDependencies(msg) { - c.processReceivedMessage(msg) - } else { - newIncomingBuffer = append(newIncomingBuffer, msg) - } - } - c.setIncomingBuffer(newIncomingBuffer) - - // Resend unacknowledged messages from outgoing buffer - for _, msg := range c.outgoingBuffer { - if !c.isMessageAcknowledged(msg) { - c.resendMessage(msg) + // Resend unacknowledged messages from outgoing buffer + c.checkUnacknowledgedMessages() } } } -func (c *Chat) isMessageAcknowledged(msg *pb.Message) bool { +func (c *Chat) checkUnacknowledgedMessages() { c.mutex.Lock() defer c.mutex.Unlock() - ackCount := 0 - totalPeers := len(c.receivedBloomFilters) - for _, filter := range c.receivedBloomFilters { - if filter.Test([]byte(msg.MessageId)) { - ackCount++ + now := time.Now() + for i := 0; i < len(c.outgoingBuffer); i++ { + unackMsg := c.outgoingBuffer[i] + if now.Sub(unackMsg.SendTime) > ackTimeout { + if unackMsg.ResendAttempts < maxResendAttempts { + c.resendMessage(unackMsg.Message) + c.outgoingBuffer[i].ResendAttempts++ + c.outgoingBuffer[i].SendTime = now + } else { + // Remove the message from the buffer after max attempts + c.outgoingBuffer = append(c.outgoingBuffer[:i], c.outgoingBuffer[i+1:]...) + i-- // Adjust index after removal + c.ui.ErrorMessage(fmt.Errorf("message %s dropped: failed to be acknowledged after %d attempts", unackMsg.Message.MessageId, maxResendAttempts)) + } } } - - // Consider a message acknowledged if at least 2/3 of peers have it in their bloom filter - return ackCount >= (2 * totalPeers / 3) } func (c *Chat) resendMessage(msg *pb.Message) { - // Implement logic to resend the message - //go func() { - ctx, cancel := context.WithTimeout(c.ctx, messageAckTimeout) - defer cancel() + go func() { + delay := resendBaseDelay * time.Duration(1< maxResendDelay { + delay = maxResendDelay + } + time.Sleep(delay) - err := c.publish(ctx, msg) - if err != nil { - c.ui.ErrorMessage(fmt.Errorf("failed to resend message: %w", err)) + ctx, cancel := context.WithTimeout(c.ctx, ackTimeout) + defer cancel() + + err := c.publish(ctx, msg) + if err != nil { + c.ui.ErrorMessage(fmt.Errorf("failed to resend message: %w", err)) + } + }() +} + +func (c *Chat) getResendAttempts(messageId string) int { + c.mutex.Lock() + defer c.mutex.Unlock() + for _, unackMsg := range c.outgoingBuffer { + if unackMsg.Message.MessageId == messageId { + return unackMsg.ResendAttempts + } } - //}() + return 0 } func (c *Chat) periodicSyncMessage() { @@ -259,7 +389,7 @@ func (c *Chat) periodicSyncMessage() { } func (c *Chat) sendSyncMessage() { - bloomBytes, err := c.bloomFilterBytes() + bloomBytes, err := c.bloomFilter.MarshalBinary() if err != nil { c.ui.ErrorMessage(fmt.Errorf("failed to marshal bloom filter: %w", err)) return @@ -268,42 +398,29 @@ func (c *Chat) sendSyncMessage() { syncMsg := &pb.Message{ SenderId: c.node.Host().ID().String(), MessageId: generateUniqueID(), - LamportTimestamp: c.lamportTimestamp, - CausalHistory: c.getRecentMessageIDs(2), + LamportTimestamp: c.getLamportTimestamp(), + CausalHistory: c.getRecentMessageIDs(10), ChannelId: c.options.ContentTopic, BloomFilter: bloomBytes, Content: "", // Empty content for sync messages } - err = c.publish(c.ctx, syncMsg) + ctx, cancel := context.WithTimeout(c.ctx, messageAckTimeout) + defer cancel() + + err = c.publish(ctx, syncMsg) if err != nil { c.ui.ErrorMessage(fmt.Errorf("failed to send sync message: %w", err)) } } -func (c *Chat) addIncomingBuffer(msg *pb.Message) { +func (c *Chat) addToIncomingBuffer(msg *pb.Message) { c.mutex.Lock() defer c.mutex.Unlock() c.incomingBuffer = append(c.incomingBuffer, msg) } -func (c *Chat) setIncomingBuffer(newBuffer []*pb.Message) { - c.incomingBuffer = newBuffer -} - -func (c *Chat) bloomFilterBytes() ([]byte, error) { - c.mutex.Lock() - defer c.mutex.Unlock() - return c.bloomFilter.MarshalBinary() -} - -func (c *Chat) updateReceivedBloomFilter(senderId string, receivedFilter *bloom.BloomFilter) { - c.mutex.Lock() - defer c.mutex.Unlock() - c.receivedBloomFilters[senderId] = receivedFilter -} - func (c *Chat) incLamportTimestamp() { c.mutex.Lock() defer c.mutex.Unlock() @@ -314,7 +431,9 @@ func (c *Chat) updateLamportTimestamp(msgTs int32) { c.mutex.Lock() defer c.mutex.Unlock() if msgTs > c.lamportTimestamp { - c.lamportTimestamp = msgTs + c.lamportTimestamp = msgTs + 1 + } else { + c.lamportTimestamp++ } } @@ -323,3 +442,17 @@ func (c *Chat) getLamportTimestamp() int32 { defer c.mutex.Unlock() return c.lamportTimestamp } + +// MarshalBinary implements the encoding.BinaryMarshaler interface for RollingBloomFilter +func (rbf *RollingBloomFilter) MarshalBinary() ([]byte, error) { + rbf.mutex.Lock() + defer rbf.mutex.Unlock() + return rbf.filter.MarshalBinary() +} + +// UnmarshalBinary implements the encoding.BinaryUnmarshaler interface for RollingBloomFilter +func (rbf *RollingBloomFilter) UnmarshalBinary(data []byte) error { + rbf.mutex.Lock() + defer rbf.mutex.Unlock() + return rbf.filter.UnmarshalBinary(data) +} diff --git a/examples/chat2-reliable/test_utils.go b/examples/chat2-reliable/test_utils.go new file mode 100644 index 000000000..990d8f86e --- /dev/null +++ b/examples/chat2-reliable/test_utils.go @@ -0,0 +1,92 @@ +package main + +import ( + "context" + "fmt" + "sync" + + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/waku/v2/node" +) + +type NetworkController struct { + nodes []*node.WakuNode + chats []*Chat + connected map[peer.ID]map[peer.ID]bool + mu sync.Mutex +} + +func NewNetworkController(nodes []*node.WakuNode, chats []*Chat) *NetworkController { + nc := &NetworkController{ + nodes: nodes, + chats: chats, + connected: make(map[peer.ID]map[peer.ID]bool), + } + + for _, n := range nodes { + nc.connected[n.Host().ID()] = make(map[peer.ID]bool) + for _, other := range nodes { + if n != other { + nc.connected[n.Host().ID()][other.Host().ID()] = true + } + } + } + + return nc +} + +func (nc *NetworkController) DisconnectNode(node *node.WakuNode) { + nc.mu.Lock() + defer nc.mu.Unlock() + + nodeID := node.Host().ID() + for _, other := range nc.nodes { + otherID := other.Host().ID() + if nodeID != otherID { + nc.disconnectPeers(node.Host(), other.Host()) + nc.connected[nodeID][otherID] = false + nc.connected[otherID][nodeID] = false + } + } +} + +func (nc *NetworkController) ReconnectNode(node *node.WakuNode) { + nc.mu.Lock() + defer nc.mu.Unlock() + + nodeID := node.Host().ID() + for _, other := range nc.nodes { + otherID := other.Host().ID() + if nodeID != otherID && !nc.connected[nodeID][otherID] { + nc.connectPeers(node.Host(), other.Host()) + nc.connected[nodeID][otherID] = true + nc.connected[otherID][nodeID] = true + fmt.Printf("Reconnected node %s to node %s\n", nodeID.String(), otherID.String()) + } + } +} + +func (nc *NetworkController) disconnectPeers(h1, h2 host.Host) { + h1.Network().ClosePeer(h2.ID()) + h2.Network().ClosePeer(h1.ID()) +} + +func (nc *NetworkController) connectPeers(h1, h2 host.Host) { + ctx := context.Background() + h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}) +} + +func (nc *NetworkController) IsConnected(n1, n2 *node.WakuNode) bool { + nc.mu.Lock() + defer nc.mu.Unlock() + return nc.connected[n1.Host().ID()][n2.Host().ID()] +} + +// func (c *Chat) checkPeerConnections() { +// peers := c.node.Host().Network().Peers() +// fmt.Printf("Node %s: Connected to %d peers\n", c.node.Host().ID().String(), len(peers)) +// for _, peer := range peers { +// fmt.Printf(" - %s\n", peer.String()) +// } +// }