Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat_: add status version to all telemetry calls #5261

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion cmd/status-cli/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/status-im/status-go/logutils"
"github.com/status-im/status-go/protocol/requests"
"github.com/status-im/status-go/services/wakuv2ext"
"github.com/status-im/status-go/telemetry"

"github.com/urfave/cli/v2"
"go.uber.org/zap"
Expand Down Expand Up @@ -38,7 +39,7 @@ func start(cCtx *cli.Context, name string, port int, apiModules string, telemetr
namedLogger := logger.Named(name)
namedLogger.Info("starting messager")

_ = setupLogger(name)
logger := setupLogger(name)

path := fmt.Sprintf("./test-%s", strings.ToLower(name))
err := os.MkdirAll(path, os.ModePerm)
Expand Down Expand Up @@ -71,6 +72,11 @@ func start(cCtx *cli.Context, name string, port int, apiModules string, telemetr
if wakuService == nil {
return nil, errors.New("waku service is not available")
}

if telemetryUrl != "" {
telemetryClient := telemetry.NewClient(logger, telemetryUrl, backend.SelectedAccountKeyID(), name, "cli")
backend.StatusNode().WakuV2Service().SetStatusTelemetryClient(telemetryClient)
}
wakuAPI := wakuv2ext.NewPublicAPI(wakuService)

messenger := wakuAPI.Messenger()
Expand Down
1 change: 1 addition & 0 deletions cmd/statusd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ func main() {
gethbridge.NewNodeBridge(backend.StatusNode().GethNode(), backend.StatusNode().WakuService(), backend.StatusNode().WakuV2Service()),
installationID.String(),
nil,
config.Version,
options...,
)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion protocol/messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ func NewMessenger(
node types.Node,
installationID string,
peerStore *mailservers.PeerStore,
version string,
opts ...Option,
) (*Messenger, error) {
var messenger *Messenger
Expand Down Expand Up @@ -427,7 +428,7 @@ func NewMessenger(

var telemetryClient *telemetry.Client
if c.telemetryServerURL != "" {
telemetryClient = telemetry.NewClient(logger, c.telemetryServerURL, c.account.KeyUID, nodeName)
telemetryClient = telemetry.NewClient(logger, c.telemetryServerURL, c.account.KeyUID, nodeName, version)
if c.wakuService != nil {
c.wakuService.SetStatusTelemetryClient(telemetryClient)
}
Expand Down
1 change: 1 addition & 0 deletions protocol/messenger_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func newTestMessenger(waku types.Waku, config testMessengerConfig) (*Messenger,
&testNode{shh: waku},
uuid.New().String(),
nil,
"testVersion",
options...,
)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions services/ext/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func (s *Service) InitProtocol(nodeName string, identity *ecdsa.PrivateKey, appD
s.n,
s.config.ShhextConfig.InstallationID,
s.peerStore,
params.Version,
options...,
)
if err != nil {
Expand Down
32 changes: 30 additions & 2 deletions telemetry/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/transport"
v1protocol "github.com/status-im/status-go/protocol/v1"
"github.com/status-im/status-go/wakuv2"

v2protocol "github.com/waku-org/go-waku/waku/v2/protocol"
)
Expand All @@ -22,15 +23,17 @@ type Client struct {
logger *zap.Logger
keyUID string
nodeName string
version string
}

func NewClient(logger *zap.Logger, serverURL string, keyUID string, nodeName string) *Client {
func NewClient(logger *zap.Logger, serverURL string, keyUID string, nodeName string, version string) *Client {
return &Client{
serverURL: serverURL,
httpClient: &http.Client{Timeout: time.Minute},
logger: logger,
keyUID: keyUID,
nodeName: nodeName,
version: version,
}
}

Expand All @@ -50,12 +53,15 @@ func (c *Client) PushReceivedMessages(filter transport.Filter, sshMessage *types
"receiverKeyUID": c.keyUID,
"nodeName": c.nodeName,
"messageSize": len(sshMessage.Payload),
"statusVersion": c.version,
})
}
body, _ := json.Marshal(postBody)
_, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body))
if err != nil {
c.logger.Error("Error sending message to telemetry server", zap.Error(err))
} else {
c.logger.Debug("Successfully pushed received messages to telemetry server")
}
}

Expand All @@ -68,11 +74,31 @@ func (c *Client) PushReceivedEnvelope(envelope *v2protocol.Envelope) {
"topic": envelope.Message().ContentTopic,
"receiverKeyUID": c.keyUID,
"nodeName": c.nodeName,
"statusVersion": c.version,
}
body, _ := json.Marshal(postBody)
_, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body))
if err != nil {
c.logger.Error("Error sending envelope to telemetry server", zap.Error(err))
c.logger.Error("Error sending received envelope to telemetry server", zap.Error(err))
}
}

