Skip to content

Commit

Permalink
feat: send sample event and response in one report per label set in t…
Browse files Browse the repository at this point in the history
…he configured duration
  • Loading branch information
vamsikrishnakandi committed Nov 14, 2024
1 parent 22a89e8 commit 3c645f8
Show file tree
Hide file tree
Showing 6 changed files with 391 additions and 0 deletions.
3 changes: 3 additions & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -249,3 +249,6 @@ PgNotifier:
retriggerCount: 500
trackBatchInterval: 2s
maxAttempt: 3
Reporting:
eventSamplingEnabled: true
eventSamplingDurationInMinutes: 60
147 changes: 147 additions & 0 deletions enterprise/reporting/event_sampler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package reporting

import (
"context"
"fmt"
"os"
"sync"
"time"

"github.com/dgraph-io/badger/v4"
"github.com/dgraph-io/badger/v4/options"
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-server/utils/misc"
)

type EventSampler struct {
db *badger.DB
mu sync.Mutex
ttl config.ValueLoader[time.Duration]
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}

func DefaultPath(pathName string) string {
tmpDirPath, err := misc.CreateTMPDIR()
if err != nil {
panic(err)
}
return fmt.Sprintf(`%v%v`, tmpDirPath, pathName)
}

func NewEventSampler(pathName string, ttl config.ValueLoader[time.Duration], conf *config.Config, log logger.Logger) (*EventSampler, error) {
dbPath := DefaultPath(pathName)
opts := badger.DefaultOptions(dbPath).
WithLogger(badgerLogger{log}).
WithCompression(options.None).
WithIndexCacheSize(16 << 20). // 16mb
WithNumGoroutines(1).
WithNumMemtables(conf.GetInt("BadgerDB.numMemtable", 5)).
WithValueThreshold(conf.GetInt64("BadgerDB.valueThreshold", 1048576)).
WithBlockCacheSize(0).
WithNumVersionsToKeep(1).
WithNumLevelZeroTables(conf.GetInt("BadgerDB.numLevelZeroTables", 5)).
WithNumLevelZeroTablesStall(conf.GetInt("BadgerDB.numLevelZeroTablesStall", 15)).
WithSyncWrites(conf.GetBool("BadgerDB.syncWrites", false)).
WithDetectConflicts(conf.GetBool("BadgerDB.detectConflicts", false))

ctx, cancel := context.WithCancel(context.Background())

db, err := badger.Open(opts)

es := &EventSampler{
db: db,
ttl: ttl,
ctx: ctx,
cancel: cancel,
wg: sync.WaitGroup{},
}

if err != nil {
return nil, err
}

go es.gcLoop()

return es, nil
}

func (es *EventSampler) Get(key []byte) (bool, error) {
es.mu.Lock()
defer es.mu.Unlock()

var found bool

err := es.db.View(func(txn *badger.Txn) error {
item, err := txn.Get(key)

if err != nil {
return err
}

found = item != nil
return nil
})

if err == badger.ErrKeyNotFound {
return false, nil
} else if err != nil {
return false, err
}

return found, nil
}

func (es *EventSampler) Put(key []byte) error {
es.mu.Lock()
defer es.mu.Unlock()

return es.db.Update(func(txn *badger.Txn) error {
entry := badger.NewEntry(key, []byte{1}).WithTTL(es.ttl.Load())
return txn.SetEntry(entry)
})
}

func (es *EventSampler) gcLoop() {
for {
select {
case <-es.ctx.Done():
_ = es.db.RunValueLogGC(0.5)
return
case <-time.After(5 * time.Minute):
}
again:
if es.ctx.Err() != nil {
return
}
// One call would only result in removal of at max one log file.
// As an optimization, you could also immediately re-run it whenever it returns nil error
// (this is why `goto again` is used).
err := es.db.RunValueLogGC(0.5)
if err == nil {
goto again
}
}
}

func (es *EventSampler) Close() {
es.cancel()
es.wg.Wait()
if es.db != nil {
_ = es.db.Close()
}
}

