diff --git a/go.mod b/go.mod index e7c5283a339..3e6a9415eb2 100644 --- a/go.mod +++ b/go.mod @@ -95,7 +95,7 @@ require ( github.com/schollz/peerdiscovery v1.7.0 github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7 github.com/urfave/cli/v2 v2.27.2 - github.com/waku-org/go-waku v0.8.1-0.20240823143238-949684092ec5 + github.com/waku-org/go-waku v0.8.1-0.20240826153427-69e1b559bc49 github.com/wk8/go-ordered-map/v2 v2.1.7 github.com/yeqown/go-qrcode/v2 v2.2.1 github.com/yeqown/go-qrcode/writer/standard v1.2.1 diff --git a/go.sum b/go.sum index 0f7945aac69..6f43fbe2da3 100644 --- a/go.sum +++ b/go.sum @@ -2137,8 +2137,8 @@ github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5 h1:9u16E github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY= -github.com/waku-org/go-waku v0.8.1-0.20240823143238-949684092ec5 h1:r5kgO4DWxeKyGF+wq5KhayW710XAqX5iWXhS/4ZqVkc= -github.com/waku-org/go-waku v0.8.1-0.20240823143238-949684092ec5/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg= +github.com/waku-org/go-waku v0.8.1-0.20240826153427-69e1b559bc49 h1:LKKgMmvUYFOzrWVQYLbI4nmXza4hTY7XsFk9WAXL/r0= +github.com/waku-org/go-waku v0.8.1-0.20240826153427-69e1b559bc49/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE= diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter.go index 6bd041e6a21..f8123704f61 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter.go @@ -14,8 +14,6 @@ import ( "go.uber.org/zap" ) -const MultiplexChannelBuffer = 100 - type FilterConfig struct { MaxPeers int `json:"maxPeers"` Peers []peer.ID `json:"peers"` @@ -44,14 +42,46 @@ type Sub struct { id string } +type subscribeParameters struct { + batchInterval time.Duration + multiplexChannelBuffer int +} + +type SubscribeOptions func(*subscribeParameters) + +func WithBatchInterval(t time.Duration) SubscribeOptions { + return func(params *subscribeParameters) { + params.batchInterval = t + } +} + +func WithMultiplexChannelBuffer(value int) SubscribeOptions { + return func(params *subscribeParameters) { + params.multiplexChannelBuffer = value + } +} + +func defaultOptions() []SubscribeOptions { + return []SubscribeOptions{ + WithBatchInterval(5 * time.Second), + WithMultiplexChannelBuffer(100), + } +} + // Subscribe -func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger) (*Sub, error) { +func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, opts ...SubscribeOptions) (*Sub, error) { + optList := append(defaultOptions(), opts...) + params := new(subscribeParameters) + for _, opt := range optList { + opt(params) + } + sub := new(Sub) sub.id = uuid.NewString() sub.wf = wf sub.ctx, sub.cancel = context.WithCancel(ctx) sub.subs = make(subscription.SubscriptionSet) - sub.DataCh = make(chan *protocol.Envelope, MultiplexChannelBuffer) + sub.DataCh = make(chan *protocol.Envelope, params.multiplexChannelBuffer) sub.ContentFilter = contentFilter sub.Config = config sub.log = log.Named("filter-api").With(zap.String("apisub-id", sub.id), zap.Stringer("content-filter", sub.ContentFilter)) @@ -66,7 +96,7 @@ func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilte } } - go sub.subscriptionLoop() + go sub.subscriptionLoop(params.batchInterval) return sub, nil } @@ -78,8 +108,8 @@ func (apiSub *Sub) Unsubscribe(contentFilter protocol.ContentFilter) { } } -func (apiSub *Sub) subscriptionLoop() { - ticker := time.NewTicker(5 * time.Second) +func (apiSub *Sub) subscriptionLoop(batchInterval time.Duration) { + ticker := time.NewTicker(batchInterval) defer ticker.Stop() for { select { diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter_manager.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter_manager.go index 4dc92c3df8a..e4b6e524d61 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter_manager.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter_manager.go @@ -31,6 +31,7 @@ type appFilterMap map[string]filterConfig type FilterManager struct { sync.Mutex ctx context.Context + opts []SubscribeOptions minPeersPerFilter int onlineChecker *onlinechecker.DefaultOnlineChecker filterSubscriptions map[string]SubDetails // map of aggregated filters to apiSub details @@ -59,10 +60,11 @@ type EnevelopeProcessor interface { OnNewEnvelope(env *protocol.Envelope) error } -func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter int, envProcessor EnevelopeProcessor, node *filter.WakuFilterLightNode) *FilterManager { +func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter int, envProcessor EnevelopeProcessor, node *filter.WakuFilterLightNode, opts ...SubscribeOptions) *FilterManager { // This fn is being mocked in test mgr := new(FilterManager) mgr.ctx = ctx + mgr.opts = opts mgr.logger = logger mgr.minPeersPerFilter = minPeersPerFilter mgr.envProcessor = envProcessor @@ -151,7 +153,7 @@ func (mgr *FilterManager) SubscribeFilter(filterID string, cf protocol.ContentFi func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) { ctx, cancel := context.WithCancel(mgr.ctx) config := FilterConfig{MaxPeers: mgr.minPeersPerFilter} - sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger) + sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger, mgr.opts...) mgr.Lock() mgr.filterSubscriptions[f.ID] = SubDetails{cancel, sub} mgr.Unlock() diff --git a/vendor/modules.txt b/vendor/modules.txt index 31d2a18cd96..7f5b06bff16 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1010,7 +1010,7 @@ github.com/waku-org/go-discover/discover/v5wire github.com/waku-org/go-libp2p-rendezvous github.com/waku-org/go-libp2p-rendezvous/db github.com/waku-org/go-libp2p-rendezvous/pb -# github.com/waku-org/go-waku v0.8.1-0.20240823143238-949684092ec5 +# github.com/waku-org/go-waku v0.8.1-0.20240826153427-69e1b559bc49 ## explicit; go 1.21 github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/tests diff --git a/wakuv2/waku.go b/wakuv2/waku.go index cb82706b693..15fe6a21727 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -1125,9 +1125,13 @@ func (w *Waku) Start() error { if w.cfg.LightClient { // Create FilterManager that will main peer connectivity // for installed filters - w.filterManager = filterapi.NewFilterManager(w.ctx, w.logger, w.cfg.MinPeersForFilter, + w.filterManager = filterapi.NewFilterManager( + w.ctx, + w.logger, + w.cfg.MinPeersForFilter, w, - w.node.FilterLightnode()) + w.node.FilterLightnode(), + filterapi.WithBatchInterval(300*time.Millisecond)) } err = w.setupRelaySubscriptions()