Skip to content

Commit

Permalink
Merge pull request #20 from waku-org/feat/waku-message-sequence
Browse files Browse the repository at this point in the history
Track message sequences for Waku dogfooding
  • Loading branch information
adklempner authored Jun 25, 2024
2 parents 8fe3616 + 0edd777 commit 1dc5ce5
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 2 deletions.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,16 @@ Then you can run the server with:
go run cmd/server/main.go -data-source-name postgres://telemetry:[email protected]:5432/telemetry
```

If trying to run locally you receive the following error:
```
pq: SSL is not enabled on the server
```

Run this command instead:
```
go run cmd/server/main.go -data-source-name "postgres://telemetry:[email protected]:5432/telemetry?sslmode=disable"
```

Finally, to run the test:
```
make test
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.20
require (
github.com/go-auxiliaries/shrinking-map v0.3.0
github.com/golang-migrate/migrate/v4 v4.15.2
github.com/gorilla/handlers v1.5.2
github.com/gorilla/mux v1.8.0
github.com/lib/pq v1.10.3
github.com/robfig/cron/v3 v3.0.1
Expand All @@ -15,6 +16,7 @@ require (

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/felixge/httpsnoop v1.0.3 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,8 @@ github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLi
github.com/evanphx/json-patch v4.11.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBdXk=
github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
Expand Down Expand Up @@ -595,6 +597,8 @@ github.com/googleapis/gnostic v0.5.5/go.mod h1:7+EbHbldMins07ALC74bsA81Ovc97Dwqy
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/handlers v0.0.0-20150720190736-60c7bfde3e33/go.mod h1:Qkdc/uu4tH4g6mTK6auzZ766c4CA0Ng8+o/OAirnOIQ=
github.com/gorilla/handlers v1.4.2/go.mod h1:Qkdc/uu4tH4g6mTK6auzZ766c4CA0Ng8+o/OAirnOIQ=
github.com/gorilla/handlers v1.5.2 h1:cLTUSsNkgcwhgRqvCNmdbRWG0A3N4F+M2nWKdScwyEE=
github.com/gorilla/handlers v1.5.2/go.mod h1:dX+xVpaxdSw+q0Qek8SSsl3dfMk3jNddUkMzo0GtH0w=
github.com/gorilla/mux v1.7.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
Expand Down
27 changes: 25 additions & 2 deletions telemetry/bindata.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

63 changes: 63 additions & 0 deletions telemetry/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func NewServer(db *sql.DB, logger *zap.Logger) *Server {

server.Router.HandleFunc("/protocol-stats", server.createProtocolStats).Methods("POST")
server.Router.HandleFunc("/received-messages", server.createReceivedMessages).Methods("POST")
server.Router.HandleFunc("/waku-metrics", server.createWakuTelemetry).Methods("POST")
server.Router.HandleFunc("/received-envelope", server.createReceivedEnvelope).Methods("POST")
server.Router.HandleFunc("/sent-envelope", server.createSentEnvelope).Methods("POST")
server.Router.HandleFunc("/update-envelope", server.updateEnvelope).Methods("POST")
Expand Down Expand Up @@ -349,7 +350,69 @@ func (s *Server) rateLimit(next http.Handler) http.Handler {
})
}

type ErrorDetail struct {
Error string `json:"Error"`
}

func (s *Server) createWakuTelemetry(w http.ResponseWriter, r *http.Request) {
start := time.Now()
var telemetryData []WakuTelemetryRequest
decoder := json.NewDecoder(r.Body)
if err := decoder.Decode(&telemetryData); err != nil {
log.Println(err)
http.Error(w, "Failed to decode telemetry data", http.StatusBadRequest)
return
}

var errorDetails []map[string]ErrorDetail

for _, data := range telemetryData {
switch data.TelemetryType {
case LightPushFilter:
var pushFilter TelemetryPushFilter
if err := json.Unmarshal(*data.TelemetryData, &pushFilter); err != nil {
errorDetails = append(errorDetails, map[string]ErrorDetail{fmt.Sprintf("%d", data.Id): {Error: fmt.Sprintf("Error decoding lightpush/filter metric: %v", err)}})
continue
}
if err := pushFilter.put(s.DB); err != nil {
errorDetails = append(errorDetails, map[string]ErrorDetail{fmt.Sprintf("%d", data.Id): {Error: fmt.Sprintf("Error saving lightpush/filter metric: %v", err)}})
continue
}
default:
errorDetails = append(errorDetails, map[string]ErrorDetail{fmt.Sprintf("%d", data.Id): {Error: fmt.Sprintf("Unknown waku telemetry type: %s", data.TelemetryType)}})
}
}

if len(errorDetails) > 0 {
log.Printf("Errors encountered: %v", errorDetails)
errorDetailsJSON, err := json.Marshal(errorDetails)
if err != nil {
s.logger.Error("failed to marshal error details", zap.Error(err))
http.Error(w, "Failed to process error details", http.StatusInternalServerError)
return
}
err = respondWithError(w, http.StatusInternalServerError, string(errorDetailsJSON))
if err != nil {
s.logger.Error("failed to respond", zap.Error(err))
}
return
}

err := respondWithJSON(w, http.StatusCreated, errorDetails)
if err != nil {
log.Println(err)
}

log.Printf(
"%s\t%s\t%s",
r.Method,
r.RequestURI,
time.Since(start),
)
}

func (s *Server) Start(port int) {
s.logger.Info("Starting server", zap.Int("port", port))

log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), s.Router))
}
15 changes: 15 additions & 0 deletions telemetry/sql/000007_waku_push_filter.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
CREATE TABLE IF NOT EXISTS wakuPushFilter (
id SERIAL PRIMARY KEY,
walletAddress VARCHAR(255),
peerIdSender VARCHAR(255) NOT NULL,
peerIdReporter VARCHAR(255) NOT NULL,
sequenceHash VARCHAR(255) NOT NULL,
sequenceTotal VARCHAR(255) NOT NULL,
sequenceIndex VARCHAR(255) NOT NULL,
contentTopic VARCHAR(255) NOT NULL,
pubsubTopic VARCHAR(255) NOT NULL,
timestamp INTEGER NOT NULL,
createdAt INTEGER NOT NULL,

CONSTRAINT wakuPushFilter_unique unique(peerIdSender, peerIdReporter, sequenceHash, sequenceIndex)
);
52 changes: 52 additions & 0 deletions telemetry/waku_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package telemetry

