From a19ec016213963459ba378dac3507f6e7dbc3da4 Mon Sep 17 00:00:00 2001 From: Arseniy Klempner Date: Thu, 23 May 2024 22:50:25 -0700 Subject: [PATCH] feat: add endpoint for batching different metrics in single request This commit adds a new endpoint which expects the body to be a JSON array of arbitrary metrics, allowing a single request to include more than one metric of one or more types. Also adds an optional status version field to tables. --- telemetry/aggregator_test.go | 5 ++ telemetry/bindata.go | 25 +++++- telemetry/db.go | 2 - telemetry/receivedenevlope.go | 26 ++++--- telemetry/receivedmessage.go | 5 +- telemetry/server.go | 88 ++++++++++++++++++++-- telemetry/sql/000006_status_version.up.sql | 3 + 7 files changed, 132 insertions(+), 22 deletions(-) create mode 100644 telemetry/sql/000006_status_version.up.sql diff --git a/telemetry/aggregator_test.go b/telemetry/aggregator_test.go index b399cec..bb1bfa9 100644 --- a/telemetry/aggregator_test.go +++ b/telemetry/aggregator_test.go @@ -42,6 +42,11 @@ func dropTables(db *sql.DB) { log.Fatalf("an error '%s' was not expected when dropping the table", err) } + _, err = db.Exec("DROP TABLE IF EXISTS sentEnvelopes") + if err != nil { + log.Fatalf("an error '%s' was not expected when dropping the table", err) + } + _, err = db.Exec("DROP TABLE IF EXISTS protocolStatsRate") if err != nil { log.Fatalf("an error '%s' was not expected when dropping the table", err) diff --git a/telemetry/bindata.go b/telemetry/bindata.go index c3195ba..79f48fc 100644 --- a/telemetry/bindata.go +++ b/telemetry/bindata.go @@ -5,6 +5,7 @@ // 000003_index_truncate.up.sql (598B) // 000004_envelope.table.up.sql (531B) // 000005_pushed_envelope.up.sql (574B) +// 000006_status_version.up.sql (198B) // doc.go (73B) package telemetry @@ -168,11 +169,31 @@ func _000005_pushed_envelopeUpSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "000005_pushed_envelope.up.sql", size: 574, mode: os.FileMode(0644), modTime: time.Unix(1717559658, 0)} + info := bindataFileInfo{name: "000005_pushed_envelope.up.sql", size: 574, mode: os.FileMode(0644), modTime: time.Unix(1717560336, 0)} a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x7d, 0xaf, 0x8a, 0xcb, 0x97, 0x1e, 0xc6, 0xf6, 0x86, 0xe4, 0x1b, 0x67, 0x10, 0x87, 0x8e, 0x80, 0x1d, 0x5a, 0x7d, 0x64, 0xd0, 0x89, 0x3f, 0x1e, 0x6f, 0x93, 0x87, 0x4a, 0xd7, 0x87, 0xb8, 0x5e}} return a, nil } +var __000006_status_versionUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\xf4\x09\x71\x0d\x52\x08\x71\x74\xf2\x71\x55\x28\x4a\x4d\x4e\xcd\x2c\x4b\x4d\xf1\x4d\x2d\x2e\x4e\x4c\x4f\x2d\x56\x70\x74\x71\x51\x70\xf6\xf7\x09\xf5\xf5\x53\x28\x2e\x49\x2c\x29\x2d\x0e\x4b\x2d\x2a\xce\xcc\xcf\x53\x08\x73\x0c\x72\xf6\x70\x0c\xd2\x30\x36\xd4\xb4\xe6\xc2\x66\x84\x6b\x5e\x59\x6a\x4e\x7e\x01\x59\x66\x14\xa7\xe6\x95\x90\xa8\x1f\x10\x00\x00\xff\xff\xeb\x4e\x39\x66\xc6\x00\x00\x00") + +func _000006_status_versionUpSqlBytes() ([]byte, error) { + return bindataRead( + __000006_status_versionUpSql, + "000006_status_version.up.sql", + ) +} + +func _000006_status_versionUpSql() (*asset, error) { + bytes, err := _000006_status_versionUpSqlBytes() + if err != nil { + return nil, err + } + + info := bindataFileInfo{name: "000006_status_version.up.sql", size: 198, mode: os.FileMode(0644), modTime: time.Unix(1717560330, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x2b, 0x11, 0xee, 0x9f, 0x4f, 0xf5, 0x0, 0x9a, 0x98, 0xe9, 0x44, 0x21, 0x2e, 0x57, 0xf7, 0xae, 0xf3, 0xb2, 0x3d, 0x94, 0x40, 0x69, 0xa7, 0x1d, 0x62, 0x57, 0x31, 0x9f, 0x60, 0x6, 0xed, 0x80}} + return a, nil +} + var _docGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x2c\xc9\x31\x12\x84\x20\x0c\x05\xd0\x9e\x53\xfc\x0b\x90\xf4\x7b\x9b\xac\xfe\xc9\x38\x20\x41\x4c\xe3\xed\x6d\xac\xdf\xb4\xad\x99\x13\xf7\xd5\x4b\x51\xf5\xf8\x39\x07\x97\x25\xe1\x51\xff\xc7\xd8\x2d\x0d\x75\x36\x47\xb2\xf3\x64\xae\x07\x35\x20\xa2\x1f\x8a\x07\x44\xcb\x1b\x00\x00\xff\xff\xb6\x03\x50\xe0\x49\x00\x00\x00") func docGoBytes() ([]byte, error) { @@ -289,6 +310,7 @@ var _bindata = map[string]func() (*asset, error){ "000003_index_truncate.up.sql": _000003_index_truncateUpSql, "000004_envelope.table.up.sql": _000004_envelopeTableUpSql, "000005_pushed_envelope.up.sql": _000005_pushed_envelopeUpSql, + "000006_status_version.up.sql": _000006_status_versionUpSql, "doc.go": docGo, } @@ -343,6 +365,7 @@ var _bintree = &bintree{nil, map[string]*bintree{ "000003_index_truncate.up.sql": {_000003_index_truncateUpSql, map[string]*bintree{}}, "000004_envelope.table.up.sql": {_000004_envelopeTableUpSql, map[string]*bintree{}}, "000005_pushed_envelope.up.sql": {_000005_pushed_envelopeUpSql, map[string]*bintree{}}, + "000006_status_version.up.sql": {_000006_status_versionUpSql, map[string]*bintree{}}, "doc.go": {docGo, map[string]*bintree{}}, }} diff --git a/telemetry/db.go b/telemetry/db.go index 7829d5b..30cd0a1 100644 --- a/telemetry/db.go +++ b/telemetry/db.go @@ -72,9 +72,7 @@ func createTables(db *sql.DB) error { sentAt INTEGER NOT NULL, topic VARCHAR(255) NOT NULL, createdAt INTEGER NOT NULL, - constraint receivedMessages_unique unique(chatId, messageHash, receiverKeyUID, nodeName) - );` _, err := db.Exec(sqlStmt) diff --git a/telemetry/receivedenevlope.go b/telemetry/receivedenevlope.go index 19d92a6..4276673 100644 --- a/telemetry/receivedenevlope.go +++ b/telemetry/receivedenevlope.go @@ -16,13 +16,14 @@ type ReceivedEnvelope struct { ReceiverKeyUID string `json:"receiverKeyUID"` NodeName string `json:"nodeName"` ProcessingError string `json:"processingError"` + StatusVersion string `json:"statusVersion"` } func (r *ReceivedEnvelope) put(db *sql.DB) error { r.CreatedAt = time.Now().Unix() stmt, err := db.Prepare(`INSERT INTO receivedEnvelopes (messageHash, sentAt, createdAt, pubsubTopic, - topic, receiverKeyUID, nodeName, processingError) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + topic, receiverKeyUID, nodeName, processingError, statusVersion) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT ON CONSTRAINT receivedEnvelopes_unique DO NOTHING RETURNING id;`) if err != nil { @@ -30,7 +31,7 @@ func (r *ReceivedEnvelope) put(db *sql.DB) error { } lastInsertId := 0 - err = stmt.QueryRow(r.MessageHash, r.SentAt, r.CreatedAt, r.PubsubTopic, r.Topic, r.ReceiverKeyUID, r.NodeName, r.ProcessingError).Scan(&lastInsertId) + err = stmt.QueryRow(r.MessageHash, r.SentAt, r.CreatedAt, r.PubsubTopic, r.Topic, r.ReceiverKeyUID, r.NodeName, r.ProcessingError, r.StatusVersion).Scan(&lastInsertId) if err != nil { if errors.Is(err, sql.ErrNoRows) { return nil @@ -72,13 +73,14 @@ type SentEnvelope struct { NodeName string `json:"nodeName"` ProcessingError string `json:"processingError"` PublishMethod string `json:"publishMethod"` + StatusVersion string `json:"statusVersion"` } func (r *SentEnvelope) put(db *sql.DB) error { r.CreatedAt = time.Now().Unix() stmt, err := db.Prepare(`INSERT INTO sentEnvelopes (messageHash, sentAt, createdAt, pubsubTopic, - topic, senderKeyUID, nodeName, publishMethod) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + topic, senderKeyUID, nodeName, publishMethod, statusVersion) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT ON CONSTRAINT sentEnvelopes_unique DO NOTHING RETURNING id;`) if err != nil { @@ -86,16 +88,16 @@ func (r *SentEnvelope) put(db *sql.DB) error { } lastInsertId := int64(0) - res, err := stmt.Exec(r.MessageHash, r.SentAt, r.CreatedAt, r.PubsubTopic, r.Topic, r.SenderKeyUID, r.NodeName, r.PublishMethod) - lastInsertId, _ = res.LastInsertId() + res, err := stmt.Exec(r.MessageHash, r.SentAt, r.CreatedAt, r.PubsubTopic, r.Topic, r.SenderKeyUID, r.NodeName, r.PublishMethod, r.StatusVersion) + if err != nil { + return err + } + lastInsertId, err = res.LastInsertId() if err != nil { - if errors.Is(err, sql.ErrNoRows) { - return nil - } else { - return err - } + return err } + defer stmt.Close() r.ID = int(lastInsertId) diff --git a/telemetry/receivedmessage.go b/telemetry/receivedmessage.go index 333da13..90be082 100644 --- a/telemetry/receivedmessage.go +++ b/telemetry/receivedmessage.go @@ -27,6 +27,7 @@ type ReceivedMessage struct { Topic string `json:"topic"` PubsubTopic string `json:"pubsubTopic"` CreatedAt int64 `json:"createdAt"` + StatusVersion string `json:"statusVersion"` } func queryReceivedMessagesBetween(db *sql.DB, startsAt time.Time, endsAt time.Time) ([]*ReceivedMessage, error) { @@ -88,14 +89,14 @@ func didReceivedMessageBeforeAndAfterInChat(db *sql.DB, receiverPublicKey string } func (r *ReceivedMessage) put(db *sql.DB) error { - stmt, err := db.Prepare("INSERT INTO receivedMessages (chatId, messageHash, messageId, receiverKeyUID, nodeName, sentAt, topic, messageType, messageSize, createdAt, pubSubTopic) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) RETURNING id;") + stmt, err := db.Prepare("INSERT INTO receivedMessages (chatId, messageHash, messageId, receiverKeyUID, nodeName, sentAt, topic, messageType, messageSize, createdAt, pubSubTopic, statusVersion) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) RETURNING id;") if err != nil { return err } r.CreatedAt = time.Now().Unix() lastInsertId := 0 - err = stmt.QueryRow(r.ChatID, r.MessageHash, r.MessageID, r.ReceiverKeyUID, r.NodeName, r.SentAt, r.Topic, r.MessageType, r.MessageSize, r.CreatedAt, r.PubsubTopic).Scan(&lastInsertId) + err = stmt.QueryRow(r.ChatID, r.MessageHash, r.MessageID, r.ReceiverKeyUID, r.NodeName, r.SentAt, r.Topic, r.MessageType, r.MessageSize, r.CreatedAt, r.PubsubTopic, r.StatusVersion).Scan(&lastInsertId) if err != nil { return err } diff --git a/telemetry/server.go b/telemetry/server.go index 8d94929..cc2f426 100644 --- a/telemetry/server.go +++ b/telemetry/server.go @@ -45,6 +45,7 @@ func NewServer(db *sql.DB, logger *zap.Logger) *Server { server.Router.HandleFunc("/sent-envelope", server.createSentEnvelope).Methods("POST") server.Router.HandleFunc("/update-envelope", server.updateEnvelope).Methods("POST") server.Router.HandleFunc("/health", handleHealthCheck).Methods("GET") + server.Router.HandleFunc("/record-metrics", server.createTelemetryData).Methods("POST") server.Router.Use(server.rateLimit) return server @@ -55,6 +56,88 @@ func handleHealthCheck(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "OK") } +type TelemetryType string + +const ( + ProtocolStatsMetric TelemetryType = "ProtocolStats" + ReceivedEnvelopeMetric TelemetryType = "ReceivedEnvelope" + SentEnvelopeMetric TelemetryType = "SentEnvelope" + UpdateEnvelopeMetric TelemetryType = "UpdateEnvelope" + ReceivedMessagesMetric TelemetryType = "ReceivedMessages" +) + +type TelemetryRequest struct { + Id int `json:"id"` + TelemetryType TelemetryType `json:"telemetry_type"` + TelemetryData *json.RawMessage `json:"telemetry_data"` +} + +func (s *Server) createTelemetryData(w http.ResponseWriter, r *http.Request) { + start := time.Now() + var telemetryData []TelemetryRequest + decoder := json.NewDecoder(r.Body) + if err := decoder.Decode(&telemetryData); err != nil { + log.Println(err) + http.Error(w, "Failed to decode telemetry data", http.StatusBadRequest) + return + } + + var errorDetails []map[string]interface{} + + for _, data := range telemetryData { + switch data.TelemetryType { + case ProtocolStatsMetric: + var stats ProtocolStats + if err := json.Unmarshal(*data.TelemetryData, &stats); err != nil { + errorDetails = append(errorDetails, map[string]interface{}{"Id": data.Id, "Error": fmt.Sprintf("Error decoding protocol stats: %v", err)}) + continue + } + if err := stats.put(s.DB); err != nil { + errorDetails = append(errorDetails, map[string]interface{}{"Id": data.Id, "Error": fmt.Sprintf("Error saving protocol stats: %v", err)}) + continue + } + case ReceivedEnvelopeMetric: + var envelope ReceivedEnvelope + if err := json.Unmarshal(*data.TelemetryData, &envelope); err != nil { + errorDetails = append(errorDetails, map[string]interface{}{"Id": data.Id, "Error": fmt.Sprintf("Error decoding received envelope: %v", err)}) + continue + } + if err := envelope.put(s.DB); err != nil { + errorDetails = append(errorDetails, map[string]interface{}{"Id": data.Id, "Error": fmt.Sprintf("Error saving received envelope: %v", err)}) + continue + } + case SentEnvelopeMetric: + var envelope SentEnvelope + if err := json.Unmarshal(*data.TelemetryData, &envelope); err != nil { + errorDetails = append(errorDetails, map[string]interface{}{"Id": data.Id, "Error": fmt.Sprintf("Error decoding sent envelope: %v", err)}) + continue + } + if err := envelope.put(s.DB); err != nil { + errorDetails = append(errorDetails, map[string]interface{}{"Id": data.Id, "Error": fmt.Sprintf("Error saving sent envelope: %v", err)}) + continue + } + default: + errorDetails = append(errorDetails, map[string]interface{}{"Id": data.Id, "Error": fmt.Sprintf("Unknown telemetry type: %s", data.TelemetryType)}) + } + } + + if len(errorDetails) > 0 { + log.Printf("Errors encountered: %v", errorDetails) + } + + err := respondWithJSON(w, http.StatusCreated, errorDetails) + if err != nil { + log.Println(err) + } + + log.Printf( + "%s\t%s\t%s", + r.Method, + r.RequestURI, + time.Since(start), + ) +} + func (s *Server) createReceivedMessages(w http.ResponseWriter, r *http.Request) { start := time.Now() var receivedMessages []ReceivedMessage @@ -198,11 +281,6 @@ func (s *Server) createSentEnvelope(w http.ResponseWriter, r *http.Request) { err := sentEnvelope.put(s.DB) if err != nil { log.Println("could not save envelope", err, sentEnvelope) - err := respondWithError(w, http.StatusBadRequest, "could not save envelope") - if err != nil { - log.Println(err) - } - return } err = respondWithJSON(w, http.StatusCreated, sentEnvelope) diff --git a/telemetry/sql/000006_status_version.up.sql b/telemetry/sql/000006_status_version.up.sql new file mode 100644 index 0000000..70dc97f --- /dev/null +++ b/telemetry/sql/000006_status_version.up.sql @@ -0,0 +1,3 @@ +ALTER TABLE receivedMessages ADD COLUMN statusVersion VARCHAR(31); +ALTER TABLE receivedEnvelopes ADD COLUMN statusVersion VARCHAR(31); +ALTER TABLE sentEnvelopes ADD COLUMN statusVersion VARCHAR(31); \ No newline at end of file