type badgerLogger struct {
logger.Logger
}

func (badgerLogger) Errorf(format string, a ...interface{}) {
_, _ = fmt.Fprintf(os.Stderr, format, a...)
}

func (badgerLogger) Warningf(format string, a ...interface{}) {
_, _ = fmt.Fprintf(os.Stderr, format, a...)
}
49 changes: 49 additions & 0 deletions enterprise/reporting/event_sampler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package reporting

import (
"testing"
"time"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/stretchr/testify/assert"
)

func TestPutAndGet(t *testing.T) {
conf := config.New()
ttl := conf.GetReloadableDurationVar(1, time.Second, "Reporting.eventSamplingDurationInMinutes")
log := logger.NewLogger()

es, _ := NewEventSampler("/test-reporting-badger", ttl, conf, log)

es.Put([]byte("key1"))

Check failure on line 19 in enterprise/reporting/event_sampler_test.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `es.Put` is not checked (errcheck)
es.Put([]byte("key2"))

Check failure on line 20 in enterprise/reporting/event_sampler_test.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `es.Put` is not checked (errcheck)
es.Put([]byte("key3"))

Check failure on line 21 in enterprise/reporting/event_sampler_test.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `es.Put` is not checked (errcheck)
val1, _ := es.Get([]byte("key1"))
val2, _ := es.Get([]byte("key2"))
val3, _ := es.Get([]byte("key3"))

assert.True(t, val1, "Expected key1 to be present")
assert.True(t, val2, "Expected key2 to be present")
assert.True(t, val3, "Expected key3 to be present")

es.Close()
}

func TestEvictionOnTTLExpiry(t *testing.T) {
conf := config.New()
ttl := conf.GetReloadableDurationVar(500, time.Millisecond, "Reporting.eventSamplingDurationInMinutes")
log := logger.NewLogger()

es, _ := NewEventSampler("/test-reporting-badger", ttl, conf, log)

es.Put([]byte("key1"))

Check failure on line 40 in enterprise/reporting/event_sampler_test.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `es.Put` is not checked (errcheck)

time.Sleep(600 * time.Millisecond)

val1, _ := es.Get([]byte("key1"))

assert.False(t, val1, "Expected key1 to be evicted")

es.Close()
}
69 changes: 69 additions & 0 deletions enterprise/reporting/label_set.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package reporting

import (
"encoding/hex"
"strconv"

"github.com/rudderlabs/rudder-server/utils/types"
"github.com/spaolacci/murmur3"
)

type LabelSet struct {
WorkspaceID string
// Namespace string
// InstanceID string
SourceDefinitionID string
SourceCategory string
SourceID string
DestinationDefinitionID string
DestinationID string
SourceTaskRunID string
SourceJobID string
SourceJobRunID string
TransformationID string
TransformationVersionID string
TrackingPlanID string
TrackingPlanVersion int
InPU string
PU string
Status string
TerminalState bool
InitialState bool
StatusCode int
EventName string
EventType string
ErrorType string
}

func NewLabelSet(metric types.PUReportedMetric) LabelSet {
return LabelSet{
WorkspaceID: metric.ConnectionDetails.SourceID,
SourceDefinitionID: metric.ConnectionDetails.SourceDefinitionId,
SourceCategory: metric.ConnectionDetails.SourceCategory,
SourceID: metric.ConnectionDetails.SourceID,
DestinationDefinitionID: metric.ConnectionDetails.DestinationDefinitionId,
DestinationID: metric.ConnectionDetails.DestinationID,
SourceTaskRunID: metric.ConnectionDetails.SourceTaskRunID,
SourceJobID: metric.ConnectionDetails.SourceJobID,
SourceJobRunID: metric.ConnectionDetails.SourceJobRunID,
TransformationID: metric.ConnectionDetails.TransformationID,
TransformationVersionID: metric.ConnectionDetails.TransformationVersionID,
TrackingPlanID: metric.ConnectionDetails.TrackingPlanID,
TrackingPlanVersion: metric.ConnectionDetails.TrackingPlanVersion,
InPU: metric.PUDetails.InPU,
PU: metric.PUDetails.PU,
Status: metric.StatusDetail.Status,
TerminalState: metric.PUDetails.TerminalPU,
InitialState: metric.PUDetails.InitialPU,
StatusCode: metric.StatusDetail.StatusCode,
EventName: metric.StatusDetail.EventName,
EventType: metric.StatusDetail.EventType,
ErrorType: metric.StatusDetail.ErrorType,
}
}

