-
Notifications
You must be signed in to change notification settings - Fork 4
/
aggr.go
56 lines (43 loc) · 898 Bytes
/
aggr.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package exectoy
// ordered aggregation
// "direct" aggregation where group keys map to array indexes
// hash aggregation
type aggrOp struct {
input ExecOp
funcs []aggrFunc
accs []aggrMap
}
func (a *aggrOp) Init() {
accs := make([]aggrMap, len(a.funcs))
for i := range a.funcs {
accs[i] = make(aggrMap)
}
}
type aggrFunc func(d dataFlow, groupHashCol int, m aggrMap)
type aggrMap map[int]aggrAcc
// todo(jordan) we can do better than this right?
type aggrAcc interface{}
type aggrSumIntOp struct {
input ExecOp
colIdx int
acc int
}
func (aggrSumIntOp) Init() {}
func (a *aggrSumIntOp) Next() dataFlow {
flow := a.input.Next()
if flow.n == 0 {
return flow
}
col := flow.b[a.colIdx].(intColumn)
if flow.useSel {
for s := 0; s < flow.n; s++ {
i := flow.sel[s]
a.acc += col[i]
}
} else {
for i := 0; i < flow.n; i++ {
a.acc += col[i]
}
}
return flow
}