import (
"database/sql"
"encoding/json"
"time"
)

type WakuTelemetryType string

const (
LightPushFilter WakuTelemetryType = "LightPushFilter"
)

type WakuTelemetryRequest struct {
Id int `json:"id"`
TelemetryType WakuTelemetryType `json:"telemetryType"`
TelemetryData *json.RawMessage `json:"telemetryData"`
}

type TelemetryPushFilter struct {
ID int `json:"id"`
WalletAddress string `json:"walletAddress"`
PeerIDSender string `json:"peerIdSender"`
PeerIDReporter string `json:"peerIdReporter"`
SequenceHash string `json:"sequenceHash"`
SequenceTotal uint64 `json:"sequenceTotal"`
SequenceIndex uint64 `json:"sequenceIndex"`
ContentTopic string `json:"contentTopic"`
PubsubTopic string `json:"pubsubTopic"`
Timestamp int64 `json:"timestamp"`
CreatedAt int64 `json:"createdAt"`
}

func (r *TelemetryPushFilter) put(db *sql.DB) error {
stmt, err := db.Prepare("INSERT INTO wakuPushFilter (peerIdSender, peerIdReporter, sequenceHash, sequenceTotal, sequenceIndex, contentTopic, pubsubTopic, timestamp, createdAt) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) RETURNING id;")
if err != nil {
return err
}

defer stmt.Close()

r.CreatedAt = time.Now().Unix()
lastInsertId := 0
err = stmt.QueryRow(r.PeerIDSender, r.PeerIDReporter, r.SequenceHash, r.SequenceTotal, r.SequenceIndex, r.ContentTopic, r.PubsubTopic, r.Timestamp, r.CreatedAt).Scan(&lastInsertId)
if err != nil {
return err
}
r.ID = lastInsertId

return nil
}

0 comments on commit 1dc5ce5

Please sign in to comment.