func (labelSet LabelSet) generateHash() string {
data := labelSet.WorkspaceID + labelSet.SourceDefinitionID + labelSet.SourceCategory + labelSet.SourceID + labelSet.DestinationDefinitionID + labelSet.DestinationID + labelSet.SourceTaskRunID + labelSet.SourceJobID + labelSet.SourceJobRunID + labelSet.TransformationID + labelSet.TransformationVersionID + labelSet.TrackingPlanID + strconv.Itoa(labelSet.TrackingPlanVersion) + labelSet.InPU + labelSet.PU + labelSet.Status + strconv.FormatBool(labelSet.TerminalState) + strconv.FormatBool(labelSet.InitialState) + strconv.Itoa(labelSet.StatusCode) + labelSet.EventName + labelSet.EventType + labelSet.ErrorType
hash := murmur3.Sum64([]byte(data))
return hex.EncodeToString([]byte(strconv.FormatUint(hash, 16)))
}
76 changes: 76 additions & 0 deletions enterprise/reporting/label_set_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package reporting

import (
"testing"

"github.com/rudderlabs/rudder-server/utils/types"
"github.com/stretchr/testify/assert"
)

func createMetricObject(eventName string) types.PUReportedMetric {
return types.PUReportedMetric{
ConnectionDetails: types.ConnectionDetails{
SourceID: "some-source-id",
DestinationID: "some-destination-id",
},
PUDetails: types.PUDetails{
InPU: "some-in-pu",
PU: "some-pu",
},
StatusDetail: &types.StatusDetail{
Status: "some-status",
Count: 3,
StatusCode: 0,
SampleResponse: `{"some-sample-response-key": "some-sample-response-value"}`,
SampleEvent: []byte(`{"some-sample-event-key": "some-sample-event-value"}`),
EventName: eventName,
EventType: "some-event-type",
},
}
}

func TestNewLabelSet(t *testing.T) {
t.Run("should create the correct LabelSet from types.PUReportedMetric", func(t *testing.T) {
inputMetric := createMetricObject("some-event-name")
labelSet := NewLabelSet(inputMetric)

assert.Equal(t, "some-source-id", labelSet.SourceID)
assert.Equal(t, "some-event-name", labelSet.EventName) // Default value
})

t.Run("should create the correct LabelSet with custom EventName", func(t *testing.T) {
inputMetric := createMetricObject("custom-event-name")
labelSet := NewLabelSet(inputMetric)

assert.Equal(t, "some-source-id", labelSet.SourceID)
assert.Equal(t, "custom-event-name", labelSet.EventName) // Custom event name
})
}

func TestGenerateHash(t *testing.T) {
t.Run("same hash for same LabelSet", func(t *testing.T) {
inputMetric1 := createMetricObject("some-event-name")
labelSet1 := NewLabelSet(inputMetric1)

inputMetric2 := createMetricObject("some-event-name")
labelSet2 := NewLabelSet(inputMetric2)

hash1 := labelSet1.generateHash()
hash2 := labelSet2.generateHash()

assert.Equal(t, hash1, hash2)
})

t.Run("different hash for different LabelSet", func(t *testing.T) {
inputMetric1 := createMetricObject("some-event-name-1")
labelSet1 := NewLabelSet(inputMetric1)

inputMetric2 := createMetricObject("some-event-name-2")
labelSet2 := NewLabelSet(inputMetric2)

hash1 := labelSet1.generateHash()
hash2 := labelSet2.generateHash()

assert.NotEqual(t, hash1, hash2)
})
}
Loading

0 comments on commit 3c645f8

Please sign in to comment.