func (c *Client) PushSentEnvelope(envelope *v2protocol.Envelope, publishMethod wakuv2.PublishMethod) {
url := fmt.Sprintf("%s/sent-envelope", c.serverURL)
postBody := map[string]interface{}{
"messageHash": envelope.Hash().String(),
"sentAt": uint32(envelope.Message().GetTimestamp() / int64(time.Second)),
"pubsubTopic": envelope.PubsubTopic(),
"topic": envelope.Message().ContentTopic,
"senderKeyUID": c.keyUID,
"nodeName": c.nodeName,
"publishMethod": publishMethod.String(),
"statusVersion": c.version,
}
body, _ := json.Marshal(postBody)
_, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body))
if err != nil {
c.logger.Error("Error sending pushed envelope to telemetry server", zap.Error(err))
}
}

Expand All @@ -96,5 +122,7 @@ func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, proces
_, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body))
if err != nil {
c.logger.Error("Error sending envelope update to telemetry server", zap.Error(err))
} else {
c.logger.Debug("Successfully pushed envelope processing error to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash)))
}
}
2 changes: 2 additions & 0 deletions wakuv2/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,7 @@ func (c *BandwidthTelemetryClient) PushProtocolStats(relayStats metrics.Stats, s
_, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body))
if err != nil {
c.logger.Error("Error sending message to telemetry server", zap.Error(err))
} else {
c.logger.Debug("Successfully pushed protocol stats to telemetry server")
}
}
38 changes: 38 additions & 0 deletions wakuv2/waku.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ const cacheTTL = 20 * time.Minute

type ITelemetryClient interface {
PushReceivedEnvelope(*protocol.Envelope)
PushSentEnvelope(*protocol.Envelope, PublishMethod)
}

// Waku represents a dark communication interface through the Ethereum
Expand Down Expand Up @@ -936,24 +937,45 @@ func (w *Waku) SkipPublishToTopic(value bool) {
w.cfg.SkipPublishToTopic = value
}

type PublishMethod int

const (
LightPush PublishMethod = iota
Relay
)

func (pm PublishMethod) String() string {
switch pm {
case LightPush:
return "LightPush"
case Relay:
return "Relay"
default:
return "Unknown"
}
}

func (w *Waku) broadcast() {
for {
select {
case envelope := <-w.sendQueue:
logger := w.logger.With(zap.Stringer("envelopeHash", envelope.Hash()), zap.String("pubsubTopic", envelope.PubsubTopic()), zap.String("contentTopic", envelope.Message().ContentTopic), zap.Int64("timestamp", envelope.Message().GetTimestamp()))
var fn publishFn
var publishMethod PublishMethod
if w.cfg.SkipPublishToTopic {
// For now only used in testing to simulate going offline
fn = func(env *protocol.Envelope, logger *zap.Logger) error {
return errors.New("test send failure")
}
} else if w.cfg.LightClient {
publishMethod = LightPush
fn = func(env *protocol.Envelope, logger *zap.Logger) error {
logger.Info("publishing message via lightpush")
_, err := w.node.Lightpush().Publish(w.ctx, env.Message(), lightpush.WithPubSubTopic(env.PubsubTopic()))
return err
}
} else {
publishMethod = Relay
fn = func(env *protocol.Envelope, logger *zap.Logger) error {
peerCnt := len(w.node.Relay().PubSub().ListPeers(env.PubsubTopic()))
logger.Info("publishing message via relay", zap.Int("peerCnt", peerCnt))
Expand All @@ -962,6 +984,22 @@ func (w *Waku) broadcast() {
}
}

// Wraps the publish function with a call to the telemetry client
if w.statusTelemetryClient != nil {
sendFn := fn
fn = func(env *protocol.Envelope, logger *zap.Logger) error {
err := sendFn(env, logger)
if err == nil {
w.statusTelemetryClient.PushSentEnvelope(env, publishMethod)
}
// else {
// TODO: send error from Relay or LightPush to Telemetry
// w.statusTelemetryClient.PushError(err)
// }
return err
}
}

w.wg.Add(1)
go w.publishEnvelope(envelope, fn, logger)
case <-w.ctx.Done():
Expand Down
Loading