Skip to content

Commit

Permalink
chore_: refactor with sender api
Browse files Browse the repository at this point in the history
  • Loading branch information
kaichaosun committed Aug 8, 2024
1 parent 45cea61 commit 55dfe50
Show file tree
Hide file tree
Showing 9 changed files with 222 additions and 138 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ require (
github.com/schollz/peerdiscovery v1.7.0
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
github.com/urfave/cli/v2 v2.27.2
github.com/waku-org/go-waku v0.8.1-0.20240807022408-c2e6320953b1
github.com/waku-org/go-waku v0.8.1-0.20240808092421-4f027dde3ded
github.com/wk8/go-ordered-map/v2 v2.1.7
github.com/yeqown/go-qrcode/v2 v2.2.1
github.com/yeqown/go-qrcode/writer/standard v1.2.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2143,8 +2143,8 @@ github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5 h1:9u16E
github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY=
github.com/waku-org/go-waku v0.8.1-0.20240807022408-c2e6320953b1 h1:UN5y6imIQBXnuq/bPAYJgT6XMZRgQgUO5Mn9VFi3c5A=
github.com/waku-org/go-waku v0.8.1-0.20240807022408-c2e6320953b1/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg=
github.com/waku-org/go-waku v0.8.1-0.20240808092421-4f027dde3ded h1:jeqIa3CnjDiOyqxMif3al39o59F3SkpIzsaz2va2a3I=
github.com/waku-org/go-waku v0.8.1-0.20240808092421-4f027dde3ded/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=
Expand Down
3 changes: 2 additions & 1 deletion telemetry/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/status-im/status-go/protocol/tt"
v1protocol "github.com/status-im/status-go/protocol/v1"
"github.com/status-im/status-go/wakuv2"
"github.com/waku-org/go-waku/waku/v2/api/publish"
)

var (
Expand Down Expand Up @@ -170,7 +171,7 @@ func TestClient_ProcessSentEnvelope(t *testing.T) {
Version: proto.Uint32(0),
Timestamp: proto.Int64(time.Now().Unix()),
}, 0, ""),
PublishMethod: wakuv2.LightPush,
PublishMethod: publish.LightPush,
}

// Send the telemetry request
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1018,7 +1018,7 @@ github.com/waku-org/go-discover/discover/v5wire
github.com/waku-org/go-libp2p-rendezvous
github.com/waku-org/go-libp2p-rendezvous/db
github.com/waku-org/go-libp2p-rendezvous/pb
# github.com/waku-org/go-waku v0.8.1-0.20240807022408-c2e6320953b1
# github.com/waku-org/go-waku v0.8.1-0.20240808092421-4f027dde3ded
## explicit; go 1.21
github.com/waku-org/go-waku/logging
github.com/waku-org/go-waku/tests
Expand Down
103 changes: 27 additions & 76 deletions wakuv2/message_publishing.go
Original file line number Diff line number Diff line change
@@ -1,38 +1,16 @@
package wakuv2

import (
"errors"

"go.uber.org/zap"

"github.com/waku-org/go-waku/waku/v2/api/publish"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"

gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/status-im/status-go/wakuv2/common"
)

type PublishMethod int

const (
LightPush PublishMethod = iota
Relay
)

func (pm PublishMethod) String() string {
switch pm {
case LightPush:
return "LightPush"
case Relay:
return "Relay"
default:
return "Unknown"
}
}

// Send injects a message into the waku send queue, to be distributed in the
// network in the coming cycles.
func (w *Waku) Send(pubsubTopic string, msg *pb.WakuMessage, priority *int) ([]byte, error) {
Expand Down Expand Up @@ -88,72 +66,45 @@ func (w *Waku) broadcast() {
return
}

logger := w.logger.With(zap.Stringer("envelopeHash", envelope.Hash()), zap.String("pubsubTopic", envelope.PubsubTopic()), zap.String("contentTopic", envelope.Message().ContentTopic), zap.Int64("timestamp", envelope.Message().GetTimestamp()))
w.wg.Add(1)
go w.publishEnvelope(envelope)
}
}

