diff --git a/advisor/advisor.go b/advisor/advisor.go index 41fe40c..ff3bb8e 100644 --- a/advisor/advisor.go +++ b/advisor/advisor.go @@ -54,10 +54,6 @@ var ( AdvisoryProtocol = "io.choria.sr.v2.age_advisory" ) -const ( - _EMPTY_ = "" -) - type Advisor struct { cfg *config.Advisory nc *nats.Conn @@ -156,7 +152,7 @@ func (a *Advisor) publisher(ctx context.Context, wg *sync.WaitGroup) { msg := nats.NewMsg(subject) msg.Data = d - if advisory.EventID != _EMPTY_ { + if advisory.EventID != "" { msg.Header.Add(api.JSMsgId, advisory.EventID) } diff --git a/config/config.go b/config/config.go index 2394a6f..8fcafb5 100644 --- a/config/config.go +++ b/config/config.go @@ -91,6 +91,8 @@ type Stream struct { MaxAgeString string `json:"max_age"` // InspectJSONField will inspect a specific field in JSON payloads and limit sends by this field InspectJSONField string `json:"inspect_field"` + // InspectJSONForceField will inspect a specific field in the JSON payloads and force the limiter to publish the message + InspectJSONForceField string `json:"inspect_force_field"` // InspectHeaderValue inspects the value of a header and does limiting based on that InspectHeaderValue string `json:"inspect_header"` // InspectSubjectToken inspects a certain token and limits based on that, -1 inspects the entire subject, 0 disables diff --git a/docs/content/configuration/sampling/_index.md b/docs/content/configuration/sampling/_index.md index 3ce4d65..61b52f6 100644 --- a/docs/content/configuration/sampling/_index.md +++ b/docs/content/configuration/sampling/_index.md @@ -87,4 +87,30 @@ Pick the `inspect_duration` based on your needs but ensure that it is longer tha The advisory subject can have `%s` in it that will be replaced with the event type (like `timeout`) and a `%v` that will be replaced with the value being tracked. Use this to partition the advisories or to help searching a large store of them {{% /notice %}} -We configure advisories that will inform us about statusses of data, advisories will be published to a Stream with the subject `NODE_DATA_ADVISORIES` and they will be retried a few times should they fail. See [Sampling Advisories](../../monitoring/#sampling-advisories) for details about advisories. +We configure advisories that will inform us about statuses of data, advisories will be published to a Stream with the subject `NODE_DATA_ADVISORIES` and they will be retried a few times should they fail. See [Sampling Advisories](../../monitoring/#sampling-advisories) for details about advisories. + +## Forcing copying despite sampling + +In some cases you know your data has changed significantly and want to force a copy, since version 0.9.0 you can force a copy using +a specific JSON boolean in your data. + +```yaml +streams: + - stream: NODE_DATA + source_url: nats://nats.us-east.example.net:4222 + target_url: nats://nats.central.example.net:4222 + inspect_field: sender + inspect_force_field: force_copy + inspect_duration: 1h + warn_duration: 12m + size_trigger: 1024 + advisory: + subject: NODE_DATA_ADVISORIES + reliable: true +``` + +Here we have the identical configuration to the previous section but with the added `inspect_force_field` added, in your JSON data if this field is set to `true` the specific message will be copied regardless. + +{{% notice style="warning" %}} +Care should be taken to not always set this or set it on changes like timestamps as that will nullify the gains from limiting and potentially DDOS your upstreams. +{{% /notice %}} diff --git a/docs/content/monitoring/_index.md b/docs/content/monitoring/_index.md index 38ed24a..6b1061b 100644 --- a/docs/content/monitoring/_index.md +++ b/docs/content/monitoring/_index.md @@ -194,6 +194,7 @@ We have extensive Prometheus Metrics about the operation of the system allowing | `choria_stream_replicator_advisor_publish_errors` | The number of times publishing advisories failed | | `choria_stream_replicator_advisor_publish_total_messages` | The total number of advisories sent | | `choria_stream_replicator_limiter_messages_without_limit_field_count` | The number of messages that did not have the data field or header used for limiting/sampling | +| `choria_stream_replicator_limiter_messages_copy_forced_count` | The number of messages copied due to `inspect_force_field` value | | `choria_stream_replicator_replicator_total_messages` | The total number of messages processed including ones that would be ignored | | `choria_stream_replicator_replicator_total_bytess` | The size of messages processed including ones that would be ignored | | `choria_stream_replicator_replicator_handler_error_count` | The number of times the handler failed to process a message | diff --git a/idtrack/idtrack.go b/idtrack/idtrack.go index 669deb1..d4838a5 100644 --- a/idtrack/idtrack.go +++ b/idtrack/idtrack.go @@ -52,10 +52,6 @@ type Tracker struct { sync.Mutex } -const ( - _EMPTY_ = "" -) - func New(ctx context.Context, wg *sync.WaitGroup, interval time.Duration, warn time.Duration, sizeTrigger float64, stateFile string, stream string, worker string, replicator string, nc *nats.Conn, syncSubject string, log *logrus.Entry) (*Tracker, error) { t := &Tracker{ Items: map[string]*Item{}, @@ -198,7 +194,7 @@ func (t *Tracker) NotifyExpired(cb func(map[string]Item)) error { // RecordSeen records that we saw the item func (t *Tracker) RecordSeen(v string, sz float64) { - if v == _EMPTY_ { + if v == "" { return } @@ -272,7 +268,7 @@ func (t *Tracker) lastSeen(v string) (time.Time, time.Time, float64) { // ShouldProcess determines if a message should be processed considering last seen times, size deltas and copied delta func (t *Tracker) ShouldProcess(v string, sz float64) bool { - if v == _EMPTY_ { + if v == "" { return true } @@ -319,8 +315,8 @@ func (t *Tracker) loadState() error { t.Lock() defer t.Unlock() - if t.stateFile == _EMPTY_ { - t.log.Warnf("Tacker state tracking not configured") + if t.stateFile == "" { + t.log.Warnf("Tracker state tracking not configured") return nil } @@ -360,7 +356,7 @@ func (t *Tracker) saveState() error { t.Lock() defer t.Unlock() - if t.stateFile == _EMPTY_ { + if t.stateFile == "" { return nil } diff --git a/limiter/memory/limiter.go b/limiter/memory/limiter.go index 37c6364..a195d4c 100644 --- a/limiter/memory/limiter.go +++ b/limiter/memory/limiter.go @@ -19,42 +19,45 @@ import ( ) type limiter struct { - jsonField string - header string - token int - duration time.Duration - replicator string - stream string - name string - processed *idtrack.Tracker - stateFile string - syncSubj string - log *logrus.Entry - mu *sync.Mutex + jsonField string + jsonForceField string + header string + token int + duration time.Duration + replicator string + stream string + name string + processed *idtrack.Tracker + stateFile string + syncSubj string + log *logrus.Entry + mu *sync.Mutex } -var _EMPTY_ = "" - func New(ctx context.Context, wg *sync.WaitGroup, cfg *config.Stream, name string, replicator string, nc *nats.Conn, log *logrus.Entry) (*limiter, error) { if cfg.InspectDuration == 0 { return nil, fmt.Errorf("inspect duration not set, memory limiter can not start") } - if name == _EMPTY_ { + if name == "" { return nil, fmt.Errorf("name is not set, memory limiter can not start") } - if replicator == _EMPTY_ { + if replicator == "" { return nil, fmt.Errorf("replicator name is required") } + if cfg.InspectJSONForceField != "" && cfg.InspectJSONField == "" { + return nil, fmt.Errorf("forcing based on json field requires both inspect_field and inspect_force_field set") + } l := &limiter{ - name: name, - duration: cfg.InspectDuration, - jsonField: cfg.InspectJSONField, - header: cfg.InspectHeaderValue, - token: cfg.InspectSubjectToken, - stateFile: cfg.StateFile, - stream: cfg.Stream, - replicator: replicator, + name: name, + duration: cfg.InspectDuration, + jsonField: cfg.InspectJSONField, + jsonForceField: cfg.InspectJSONForceField, + header: cfg.InspectHeaderValue, + token: cfg.InspectSubjectToken, + stateFile: cfg.StateFile, + stream: cfg.Stream, + replicator: replicator, log: log.WithFields(logrus.Fields{ "limiter": "memory", "duration": cfg.InspectDuration.String(), @@ -67,10 +70,13 @@ func New(ctx context.Context, wg *sync.WaitGroup, cfg *config.Stream, name strin } switch { - case l.jsonField != _EMPTY_: + case l.jsonField != "": l.log = l.log.WithField("field", l.jsonField) + if l.jsonForceField != "" { + l.log = l.log.WithField("force_field", l.jsonForceField) + } - case l.header != _EMPTY_: + case l.header != "": l.log = l.log.WithField("header", l.header) case l.token != 0: @@ -98,9 +104,21 @@ func (l *limiter) Tracker() *idtrack.Tracker { func (l *limiter) ProcessAndRecord(msg *nats.Msg, f func(msg *nats.Msg, process bool) error) error { var trackValue string + var mustCopy bool switch { - case l.jsonField != _EMPTY_: + case l.jsonForceField != "" && l.jsonField != "": + res := gjson.GetBytes(msg.Data, l.jsonField) + if res.Exists() { + trackValue = res.String() + } + + res = gjson.GetBytes(msg.Data, l.jsonForceField) + if res.Exists() { + mustCopy = true + } + + case l.jsonField != "": res := gjson.GetBytes(msg.Data, l.jsonField) if res.Exists() { trackValue = res.String() @@ -115,16 +133,26 @@ func (l *limiter) ProcessAndRecord(msg *nats.Msg, f func(msg *nats.Msg, process trackValue = parts[l.token-1] } - case l.header != _EMPTY_: + case l.header != "": trackValue = msg.Header.Get(l.header) } - if trackValue == _EMPTY_ { + if trackValue == "" { limiterMessagesWithoutTrackingFieldCount.WithLabelValues("memory", l.name, l.replicator).Inc() } + if mustCopy { + limiterMessageForcedByField.WithLabelValues("memory", l.name, l.replicator).Inc() + } + sz := float64(len(msg.Data)) - shouldProcess := l.processed.ShouldProcess(trackValue, sz) + var shouldProcess bool + + if mustCopy { + shouldProcess = true + } else { + shouldProcess = l.processed.ShouldProcess(trackValue, sz) + } l.processed.RecordSeen(trackValue, sz) err := f(msg, shouldProcess) @@ -132,7 +160,7 @@ func (l *limiter) ProcessAndRecord(msg *nats.Msg, f func(msg *nats.Msg, process return err } - if shouldProcess { + if shouldProcess || mustCopy { l.processed.RecordCopied(trackValue) } diff --git a/limiter/memory/limiter_test.go b/limiter/memory/limiter_test.go index 2268d24..bace645 100644 --- a/limiter/memory/limiter_test.go +++ b/limiter/memory/limiter_test.go @@ -67,6 +67,44 @@ var _ = Describe("Limiter", func() { })).ToNot(HaveOccurred()) }) + It("Should handle present force copy fields", func() { + cfg.InspectJSONForceField = "force_copy" + cfg.InspectJSONField = "sender" + + limiter, err := New(ctx, &wg, cfg, "GINKGO", "GINKGO", nil, log) + Expect(err).ToNot(HaveOccurred()) + + msg := nats.NewMsg("test") + msg.Data = []byte(`{"sender":"some.node"}`) + + processed := 0 + skipped := 0 + + handler := func(msg *nats.Msg, process bool) error { + if process { + processed++ + } else { + skipped++ + } + + return nil + } + + Expect(limiter.ProcessAndRecord(msg, handler)).ToNot(HaveOccurred()) + Expect(limiter.ProcessAndRecord(msg, handler)).ToNot(HaveOccurred()) + Expect(processed).To(Equal(1)) + Expect(skipped).To(Equal(1)) + msg.Data = []byte(`{"sender":"some.node", "force_copy":true}`) + Expect(limiter.ProcessAndRecord(msg, handler)).ToNot(HaveOccurred()) + Expect(limiter.ProcessAndRecord(msg, handler)).ToNot(HaveOccurred()) + Expect(processed).To(Equal(3)) + Expect(skipped).To(Equal(1)) + msg.Data = []byte(`{"sender":"some.node"}`) + Expect(limiter.ProcessAndRecord(msg, handler)).ToNot(HaveOccurred()) + Expect(processed).To(Equal(3)) + Expect(skipped).To(Equal(2)) + }) + It("Should handle present json fields", func() { cfg.InspectJSONField = "sender" limiter, err := New(ctx, &wg, cfg, "GINKGO", "GINKGO", nil, log) diff --git a/limiter/memory/stats.go b/limiter/memory/stats.go index 01366c4..e02913a 100644 --- a/limiter/memory/stats.go +++ b/limiter/memory/stats.go @@ -13,8 +13,14 @@ var ( Name: prometheus.BuildFQName("choria_stream_replicator", "limiter", "messages_without_limit_field_count"), Help: "The number of messages that did not have the data field or header used for limiting", }, []string{"limiter", "stream", "replicator"}) + + limiterMessageForcedByField = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: prometheus.BuildFQName("choria_stream_replicator", "limiter", "messages_copy_forced_count"), + Help: "The number of messages that were copied due to matching on the force copy value", + }, []string{"limiter", "stream", "replicator"}) ) func init() { prometheus.MustRegister(limiterMessagesWithoutTrackingFieldCount) + prometheus.MustRegister(limiterMessageForcedByField) } diff --git a/replicator/replicator.go b/replicator/replicator.go index a0f4eca..9b25e95 100644 --- a/replicator/replicator.go +++ b/replicator/replicator.go @@ -64,7 +64,6 @@ const ( pollFrequency = 10 * time.Second srcHeader = "Choria-SR-Source" srcHeaderPattern = "%s %d %s %s %d" - _EMPTY_ = "" ) func (t *Target) Close() error { @@ -81,16 +80,16 @@ func (t *Target) Close() error { } func NewStream(stream *config.Stream, sr *config.Config, log *logrus.Entry) (*Stream, error) { - if stream.Stream == _EMPTY_ { + if stream.Stream == "" { return nil, fmt.Errorf("stream name is required") } - if stream.SourceURL == _EMPTY_ { + if stream.SourceURL == "" { return nil, fmt.Errorf("source_url is required") } - if stream.TargetURL == _EMPTY_ { + if stream.TargetURL == "" { return nil, fmt.Errorf("target_url is required") } - if stream.TargetStream == _EMPTY_ { + if stream.TargetStream == "" { stream.TargetStream = stream.Stream } if stream.TargetInitiated && stream.NoTargetCreate { @@ -98,7 +97,7 @@ func NewStream(stream *config.Stream, sr *config.Config, log *logrus.Entry) (*St } name := "stream_replicator" - if stream.Name != _EMPTY_ { + if stream.Name != "" { if strings.HasPrefix("SR_", stream.Name) { name = stream.Name } else { @@ -112,7 +111,7 @@ func NewStream(stream *config.Stream, sr *config.Config, log *logrus.Entry) (*St cname: name, mu: &sync.Mutex{}, hcInterval: time.Minute, - paused: stream.LeaderElectionName != _EMPTY_, + paused: stream.LeaderElectionName != "", log: log.WithFields(logrus.Fields{ "source": stream.Stream, "target": stream.TargetStream, @@ -131,7 +130,7 @@ func (s *Stream) Run(ctx context.Context, wg *sync.WaitGroup) error { return err } - if s.cfg.InspectJSONField != _EMPTY_ && s.cfg.InspectDuration > 0 { + if s.cfg.InspectJSONField != "" && s.cfg.InspectDuration > 0 { nc, err := s.connectAdvisories(ctx) if err != nil { return err @@ -150,7 +149,7 @@ func (s *Stream) Run(ctx context.Context, wg *sync.WaitGroup) error { } } - if s.cfg.LeaderElectionName != _EMPTY_ { + if s.cfg.LeaderElectionName != "" { err = s.setupElection(ctx) if err != nil { s.log.Errorf("Could not set up elections: %v", err) @@ -183,12 +182,12 @@ func (s *Stream) Run(ctx context.Context, wg *sync.WaitGroup) error { } func (s *Stream) targetForSubject(subj string) string { - if s.cfg.TargetPrefix != _EMPTY_ { + if s.cfg.TargetPrefix != "" { subj = fmt.Sprintf("%s.%s", s.cfg.TargetPrefix, subj) subj = strings.Replace(subj, "..", ".", -1) } - if s.cfg.TargetRemoveString != _EMPTY_ { + if s.cfg.TargetRemoveString != "" { subj = strings.Replace(subj, s.cfg.TargetRemoveString, "", -1) subj = strings.Replace(subj, "..", ".", -1) } @@ -319,19 +318,19 @@ func (s *Stream) connectDestination(ctx context.Context) (err error) { scfg := s.source.cfg s.source.mu.Unlock() - if s.cfg.TargetPrefix != _EMPTY_ || s.cfg.TargetRemoveString != _EMPTY_ { + if s.cfg.TargetPrefix != "" || s.cfg.TargetRemoveString != "" { var subjects []string for _, sub := range scfg.Subjects { var target string - if s.cfg.TargetPrefix != _EMPTY_ { + if s.cfg.TargetPrefix != "" { target = fmt.Sprintf("%s.%s", s.cfg.TargetPrefix, sub) } else { target = sub } - if s.cfg.TargetRemoveString != _EMPTY_ { + if s.cfg.TargetRemoveString != "" { target = strings.Replace(target, s.cfg.TargetRemoveString, "", -1) target = strings.Replace(target, "..", ".", -1) } diff --git a/replicator/src_to_dest_copier.go b/replicator/src_to_dest_copier.go index 5324b4e..b88b047 100644 --- a/replicator/src_to_dest_copier.go +++ b/replicator/src_to_dest_copier.go @@ -68,7 +68,7 @@ func (c *sourceInitiatedCopier) copyMessages(ctx context.Context) error { nc := c.source.nc ib := nc.NewRespInbox() - c.source.sub, err = nc.ChanQueueSubscribe(ib, _EMPTY_, c.msgs) + c.source.sub, err = nc.ChanQueueSubscribe(ib, "", c.msgs) c.source.mu.Unlock() if err != nil { return err @@ -234,7 +234,7 @@ func (c *sourceInitiatedCopier) healthCheckSource() (fixed bool, err error) { jsm.InactiveThreshold(5*pollFrequency)) } - if c.cfg.FilterSubject != _EMPTY_ { + if c.cfg.FilterSubject != "" { opts = append(opts, jsm.FilterStreamBySubject(c.cfg.FilterSubject)) } diff --git a/replicator/target_initiated_copier.go b/replicator/target_initiated_copier.go index 3c12152..d083615 100644 --- a/replicator/target_initiated_copier.go +++ b/replicator/target_initiated_copier.go @@ -370,7 +370,7 @@ func (c *targetInitiatedCopier) recreateEphemeraLocked() (bool, error) { jsm.IdleHeartbeat(20 * time.Second), } - if c.cfg.FilterSubject != _EMPTY_ { + if c.cfg.FilterSubject != "" { opts = append(opts, jsm.FilterStreamBySubject(c.cfg.FilterSubject)) }