Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: emit an event in EventBus upon dial error #1222

Merged
merged 2 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion waku/v2/node/wakunode2.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,9 @@ func (w *WakuNode) DialPeerWithInfo(ctx context.Context, peerInfo peer.AddrInfo)
func (w *WakuNode) connect(ctx context.Context, info peer.AddrInfo) error {
err := w.host.Connect(ctx, info)
if err != nil {
w.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(info.ID)
if w.peermanager != nil {
w.peermanager.HandleDialError(err, info.ID)
}
return err
}

Expand Down
10 changes: 4 additions & 6 deletions waku/v2/peermanager/peer_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package peermanager

import (
"context"
"errors"
"math/rand"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -277,11 +276,10 @@ func (c *PeerConnectionStrategy) dialPeer(pi peer.AddrInfo, sem chan struct{}) {
ctx, cancel := context.WithTimeout(c.Context(), c.dialTimeout)
defer cancel()
err := c.host.Connect(ctx, pi)
if err != nil && !errors.Is(err, context.Canceled) {
c.addConnectionBackoff(pi.ID)
c.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(pi.ID)
c.logger.Warn("connecting to peer", logging.HostID("peerID", pi.ID), zap.Error(err))
if err != nil {
c.pm.HandleDialError(err, pi.ID)
} else {
c.host.Peerstore().(wps.WakuPeerstore).ResetConnFailures(pi.ID)
}
c.host.Peerstore().(wps.WakuPeerstore).ResetConnFailures(pi.ID)
<-sem
}
29 changes: 29 additions & 0 deletions waku/v2/peermanager/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/metadata"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/service"
"github.com/waku-org/go-waku/waku/v2/utils"

"go.uber.org/zap"
)
Expand Down Expand Up @@ -87,6 +88,7 @@ type PeerManager struct {
TopicHealthNotifCh chan<- TopicHealthStatus
rttCache *FastestPeerSelector
RelayEnabled bool
evtDialError event.Emitter
}

// PeerSelection provides various options based on which Peer is selected from a list of peers.
Expand Down Expand Up @@ -249,6 +251,14 @@ func (pm *PeerManager) Start(ctx context.Context) {
go pm.connectivityLoop(ctx)
}
go pm.peerStoreLoop(ctx)

if pm.host != nil {
var err error
pm.evtDialError, err = pm.host.EventBus().Emitter(new(utils.DialError))
if err != nil {
pm.logger.Error("failed to create dial error emitter", zap.Error(err))
}
}
}

func (pm *PeerManager) peerStoreLoop(ctx context.Context) {
Expand Down Expand Up @@ -719,3 +729,22 @@ func (pm *PeerManager) addPeerToServiceSlot(proto protocol.ID, peerID peer.ID) {
// getPeers returns nil for WakuRelayIDv200 protocol, but we don't run this ServiceSlot code for WakuRelayIDv200 protocol
pm.serviceSlots.getPeers(proto).add(peerID)
}

func (pm *PeerManager) HandleDialError(err error, peerID peer.ID) {
if err == nil || errors.Is(err, context.Canceled) {
return
}
if pm.peerConnector != nil {
pm.peerConnector.addConnectionBackoff(peerID)
}
if pm.host != nil {
pm.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(peerID)
}
pm.logger.Warn("connecting to peer", logging.HostID("peerID", peerID), zap.Error(err))
if pm.evtDialError != nil {
emitterErr := pm.evtDialError.Emit(utils.DialError{Err: err, PeerID: peerID})
if emitterErr != nil {
pm.logger.Error("failed to emit DialError", zap.Error(emitterErr))
}
}
}
1 change: 1 addition & 0 deletions waku/v2/peermanager/peer_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ func TestConnectToRelayPeers(t *testing.T) {
ctx, pm, deferFn := initTest(t)
pc, err := NewPeerConnectionStrategy(pm, onlinechecker.NewDefaultOnlineChecker(true), 120*time.Second, pm.logger)
require.NoError(t, err)
pc.SetHost(pm.host)
err = pc.Start(ctx)
require.NoError(t, err)
pm.Start(ctx)
Expand Down
4 changes: 2 additions & 2 deletions waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,8 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte,
stream, err := wf.h.NewStream(ctx, peerID, FilterSubscribeID_v20beta1)
if err != nil {
wf.metrics.RecordError(dialFailure)
if ps, ok := wf.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(peerID)
if wf.pm != nil {
wf.pm.HandleDialError(err, peerID)
}
return err
}
Expand Down
8 changes: 5 additions & 3 deletions waku/v2/protocol/filter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/libp2p/go-msgio/pbio"
"github.com/prometheus/client_golang/prometheus"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
Expand All @@ -38,6 +38,7 @@ type (
log *zap.Logger
*service.CommonService
subscriptions *SubscribersMap
pm *peermanager.PeerManager

maxSubscriptions int
}
Expand All @@ -61,6 +62,7 @@ func NewWakuFilterFullNode(timesource timesource.Timesource, reg prometheus.Regi
wf.maxSubscriptions = params.MaxSubscribers
if params.pm != nil {
params.pm.RegisterWakuProtocol(FilterSubscribeID_v20beta1, FilterSubscribeENRField)
wf.pm = params.pm
}
return wf
}
Expand Down Expand Up @@ -274,8 +276,8 @@ func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, logger *zap.Logge
wf.metrics.RecordError(pushTimeoutFailure)
} else {
wf.metrics.RecordError(dialFailure)
if ps, ok := wf.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(peerID)
if wf.pm != nil {
wf.pm.HandleDialError(err, peerID)
}
}
logger.Error("opening peer stream", zap.Error(err))
Expand Down
5 changes: 2 additions & 3 deletions waku/v2/protocol/legacy_store/waku_store_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,9 @@ func (store *WakuStore) queryFrom(ctx context.Context, historyRequest *pb.Histor

stream, err := store.h.NewStream(ctx, selectedPeer, StoreID_v20beta4)
if err != nil {
logger.Error("creating stream to peer", zap.Error(err))
store.metrics.RecordError(dialFailure)
if ps, ok := store.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(selectedPeer)
if store.pm != nil {
store.pm.HandleDialError(err, selectedPeer)
}
return nil, err
}
Expand Down
5 changes: 2 additions & 3 deletions waku/v2/protocol/lightpush/waku_lightpush.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,9 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, p

stream, err := wakuLP.h.NewStream(ctx, peerID, LightPushID_v20beta1)
if err != nil {
logger.Error("creating stream to peer", zap.Error(err))
wakuLP.metrics.RecordError(dialFailure)
if ps, ok := wakuLP.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(peerID)
if wakuLP.pm != nil {
wakuLP.pm.HandleDialError(err, peerID)
}
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions waku/v2/protocol/peer_exchange/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts

stream, err := wakuPX.h.NewStream(ctx, params.selectedPeer, PeerExchangeID_v20alpha1)
if err != nil {
if ps, ok := wakuPX.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(params.selectedPeer)
if wakuPX.pm != nil {
wakuPX.pm.HandleDialError(err, params.selectedPeer)
}
return err
}
Expand Down
5 changes: 2 additions & 3 deletions waku/v2/protocol/store/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,8 @@ func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRe

stream, err := s.h.NewStream(ctx, params.selectedPeer, StoreQueryID_v300)
if err != nil {
logger.Error("creating stream to peer", zap.Error(err))
if ps, ok := s.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(params.selectedPeer)
if s.pm != nil {
s.pm.HandleDialError(err, params.selectedPeer)
}
return nil, err
}
Expand Down
5 changes: 5 additions & 0 deletions waku/v2/utils/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ import (
"github.com/multiformats/go-multiaddr"
)

type DialError struct {
Err error
PeerID peer.ID
}

// GetPeerID is used to extract the peerID from a multiaddress
func GetPeerID(m multiaddr.Multiaddr) (peer.ID, error) {
peerIDStr, err := m.ValueForProtocol(multiaddr.P_P2P)
Expand Down
Loading