var fn publish.PublishFn
var publishMethod PublishMethod
func (w *Waku) publishEnvelope(envelope *protocol.Envelope) {
defer w.wg.Done()

if w.cfg.SkipPublishToTopic {
// For now only used in testing to simulate going offline
publishMethod = LightPush
fn = func(env *protocol.Envelope, logger *zap.Logger) error {
return errors.New("test send failure")
}
} else if w.cfg.LightClient {
publishMethod = LightPush
fn = func(env *protocol.Envelope, logger *zap.Logger) error {
logger.Info("publishing message via lightpush")
_, err := w.node.Lightpush().Publish(w.ctx, env.Message(), lightpush.WithPubSubTopic(env.PubsubTopic()), lightpush.WithMaxPeers(peersToPublishForLightpush))
return err
}
} else {
publishMethod = Relay
fn = func(env *protocol.Envelope, logger *zap.Logger) error {
peerCnt := len(w.node.Relay().PubSub().ListPeers(env.PubsubTopic()))
logger.Info("publishing message via relay", zap.Int("peerCnt", peerCnt))
_, err := w.node.Relay().Publish(w.ctx, env.Message(), relay.WithPubSubTopic(env.PubsubTopic()))
return err
}
}
logger := w.logger.With(zap.Stringer("envelopeHash", envelope.Hash()), zap.String("pubsubTopic", envelope.PubsubTopic()), zap.String("contentTopic", envelope.Message().ContentTopic), zap.Int64("timestamp", envelope.Message().GetTimestamp()))

// Wraps the publish function with a call to the telemetry client
if w.statusTelemetryClient != nil {
sendFn := fn
fn = func(env *protocol.Envelope, logger *zap.Logger) error {
err := sendFn(env, logger)
if err == nil {
w.statusTelemetryClient.PushSentEnvelope(SentEnvelope{Envelope: env, PublishMethod: publishMethod})
} else {
w.statusTelemetryClient.PushErrorSendingEnvelope(ErrorSendingEnvelope{Error: err, SentEnvelope: SentEnvelope{Envelope: env, PublishMethod: publishMethod}})
}
return err
}
}
// only used in testing to simulate going offline
if w.cfg.SkipPublishToTopic {
logger.Info("skipping publish to topic")
return
}

// Wraps the publish function with rate limiter
fn = w.limiter.ThrottlePublishFn(w.ctx, fn)
err := w.messageSender.Send(envelope)

w.wg.Add(1)
go w.publishEnvelope(envelope, fn, logger)
if w.statusTelemetryClient != nil {
if err == nil {
w.statusTelemetryClient.PushSentEnvelope(SentEnvelope{Envelope: envelope, PublishMethod: w.messageSender.PublishMethod()})
} else {
w.statusTelemetryClient.PushErrorSendingEnvelope(ErrorSendingEnvelope{Error: err, SentEnvelope: SentEnvelope{Envelope: envelope, PublishMethod: w.messageSender.PublishMethod()}})
}
}
}

func (w *Waku) publishEnvelope(envelope *protocol.Envelope, publishFn publish.PublishFn, logger *zap.Logger) {
defer w.wg.Done()

if err := publishFn(envelope, logger); err != nil {
if err != nil {
logger.Error("could not send message", zap.Error(err))
w.SendEnvelopeEvent(common.EnvelopeEvent{
Hash: gethcommon.BytesToHash(envelope.Hash().Bytes()),
Event: common.EventEnvelopeExpired,
})
return
} else {
if !w.cfg.EnableStoreConfirmationForMessagesSent {
w.SendEnvelopeEvent(common.EnvelopeEvent{
Hash: gethcommon.BytesToHash(envelope.Hash().Bytes()),
Event: common.EventEnvelopeSent,
})
}
}

if !w.cfg.EnableStoreConfirmationForMessagesSent {
w.SendEnvelopeEvent(common.EnvelopeEvent{
Hash: gethcommon.BytesToHash(envelope.Hash().Bytes()),
Event: common.EventEnvelopeSent,
})
}
}
Loading

0 comments on commit 55dfe50

Please sign in to comment.