Skip to content

Commit

Permalink
quota: realtime usage update for metrics, logical/physical sizes, and…
Browse files Browse the repository at this point in the history
… data points

Without this patch, it is possible for some namespaces to have a window of 1-3 `carbonserver.quota-usage-report-frequency`
to conduct quota enforcement penetration.

There are two cases that need to be addressed, dir nodes that are already annotated with proper quota config, nodes that
don't have quota config due to realtime insert via *CarbonserverListener.newMetricsChan.

For annotated nodes, trie index resovle it via realtime metric counter update in *trieIndex.insert.

For unannotated nodes, a new config are introduced, which is called transient-child-limit. With this config, during realtime
insert, trieIndex would annotate the new dir nodes of parent nodes that enables that config. This is not the most proper way
to address the issue, but a trade off of correctness, reliability, complexity, and performance. Re-iterating all quota configs
for every realtime insert is nice but expensive. However, as always, I could be wrong. But this seems to be working in my tests.
  • Loading branch information
bom-d-van committed Jul 25, 2022
1 parent 2671774 commit 2073cf3
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 8 deletions.
13 changes: 13 additions & 0 deletions carbon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,17 @@ retentions = 60:43200,3600:43800`), 0644)

func (c *Config) getCarbonserverQuotas() (quotas []*carbonserver.Quota) {
for _, q := range c.Whisper.Quotas {
var transientChild *carbonserver.Quota
if q.TransientChildLimit > 0 {
transientChild = &carbonserver.Quota{
Pattern: "transient",
Metrics: q.TransientChildLimit,
Throughput: q.TransientChildLimit * 10,
IsTransient: true,
DroppingPolicy: carbonserver.QDPNew,
}
}

quotas = append(quotas, &carbonserver.Quota{
Pattern: q.Pattern,
Namespaces: q.Namespaces,
Expand All @@ -430,6 +441,8 @@ func (c *Config) getCarbonserverQuotas() (quotas []*carbonserver.Quota) {
Throughput: q.Throughput,
DroppingPolicy: carbonserver.ParseQuotaDroppingPolicy(q.DroppingPolicy),
StatMetricPrefix: q.StatMetricPrefix,

TransientChild: transientChild,
})
}

Expand Down
67 changes: 61 additions & 6 deletions carbonserver/trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,9 @@ type dirMeta struct {

func newDirMeta() *dirMeta { return &dirMeta{usage: &QuotaUsage{}} }

func (*dirMeta) trieMeta() {}
func (dm *dirMeta) update(quota *Quota) { dm.quota.Store(quota) }
func (*dirMeta) trieMeta() {}
func (dm *dirMeta) setQuota(quota *Quota) { dm.quota.Store(quota) }
func (dm *dirMeta) getQuota() *Quota { return dm.quota.Load().(*Quota) }

func (dm *dirMeta) withinQuota(metrics, namespaces, logical, physical, dataPoints int64) bool {
quota, ok := dm.quota.Load().(*Quota)
Expand Down Expand Up @@ -538,6 +539,8 @@ func (ti *trieIndex) insert(path string, logicalSize, physicalSize, dataPoints,
var start, nlen int
var sn, newn *trieNode
var cur = ti.root
var dirNodes = [16]*trieNode{ti.root} // why arrray: avoiding heap allocation
var dirNodesIdx = 1
outer:
// why len(path)+1: make sure the last node is also processed in the loop
for i := 0; i < len(path)+1; i++ {
Expand Down Expand Up @@ -649,12 +652,36 @@ outer:
if child.dir() {
cur = child
cur.gen = ti.root.gen

if dirNodesIdx < len(dirNodes) {
dirNodes[dirNodesIdx] = child
dirNodesIdx++
}
continue outer
}
}

if i < len(path) {
newn = ti.newDir()

if dirNodesIdx < len(dirNodes) {
if dirNodes[dirNodesIdx-1].meta != nil {
// Purpose of transient child?
//
//
pmeta := dirNodes[dirNodesIdx-1].meta
pquota, ok := pmeta.(*dirMeta).quota.Load().(*Quota)
if ok && pquota.TransientChild != nil {
meta := newDirMeta()
meta.setQuota(pquota.TransientChild)
newn.meta = meta
}
}

dirNodes[dirNodesIdx] = newn
dirNodesIdx++
}

cur.addChild(newn)
cur = newn
}
Expand Down Expand Up @@ -720,7 +747,24 @@ outer:
cur.addChild(child)
cur = child

// TODO: need to support realtime and concurrent index?
ti.fileCount++

for i := 0; i < dirNodesIdx; i++ {
if dirNodes[i] == nil {
continue
}

meta, ok := dirNodes[i].meta.(*dirMeta)
if !ok || meta.usage == nil {
continue
}

atomic.AddInt64(&meta.usage.Metrics, 1)
atomic.AddInt64(&meta.usage.LogicalSize, logicalSize)
atomic.AddInt64(&meta.usage.PhysicalSize, physicalSize)
atomic.AddInt64(&meta.usage.DataPoints, dataPoints)
}
}

return cur, nil
Expand Down Expand Up @@ -1570,8 +1614,14 @@ type Quota struct {

DroppingPolicy QuotaDroppingPolicy
StatMetricPrefix string

// See details in trieIndex.insert
TransientChild *Quota
IsTransient bool // for avoid emitting metrics for transient quota nodes.
}

// var transientQuota = &Quota{Metrics: 10000, Throughput: 10000, DroppingPolicy: QDPNew, StatMetricPrefix: "transient"}

func (q *Quota) String() string {
return fmt.Sprintf("pattern:%s,dirs:%d,files:%d,points:%d,logical:%d,physical:%d,throughput:%d,policy:%s", q.Pattern, q.Namespaces, q.Metrics, q.DataPoints, q.LogicalSize, q.PhysicalSize, q.Throughput, q.DroppingPolicy)
}
Expand Down Expand Up @@ -1788,7 +1838,7 @@ func (ti *trieIndex) applyQuotas(resetFrequency time.Duration, quotas ...*Quota)
if quota.Pattern == "/" {
if !updateChecker["/"] {
meta := ti.root.meta.(*dirMeta)
meta.update(quota)
meta.setQuota(quota)
ti.throughputs.store("/", newThroughputUsagePerNamespace(ti.root.gen, quota, meta.usage))

updateChecker["/"] = true
Expand All @@ -1804,6 +1854,7 @@ func (ti *trieIndex) applyQuotas(resetFrequency time.Duration, quotas ...*Quota)

for i, node := range nodes {
if node.meta == nil {
// TODO: has data race in throttle, generateQuotaAndUsageMetrics and other places?
node.meta = newDirMeta()
}

Expand All @@ -1819,7 +1870,7 @@ func (ti *trieIndex) applyQuotas(resetFrequency time.Duration, quotas ...*Quota)

if !updateChecker[paths[i]] {
ti.throughputs.store(paths[i], newThroughputUsagePerNamespace(ti.root.gen, quota, meta.usage))
meta.update(quota)
meta.setQuota(quota)

updateChecker[paths[i]] = true
}
Expand All @@ -1838,7 +1889,7 @@ func (ti *trieIndex) applyQuotas(resetFrequency time.Duration, quotas ...*Quota)
}

// refreshUsage updates usage data and generate stat metrics.
// It can't be evoked with concurrent trieIndex.insert.
// It can't be evoked with concurrently trieIndex.insert.
func (ti *trieIndex) refreshUsage(throughputs *throughputQuotaManager) (files uint64) {
if throughputs == nil {
throughputs = newQuotaThroughputQuotaManager()
Expand Down Expand Up @@ -1895,8 +1946,10 @@ func (ti *trieIndex) refreshUsage(throughputs *throughputQuotaManager) (files ui
tname = name
}
var prefix string
var isTransient bool
if quota, ok := cur.node.meta.(*dirMeta).quota.Load().(*Quota); ok {
prefix = quota.StatMetricPrefix
isTransient = quota.IsTransient
}

var throughput int64
Expand All @@ -1906,7 +1959,9 @@ func (ti *trieIndex) refreshUsage(throughputs *throughputQuotaManager) (files ui

throttled := atomic.LoadInt64(&usage.Throttled)

ti.generateQuotaAndUsageMetrics(prefix, strings.ReplaceAll(name, ".", "-"), cur.node, throughput, throttled)
if !isTransient {
ti.generateQuotaAndUsageMetrics(prefix, strings.ReplaceAll(name, ".", "-"), cur.node, throughput, throttled)
}

if throttled > 0 {
atomic.AddInt64(&usage.Throttled, -throttled)
Expand Down
99 changes: 97 additions & 2 deletions carbonserver/trie_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1493,6 +1493,11 @@ func TestTrieQuotaConcurrentApplyAndEnforce(t *testing.T) {
tindex.prune()

endc := make(chan struct{})
defer func() {
// make sure that tindex.applyQuotas ends properly.
endc <- struct{}{}
}()

loopCount := 10240

go func() {
Expand Down Expand Up @@ -1527,9 +1532,99 @@ func TestTrieQuotaConcurrentApplyAndEnforce(t *testing.T) {
break
}
}
}

func TestTrieQuotaRealtimeEnforcement(t *testing.T) {
tindex := newTrie(
".wsp",
func(metric string) (size, dataPoints int64) {
return 12 * 1024, 1024
},
)

tindex.root.gen++
tindex.insert("/sys/app/server-001/cpu.wsp", 0, 0, 0, 0)
tindex.insert("/sys/app/server-002/cpu.wsp", 0, 0, 0, 0)
tindex.insert("/sys/app/server-003/cpu.wsp", 0, 0, 0, 0)
tindex.prune()

refreshTriggers := struct {
start chan struct{}
done chan struct{}
end chan struct{}
}{
start: make(chan struct{}, 0),
done: make(chan struct{}, 0),
end: make(chan struct{}, 0),
}
defer func() {
// make sure that tindex.applyQuotas ends properly.
refreshTriggers.end <- struct{}{}
}()

go func() {
for {
select {
case <-refreshTriggers.end:
return
case <-refreshTriggers.start:
}

// make sure that tindex.applyQuotas ends properly.
endc <- struct{}{}
tindex.applyQuotas(
time.Minute,
&Quota{
Pattern: "/",
},
&Quota{
Pattern: "sys.app",
TransientChild: &Quota{
Pattern: "transient",
Metrics: 1_000,
DroppingPolicy: QDPNew,
},
},
&Quota{
Pattern: "sys.app.*",
Metrics: 10_000,
DroppingPolicy: QDPNew,
},
&Quota{
Pattern: "sys.app.server-001",
Metrics: 500_000,
},
)

refreshTriggers.done <- struct{}{}
}
}()

refreshTriggers.start <- struct{}{}
<-refreshTriggers.done

t.Run("existing quota meta", func(t *testing.T) {
for i := 0; i < 500_000; i++ {
tindex.insert(fmt.Sprintf("/sys/app/server-001/cpu-%d.wsp", i), 0, 0, 0, 0)
}
if !tindex.throttle(&points.Points{Metric: "sys.app.server-001.memory", Data: []points.Point{{}, {}, {}, {}}}, false) {
t.Error("should throlle with existing meta nodes with realtime stat")
}
})

t.Run("transient quota meta", func(t *testing.T) {
for i := 0; i < 1_000; i++ {
tindex.insert(fmt.Sprintf("/sys/app/server-004/cpu-%d.wsp", i), 0, 0, 0, 0)
}
if !tindex.throttle(&points.Points{Metric: "sys.app.server-004.memory", Data: []points.Point{{}, {}, {}, {}}}, false) {
t.Error("should throlle with transient meta nodes with realtime stat")
}

refreshTriggers.start <- struct{}{}
<-refreshTriggers.done

if tindex.throttle(&points.Points{Metric: "sys.app.server-004.memory", Data: []points.Point{{}, {}, {}, {}}}, false) {
t.Error("should not throttle with proper quota annotation")
}
})
}

func TestTrieQuotaWithProperHierarchicalThroughputEnforcement(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions persister/ini.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strings"
)

// TODO: migrate to toml?
func parseIniFile(filename string) ([]map[string]string, error) {
body, err := ioutil.ReadFile(filename)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions persister/whisper_quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type Quota struct {
Throughput int64
DroppingPolicy string
StatMetricPrefix string

TransientChildLimit int64
}

type WhisperQuotas []Quota
Expand Down Expand Up @@ -70,6 +72,9 @@ func ReadWhisperQuotas(filename string) (WhisperQuotas, error) {
if quota.Throughput, err = parseInt(section, "throughput"); err != nil {
return nil, err
}
if quota.TransientChildLimit, err = parseInt(section, "transient-child-limit"); err != nil {
return nil, err
}

switch v := section["dropping-policy"]; v {
case "new", "none", "":
Expand Down

0 comments on commit 2073cf3

Please sign in to comment.