Skip to content

Commit

Permalink
fix: use corrected connected peer count and add check to avoid crash (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
chaitanyaprem authored Aug 6, 2024
1 parent f3560ce commit 5aa1131
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 19 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ jobs:
- name: "Run storev3 tests"
run: |
docker compose -f .github/docker-compose/nwaku.yml up -d
NWAKU_HOST=$(docker-compose -f .github/docker-compose/nwaku.yml port nwaku 60000)
NWAKU_HOST=$(docker compose -f .github/docker-compose/nwaku.yml port nwaku 60000)
NWAKU_PORT=$(echo $NWAKU_HOST | cut -d ":" -f 2)
sleep 5
make test-storev3 TEST_STOREV3_NODE="/ip4/127.0.0.1/tcp/${NWAKU_PORT}/p2p/16Uiu2HAmMGhfSTUzKbsjMWxc6T1X4wiTWSF1bEWSLjAukCm7KiHV"
33 changes: 15 additions & 18 deletions waku/v2/peermanager/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,32 +309,29 @@ func (pm *PeerManager) ensureMinRelayConnsPerTopic() {
defer pm.topicMutex.RUnlock()
for topicStr, topicInst := range pm.subRelayTopics {

// @cammellos reported that ListPeers returned an invalid number of
// peers. This will ensure that the peers returned by this function
// match those peers that are currently connected

meshPeerLen := pm.checkAndUpdateTopicHealth(topicInst)
topicPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(topicStr)
curPeerLen := topicPeers.Len()
if meshPeerLen < waku_proto.GossipSubDMin || curPeerLen < pm.OutPeersTarget {
curConnectedPeerLen := pm.getPeersBasedOnconnectionStatus(topicStr, network.Connected).Len()

if meshPeerLen < waku_proto.GossipSubDMin || curConnectedPeerLen < pm.OutPeersTarget {
pm.logger.Debug("subscribed topic has not reached target peers, initiating more connections to maintain healthy mesh",
zap.String("pubSubTopic", topicStr), zap.Int("connectedPeerCount", curPeerLen),
zap.String("pubSubTopic", topicStr), zap.Int("connectedPeerCount", curConnectedPeerLen),
zap.Int("targetPeers", pm.OutPeersTarget))
//Find not connected peers.
notConnectedPeers := pm.getNotConnectedPers(topicStr)
notConnectedPeers := pm.getPeersBasedOnconnectionStatus(topicStr, network.NotConnected)
if notConnectedPeers.Len() == 0 {
pm.logger.Debug("could not find any peers in peerstore to connect to, discovering more", zap.String("pubSubTopic", topicStr))
go pm.discoverPeersByPubsubTopics([]string{topicStr}, relay.WakuRelayID_v200, pm.ctx, 2)
continue
}
pm.logger.Debug("connecting to eligible peers in peerstore", zap.String("pubSubTopic", topicStr))
//Connect to eligible peers.
numPeersToConnect := pm.OutPeersTarget - curPeerLen

if numPeersToConnect > notConnectedPeers.Len() {
numPeersToConnect = notConnectedPeers.Len()
numPeersToConnect := pm.OutPeersTarget - curConnectedPeerLen
if numPeersToConnect > 0 {
if numPeersToConnect > notConnectedPeers.Len() {
numPeersToConnect = notConnectedPeers.Len()
}
pm.connectToSpecifiedPeers(notConnectedPeers[0:numPeersToConnect])
}
pm.connectToSpecifiedPeers(notConnectedPeers[0:numPeersToConnect])
}
}
}
Expand Down Expand Up @@ -374,17 +371,17 @@ func (pm *PeerManager) connectToSpecifiedPeers(peers peer.IDSlice) {
}
}

// getNotConnectedPers returns peers for a pubSubTopic that are not connected.
func (pm *PeerManager) getNotConnectedPers(pubsubTopic string) (notConnectedPeers peer.IDSlice) {
// getPeersBasedOnconnectionStatus returns peers for a pubSubTopic that are either connected/not-connected based on status passed.
func (pm *PeerManager) getPeersBasedOnconnectionStatus(pubsubTopic string, connected network.Connectedness) (filteredPeers peer.IDSlice) {
var peerList peer.IDSlice
if pubsubTopic == "" {
peerList = pm.host.Peerstore().Peers()
} else {
peerList = pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubsubTopic)
}
for _, peerID := range peerList {
if pm.host.Network().Connectedness(peerID) != network.Connected {
notConnectedPeers = append(notConnectedPeers, peerID)
if pm.host.Network().Connectedness(peerID) == connected {
filteredPeers = append(filteredPeers, peerID)
}
}
return
Expand Down

0 comments on commit 5aa1131

Please sign in to comment.