From 0edd7772e024e2a278b194421aee01582c27efb6 Mon Sep 17 00:00:00 2001 From: Arseniy Klempner Date: Thu, 16 May 2024 18:14:23 -0700 Subject: [PATCH] feat: add table and endpoint for tracking waku message sequences --- README.md | 10 ++++ go.mod | 2 + go.sum | 4 ++ telemetry/bindata.go | 27 ++++++++- telemetry/server.go | 63 ++++++++++++++++++++ telemetry/sql/000007_waku_push_filter.up.sql | 15 +++++ telemetry/waku_metrics.go | 52 ++++++++++++++++ 7 files changed, 171 insertions(+), 2 deletions(-) create mode 100644 telemetry/sql/000007_waku_push_filter.up.sql create mode 100644 telemetry/waku_metrics.go diff --git a/README.md b/README.md index a76d457..e5da6d3 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,16 @@ Then you can run the server with: go run cmd/server/main.go -data-source-name postgres://telemetry:newPassword@127.0.0.1:5432/telemetry ``` +If trying to run locally you receive the following error: +``` +pq: SSL is not enabled on the server +``` + +Run this command instead: +``` +go run cmd/server/main.go -data-source-name "postgres://telemetry:newPassword@127.0.0.1:5432/telemetry?sslmode=disable" +``` + Finally, to run the test: ``` make test diff --git a/go.mod b/go.mod index bb890d2..c1ab7b4 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.20 require ( github.com/go-auxiliaries/shrinking-map v0.3.0 github.com/golang-migrate/migrate/v4 v4.15.2 + github.com/gorilla/handlers v1.5.2 github.com/gorilla/mux v1.8.0 github.com/lib/pq v1.10.3 github.com/robfig/cron/v3 v3.0.1 @@ -15,6 +16,7 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect + github.com/felixge/httpsnoop v1.0.3 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/go.sum b/go.sum index 04f9c20..09a08b9 100644 --- a/go.sum +++ b/go.sum @@ -391,6 +391,8 @@ github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLi github.com/evanphx/json-patch v4.11.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBdXk= +github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= @@ -595,6 +597,8 @@ github.com/googleapis/gnostic v0.5.5/go.mod h1:7+EbHbldMins07ALC74bsA81Ovc97Dwqy github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gorilla/handlers v0.0.0-20150720190736-60c7bfde3e33/go.mod h1:Qkdc/uu4tH4g6mTK6auzZ766c4CA0Ng8+o/OAirnOIQ= github.com/gorilla/handlers v1.4.2/go.mod h1:Qkdc/uu4tH4g6mTK6auzZ766c4CA0Ng8+o/OAirnOIQ= +github.com/gorilla/handlers v1.5.2 h1:cLTUSsNkgcwhgRqvCNmdbRWG0A3N4F+M2nWKdScwyEE= +github.com/gorilla/handlers v1.5.2/go.mod h1:dX+xVpaxdSw+q0Qek8SSsl3dfMk3jNddUkMzo0GtH0w= github.com/gorilla/mux v1.7.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= diff --git a/telemetry/bindata.go b/telemetry/bindata.go index 79f48fc..bd01192 100644 --- a/telemetry/bindata.go +++ b/telemetry/bindata.go @@ -6,6 +6,7 @@ // 000004_envelope.table.up.sql (531B) // 000005_pushed_envelope.up.sql (574B) // 000006_status_version.up.sql (198B) +// 000007_waku_push_filter.up.sql (523B) // doc.go (73B) package telemetry @@ -169,7 +170,7 @@ func _000005_pushed_envelopeUpSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "000005_pushed_envelope.up.sql", size: 574, mode: os.FileMode(0644), modTime: time.Unix(1717560336, 0)} + info := bindataFileInfo{name: "000005_pushed_envelope.up.sql", size: 574, mode: os.FileMode(0644), modTime: time.Unix(1719028717, 0)} a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x7d, 0xaf, 0x8a, 0xcb, 0x97, 0x1e, 0xc6, 0xf6, 0x86, 0xe4, 0x1b, 0x67, 0x10, 0x87, 0x8e, 0x80, 0x1d, 0x5a, 0x7d, 0x64, 0xd0, 0x89, 0x3f, 0x1e, 0x6f, 0x93, 0x87, 0x4a, 0xd7, 0x87, 0xb8, 0x5e}} return a, nil } @@ -189,11 +190,31 @@ func _000006_status_versionUpSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "000006_status_version.up.sql", size: 198, mode: os.FileMode(0644), modTime: time.Unix(1717560330, 0)} + info := bindataFileInfo{name: "000006_status_version.up.sql", size: 198, mode: os.FileMode(0644), modTime: time.Unix(1719028717, 0)} a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x2b, 0x11, 0xee, 0x9f, 0x4f, 0xf5, 0x0, 0x9a, 0x98, 0xe9, 0x44, 0x21, 0x2e, 0x57, 0xf7, 0xae, 0xf3, 0xb2, 0x3d, 0x94, 0x40, 0x69, 0xa7, 0x1d, 0x62, 0x57, 0x31, 0x9f, 0x60, 0x6, 0xed, 0x80}} return a, nil } +var __000007_waku_push_filterUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x84\x91\xc1\x4a\x03\x31\x10\x86\xcf\xdd\xa7\x98\xe3\x2e\xec\x49\xe8\xc9\x53\x5c\x52\x1b\x5c\xd3\x92\x44\xb1\x27\x49\x37\x03\x0d\x6e\xb3\xdb\x64\x42\x7d\x7c\xa1\x82\x76\xc5\xb6\xa7\x61\xf8\x3f\xf8\x87\x6f\x1a\xc5\x99\xe1\x60\xd8\x43\xcb\x41\x2c\x40\xae\x0c\xf0\x37\xa1\x8d\x86\xa3\xfd\xc8\xeb\x9c\x76\x0b\xdf\x13\x46\x28\x8b\x99\x77\xa0\xb9\x12\xac\x85\xb5\x12\xcf\x4c\x6d\xe0\x89\x6f\xea\x62\x76\xb4\x7d\x8f\xc4\x9c\x8b\x98\x12\xbc\x32\xd5\x2c\x99\x2a\xef\xe6\xf3\xaa\x2e\x66\x23\x62\x14\x4e\x63\x70\x18\x27\xd9\xa9\x4b\xbe\xb4\xed\x0f\xa4\x70\x1c\x22\x5d\xc1\x12\x1e\x32\x86\x0e\x97\x36\xed\x6e\x42\x66\x20\xdb\xdf\xa4\x44\x70\xf8\x79\x91\xea\x86\x40\x18\xc8\x0c\xa3\xef\x2e\x1f\x9f\xb7\x29\x6f\xaf\x33\xe4\xf7\x98\xc8\xee\x47\x10\xd2\xf0\x47\xae\x26\x2d\x11\x2d\xa1\x63\xf4\x4f\x58\x00\x00\x34\x2b\xa9\x8d\x62\x42\x9a\x3f\x5f\x79\xcf\xc1\x1f\x32\xc2\xf7\x28\xcf\x5d\xd7\x30\x95\x5a\xc3\xb9\xbd\xdf\xed\x24\xa0\x2a\xaa\xfb\xaf\x00\x00\x00\xff\xff\x48\xf0\x3d\x30\x0b\x02\x00\x00") + +func _000007_waku_push_filterUpSqlBytes() ([]byte, error) { + return bindataRead( + __000007_waku_push_filterUpSql, + "000007_waku_push_filter.up.sql", + ) +} + +func _000007_waku_push_filterUpSql() (*asset, error) { + bytes, err := _000007_waku_push_filterUpSqlBytes() + if err != nil { + return nil, err + } + + info := bindataFileInfo{name: "000007_waku_push_filter.up.sql", size: 523, mode: os.FileMode(0644), modTime: time.Unix(1719271502, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x5d, 0xa, 0x2c, 0x93, 0xa, 0x1f, 0xeb, 0x49, 0x60, 0xe2, 0x8, 0x46, 0xb5, 0x16, 0xa4, 0xa9, 0x7f, 0xec, 0xfb, 0xe1, 0xdc, 0x12, 0x15, 0x17, 0x1, 0x28, 0xa3, 0xca, 0xeb, 0x45, 0x81, 0x31}} + return a, nil +} + var _docGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x2c\xc9\x31\x12\x84\x20\x0c\x05\xd0\x9e\x53\xfc\x0b\x90\xf4\x7b\x9b\xac\xfe\xc9\x38\x20\x41\x4c\xe3\xed\x6d\xac\xdf\xb4\xad\x99\x13\xf7\xd5\x4b\x51\xf5\xf8\x39\x07\x97\x25\xe1\x51\xff\xc7\xd8\x2d\x0d\x75\x36\x47\xb2\xf3\x64\xae\x07\x35\x20\xa2\x1f\x8a\x07\x44\xcb\x1b\x00\x00\xff\xff\xb6\x03\x50\xe0\x49\x00\x00\x00") func docGoBytes() ([]byte, error) { @@ -311,6 +332,7 @@ var _bindata = map[string]func() (*asset, error){ "000004_envelope.table.up.sql": _000004_envelopeTableUpSql, "000005_pushed_envelope.up.sql": _000005_pushed_envelopeUpSql, "000006_status_version.up.sql": _000006_status_versionUpSql, + "000007_waku_push_filter.up.sql": _000007_waku_push_filterUpSql, "doc.go": docGo, } @@ -366,6 +388,7 @@ var _bintree = &bintree{nil, map[string]*bintree{ "000004_envelope.table.up.sql": {_000004_envelopeTableUpSql, map[string]*bintree{}}, "000005_pushed_envelope.up.sql": {_000005_pushed_envelopeUpSql, map[string]*bintree{}}, "000006_status_version.up.sql": {_000006_status_versionUpSql, map[string]*bintree{}}, + "000007_waku_push_filter.up.sql": {_000007_waku_push_filterUpSql, map[string]*bintree{}}, "doc.go": {docGo, map[string]*bintree{}}, }} diff --git a/telemetry/server.go b/telemetry/server.go index cc2f426..02c1eb1 100644 --- a/telemetry/server.go +++ b/telemetry/server.go @@ -41,6 +41,7 @@ func NewServer(db *sql.DB, logger *zap.Logger) *Server { server.Router.HandleFunc("/protocol-stats", server.createProtocolStats).Methods("POST") server.Router.HandleFunc("/received-messages", server.createReceivedMessages).Methods("POST") + server.Router.HandleFunc("/waku-metrics", server.createWakuTelemetry).Methods("POST") server.Router.HandleFunc("/received-envelope", server.createReceivedEnvelope).Methods("POST") server.Router.HandleFunc("/sent-envelope", server.createSentEnvelope).Methods("POST") server.Router.HandleFunc("/update-envelope", server.updateEnvelope).Methods("POST") @@ -349,7 +350,69 @@ func (s *Server) rateLimit(next http.Handler) http.Handler { }) } +type ErrorDetail struct { + Error string `json:"Error"` +} + +func (s *Server) createWakuTelemetry(w http.ResponseWriter, r *http.Request) { + start := time.Now() + var telemetryData []WakuTelemetryRequest + decoder := json.NewDecoder(r.Body) + if err := decoder.Decode(&telemetryData); err != nil { + log.Println(err) + http.Error(w, "Failed to decode telemetry data", http.StatusBadRequest) + return + } + + var errorDetails []map[string]ErrorDetail + + for _, data := range telemetryData { + switch data.TelemetryType { + case LightPushFilter: + var pushFilter TelemetryPushFilter + if err := json.Unmarshal(*data.TelemetryData, &pushFilter); err != nil { + errorDetails = append(errorDetails, map[string]ErrorDetail{fmt.Sprintf("%d", data.Id): {Error: fmt.Sprintf("Error decoding lightpush/filter metric: %v", err)}}) + continue + } + if err := pushFilter.put(s.DB); err != nil { + errorDetails = append(errorDetails, map[string]ErrorDetail{fmt.Sprintf("%d", data.Id): {Error: fmt.Sprintf("Error saving lightpush/filter metric: %v", err)}}) + continue + } + default: + errorDetails = append(errorDetails, map[string]ErrorDetail{fmt.Sprintf("%d", data.Id): {Error: fmt.Sprintf("Unknown waku telemetry type: %s", data.TelemetryType)}}) + } + } + + if len(errorDetails) > 0 { + log.Printf("Errors encountered: %v", errorDetails) + errorDetailsJSON, err := json.Marshal(errorDetails) + if err != nil { + s.logger.Error("failed to marshal error details", zap.Error(err)) + http.Error(w, "Failed to process error details", http.StatusInternalServerError) + return + } + err = respondWithError(w, http.StatusInternalServerError, string(errorDetailsJSON)) + if err != nil { + s.logger.Error("failed to respond", zap.Error(err)) + } + return + } + + err := respondWithJSON(w, http.StatusCreated, errorDetails) + if err != nil { + log.Println(err) + } + + log.Printf( + "%s\t%s\t%s", + r.Method, + r.RequestURI, + time.Since(start), + ) +} + func (s *Server) Start(port int) { s.logger.Info("Starting server", zap.Int("port", port)) + log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), s.Router)) } diff --git a/telemetry/sql/000007_waku_push_filter.up.sql b/telemetry/sql/000007_waku_push_filter.up.sql new file mode 100644 index 0000000..3d96e22 --- /dev/null +++ b/telemetry/sql/000007_waku_push_filter.up.sql @@ -0,0 +1,15 @@ +CREATE TABLE IF NOT EXISTS wakuPushFilter ( + id SERIAL PRIMARY KEY, + walletAddress VARCHAR(255), + peerIdSender VARCHAR(255) NOT NULL, + peerIdReporter VARCHAR(255) NOT NULL, + sequenceHash VARCHAR(255) NOT NULL, + sequenceTotal VARCHAR(255) NOT NULL, + sequenceIndex VARCHAR(255) NOT NULL, + contentTopic VARCHAR(255) NOT NULL, + pubsubTopic VARCHAR(255) NOT NULL, + timestamp INTEGER NOT NULL, + createdAt INTEGER NOT NULL, + + CONSTRAINT wakuPushFilter_unique unique(peerIdSender, peerIdReporter, sequenceHash, sequenceIndex) +); \ No newline at end of file diff --git a/telemetry/waku_metrics.go b/telemetry/waku_metrics.go new file mode 100644 index 0000000..f8149ce --- /dev/null +++ b/telemetry/waku_metrics.go @@ -0,0 +1,52 @@ +package telemetry + +import ( + "database/sql" + "encoding/json" + "time" +) + +type WakuTelemetryType string + +const ( + LightPushFilter WakuTelemetryType = "LightPushFilter" +) + +type WakuTelemetryRequest struct { + Id int `json:"id"` + TelemetryType WakuTelemetryType `json:"telemetryType"` + TelemetryData *json.RawMessage `json:"telemetryData"` +} + +type TelemetryPushFilter struct { + ID int `json:"id"` + WalletAddress string `json:"walletAddress"` + PeerIDSender string `json:"peerIdSender"` + PeerIDReporter string `json:"peerIdReporter"` + SequenceHash string `json:"sequenceHash"` + SequenceTotal uint64 `json:"sequenceTotal"` + SequenceIndex uint64 `json:"sequenceIndex"` + ContentTopic string `json:"contentTopic"` + PubsubTopic string `json:"pubsubTopic"` + Timestamp int64 `json:"timestamp"` + CreatedAt int64 `json:"createdAt"` +} + +func (r *TelemetryPushFilter) put(db *sql.DB) error { + stmt, err := db.Prepare("INSERT INTO wakuPushFilter (peerIdSender, peerIdReporter, sequenceHash, sequenceTotal, sequenceIndex, contentTopic, pubsubTopic, timestamp, createdAt) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) RETURNING id;") + if err != nil { + return err + } + + defer stmt.Close() + + r.CreatedAt = time.Now().Unix() + lastInsertId := 0 + err = stmt.QueryRow(r.PeerIDSender, r.PeerIDReporter, r.SequenceHash, r.SequenceTotal, r.SequenceIndex, r.ContentTopic, r.PubsubTopic, r.Timestamp, r.CreatedAt).Scan(&lastInsertId) + if err != nil { + return err + } + r.ID = lastInsertId + + return nil +}