Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: generic metrics and message check metrics #56

Merged
merged 1 commit into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,7 @@ func main() {
server.RegisterMetric(types.PeerCountByShardMetric, &metrics.PeerCountByShard{})
server.RegisterMetric(types.PeerCountByOriginMetric, &metrics.PeerCountByOrigin{})

server.RegisterMetric(types.MessageCheckSuccessMetric, &metrics.MessageCheckSuccess{})
server.RegisterMetric(types.MessageCheckFailureMetric, &metrics.MessageCheckFailure{})
server.Start(*port)
}
12 changes: 12 additions & 0 deletions lib/common/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ func DropTables(db *sql.DB) {
"errorsendingenvelope",
"peerCountByShard",
"peerCountByOrigin",
"messageCheckSuccess",
"messageCheckFailure",
"schema_migrations",
}

Expand Down Expand Up @@ -96,6 +98,16 @@ func DropTables(db *sql.DB) {
log.Fatalf("an error '%s' was not expected when dropping the index", err)
}

_, err = tx.Exec("DROP TABLE IF EXISTS messageCheckSuccess")
if err != nil {
log.Fatalf("an error '%s' was not expected when dropping the table", err)
}

_, err = tx.Exec("DROP TABLE IF EXISTS messageCheckFailure")
if err != nil {
log.Fatalf("an error '%s' was not expected when dropping the table", err)
}

