Skip to content

Commit

Permalink
feat: add endpoint for batching different metrics in single request
Browse files Browse the repository at this point in the history
This commit adds a new endpoint which expects the body to be a
JSON array of arbitrary metrics, allowing a single request to
include more than one metric of one or more types. Also adds an
optional status version field to tables.
  • Loading branch information
adklempner committed Jun 5, 2024
1 parent 6efd105 commit 50e88d3
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 16 deletions.
5 changes: 5 additions & 0 deletions telemetry/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ func dropTables(db *sql.DB) {
log.Fatalf("an error '%s' was not expected when dropping the table", err)
}

_, err = db.Exec("DROP TABLE IF EXISTS sentEnvelopes")
if err != nil {
log.Fatalf("an error '%s' was not expected when dropping the table", err)
}

_, err = db.Exec("DROP TABLE IF EXISTS protocolStatsRate")
if err != nil {
log.Fatalf("an error '%s' was not expected when dropping the table", err)
Expand Down
25 changes: 24 additions & 1 deletion telemetry/bindata.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions telemetry/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,7 @@ func createTables(db *sql.DB) error {
sentAt INTEGER NOT NULL,
topic VARCHAR(255) NOT NULL,
createdAt INTEGER NOT NULL,
constraint receivedMessages_unique unique(chatId, messageHash, receiverKeyUID, nodeName)
);`
_, err := db.Exec(sqlStmt)

Expand Down
14 changes: 8 additions & 6 deletions telemetry/receivedenevlope.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,22 @@ type ReceivedEnvelope struct {
ReceiverKeyUID string `json:"receiverKeyUID"`
NodeName string `json:"nodeName"`
ProcessingError string `json:"processingError"`
StatusVersion string `json:"statusVersion"`
}

func (r *ReceivedEnvelope) put(db *sql.DB) error {
r.CreatedAt = time.Now().Unix()
stmt, err := db.Prepare(`INSERT INTO receivedEnvelopes (messageHash, sentAt, createdAt, pubsubTopic,
topic, receiverKeyUID, nodeName, processingError)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
topic, receiverKeyUID, nodeName, processingError, statusVersion)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT ON CONSTRAINT receivedEnvelopes_unique DO NOTHING
RETURNING id;`)
if err != nil {
return err
}

lastInsertId := 0
err = stmt.QueryRow(r.MessageHash, r.SentAt, r.CreatedAt, r.PubsubTopic, r.Topic, r.ReceiverKeyUID, r.NodeName, r.ProcessingError).Scan(&lastInsertId)
err = stmt.QueryRow(r.MessageHash, r.SentAt, r.CreatedAt, r.PubsubTopic, r.Topic, r.ReceiverKeyUID, r.NodeName, r.ProcessingError, r.StatusVersion).Scan(&lastInsertId)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil
Expand Down Expand Up @@ -72,21 +73,22 @@ type SentEnvelope struct {
NodeName string `json:"nodeName"`
ProcessingError string `json:"processingError"`
PublishMethod string `json:"publishMethod"`
StatusVersion string `json:"statusVersion"`
}

func (r *SentEnvelope) put(db *sql.DB) error {
r.CreatedAt = time.Now().Unix()
stmt, err := db.Prepare(`INSERT INTO sentEnvelopes (messageHash, sentAt, createdAt, pubsubTopic,
topic, senderKeyUID, nodeName, publishMethod)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
topic, senderKeyUID, nodeName, publishMethod, statusVersion)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT ON CONSTRAINT sentEnvelopes_unique DO NOTHING
RETURNING id;`)
if err != nil {
return err
}

lastInsertId := int64(0)
res, err := stmt.Exec(r.MessageHash, r.SentAt, r.CreatedAt, r.PubsubTopic, r.Topic, r.SenderKeyUID, r.NodeName, r.PublishMethod)
res, err := stmt.Exec(r.MessageHash, r.SentAt, r.CreatedAt, r.PubsubTopic, r.Topic, r.SenderKeyUID, r.NodeName, r.PublishMethod, r.StatusVersion)
lastInsertId, _ = res.LastInsertId()

if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions telemetry/receivedmessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type ReceivedMessage struct {
Topic string `json:"topic"`
PubsubTopic string `json:"pubsubTopic"`
CreatedAt int64 `json:"createdAt"`
StatusVersion string `json:"statusVersion"`
}

func queryReceivedMessagesBetween(db *sql.DB, startsAt time.Time, endsAt time.Time) ([]*ReceivedMessage, error) {
Expand Down Expand Up @@ -88,14 +89,14 @@ func didReceivedMessageBeforeAndAfterInChat(db *sql.DB, receiverPublicKey string
}

func (r *ReceivedMessage) put(db *sql.DB) error {
stmt, err := db.Prepare("INSERT INTO receivedMessages (chatId, messageHash, messageId, receiverKeyUID, nodeName, sentAt, topic, messageType, messageSize, createdAt, pubSubTopic) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) RETURNING id;")
stmt, err := db.Prepare("INSERT INTO receivedMessages (chatId, messageHash, messageId, receiverKeyUID, nodeName, sentAt, topic, messageType, messageSize, createdAt, pubSubTopic, statusVersion) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) RETURNING id;")
if err != nil {
return err
}

r.CreatedAt = time.Now().Unix()
lastInsertId := 0
err = stmt.QueryRow(r.ChatID, r.MessageHash, r.MessageID, r.ReceiverKeyUID, r.NodeName, r.SentAt, r.Topic, r.MessageType, r.MessageSize, r.CreatedAt, r.PubsubTopic).Scan(&lastInsertId)
err = stmt.QueryRow(r.ChatID, r.MessageHash, r.MessageID, r.ReceiverKeyUID, r.NodeName, r.SentAt, r.Topic, r.MessageType, r.MessageSize, r.CreatedAt, r.PubsubTopic, r.StatusVersion).Scan(&lastInsertId)
if err != nil {
return err
}
Expand Down
86 changes: 81 additions & 5 deletions telemetry/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func NewServer(db *sql.DB, logger *zap.Logger) *Server {
server.Router.HandleFunc("/sent-envelope", server.createSentEnvelope).Methods("POST")
server.Router.HandleFunc("/update-envelope", server.updateEnvelope).Methods("POST")
server.Router.HandleFunc("/health", handleHealthCheck).Methods("GET")
server.Router.HandleFunc("/record-metrics", server.createTelemetryData).Methods("POST")
server.Router.Use(server.rateLimit)

return server
Expand All @@ -55,6 +56,86 @@ func handleHealthCheck(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "OK")
}

type TelemetryType string

const (
ProtocolStatsMetric TelemetryType = "ProtocolStats"
ReceivedEnvelopeMetric TelemetryType = "ReceivedEnvelope"
SentEnvelopeMetric TelemetryType = "SentEnvelope"
UpdateEnvelopeMetric TelemetryType = "UpdateEnvelope"
ReceivedMessagesMetric TelemetryType = "ReceivedMessages"
)

type TelemetryRequest struct {
Id int `json:"id"`
TelemetryType TelemetryType `json:"telemetry_type"`
TelemetryData *json.RawMessage `json:"telemetry_data"`
}

func (s *Server) createTelemetryData(w http.ResponseWriter, r *http.Request) {
start := time.Now()
var telemetryData []TelemetryRequest
decoder := json.NewDecoder(r.Body)
if err := decoder.Decode(&telemetryData); err != nil {
log.Println(err)
}

var errorDetails []map[string]interface{}

for _, data := range telemetryData {
switch data.TelemetryType {
case "ProtocolStats":
var stats ProtocolStats
if err := json.Unmarshal(*data.TelemetryData, &stats); err != nil {
errorDetails = append(errorDetails, map[string]interface{}{"Id": data.Id, "Error": fmt.Sprintf("Error decoding protocol stats: %v", err)})
continue
}
if err := stats.put(s.DB); err != nil {
errorDetails = append(errorDetails, map[string]interface{}{"Id": data.Id, "Error": fmt.Sprintf("Error saving protocol stats: %v", err)})
continue
}
case "ReceivedEnvelope":
var envelope ReceivedEnvelope
if err := json.Unmarshal(*data.TelemetryData, &envelope); err != nil {
errorDetails = append(errorDetails, map[string]interface{}{"Id": data.Id, "Error": fmt.Sprintf("Error decoding received envelope: %v", err)})
continue
}
if err := envelope.put(s.DB); err != nil {
errorDetails = append(errorDetails, map[string]interface{}{"Id": data.Id, "Error": fmt.Sprintf("Error saving received envelope: %v", err)})
continue
}
case "SentEnvelope":
var envelope SentEnvelope
if err := json.Unmarshal(*data.TelemetryData, &envelope); err != nil {
errorDetails = append(errorDetails, map[string]interface{}{"Id": data.Id, "Error": fmt.Sprintf("Error decoding sent envelope: %v", err)})
continue
}
if err := envelope.put(s.DB); err != nil {
errorDetails = append(errorDetails, map[string]interface{}{"Id": data.Id, "Error": fmt.Sprintf("Error saving sent envelope: %v", err)})
continue
}
default:
errorDetails = append(errorDetails, map[string]interface{}{"Id": data.Id, "Error": fmt.Sprintf("Unknown telemetry type: %s", data.TelemetryType)})
}
}

if len(errorDetails) > 0 {
log.Printf("Errors encountered: %v", errorDetails)
}

err := respondWithJSON(w, http.StatusCreated, errorDetails)
if err != nil {
log.Println(err)
}

log.Printf(
"%s\t%s\t%s",
r.Method,
r.RequestURI,
time.Since(start),
)
}

func (s *Server) createReceivedMessages(w http.ResponseWriter, r *http.Request) {
start := time.Now()
var receivedMessages []ReceivedMessage
Expand Down Expand Up @@ -198,11 +279,6 @@ func (s *Server) createSentEnvelope(w http.ResponseWriter, r *http.Request) {
err := sentEnvelope.put(s.DB)
if err != nil {
log.Println("could not save envelope", err, sentEnvelope)
err := respondWithError(w, http.StatusBadRequest, "could not save envelope")
if err != nil {
log.Println(err)
}
return
}

err = respondWithJSON(w, http.StatusCreated, sentEnvelope)
Expand Down
3 changes: 3 additions & 0 deletions telemetry/sql/000006_status_version.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE receivedMessages ADD COLUMN statusVersion VARCHAR(31);
ALTER TABLE receivedEnvelopes ADD COLUMN statusVersion VARCHAR(31);
ALTER TABLE sentEnvelopes ADD COLUMN statusVersion VARCHAR(31);

0 comments on commit 50e88d3

Please sign in to comment.