From 2073cf3214694d7e858f6888b3062ad9a8eba43b Mon Sep 17 00:00:00 2001 From: bom-d-van Date: Tue, 28 Jun 2022 08:49:12 +0200 Subject: [PATCH] quota: realtime usage update for metrics, logical/physical sizes, and data points Without this patch, it is possible for some namespaces to have a window of 1-3 `carbonserver.quota-usage-report-frequency` to conduct quota enforcement penetration. There are two cases that need to be addressed, dir nodes that are already annotated with proper quota config, nodes that don't have quota config due to realtime insert via *CarbonserverListener.newMetricsChan. For annotated nodes, trie index resovle it via realtime metric counter update in *trieIndex.insert. For unannotated nodes, a new config are introduced, which is called transient-child-limit. With this config, during realtime insert, trieIndex would annotate the new dir nodes of parent nodes that enables that config. This is not the most proper way to address the issue, but a trade off of correctness, reliability, complexity, and performance. Re-iterating all quota configs for every realtime insert is nice but expensive. However, as always, I could be wrong. But this seems to be working in my tests. --- carbon/config.go | 13 +++++ carbonserver/trie.go | 67 +++++++++++++++++++++++--- carbonserver/trie_test.go | 99 +++++++++++++++++++++++++++++++++++++- persister/ini.go | 1 + persister/whisper_quota.go | 5 ++ 5 files changed, 177 insertions(+), 8 deletions(-) diff --git a/carbon/config.go b/carbon/config.go index 71034930f..48273154c 100644 --- a/carbon/config.go +++ b/carbon/config.go @@ -420,6 +420,17 @@ retentions = 60:43200,3600:43800`), 0644) func (c *Config) getCarbonserverQuotas() (quotas []*carbonserver.Quota) { for _, q := range c.Whisper.Quotas { + var transientChild *carbonserver.Quota + if q.TransientChildLimit > 0 { + transientChild = &carbonserver.Quota{ + Pattern: "transient", + Metrics: q.TransientChildLimit, + Throughput: q.TransientChildLimit * 10, + IsTransient: true, + DroppingPolicy: carbonserver.QDPNew, + } + } + quotas = append(quotas, &carbonserver.Quota{ Pattern: q.Pattern, Namespaces: q.Namespaces, @@ -430,6 +441,8 @@ func (c *Config) getCarbonserverQuotas() (quotas []*carbonserver.Quota) { Throughput: q.Throughput, DroppingPolicy: carbonserver.ParseQuotaDroppingPolicy(q.DroppingPolicy), StatMetricPrefix: q.StatMetricPrefix, + + TransientChild: transientChild, }) } diff --git a/carbonserver/trie.go b/carbonserver/trie.go index bc3694e65..03aad9c72 100644 --- a/carbonserver/trie.go +++ b/carbonserver/trie.go @@ -380,8 +380,9 @@ type dirMeta struct { func newDirMeta() *dirMeta { return &dirMeta{usage: &QuotaUsage{}} } -func (*dirMeta) trieMeta() {} -func (dm *dirMeta) update(quota *Quota) { dm.quota.Store(quota) } +func (*dirMeta) trieMeta() {} +func (dm *dirMeta) setQuota(quota *Quota) { dm.quota.Store(quota) } +func (dm *dirMeta) getQuota() *Quota { return dm.quota.Load().(*Quota) } func (dm *dirMeta) withinQuota(metrics, namespaces, logical, physical, dataPoints int64) bool { quota, ok := dm.quota.Load().(*Quota) @@ -538,6 +539,8 @@ func (ti *trieIndex) insert(path string, logicalSize, physicalSize, dataPoints, var start, nlen int var sn, newn *trieNode var cur = ti.root + var dirNodes = [16]*trieNode{ti.root} // why arrray: avoiding heap allocation + var dirNodesIdx = 1 outer: // why len(path)+1: make sure the last node is also processed in the loop for i := 0; i < len(path)+1; i++ { @@ -649,12 +652,36 @@ outer: if child.dir() { cur = child cur.gen = ti.root.gen + + if dirNodesIdx < len(dirNodes) { + dirNodes[dirNodesIdx] = child + dirNodesIdx++ + } continue outer } } if i < len(path) { newn = ti.newDir() + + if dirNodesIdx < len(dirNodes) { + if dirNodes[dirNodesIdx-1].meta != nil { + // Purpose of transient child? + // + // + pmeta := dirNodes[dirNodesIdx-1].meta + pquota, ok := pmeta.(*dirMeta).quota.Load().(*Quota) + if ok && pquota.TransientChild != nil { + meta := newDirMeta() + meta.setQuota(pquota.TransientChild) + newn.meta = meta + } + } + + dirNodes[dirNodesIdx] = newn + dirNodesIdx++ + } + cur.addChild(newn) cur = newn } @@ -720,7 +747,24 @@ outer: cur.addChild(child) cur = child + // TODO: need to support realtime and concurrent index? ti.fileCount++ + + for i := 0; i < dirNodesIdx; i++ { + if dirNodes[i] == nil { + continue + } + + meta, ok := dirNodes[i].meta.(*dirMeta) + if !ok || meta.usage == nil { + continue + } + + atomic.AddInt64(&meta.usage.Metrics, 1) + atomic.AddInt64(&meta.usage.LogicalSize, logicalSize) + atomic.AddInt64(&meta.usage.PhysicalSize, physicalSize) + atomic.AddInt64(&meta.usage.DataPoints, dataPoints) + } } return cur, nil @@ -1570,8 +1614,14 @@ type Quota struct { DroppingPolicy QuotaDroppingPolicy StatMetricPrefix string + + // See details in trieIndex.insert + TransientChild *Quota + IsTransient bool // for avoid emitting metrics for transient quota nodes. } +// var transientQuota = &Quota{Metrics: 10000, Throughput: 10000, DroppingPolicy: QDPNew, StatMetricPrefix: "transient"} + func (q *Quota) String() string { return fmt.Sprintf("pattern:%s,dirs:%d,files:%d,points:%d,logical:%d,physical:%d,throughput:%d,policy:%s", q.Pattern, q.Namespaces, q.Metrics, q.DataPoints, q.LogicalSize, q.PhysicalSize, q.Throughput, q.DroppingPolicy) } @@ -1788,7 +1838,7 @@ func (ti *trieIndex) applyQuotas(resetFrequency time.Duration, quotas ...*Quota) if quota.Pattern == "/" { if !updateChecker["/"] { meta := ti.root.meta.(*dirMeta) - meta.update(quota) + meta.setQuota(quota) ti.throughputs.store("/", newThroughputUsagePerNamespace(ti.root.gen, quota, meta.usage)) updateChecker["/"] = true @@ -1804,6 +1854,7 @@ func (ti *trieIndex) applyQuotas(resetFrequency time.Duration, quotas ...*Quota) for i, node := range nodes { if node.meta == nil { + // TODO: has data race in throttle, generateQuotaAndUsageMetrics and other places? node.meta = newDirMeta() } @@ -1819,7 +1870,7 @@ func (ti *trieIndex) applyQuotas(resetFrequency time.Duration, quotas ...*Quota) if !updateChecker[paths[i]] { ti.throughputs.store(paths[i], newThroughputUsagePerNamespace(ti.root.gen, quota, meta.usage)) - meta.update(quota) + meta.setQuota(quota) updateChecker[paths[i]] = true } @@ -1838,7 +1889,7 @@ func (ti *trieIndex) applyQuotas(resetFrequency time.Duration, quotas ...*Quota) } // refreshUsage updates usage data and generate stat metrics. -// It can't be evoked with concurrent trieIndex.insert. +// It can't be evoked with concurrently trieIndex.insert. func (ti *trieIndex) refreshUsage(throughputs *throughputQuotaManager) (files uint64) { if throughputs == nil { throughputs = newQuotaThroughputQuotaManager() @@ -1895,8 +1946,10 @@ func (ti *trieIndex) refreshUsage(throughputs *throughputQuotaManager) (files ui tname = name } var prefix string + var isTransient bool if quota, ok := cur.node.meta.(*dirMeta).quota.Load().(*Quota); ok { prefix = quota.StatMetricPrefix + isTransient = quota.IsTransient } var throughput int64 @@ -1906,7 +1959,9 @@ func (ti *trieIndex) refreshUsage(throughputs *throughputQuotaManager) (files ui throttled := atomic.LoadInt64(&usage.Throttled) - ti.generateQuotaAndUsageMetrics(prefix, strings.ReplaceAll(name, ".", "-"), cur.node, throughput, throttled) + if !isTransient { + ti.generateQuotaAndUsageMetrics(prefix, strings.ReplaceAll(name, ".", "-"), cur.node, throughput, throttled) + } if throttled > 0 { atomic.AddInt64(&usage.Throttled, -throttled) diff --git a/carbonserver/trie_test.go b/carbonserver/trie_test.go index fd4ff8375..be6a6a0d0 100644 --- a/carbonserver/trie_test.go +++ b/carbonserver/trie_test.go @@ -1493,6 +1493,11 @@ func TestTrieQuotaConcurrentApplyAndEnforce(t *testing.T) { tindex.prune() endc := make(chan struct{}) + defer func() { + // make sure that tindex.applyQuotas ends properly. + endc <- struct{}{} + }() + loopCount := 10240 go func() { @@ -1527,9 +1532,99 @@ func TestTrieQuotaConcurrentApplyAndEnforce(t *testing.T) { break } } +} + +func TestTrieQuotaRealtimeEnforcement(t *testing.T) { + tindex := newTrie( + ".wsp", + func(metric string) (size, dataPoints int64) { + return 12 * 1024, 1024 + }, + ) + + tindex.root.gen++ + tindex.insert("/sys/app/server-001/cpu.wsp", 0, 0, 0, 0) + tindex.insert("/sys/app/server-002/cpu.wsp", 0, 0, 0, 0) + tindex.insert("/sys/app/server-003/cpu.wsp", 0, 0, 0, 0) + tindex.prune() + + refreshTriggers := struct { + start chan struct{} + done chan struct{} + end chan struct{} + }{ + start: make(chan struct{}, 0), + done: make(chan struct{}, 0), + end: make(chan struct{}, 0), + } + defer func() { + // make sure that tindex.applyQuotas ends properly. + refreshTriggers.end <- struct{}{} + }() + + go func() { + for { + select { + case <-refreshTriggers.end: + return + case <-refreshTriggers.start: + } - // make sure that tindex.applyQuotas ends properly. - endc <- struct{}{} + tindex.applyQuotas( + time.Minute, + &Quota{ + Pattern: "/", + }, + &Quota{ + Pattern: "sys.app", + TransientChild: &Quota{ + Pattern: "transient", + Metrics: 1_000, + DroppingPolicy: QDPNew, + }, + }, + &Quota{ + Pattern: "sys.app.*", + Metrics: 10_000, + DroppingPolicy: QDPNew, + }, + &Quota{ + Pattern: "sys.app.server-001", + Metrics: 500_000, + }, + ) + + refreshTriggers.done <- struct{}{} + } + }() + + refreshTriggers.start <- struct{}{} + <-refreshTriggers.done + + t.Run("existing quota meta", func(t *testing.T) { + for i := 0; i < 500_000; i++ { + tindex.insert(fmt.Sprintf("/sys/app/server-001/cpu-%d.wsp", i), 0, 0, 0, 0) + } + if !tindex.throttle(&points.Points{Metric: "sys.app.server-001.memory", Data: []points.Point{{}, {}, {}, {}}}, false) { + t.Error("should throlle with existing meta nodes with realtime stat") + } + }) + + t.Run("transient quota meta", func(t *testing.T) { + for i := 0; i < 1_000; i++ { + tindex.insert(fmt.Sprintf("/sys/app/server-004/cpu-%d.wsp", i), 0, 0, 0, 0) + } + if !tindex.throttle(&points.Points{Metric: "sys.app.server-004.memory", Data: []points.Point{{}, {}, {}, {}}}, false) { + t.Error("should throlle with transient meta nodes with realtime stat") + } + + refreshTriggers.start <- struct{}{} + <-refreshTriggers.done + + if tindex.throttle(&points.Points{Metric: "sys.app.server-004.memory", Data: []points.Point{{}, {}, {}, {}}}, false) { + t.Error("should not throttle with proper quota annotation") + } + }) } func TestTrieQuotaWithProperHierarchicalThroughputEnforcement(t *testing.T) { diff --git a/persister/ini.go b/persister/ini.go index 40cb179e5..7ef25561c 100644 --- a/persister/ini.go +++ b/persister/ini.go @@ -7,6 +7,7 @@ import ( "strings" ) +// TODO: migrate to toml? func parseIniFile(filename string) ([]map[string]string, error) { body, err := ioutil.ReadFile(filename) if err != nil { diff --git a/persister/whisper_quota.go b/persister/whisper_quota.go index be115de60..b92e15a98 100644 --- a/persister/whisper_quota.go +++ b/persister/whisper_quota.go @@ -18,6 +18,8 @@ type Quota struct { Throughput int64 DroppingPolicy string StatMetricPrefix string + + TransientChildLimit int64 } type WhisperQuotas []Quota @@ -70,6 +72,9 @@ func ReadWhisperQuotas(filename string) (WhisperQuotas, error) { if quota.Throughput, err = parseInt(section, "throughput"); err != nil { return nil, err } + if quota.TransientChildLimit, err = parseInt(section, "transient-child-limit"); err != nil { + return nil, err + } switch v := section["dropping-policy"]; v { case "new", "none", "":