Skip to content

Commit

Permalink
Merge pull request #57 from weboko/weboko/update-waku-telemetry
Browse files Browse the repository at this point in the history
feat: update wakuPushFilter table
  • Loading branch information
adklempner authored Sep 17, 2024
2 parents b903356 + 1eba448 commit 72eb47f
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 53 deletions.
16 changes: 16 additions & 0 deletions lib/database/sql/000018_waku_req_res.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
ALTER TABLE IF NOT EXISTS wakuRequestResponse (
id SERIAL PRIMARY KEY,
protocol VARCHAR(50) NOT NULL,
ephemeral BOOLEAN NOT NULL,
timestamp INTEGER NOT NULL,
seenTimestamp INTEGER NOT NULL,
createdAt INTEGER NOT NULL,
contentTopic VARCHAR(255) NOT NULL,
pubsubTopic VARCHAR(255) NOT NULL,
peerId VARCHAR(255) NOT NULL,
messageHash VARCHAR(255) NOT NULL,
errorMessage TEXT,
extraData TEXT,

CONSTRAINT messages_unique UNIQUE (peerId, messageHash)
);
69 changes: 26 additions & 43 deletions lib/metrics/waku_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ type WakuTelemetryType string

const (
LightPushFilter WakuTelemetryType = "LightPushFilter"
LightPushError WakuTelemetryType = "LightPushError"
Generic WakuTelemetryType = "Generic"
)

Expand All @@ -21,21 +20,22 @@ type WakuTelemetryRequest struct {
}

type TelemetryPushFilter struct {
ID int `json:"id"`
WalletAddress string `json:"walletAddress"`
PeerIDSender string `json:"peerIdSender"`
PeerIDReporter string `json:"peerIdReporter"`
SequenceHash string `json:"sequenceHash"`
SequenceTotal uint64 `json:"sequenceTotal"`
SequenceIndex uint64 `json:"sequenceIndex"`
ContentTopic string `json:"contentTopic"`
PubsubTopic string `json:"pubsubTopic"`
Timestamp int64 `json:"timestamp"`
CreatedAt int64 `json:"createdAt"`
ID int `json:"id"`
Protocol string `json:"protocol"`
Ephemeral bool `json:"ephemeral"`
Timestamp int64 `json:"timestamp"`
SeenTimestamp int64 `json:"seenTimestamp"`
CreatedAt int64 `json:"createdAt"`
ContentTopic string `json:"contentTopic"`
PubsubTopic string `json:"pubsubTopic"`
PeerID string `json:"peerId"`
MessageHash string `json:"messageHash"`
ErrorMessage string `json:"errorMessage"`
ExtraData string `json:"extraData"`
}

func (r *TelemetryPushFilter) Put(db *sql.DB) error {
stmt, err := db.Prepare("INSERT INTO wakuPushFilter (peerIdSender, peerIdReporter, sequenceHash, sequenceTotal, sequenceIndex, contentTopic, pubsubTopic, timestamp, createdAt) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) RETURNING id;")
stmt, err := db.Prepare("INSERT INTO wakuRequestResponse (protocol, ephemeral, timestamp, seenTimestamp, contentTopic, pubsubTopic, peerId, messageHash, errorMessage, extraData, createdAt) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) RETURNING id;")
if err != nil {
return err
}
Expand All @@ -44,37 +44,20 @@ func (r *TelemetryPushFilter) Put(db *sql.DB) error {

r.CreatedAt = time.Now().Unix()
lastInsertId := 0
err = stmt.QueryRow(r.PeerIDSender, r.PeerIDReporter, r.SequenceHash, r.SequenceTotal, r.SequenceIndex, r.ContentTopic, r.PubsubTopic, r.Timestamp, r.CreatedAt).Scan(&lastInsertId)
if err != nil {
return err
}
r.ID = lastInsertId

return nil
}

type TelemetryPushError struct {
ID int `json:"id"`
PeerID string `json:"peerId"`
ErrorMessage string `json:"errorMessage"`
PeerIDRemote string `json:"peerIdRemote"`
ContentTopic string `json:"contentTopic"`
PubsubTopic string `json:"pubsubTopic"`
Timestamp int64 `json:"timestamp"`
CreatedAt int64 `json:"createdAt"`
}

func (r *TelemetryPushError) Put(db *sql.DB) error {
stmt, err := db.Prepare("INSERT INTO wakuPushError (peerId, peerIdRemote, contentTopic, pubsubTopic, errorMessage, timestamp, createdAt) VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING id;")
if err != nil {
return err
}

defer stmt.Close()
err = stmt.QueryRow(
r.Protocol,
r.Ephemeral,
r.Timestamp,
r.SeenTimestamp,
r.ContentTopic,
r.PubsubTopic,
r.PeerID,
r.MessageHash,
r.ErrorMessage,
r.ExtraData,
r.CreatedAt,
).Scan(&lastInsertId)

r.CreatedAt = time.Now().Unix()
lastInsertId := 0
err = stmt.QueryRow(r.PeerID, r.PeerIDRemote, r.ContentTopic, r.PubsubTopic, r.ErrorMessage, r.Timestamp, r.CreatedAt).Scan(&lastInsertId)
if err != nil {
return err
}
Expand Down
10 changes: 0 additions & 10 deletions telemetry/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,16 +249,6 @@ func (s *Server) createWakuTelemetry(w http.ResponseWriter, r *http.Request) {
errorDetails.Append(data.Id, fmt.Sprintf("Error saving lightpush/filter metric: %v", err))
continue
}
case metrics.LightPushError:
var pushError metrics.TelemetryPushError
if err := json.Unmarshal(*data.TelemetryData, &pushError); err != nil {
errorDetails.Append(data.Id, fmt.Sprintf("Error decoding lightpush error metric: %v", err))
continue
}
if err := pushError.Put(s.DB); err != nil {
errorDetails.Append(data.Id, fmt.Sprintf("Error saving lightpush error metric: %v", err))
continue
}
case metrics.Generic:
var pushGeneric metrics.TelemetryGeneric
if err := json.Unmarshal(*data.TelemetryData, &pushGeneric); err != nil {
Expand Down

0 comments on commit 72eb47f

Please sign in to comment.