Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

quota: realtime usage update for metrics, logical/physical sizes, and data points #474

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions carbon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,17 @@ retentions = 60:43200,3600:43800`), 0644)

func (c *Config) getCarbonserverQuotas() (quotas []*carbonserver.Quota) {
for _, q := range c.Whisper.Quotas {
var transientChild *carbonserver.Quota
if q.TransientChildLimit > 0 {
transientChild = &carbonserver.Quota{
Pattern: "transient",
Metrics: q.TransientChildLimit,
Throughput: q.TransientChildLimit * 10,
IsTransient: true,
DroppingPolicy: carbonserver.QDPNew,
}
}

quotas = append(quotas, &carbonserver.Quota{
Pattern: q.Pattern,
Namespaces: q.Namespaces,
Expand All @@ -430,6 +441,8 @@ func (c *Config) getCarbonserverQuotas() (quotas []*carbonserver.Quota) {
Throughput: q.Throughput,
DroppingPolicy: carbonserver.ParseQuotaDroppingPolicy(q.DroppingPolicy),
StatMetricPrefix: q.StatMetricPrefix,

TransientChild: transientChild,
})
}

Expand Down
67 changes: 61 additions & 6 deletions carbonserver/trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,9 @@ type dirMeta struct {

func newDirMeta() *dirMeta { return &dirMeta{usage: &QuotaUsage{}} }

func (*dirMeta) trieMeta() {}
func (dm *dirMeta) update(quota *Quota) { dm.quota.Store(quota) }
func (*dirMeta) trieMeta() {}
func (dm *dirMeta) setQuota(quota *Quota) { dm.quota.Store(quota) }
func (dm *dirMeta) getQuota() *Quota { return dm.quota.Load().(*Quota) }

func (dm *dirMeta) withinQuota(metrics, namespaces, logical, physical, dataPoints int64) bool {
quota, ok := dm.quota.Load().(*Quota)
Expand Down Expand Up @@ -538,6 +539,8 @@ func (ti *trieIndex) insert(path string, logicalSize, physicalSize, dataPoints,
var start, nlen int
var sn, newn *trieNode
var cur = ti.root
var dirNodes = [16]*trieNode{ti.root} // why arrray: avoiding heap allocation
var dirNodesIdx = 1
outer:
// why len(path)+1: make sure the last node is also processed in the loop
for i := 0; i < len(path)+1; i++ {
Expand Down Expand Up @@ -649,12 +652,36 @@ outer:
if child.dir() {
cur = child
cur.gen = ti.root.gen

if dirNodesIdx < len(dirNodes) {
dirNodes[dirNodesIdx] = child
dirNodesIdx++
}
continue outer
}
}

if i < len(path) {
newn = ti.newDir()

if dirNodesIdx < len(dirNodes) {
if dirNodes[dirNodesIdx-1].meta != nil {
// Purpose of transient child?
//
//
pmeta := dirNodes[dirNodesIdx-1].meta
pquota, ok := pmeta.(*dirMeta).quota.Load().(*Quota)
if ok && pquota.TransientChild != nil {
meta := newDirMeta()
meta.setQuota(pquota.TransientChild)
newn.meta = meta
}
}

dirNodes[dirNodesIdx] = newn
dirNodesIdx++
}

cur.addChild(newn)
cur = newn
}
Expand Down Expand Up @@ -720,7 +747,24 @@ outer:
cur.addChild(child)
cur = child

// TODO: need to support realtime and concurrent index?
ti.fileCount++

for i := 0; i < dirNodesIdx; i++ {
if dirNodes[i] == nil {
continue
}

meta, ok := dirNodes[i].meta.(*dirMeta)
if !ok || meta.usage == nil {
continue
}

atomic.AddInt64(&meta.usage.Metrics, 1)
atomic.AddInt64(&meta.usage.LogicalSize, logicalSize)
atomic.AddInt64(&meta.usage.PhysicalSize, physicalSize)
atomic.AddInt64(&meta.usage.DataPoints, dataPoints)
}
}

return cur, nil
Expand Down Expand Up @@ -1570,8 +1614,14 @@ type Quota struct {

DroppingPolicy QuotaDroppingPolicy
StatMetricPrefix string

// See details in trieIndex.insert
TransientChild *Quota
IsTransient bool // for avoid emitting metrics for transient quota nodes.
}

// var transientQuota = &Quota{Metrics: 10000, Throughput: 10000, DroppingPolicy: QDPNew, StatMetricPrefix: "transient"}

func (q *Quota) String() string {
return fmt.Sprintf("pattern:%s,dirs:%d,files:%d,points:%d,logical:%d,physical:%d,throughput:%d,policy:%s", q.Pattern, q.Namespaces, q.Metrics, q.DataPoints, q.LogicalSize, q.PhysicalSize, q.Throughput, q.DroppingPolicy)
}
Expand Down Expand Up @@ -1788,7 +1838,7 @@ func (ti *trieIndex) applyQuotas(resetFrequency time.Duration, quotas ...*Quota)
if quota.Pattern == "/" {
if !updateChecker["/"] {
meta := ti.root.meta.(*dirMeta)
meta.update(quota)
meta.setQuota(quota)
ti.throughputs.store("/", newThroughputUsagePerNamespace(ti.root.gen, quota, meta.usage))

updateChecker["/"] = true
Expand All @@ -1804,6 +1854,7 @@ func (ti *trieIndex) applyQuotas(resetFrequency time.Duration, quotas ...*Quota)

for i, node := range nodes {
if node.meta == nil {
// TODO: has data race in throttle, generateQuotaAndUsageMetrics and other places?
node.meta = newDirMeta()
}

Expand All @@ -1819,7 +1870,7 @@ func (ti *trieIndex) applyQuotas(resetFrequency time.Duration, quotas ...*Quota)

if !updateChecker[paths[i]] {
ti.throughputs.store(paths[i], newThroughputUsagePerNamespace(ti.root.gen, quota, meta.usage))
meta.update(quota)
meta.setQuota(quota)

updateChecker[paths[i]] = true
}
Expand All @@ -1838,7 +1889,7 @@ func (ti *trieIndex) applyQuotas(resetFrequency time.Duration, quotas ...*Quota)
}

// refreshUsage updates usage data and generate stat metrics.
// It can't be evoked with concurrent trieIndex.insert.
// It can't be evoked with concurrently trieIndex.insert.
func (ti *trieIndex) refreshUsage(throughputs *throughputQuotaManager) (files uint64) {
if throughputs == nil {
throughputs = newQuotaThroughputQuotaManager()
Expand Down Expand Up @@ -1895,8 +1946,10 @@ func (ti *trieIndex) refreshUsage(throughputs *throughputQuotaManager) (files ui
tname = name
}
var prefix string
var isTransient bool
if quota, ok := cur.node.meta.(*dirMeta).quota.Load().(*Quota); ok {
prefix = quota.StatMetricPrefix
isTransient = quota.IsTransient
}

var throughput int64
Expand All @@ -1906,7 +1959,9 @@ func (ti *trieIndex) refreshUsage(throughputs *throughputQuotaManager) (files ui

throttled := atomic.LoadInt64(&usage.Throttled)

ti.generateQuotaAndUsageMetrics(prefix, strings.ReplaceAll(name, ".", "-"), cur.node, throughput, throttled)
if !isTransient {
ti.generateQuotaAndUsageMetrics(prefix, strings.ReplaceAll(name, ".", "-"), cur.node, throughput, throttled)
}

if throttled > 0 {
atomic.AddInt64(&usage.Throttled, -throttled)
Expand Down
99 changes: 97 additions & 2 deletions carbonserver/trie_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1493,6 +1493,11 @@ func TestTrieQuotaConcurrentApplyAndEnforce(t *testing.T) {
tindex.prune()

endc := make(chan struct{})
defer func() {
// make sure that tindex.applyQuotas ends properly.
endc <- struct{}{}
}()

loopCount := 10240

go func() {
Expand Down Expand Up @@ -1527,9 +1532,99 @@ func TestTrieQuotaConcurrentApplyAndEnforce(t *testing.T) {
break
}
}
}

func TestTrieQuotaRealtimeEnforcement(t *testing.T) {
tindex := newTrie(
".wsp",
func(metric string) (size, dataPoints int64) {
return 12 * 1024, 1024
},
)

tindex.root.gen++
tindex.insert("/sys/app/server-001/cpu.wsp", 0, 0, 0, 0)
tindex.insert("/sys/app/server-002/cpu.wsp", 0, 0, 0, 0)
tindex.insert("/sys/app/server-003/cpu.wsp", 0, 0, 0, 0)
tindex.prune()

refreshTriggers := struct {
start chan struct{}
done chan struct{}
end chan struct{}
}{
start: make(chan struct{}, 0),
done: make(chan struct{}, 0),
end: make(chan struct{}, 0),
}
defer func() {
// make sure that tindex.applyQuotas ends properly.
refreshTriggers.end <- struct{}{}
}()

go func() {
for {
select {
case <-refreshTriggers.end:
return
case <-refreshTriggers.start:
}

// make sure that tindex.applyQuotas ends properly.
endc <- struct{}{}
tindex.applyQuotas(
time.Minute,
&Quota{
Pattern: "/",
},
&Quota{
Pattern: "sys.app",
TransientChild: &Quota{
Pattern: "transient",
Metrics: 1_000,
DroppingPolicy: QDPNew,
},
},
&Quota{
Pattern: "sys.app.*",
Metrics: 10_000,
DroppingPolicy: QDPNew,
},
&Quota{
Pattern: "sys.app.server-001",
Metrics: 500_000,
},
)

refreshTriggers.done <- struct{}{}
}
}()

refreshTriggers.start <- struct{}{}
<-refreshTriggers.done

t.Run("existing quota meta", func(t *testing.T) {
for i := 0; i < 500_000; i++ {
tindex.insert(fmt.Sprintf("/sys/app/server-001/cpu-%d.wsp", i), 0, 0, 0, 0)
}
if !tindex.throttle(&points.Points{Metric: "sys.app.server-001.memory", Data: []points.Point{{}, {}, {}, {}}}, false) {
t.Error("should throlle with existing meta nodes with realtime stat")
}
})

t.Run("transient quota meta", func(t *testing.T) {
for i := 0; i < 1_000; i++ {
tindex.insert(fmt.Sprintf("/sys/app/server-004/cpu-%d.wsp", i), 0, 0, 0, 0)
}
if !tindex.throttle(&points.Points{Metric: "sys.app.server-004.memory", Data: []points.Point{{}, {}, {}, {}}}, false) {
t.Error("should throlle with transient meta nodes with realtime stat")
}

refreshTriggers.start <- struct{}{}
<-refreshTriggers.done

if tindex.throttle(&points.Points{Metric: "sys.app.server-004.memory", Data: []points.Point{{}, {}, {}, {}}}, false) {
t.Error("should not throttle with proper quota annotation")
}
})
}

func TestTrieQuotaWithProperHierarchicalThroughputEnforcement(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions persister/ini.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strings"
)

// TODO: migrate to toml?
func parseIniFile(filename string) ([]map[string]string, error) {
body, err := ioutil.ReadFile(filename)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions persister/whisper_quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type Quota struct {
Throughput int64
DroppingPolicy string
StatMetricPrefix string

TransientChildLimit int64
}

type WhisperQuotas []Quota
Expand Down Expand Up @@ -70,6 +72,9 @@ func ReadWhisperQuotas(filename string) (WhisperQuotas, error) {
if quota.Throughput, err = parseInt(section, "throughput"); err != nil {
return nil, err
}
if quota.TransientChildLimit, err = parseInt(section, "transient-child-limit"); err != nil {
return nil, err
}

switch v := section["dropping-policy"]; v {
case "new", "none", "":
Expand Down