diff --git a/go.mod b/go.mod index 1293ebc6d91..08ff12c29cf 100644 --- a/go.mod +++ b/go.mod @@ -96,7 +96,7 @@ require ( github.com/schollz/peerdiscovery v1.7.0 github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7 github.com/urfave/cli/v2 v2.27.2 - github.com/waku-org/go-waku v0.8.1-0.20240807022408-c2e6320953b1 + github.com/waku-org/go-waku v0.8.1-0.20240808092421-4f027dde3ded github.com/wk8/go-ordered-map/v2 v2.1.7 github.com/yeqown/go-qrcode/v2 v2.2.1 github.com/yeqown/go-qrcode/writer/standard v1.2.1 diff --git a/go.sum b/go.sum index 668d506261e..c9775cfb248 100644 --- a/go.sum +++ b/go.sum @@ -2143,8 +2143,8 @@ github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5 h1:9u16E github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY= -github.com/waku-org/go-waku v0.8.1-0.20240807022408-c2e6320953b1 h1:UN5y6imIQBXnuq/bPAYJgT6XMZRgQgUO5Mn9VFi3c5A= -github.com/waku-org/go-waku v0.8.1-0.20240807022408-c2e6320953b1/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg= +github.com/waku-org/go-waku v0.8.1-0.20240808092421-4f027dde3ded h1:jeqIa3CnjDiOyqxMif3al39o59F3SkpIzsaz2va2a3I= +github.com/waku-org/go-waku v0.8.1-0.20240808092421-4f027dde3ded/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE= diff --git a/telemetry/client_test.go b/telemetry/client_test.go index dbff6db27ed..df88a42dcbc 100644 --- a/telemetry/client_test.go +++ b/telemetry/client_test.go @@ -25,6 +25,7 @@ import ( "github.com/status-im/status-go/protocol/tt" v1protocol "github.com/status-im/status-go/protocol/v1" "github.com/status-im/status-go/wakuv2" + "github.com/waku-org/go-waku/waku/v2/api/publish" ) var ( @@ -170,7 +171,7 @@ func TestClient_ProcessSentEnvelope(t *testing.T) { Version: proto.Uint32(0), Timestamp: proto.Int64(time.Now().Unix()), }, 0, ""), - PublishMethod: wakuv2.LightPush, + PublishMethod: publish.LightPush, } // Send the telemetry request diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_check.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_check.go index a7b16a5716e..47f7c540899 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_check.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_check.go @@ -30,8 +30,8 @@ type MessageSentCheck struct { messageIDs map[string]map[common.Hash]uint32 messageIDsMu sync.RWMutex storePeerID peer.ID - MessageStoredChan chan common.Hash - MessageExpiredChan chan common.Hash + messageStoredChan chan common.Hash + messageExpiredChan chan common.Hash ctx context.Context store *store.WakuStore timesource timesource.Timesource @@ -43,12 +43,12 @@ type MessageSentCheck struct { } // NewMessageSentCheck creates a new instance of MessageSentCheck with default parameters -func NewMessageSentCheck(ctx context.Context, store *store.WakuStore, timesource timesource.Timesource, logger *zap.Logger) *MessageSentCheck { +func NewMessageSentCheck(ctx context.Context, store *store.WakuStore, timesource timesource.Timesource, msgStoredChan chan common.Hash, msgExpiredChan chan common.Hash, logger *zap.Logger) *MessageSentCheck { return &MessageSentCheck{ messageIDs: make(map[string]map[common.Hash]uint32), messageIDsMu: sync.RWMutex{}, - MessageStoredChan: make(chan common.Hash, 1000), - MessageExpiredChan: make(chan common.Hash, 1000), + messageStoredChan: msgStoredChan, + messageExpiredChan: msgExpiredChan, ctx: ctx, store: store, timesource: timesource, @@ -232,12 +232,12 @@ func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []c if found { ackHashes = append(ackHashes, hash) - m.MessageStoredChan <- hash + m.messageStoredChan <- hash } if !found && uint32(m.timesource.Now().Unix()) > relayTime[i]+m.messageExpiredPerid { missedHashes = append(missedHashes, hash) - m.MessageExpiredChan <- hash + m.messageExpiredChan <- hash } } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_sender.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_sender.go new file mode 100644 index 00000000000..a6bff6c668e --- /dev/null +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_sender.go @@ -0,0 +1,124 @@ +package publish + +import ( + "context" + "errors" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "go.uber.org/zap" + "golang.org/x/time/rate" +) + +const DefaultPeersToPublishForLightpush = 2 +const DefaultPublishingLimiterRate = rate.Limit(2) +const DefaultPublishingLimitBurst = 4 + +type PublishMethod int + +const ( + LightPush PublishMethod = iota + Relay +) + +func (pm PublishMethod) String() string { + switch pm { + case LightPush: + return "LightPush" + case Relay: + return "Relay" + default: + return "Unknown" + } +} + +type MessageSender struct { + ctx context.Context + publishMethod PublishMethod + lightPush *lightpush.WakuLightPush + relay *relay.WakuRelay + messageSentCheck *MessageSentCheck + rateLimiter *PublishRateLimiter + logger *zap.Logger +} + +func NewMessageSender(ctx context.Context, publishMethod PublishMethod, lightPush *lightpush.WakuLightPush, relay *relay.WakuRelay, logger *zap.Logger) *MessageSender { + return &MessageSender{ + ctx: ctx, + publishMethod: publishMethod, + lightPush: lightPush, + relay: relay, + rateLimiter: NewPublishRateLimiter(DefaultPublishingLimiterRate, DefaultPublishingLimitBurst), + logger: logger, + } +} + +func (ms *MessageSender) WithMessageSentCheck(messageSentCheck *MessageSentCheck) *MessageSender { + ms.messageSentCheck = messageSentCheck + return ms +} + +func (ms *MessageSender) WithRateLimiting(rateLimiter *PublishRateLimiter) *MessageSender { + ms.rateLimiter = rateLimiter + return ms +} + +func (ms *MessageSender) Send(env *protocol.Envelope) error { + logger := ms.logger.With(zap.Stringer("envelopeHash", env.Hash()), zap.String("pubsubTopic", env.PubsubTopic()), zap.String("contentTopic", env.Message().ContentTopic), zap.Int64("timestamp", env.Message().GetTimestamp())) + if ms.rateLimiter != nil { + if err := ms.rateLimiter.Check(ms.ctx, logger); err != nil { + return err + } + } + + switch ms.publishMethod { + case LightPush: + if ms.lightPush == nil { + return errors.New("lightpush is not available") + } + logger.Info("publishing message via lightpush") + _, err := ms.lightPush.Publish(ms.ctx, env.Message(), lightpush.WithPubSubTopic(env.PubsubTopic()), lightpush.WithMaxPeers(DefaultPeersToPublishForLightpush)) + return err + case Relay: + if ms.relay == nil { + return errors.New("relay is not available") + } + peerCnt := len(ms.relay.PubSub().ListPeers(env.PubsubTopic())) + logger.Info("publishing message via relay", zap.Int("peerCnt", peerCnt)) + _, err := ms.relay.Publish(ms.ctx, env.Message(), relay.WithPubSubTopic(env.PubsubTopic())) + return err + } + + ephemeral := env.Message().Ephemeral + if ms.messageSentCheck != nil && (ephemeral == nil || !*ephemeral) { + ms.messageSentCheck.Add(env.PubsubTopic(), common.BytesToHash(env.Hash().Bytes()), uint32(env.Message().GetTimestamp()/int64(time.Second))) + } + + return nil +} + +func (ms *MessageSender) Start() { + if ms.messageSentCheck != nil { + go ms.messageSentCheck.Start() + } +} + +func (ms *MessageSender) PublishMethod() PublishMethod { + return ms.publishMethod +} + +func (ms *MessageSender) MessagesDelivered(messageIDs []common.Hash) { + if ms.messageSentCheck != nil { + ms.messageSentCheck.DeleteByMessageIDs(messageIDs) + } +} + +func (ms *MessageSender) SetStorePeerID(peerID peer.ID) { + if ms.messageSentCheck != nil { + ms.messageSentCheck.SetStorePeerID(peerID) + } +} diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/rate_limiting.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/rate_limiting.go index 4322413b31a..87c0f427c98 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/rate_limiting.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/rate_limiting.go @@ -35,3 +35,13 @@ func (p *PublishRateLimiter) ThrottlePublishFn(ctx context.Context, publishFn Pu return publishFn(envelope, logger) } } + +func (p *PublishRateLimiter) Check(ctx context.Context, logger *zap.Logger) error { + if err := p.limiter.Wait(ctx); err != nil { + if !errors.Is(err, context.Canceled) { + logger.Error("could not send message (limiter)", zap.Error(err)) + } + return err + } + return nil +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 90c22f7fa3a..87f193ad531 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1018,7 +1018,7 @@ github.com/waku-org/go-discover/discover/v5wire github.com/waku-org/go-libp2p-rendezvous github.com/waku-org/go-libp2p-rendezvous/db github.com/waku-org/go-libp2p-rendezvous/pb -# github.com/waku-org/go-waku v0.8.1-0.20240807022408-c2e6320953b1 +# github.com/waku-org/go-waku v0.8.1-0.20240808092421-4f027dde3ded ## explicit; go 1.21 github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/tests diff --git a/wakuv2/message_publishing.go b/wakuv2/message_publishing.go index afb8b55aee7..3267cc8b0d5 100644 --- a/wakuv2/message_publishing.go +++ b/wakuv2/message_publishing.go @@ -1,13 +1,9 @@ package wakuv2 import ( - "errors" - "go.uber.org/zap" - "github.com/waku-org/go-waku/waku/v2/api/publish" "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" @@ -15,24 +11,6 @@ import ( "github.com/status-im/status-go/wakuv2/common" ) -type PublishMethod int - -const ( - LightPush PublishMethod = iota - Relay -) - -func (pm PublishMethod) String() string { - switch pm { - case LightPush: - return "LightPush" - case Relay: - return "Relay" - default: - return "Unknown" - } -} - // Send injects a message into the waku send queue, to be distributed in the // network in the coming cycles. func (w *Waku) Send(pubsubTopic string, msg *pb.WakuMessage, priority *int) ([]byte, error) { @@ -88,72 +66,45 @@ func (w *Waku) broadcast() { return } - logger := w.logger.With(zap.Stringer("envelopeHash", envelope.Hash()), zap.String("pubsubTopic", envelope.PubsubTopic()), zap.String("contentTopic", envelope.Message().ContentTopic), zap.Int64("timestamp", envelope.Message().GetTimestamp())) + w.wg.Add(1) + go w.publishEnvelope(envelope) + } +} - var fn publish.PublishFn - var publishMethod PublishMethod +func (w *Waku) publishEnvelope(envelope *protocol.Envelope) { + defer w.wg.Done() - if w.cfg.SkipPublishToTopic { - // For now only used in testing to simulate going offline - publishMethod = LightPush - fn = func(env *protocol.Envelope, logger *zap.Logger) error { - return errors.New("test send failure") - } - } else if w.cfg.LightClient { - publishMethod = LightPush - fn = func(env *protocol.Envelope, logger *zap.Logger) error { - logger.Info("publishing message via lightpush") - _, err := w.node.Lightpush().Publish(w.ctx, env.Message(), lightpush.WithPubSubTopic(env.PubsubTopic()), lightpush.WithMaxPeers(peersToPublishForLightpush)) - return err - } - } else { - publishMethod = Relay - fn = func(env *protocol.Envelope, logger *zap.Logger) error { - peerCnt := len(w.node.Relay().PubSub().ListPeers(env.PubsubTopic())) - logger.Info("publishing message via relay", zap.Int("peerCnt", peerCnt)) - _, err := w.node.Relay().Publish(w.ctx, env.Message(), relay.WithPubSubTopic(env.PubsubTopic())) - return err - } - } + logger := w.logger.With(zap.Stringer("envelopeHash", envelope.Hash()), zap.String("pubsubTopic", envelope.PubsubTopic()), zap.String("contentTopic", envelope.Message().ContentTopic), zap.Int64("timestamp", envelope.Message().GetTimestamp())) - // Wraps the publish function with a call to the telemetry client - if w.statusTelemetryClient != nil { - sendFn := fn - fn = func(env *protocol.Envelope, logger *zap.Logger) error { - err := sendFn(env, logger) - if err == nil { - w.statusTelemetryClient.PushSentEnvelope(SentEnvelope{Envelope: env, PublishMethod: publishMethod}) - } else { - w.statusTelemetryClient.PushErrorSendingEnvelope(ErrorSendingEnvelope{Error: err, SentEnvelope: SentEnvelope{Envelope: env, PublishMethod: publishMethod}}) - } - return err - } - } + // only used in testing to simulate going offline + if w.cfg.SkipPublishToTopic { + logger.Info("skipping publish to topic") + return + } - // Wraps the publish function with rate limiter - fn = w.limiter.ThrottlePublishFn(w.ctx, fn) + err := w.messageSender.Send(envelope) - w.wg.Add(1) - go w.publishEnvelope(envelope, fn, logger) + if w.statusTelemetryClient != nil { + if err == nil { + w.statusTelemetryClient.PushSentEnvelope(SentEnvelope{Envelope: envelope, PublishMethod: w.messageSender.PublishMethod()}) + } else { + w.statusTelemetryClient.PushErrorSendingEnvelope(ErrorSendingEnvelope{Error: err, SentEnvelope: SentEnvelope{Envelope: envelope, PublishMethod: w.messageSender.PublishMethod()}}) + } } -} - -func (w *Waku) publishEnvelope(envelope *protocol.Envelope, publishFn publish.PublishFn, logger *zap.Logger) { - defer w.wg.Done() - if err := publishFn(envelope, logger); err != nil { + if err != nil { logger.Error("could not send message", zap.Error(err)) w.SendEnvelopeEvent(common.EnvelopeEvent{ Hash: gethcommon.BytesToHash(envelope.Hash().Bytes()), Event: common.EventEnvelopeExpired, }) return - } else { - if !w.cfg.EnableStoreConfirmationForMessagesSent { - w.SendEnvelopeEvent(common.EnvelopeEvent{ - Hash: gethcommon.BytesToHash(envelope.Hash().Bytes()), - Event: common.EventEnvelopeSent, - }) - } + } + + if !w.cfg.EnableStoreConfirmationForMessagesSent { + w.SendEnvelopeEvent(common.EnvelopeEvent{ + Hash: gethcommon.BytesToHash(envelope.Hash().Bytes()), + Event: common.EventEnvelopeSent, + }) } } diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 75da8df8b27..c45d3f65e95 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -89,13 +89,10 @@ const cacheTTL = 20 * time.Minute const maxRelayPeers = 300 const randomPeersKeepAliveInterval = 5 * time.Second const allPeersKeepAliveInterval = 5 * time.Minute -const peersToPublishForLightpush = 2 -const publishingLimiterRate = rate.Limit(2) -const publishingLimitBurst = 4 type SentEnvelope struct { Envelope *protocol.Envelope - PublishMethod PublishMethod + PublishMethod publish.PublishMethod } type ErrorSendingEnvelope struct { @@ -136,7 +133,6 @@ type Waku struct { protectedTopicStore *persistence.ProtectedTopicsStore sendQueue *publish.MessageQueue - limiter *publish.PublishRateLimiter missingMsgVerifier *missing.MissingMessageVerifier @@ -154,7 +150,7 @@ type Waku struct { storeMsgIDs map[gethcommon.Hash]bool // Map of the currently processing ids storeMsgIDsMu sync.RWMutex - messageSentCheck *publish.MessageSentCheck + messageSender *publish.MessageSender topicHealthStatusChan chan peermanager.TopicHealthStatus connectionNotifChan chan node.PeerConnection @@ -245,15 +241,6 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, logger *zap.Logge sendQueue: publish.NewMessageQueue(1000, cfg.UseThrottledPublish), } - if !cfg.UseThrottledPublish || testing.Testing() { - // To avoid delaying the tests, or for when we dont want to rate limit, we set up an infinite rate limiter, - // basically disabling the rate limit functionality - waku.limiter = publish.NewPublishRateLimiter(rate.Inf, 1) - - } else { - waku.limiter = publish.NewPublishRateLimiter(publishingLimiterRate, publishingLimitBurst) - } - waku.filters = common.NewFilters(waku.cfg.DefaultShardPubsubTopic, waku.logger) waku.bandwidthCounter = metrics.NewBandwidthCounter() @@ -986,16 +973,11 @@ func (w *Waku) SkipPublishToTopic(value bool) { } func (w *Waku) ConfirmMessageDelivered(hashes []gethcommon.Hash) { - if !w.cfg.EnableStoreConfirmationForMessagesSent { - return - } - w.messageSentCheck.DeleteByMessageIDs(hashes) + w.messageSender.MessagesDelivered(hashes) } func (w *Waku) SetStorePeerID(peerID peer.ID) { - if w.messageSentCheck != nil { - w.messageSentCheck.SetStorePeerID(peerID) - } + w.messageSender.SetStorePeerID(peerID) } func (w *Waku) Query(ctx context.Context, peerID peer.ID, query store.FilterCriteria, cursor []byte, opts []store.RequestOption, processEnvelopes bool) ([]byte, int, error) { @@ -1189,9 +1171,7 @@ func (w *Waku) Start() error { go w.sendQueue.Start(w.ctx) - if w.cfg.EnableStoreConfirmationForMessagesSent { - w.confirmMessagesSent() - } + w.startMessageSender() // we should wait `seedBootnodesForDiscV5` shutdown smoothly before set w.ctx to nil within `w.Stop()` w.wg.Add(1) @@ -1200,28 +1180,51 @@ func (w *Waku) Start() error { return nil } -func (w *Waku) confirmMessagesSent() { - w.messageSentCheck = publish.NewMessageSentCheck(w.ctx, w.node.Store(), w.node.Timesource(), w.logger) - go w.messageSentCheck.Start() +func (w *Waku) startMessageSender() { + publishMethod := publish.Relay + if w.cfg.LightClient { + publishMethod = publish.LightPush + } - go func() { - for { - select { - case <-w.ctx.Done(): - return - case hash := <-w.messageSentCheck.MessageStoredChan: - w.SendEnvelopeEvent(common.EnvelopeEvent{ - Hash: hash, - Event: common.EventEnvelopeSent, - }) - case hash := <-w.messageSentCheck.MessageExpiredChan: - w.SendEnvelopeEvent(common.EnvelopeEvent{ - Hash: hash, - Event: common.EventEnvelopeExpired, - }) + sender := publish.NewMessageSender(w.ctx, publishMethod, w.node.Lightpush(), w.node.Relay(), w.logger) + + if w.cfg.EnableStoreConfirmationForMessagesSent { + msgStoredChan := make(chan gethcommon.Hash, 1000) + msgExpiredChan := make(chan gethcommon.Hash, 1000) + messageSentCheck := publish.NewMessageSentCheck(w.ctx, w.node.Store(), w.node.Timesource(), msgStoredChan, msgExpiredChan, w.logger) + sender = sender.WithMessageSentCheck(messageSentCheck) + + go func() { + for { + select { + case <-w.ctx.Done(): + return + case hash := <-msgStoredChan: + w.SendEnvelopeEvent(common.EnvelopeEvent{ + Hash: hash, + Event: common.EventEnvelopeSent, + }) + case hash := <-msgExpiredChan: + w.SendEnvelopeEvent(common.EnvelopeEvent{ + Hash: hash, + Event: common.EventEnvelopeExpired, + }) + } } - } - }() + }() + } + + if !w.cfg.UseThrottledPublish || testing.Testing() { + // To avoid delaying the tests, or for when we dont want to rate limit, we set up an infinite rate limiter, + // basically disabling the rate limit functionality + limiter := publish.NewPublishRateLimiter(rate.Inf, 1) + sender = sender.WithRateLimiting(limiter) + } + + w.messageSender = sender + + go w.messageSender.Start() + } func (w *Waku) MessageExists(mh pb.MessageHash) (bool, error) { @@ -1411,11 +1414,6 @@ func (w *Waku) processMessage(e *common.ReceivedMessage) { w.storeMsgIDsMu.Unlock() } - ephemeral := e.Envelope.Message().Ephemeral - if w.cfg.EnableStoreConfirmationForMessagesSent && e.MsgType == common.SendMessageType && (ephemeral == nil || !*ephemeral) { - w.messageSentCheck.Add(e.PubsubTopic, e.Hash(), e.Sent) - } - matched := w.filters.NotifyWatchers(e) // If not matched we remove it