_, err = tx.Exec("DROP TABLE IF EXISTS schema_migrations")
if err != nil {
log.Fatalf("an error '%s' was not expected when dropping the table", err)
Expand Down
54 changes: 50 additions & 4 deletions lib/database/bindata.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion lib/database/sql/000018_waku_req_res.up.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ALTER TABLE IF NOT EXISTS wakuRequestResponse (
CREATE TABLE IF NOT EXISTS wakuRequestResponse (
id SERIAL PRIMARY KEY,
siddarthkay marked this conversation as resolved.
Show resolved Hide resolved
protocol VARCHAR(50) NOT NULL,
ephemeral BOOLEAN NOT NULL,
Expand Down
38 changes: 38 additions & 0 deletions lib/database/sql/000019_message_check.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
CREATE TABLE IF NOT EXISTS wakuRequestResponse (
id SERIAL PRIMARY KEY,
protocol VARCHAR(50) NOT NULL,
ephemeral BOOLEAN NOT NULL,
timestamp INTEGER NOT NULL,
seenTimestamp INTEGER NOT NULL,
createdAt INTEGER NOT NULL,
contentTopic VARCHAR(255) NOT NULL,
pubsubTopic VARCHAR(255) NOT NULL,
peerId VARCHAR(255) NOT NULL,
messageHash VARCHAR(255) NOT NULL,
errorMessage TEXT,
extraData TEXT,

CONSTRAINT messages_unique UNIQUE (peerId, messageHash)
);

CREATE TABLE IF NOT EXISTS messageCheckSuccess (
siddarthkay marked this conversation as resolved.
Show resolved Hide resolved
id SERIAL PRIMARY KEY,
recordId INTEGER NOT NULL,
messageHash TEXT NOT NULL,
timestamp INTEGER NOT NULL,
CONSTRAINT messageCheckSuccess_unique UNIQUE (recordId, messageHash, timestamp)
);

ALTER TABLE messageCheckSuccess ADD CONSTRAINT fk_messageCheckSuccess_telemetryRecord
FOREIGN KEY (recordId) REFERENCES telemetryRecord(id);

CREATE TABLE IF NOT EXISTS messageCheckFailure (
id SERIAL PRIMARY KEY,
recordId INTEGER NOT NULL,
messageHash TEXT NOT NULL,
timestamp INTEGER NOT NULL,
CONSTRAINT messageCheckFailure_unique UNIQUE (recordId, messageHash, timestamp)
);

ALTER TABLE messageCheckFailure ADD CONSTRAINT fk_messageCheckFailure_telemetryRecord
FOREIGN KEY (recordId) REFERENCES telemetryRecord(id);
124 changes: 124 additions & 0 deletions lib/metrics/generic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package metrics

import (
"context"
"database/sql"
"encoding/json"
"fmt"
"reflect"
"strings"

"github.com/status-im/telemetry/lib/common"
"github.com/status-im/telemetry/pkg/types"
)

// GenericMetric is a generic struct that can handle any metric type
type GenericMetric[T any] struct {
types.TelemetryRecord
Data T
}

// MetricProcessor is an interface for processing metrics
type MetricProcessor interface {
Process(context.Context, *sql.DB, *common.MetricErrors, *types.TelemetryRequest) error
Clean(*sql.DB, int64) (int64, error)
}

// NewMetricProcessor creates a new MetricProcessor for the given metric type
func NewMetricProcessor[T types.TelemetryRecord]() MetricProcessor {
return &GenericMetric[T]{
Data: *new(T),
}
}

// Process implements the MetricProcessor interface
func (g *GenericMetric[T]) Process(ctx context.Context, db *sql.DB, errs *common.MetricErrors, data *types.TelemetryRequest) error {
// Unmarshal the TelemetryRecord fields
if err := json.Unmarshal(*data.TelemetryData, &g.TelemetryRecord); err != nil {
errs.Append(data.ID, fmt.Sprintf("Error decoding TelemetryRecord: %v", err))
return err
}

// Unmarshal the Data field
if err := json.Unmarshal(*data.TelemetryData, &g.Data); err != nil {
errs.Append(data.ID, fmt.Sprintf("Error decoding %T: %v", g.Data, err))
return err
}

tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()

recordId, err := InsertTelemetryRecord(tx, &g.TelemetryRecord)
if err != nil {
return err
}

columns, values := getColumnsAndValues(g.Data)
placeholders := make([]string, len(columns)+1)
for i := range placeholders {
placeholders[i] = fmt.Sprintf("$%d", i+1)
}

tableName := getTableName(g.Data)
query := fmt.Sprintf(`
INSERT INTO %s (recordId, %s)
VALUES (%s)
RETURNING id;
`, tableName, strings.Join(columns, ", "), strings.Join(placeholders, ", "))

args := []interface{}{recordId}
args = append(args, values...)

result := tx.QueryRowContext(ctx, query, args...)
if result.Err() != nil {
errs.Append(data.ID, fmt.Sprintf("Error saving %T: %v", g.Data, result.Err()))
return result.Err()
}

var lastInsertId int
err = result.Scan(&lastInsertId)
if err != nil {
return err
}

if err := tx.Commit(); err != nil {
errs.Append(data.ID, fmt.Sprintf("Error committing transaction: %v", err))
return err
}

return nil
}

// Clean implements the MetricProcessor interface
func (g *GenericMetric[T]) Clean(db *sql.DB, before int64) (int64, error) {
tableName := getTableName(g.Data)
return common.Cleanup(db, tableName, before)
}

// Helper functions

func getColumnsAndValues(v interface{}) ([]string, []interface{}) {
var columns []string
var values []interface{}
t := reflect.TypeOf(v)
val := reflect.ValueOf(v)

for i := 0; i < t.NumField(); i++ {
field := t.Field(i)
tag := field.Tag.Get("json")
if tag != "" && tag != "-" {
columnName := strings.Split(tag, ",")[0]
columns = append(columns, columnName)
values = append(values, val.Field(i).Interface())
}
}
return columns, values
}

func getTableName(v interface{}) string {
t := reflect.TypeOf(v)
return strings.ToLower(t.Name())
}
18 changes: 18 additions & 0 deletions lib/metrics/message_check.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package metrics

import (
"github.com/status-im/telemetry/pkg/types"
)

type MessageCheckSuccess struct {
GenericMetric[types.MessageCheckSuccess]
}

type MessageCheckFailure struct {
GenericMetric[types.MessageCheckFailure]
}

var (
MessageCheckSuccessProcessor = &MessageCheckSuccess{}
MessageCheckFailureProcessor = &MessageCheckFailure{}
)
14 changes: 14 additions & 0 deletions pkg/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ const (
PeerConnFailureMetric TelemetryType = "PeerConnFailure"
PeerCountByShardMetric TelemetryType = "PeerCountByShard"
PeerCountByOriginMetric TelemetryType = "PeerCountByOrigin"
MessageCheckSuccessMetric TelemetryType = "MessageCheckSuccess"
MessageCheckFailureMetric TelemetryType = "MessageCheckFailure"
)

type Origin int64
Expand Down Expand Up @@ -133,3 +135,15 @@ type PeerCountByOrigin struct {
Origin Origin `json:"origin"`
Timestamp int64 `json:"timestamp"`
}

type MessageCheckSuccess struct {
TelemetryRecord
MessageHash string `json:"messageHash"`
Timestamp int64 `json:"timestamp"`
}

type MessageCheckFailure struct {
TelemetryRecord
MessageHash string `json:"messageHash"`
Timestamp int64 `json:"timestamp"`
}
Loading