Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support a new aggregation policy: mix (with percentiles) #331

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions carbonserver/carbonserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,13 @@ func (listener *CarbonserverListener) expandGlobs(ctx context.Context, query str

var useGlob bool

var aggregationSpec string
if strings.Contains(query, "@") {
a := strings.Split(query, "@")
query = a[0]
aggregationSpec = a[1]
}

// TODO: Find out why we have set 'useGlob' if 'star == -1'
if star := strings.IndexByte(query, '*'); strings.IndexByte(query, '[') == -1 && strings.IndexByte(query, '?') == -1 && (star == -1 || star == len(query)-1) {
useGlob = true
Expand Down Expand Up @@ -880,6 +887,13 @@ func (listener *CarbonserverListener) expandGlobs(ctx context.Context, query str
files[i] = strings.Replace(p, "/", ".", -1)
}

if aggregationSpec != "" {
for i := range files {
files[i] += "@" + aggregationSpec
}
query += "@" + aggregationSpec
}

matchedCount = len(files)
resultCh <- &ExpandedGlobResponse{query, files, leafs, nil}
}
Expand Down
22 changes: 20 additions & 2 deletions carbonserver/fetchfromdisk.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package carbonserver

import (
"errors"
"fmt"
_ "net/http/pprof"
"strings"
"sync/atomic"
Expand All @@ -10,6 +11,7 @@ import (
"go.uber.org/zap"

"github.com/go-graphite/go-whisper"
"github.com/lomik/go-carbon/helper"
"github.com/lomik/go-carbon/points"
)

Expand All @@ -28,6 +30,13 @@ type metricFromDisk struct {
func (listener *CarbonserverListener) fetchFromDisk(metric string, fromTime, untilTime int32) (*metricFromDisk, error) {
var step int32

var aggregationSpec string
if strings.Contains(metric, "@") {
a := strings.Split(metric, "@")
metric = a[0]
aggregationSpec = a[1]
}

// We need to obtain the metadata from whisper file anyway.
path := listener.whisperData + "/" + strings.Replace(metric, ".", "/", -1) + ".wsp"
w, err := whisper.OpenWithOptions(path, &whisper.Options{
Expand Down Expand Up @@ -93,11 +102,20 @@ func (listener *CarbonserverListener) fetchFromDisk(metric string, fromTime, unt
listener.prometheus.diskRequest()

res.DiskStartTime = time.Now()
points, err := w.Fetch(int(fromTime), int(untilTime))
var points *whisper.TimeSeries
if aggregationSpec == "" {
points, err = w.Fetch(int(fromTime), int(untilTime))
} else {
var spec whisper.MixAggregationSpec
spec.Method, spec.Percentile, err = helper.ParseAggregationMethod(aggregationSpec)
if err == nil {
points, err = w.FetchByAggregation(int(fromTime), int(untilTime), &spec)
}
}
w.Close()
if err != nil {
logger.Warn("failed to fetch points", zap.Error(err))
return nil, errors.New("failed to fetch points")
return nil, fmt.Errorf("failed to fetch points: %s", err)
}

// Should never happen, because we have a check for proper archive now
Expand Down
5 changes: 5 additions & 0 deletions deploy/storage-aggregation.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,8 @@
pattern = .*
xFilesFactor = 0.5
aggregationMethod = average

[mix]
pattern = mix.*
xFilesFactor = 0.5
aggregationMethod = average,sum,average,median,p90,p95
37 changes: 37 additions & 0 deletions helper/aggregation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package helper

import (
"errors"
"strconv"
"strings"

whisper "github.com/go-graphite/go-whisper"
)

func ParseAggregationMethod(str string) (method whisper.AggregationMethod, percentile float32, err error) {
switch str {
case "average", "avg":
method = whisper.Average
case "sum":
method = whisper.Sum
case "last":
method = whisper.Last
case "max":
method = whisper.Max
case "min":
method = whisper.Min
case "median":
str = "p50"
fallthrough
default:
if strings.HasPrefix(str, "p") {
method = whisper.Percentile
var percentile64 float64
percentile64, err = strconv.ParseFloat(strings.TrimLeft(str, "p"), 32)
percentile = float32(percentile64)
} else {
err = errors.New("unknown aggregation method")
}
}
return
}
16 changes: 12 additions & 4 deletions persister/whisper.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,6 @@ func (p *Whisper) store(metric string) {
p.logger.Error("no storage aggregation defined for metric", zap.String("metric", metric))
return
}

if err = os.MkdirAll(filepath.Dir(path), os.ModeDir|os.ModePerm); err != nil {
p.logger.Error("mkdir failed",
zap.String("dir", filepath.Dir(path)),
Expand All @@ -267,10 +266,19 @@ func (p *Whisper) store(metric string) {
if schema.Compressed != nil {
compressed = *schema.Compressed
}
if !compressed && aggr.aggregationMethod == whisper.Mix {
// intended to be less intrusive, just logging an error
p.logger.Error("bad aggregation match",
zap.String("path", path),
zap.Error(fmt.Errorf("mix aggregation currently only support cwhisper format, resetting it to default aggregation policy %s", p.aggregation.Default.aggregationMethod)),
)
aggr = p.aggregation.Default
}
w, err = whisper.CreateWithOptions(path, schema.Retentions, aggr.aggregationMethod, float32(aggr.xFilesFactor), &whisper.Options{
Sparse: p.sparse,
FLock: p.flock,
Compressed: compressed,
Sparse: p.sparse,
FLock: p.flock,
Compressed: compressed,
MixAggregationSpecs: aggr.mixAggregationSpecs,
})
if err != nil {
p.logger.Error("create new whisper file failed",
Expand Down
34 changes: 20 additions & 14 deletions persister/whisper_aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
"fmt"
"regexp"
"strconv"
"strings"

whisper "github.com/go-graphite/go-whisper"
"github.com/lomik/go-carbon/helper"
)

// WhisperAggregationItem ...
Expand All @@ -19,6 +21,7 @@ type WhisperAggregationItem struct {
xFilesFactor float64
aggregationMethodStr string
aggregationMethod whisper.AggregationMethod
mixAggregationSpecs []whisper.MixAggregationSpec
}

// WhisperAggregation ...
Expand Down Expand Up @@ -88,21 +91,24 @@ func ReadWhisperAggregation(filename string) (*WhisperAggregation, error) {
section["xfilesfactor"], item.name, err.Error())
}

var err error
item.aggregationMethodStr = section["aggregationmethod"]
switch item.aggregationMethodStr {
case "average", "avg":
item.aggregationMethod = whisper.Average
case "sum":
item.aggregationMethod = whisper.Sum
case "last":
item.aggregationMethod = whisper.Last
case "max":
item.aggregationMethod = whisper.Max
case "min":
item.aggregationMethod = whisper.Min
default:
return nil, fmt.Errorf("unknown aggregation method '%s'",
section["aggregationmethod"])
if strings.Contains(item.aggregationMethodStr, ",") {
item.aggregationMethod = whisper.Mix
specStrs := strings.Split(item.aggregationMethodStr, ",")
for _, specStr := range specStrs {
var spec whisper.MixAggregationSpec
spec.Method, spec.Percentile, err = helper.ParseAggregationMethod(specStr)
if err != nil {
break
}
item.mixAggregationSpecs = append(item.mixAggregationSpecs, spec)
}
} else {
item.aggregationMethod, _, err = helper.ParseAggregationMethod(item.aggregationMethodStr)
}
if err != nil {
return nil, fmt.Errorf("failed to parse aggregation method '%s': %s", section["aggregationmethod"], err)
}

result.Data = append(result.Data, item)
Expand Down
Loading