Skip to content

Commit

Permalink
(#151) Support forcing limiter bypass based on json field
Browse files Browse the repository at this point in the history
Adds the new inspect_force_field setting that can combine with inspect_field
to force copying of specific messages

Signed-off-by: R.I.Pienaar <[email protected]>
  • Loading branch information
ripienaar committed Oct 31, 2024
1 parent 513277a commit 0689d59
Show file tree
Hide file tree
Showing 11 changed files with 157 additions and 60 deletions.
4 changes: 2 additions & 2 deletions advisor/advisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ var (
)

const (
_EMPTY_ = ""
"" = ""

Check failure on line 58 in advisor/advisor.go

View workflow job for this annotation

GitHub Actions / test (1.22)

expected 'IDENT', found ""

Check failure on line 58 in advisor/advisor.go

View workflow job for this annotation

GitHub Actions / test (1.22)

syntax error: unexpected literal "", expected name
)

type Advisor struct {
Expand Down Expand Up @@ -156,7 +156,7 @@ func (a *Advisor) publisher(ctx context.Context, wg *sync.WaitGroup) {

msg := nats.NewMsg(subject)
msg.Data = d
if advisory.EventID != _EMPTY_ {
if advisory.EventID != "" {
msg.Header.Add(api.JSMsgId, advisory.EventID)
}

Expand Down
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ type Stream struct {
MaxAgeString string `json:"max_age"`
// InspectJSONField will inspect a specific field in JSON payloads and limit sends by this field
InspectJSONField string `json:"inspect_field"`
// InspectJSONForceField will inspect a specific field in the JSON payloads and force the limiter to publish the message
InspectJSONForceField string `json:"inspect_force_field"`
// InspectHeaderValue inspects the value of a header and does limiting based on that
InspectHeaderValue string `json:"inspect_header"`
// InspectSubjectToken inspects a certain token and limits based on that, -1 inspects the entire subject, 0 disables
Expand Down
28 changes: 27 additions & 1 deletion docs/content/configuration/sampling/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,30 @@ Pick the `inspect_duration` based on your needs but ensure that it is longer tha
The advisory subject can have `%s` in it that will be replaced with the event type (like `timeout`) and a `%v` that will be replaced with the value being tracked. Use this to partition the advisories or to help searching a large store of them
{{% /notice %}}

We configure advisories that will inform us about statusses of data, advisories will be published to a Stream with the subject `NODE_DATA_ADVISORIES` and they will be retried a few times should they fail. See [Sampling Advisories](../../monitoring/#sampling-advisories) for details about advisories.
We configure advisories that will inform us about statuses of data, advisories will be published to a Stream with the subject `NODE_DATA_ADVISORIES` and they will be retried a few times should they fail. See [Sampling Advisories](../../monitoring/#sampling-advisories) for details about advisories.

## Forcing copying despite sampling

In some cases you know your data has changed significantly and want to force a copy, since version 0.9.0 you can force a copy using
a specific JSON boolean in your data.

```yaml
streams:
- stream: NODE_DATA
source_url: nats://nats.us-east.example.net:4222
target_url: nats://nats.central.example.net:4222
inspect_field: sender
inspect_force_field: force_copy
inspect_duration: 1h
warn_duration: 12m
size_trigger: 1024
advisory:
subject: NODE_DATA_ADVISORIES
reliable: true
```

Here we have the identical configuration to the previous section but with the added `inspect_force_field` added, in your JSON data if this field is set to `true` the specific message will be copied regardless.

{{% notice style="warning" %}}
Care should be taken to not always set this or set it on changes like timestamps as that will nullify the gains from limiting and potentially DDOS your upstreams.
{{% /notice %}}
1 change: 1 addition & 0 deletions docs/content/monitoring/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ We have extensive Prometheus Metrics about the operation of the system allowing
| `choria_stream_replicator_advisor_publish_errors` | The number of times publishing advisories failed |
| `choria_stream_replicator_advisor_publish_total_messages` | The total number of advisories sent |
| `choria_stream_replicator_limiter_messages_without_limit_field_count` | The number of messages that did not have the data field or header used for limiting/sampling |
| `choria_stream_replicator_limiter_messages_copy_forced_count` | The number of messages copied due to `inspect_force_field` value |
| `choria_stream_replicator_replicator_total_messages` | The total number of messages processed including ones that would be ignored |
| `choria_stream_replicator_replicator_total_bytess` | The size of messages processed including ones that would be ignored |
| `choria_stream_replicator_replicator_handler_error_count` | The number of times the handler failed to process a message |
Expand Down
14 changes: 5 additions & 9 deletions idtrack/idtrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@ type Tracker struct {
sync.Mutex
}

const (
_EMPTY_ = ""
)

func New(ctx context.Context, wg *sync.WaitGroup, interval time.Duration, warn time.Duration, sizeTrigger float64, stateFile string, stream string, worker string, replicator string, nc *nats.Conn, syncSubject string, log *logrus.Entry) (*Tracker, error) {
t := &Tracker{
Items: map[string]*Item{},
Expand Down Expand Up @@ -198,7 +194,7 @@ func (t *Tracker) NotifyExpired(cb func(map[string]Item)) error {

// RecordSeen records that we saw the item
func (t *Tracker) RecordSeen(v string, sz float64) {
if v == _EMPTY_ {
if v == "" {
return
}

Expand Down Expand Up @@ -272,7 +268,7 @@ func (t *Tracker) lastSeen(v string) (time.Time, time.Time, float64) {

// ShouldProcess determines if a message should be processed considering last seen times, size deltas and copied delta
func (t *Tracker) ShouldProcess(v string, sz float64) bool {
if v == _EMPTY_ {
if v == "" {
return true
}

Expand Down Expand Up @@ -319,8 +315,8 @@ func (t *Tracker) loadState() error {
t.Lock()
defer t.Unlock()

if t.stateFile == _EMPTY_ {
t.log.Warnf("Tacker state tracking not configured")
if t.stateFile == "" {
t.log.Warnf("Tracker state tracking not configured")
return nil
}

Expand Down Expand Up @@ -360,7 +356,7 @@ func (t *Tracker) saveState() error {
t.Lock()
defer t.Unlock()

if t.stateFile == _EMPTY_ {
if t.stateFile == "" {
return nil
}

Expand Down
90 changes: 59 additions & 31 deletions limiter/memory/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,42 +19,45 @@ import (
)

type limiter struct {
jsonField string
header string
token int
duration time.Duration
replicator string
stream string
name string
processed *idtrack.Tracker
stateFile string
syncSubj string
log *logrus.Entry
mu *sync.Mutex
jsonField string
jsonForceField string
header string
token int
duration time.Duration
replicator string
stream string
name string
processed *idtrack.Tracker
stateFile string
syncSubj string
log *logrus.Entry
mu *sync.Mutex
}

var _EMPTY_ = ""

func New(ctx context.Context, wg *sync.WaitGroup, cfg *config.Stream, name string, replicator string, nc *nats.Conn, log *logrus.Entry) (*limiter, error) {
if cfg.InspectDuration == 0 {
return nil, fmt.Errorf("inspect duration not set, memory limiter can not start")
}
if name == _EMPTY_ {
if name == "" {
return nil, fmt.Errorf("name is not set, memory limiter can not start")
}
if replicator == _EMPTY_ {
if replicator == "" {
return nil, fmt.Errorf("replicator name is required")
}
if cfg.InspectJSONForceField != "" && cfg.InspectJSONField == "" {
return nil, fmt.Errorf("forcing based on json field requires both inspect_field and inspect_force_field set")
}

l := &limiter{
name: name,
duration: cfg.InspectDuration,
jsonField: cfg.InspectJSONField,
header: cfg.InspectHeaderValue,
token: cfg.InspectSubjectToken,
stateFile: cfg.StateFile,
stream: cfg.Stream,
replicator: replicator,
name: name,
duration: cfg.InspectDuration,
jsonField: cfg.InspectJSONField,
jsonForceField: cfg.InspectJSONForceField,
header: cfg.InspectHeaderValue,
token: cfg.InspectSubjectToken,
stateFile: cfg.StateFile,
stream: cfg.Stream,
replicator: replicator,
log: log.WithFields(logrus.Fields{
"limiter": "memory",
"duration": cfg.InspectDuration.String(),
Expand All @@ -67,10 +70,13 @@ func New(ctx context.Context, wg *sync.WaitGroup, cfg *config.Stream, name strin
}

switch {
case l.jsonField != _EMPTY_:
case l.jsonField != "":
l.log = l.log.WithField("field", l.jsonField)
if l.jsonForceField != "" {
l.log = l.log.WithField("force_field", l.jsonForceField)
}

case l.header != _EMPTY_:
case l.header != "":
l.log = l.log.WithField("header", l.header)

case l.token != 0:
Expand Down Expand Up @@ -98,9 +104,21 @@ func (l *limiter) Tracker() *idtrack.Tracker {

func (l *limiter) ProcessAndRecord(msg *nats.Msg, f func(msg *nats.Msg, process bool) error) error {
var trackValue string
var mustCopy bool

switch {
case l.jsonField != _EMPTY_:
case l.jsonForceField != "" && l.jsonField != "":
res := gjson.GetBytes(msg.Data, l.jsonField)
if res.Exists() {
trackValue = res.String()
}

res = gjson.GetBytes(msg.Data, l.jsonForceField)
if res.Exists() {
mustCopy = true
}

case l.jsonField != "":
res := gjson.GetBytes(msg.Data, l.jsonField)
if res.Exists() {
trackValue = res.String()
Expand All @@ -115,24 +133,34 @@ func (l *limiter) ProcessAndRecord(msg *nats.Msg, f func(msg *nats.Msg, process
trackValue = parts[l.token-1]
}

case l.header != _EMPTY_:
case l.header != "":
trackValue = msg.Header.Get(l.header)
}

if trackValue == _EMPTY_ {
if trackValue == "" {
limiterMessagesWithoutTrackingFieldCount.WithLabelValues("memory", l.name, l.replicator).Inc()
}

if mustCopy {
limiterMessageForcedByField.WithLabelValues("memory", l.name, l.replicator).Inc()
}

sz := float64(len(msg.Data))
shouldProcess := l.processed.ShouldProcess(trackValue, sz)
var shouldProcess bool

if mustCopy {
shouldProcess = true
} else {
shouldProcess = l.processed.ShouldProcess(trackValue, sz)
}
l.processed.RecordSeen(trackValue, sz)

err := f(msg, shouldProcess)
if err != nil {
return err
}

if shouldProcess {
if shouldProcess || mustCopy {
l.processed.RecordCopied(trackValue)
}

Expand Down
38 changes: 38 additions & 0 deletions limiter/memory/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,44 @@ var _ = Describe("Limiter", func() {
})).ToNot(HaveOccurred())
})

It("Should handle present force copy fields", func() {
cfg.InspectJSONForceField = "force_copy"
cfg.InspectJSONField = "sender"

limiter, err := New(ctx, &wg, cfg, "GINKGO", "GINKGO", nil, log)
Expect(err).ToNot(HaveOccurred())

msg := nats.NewMsg("test")
msg.Data = []byte(`{"sender":"some.node"}`)

processed := 0
skipped := 0

handler := func(msg *nats.Msg, process bool) error {
if process {
processed++
} else {
skipped++
}

return nil
}

Expect(limiter.ProcessAndRecord(msg, handler)).ToNot(HaveOccurred())
Expect(limiter.ProcessAndRecord(msg, handler)).ToNot(HaveOccurred())
Expect(processed).To(Equal(1))
Expect(skipped).To(Equal(1))
msg.Data = []byte(`{"sender":"some.node", "force_copy":true}`)
Expect(limiter.ProcessAndRecord(msg, handler)).ToNot(HaveOccurred())
Expect(limiter.ProcessAndRecord(msg, handler)).ToNot(HaveOccurred())
Expect(processed).To(Equal(3))
Expect(skipped).To(Equal(1))
msg.Data = []byte(`{"sender":"some.node"}`)
Expect(limiter.ProcessAndRecord(msg, handler)).ToNot(HaveOccurred())
Expect(processed).To(Equal(3))
Expect(skipped).To(Equal(2))
})

It("Should handle present json fields", func() {
cfg.InspectJSONField = "sender"
limiter, err := New(ctx, &wg, cfg, "GINKGO", "GINKGO", nil, log)
Expand Down
6 changes: 6 additions & 0 deletions limiter/memory/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,14 @@ var (
Name: prometheus.BuildFQName("choria_stream_replicator", "limiter", "messages_without_limit_field_count"),
Help: "The number of messages that did not have the data field or header used for limiting",
}, []string{"limiter", "stream", "replicator"})

limiterMessageForcedByField = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("choria_stream_replicator", "limiter", "messages_copy_forced_count"),
Help: "The number of messages that were copied due to matching on the force copy value",
}, []string{"limiter", "stream", "replicator"})
)

func init() {
prometheus.MustRegister(limiterMessagesWithoutTrackingFieldCount)
prometheus.MustRegister(limiterMessageForcedByField)
}
Loading

0 comments on commit 0689d59

Please sign in to comment.