-
Notifications
You must be signed in to change notification settings - Fork 4
/
mergejoin.go
221 lines (196 loc) · 5.7 KB
/
mergejoin.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
package exectoy
type mergeJoinIntIntOp struct {
left ExecOp
right ExecOp
leftEqColIdx int
rightEqColIdx int
leftCols []int
rightCols []int
leftFlow dataFlow
rightFlow dataFlow
leftFlowIdx int
rightFlowIdx int
leftBatchBuf batch
rightBatchBuf batch
leftBatchN int
rightBatchN int
leftBatchIdx int
rightBatchIdx int
d dataFlow
}
func (m *mergeJoinIntIntOp) Init() {
nOutputCols := len(m.leftCols) + len(m.rightCols)
if nOutputCols == 0 {
panic("no output cols")
}
m.d.b = make(batch, nOutputCols)
for i := range m.d.b {
m.d.b[i] = make(intColumn, batchRowLen)
}
m.leftBatchBuf = make(batch, len(m.leftCols))
m.rightBatchBuf = make(batch, len(m.rightCols))
for i := range m.leftBatchBuf {
m.leftBatchBuf[i] = make(intColumn, 0, 1)
}
for i := range m.rightBatchBuf {
m.rightBatchBuf[i] = make(intColumn, 0, 1)
}
}
func (m *mergeJoinIntIntOp) Next() dataFlow {
// Check for buffered output.
if m.maybeOutput() {
return m.d
}
if m.leftFlow.n == 0 {
m.leftFlow = m.left.Next()
}
if m.rightFlow.n == 0 {
m.rightFlow = m.right.Next()
}
if m.leftFlow.n == 0 || m.rightFlow.n == 0 {
ret := m.d
m.d.n = 0
return ret
}
leftCol, rightCol := m.leftFlow.b[m.leftEqColIdx], m.rightFlow.b[m.rightEqColIdx]
for {
// todo(jordan) deal with sel
leftVal, rightVal := leftCol.(intColumn)[m.leftFlowIdx], rightCol.(intColumn)[m.rightFlowIdx]
var ok bool
if leftVal > rightVal {
ok, m.rightFlow, m.rightFlowIdx = m.advanceToMatch(leftVal, m.rightFlow, m.rightEqColIdx, m.right, m.rightFlowIdx)
if !ok && m.rightFlowIdx == -1 {
// ran out of rows on the right.
ret := m.d
m.d.n = 0
return ret
}
} else if leftVal < rightVal {
ok, m.leftFlow, m.leftFlowIdx = m.advanceToMatch(rightVal, m.leftFlow, m.leftEqColIdx, m.left, m.leftFlowIdx)
if !ok && m.leftFlowIdx == -1 {
// ran out of rows on the left.
ret := m.d
m.d.n = 0
return ret
}
} else { // leftVal == rightVal
// buffer rows on both sides.
m.leftFlow, m.leftFlowIdx = m.bufferMatchGroup(leftVal, m.leftFlow, m.leftEqColIdx, m.left, m.leftFlowIdx, m.leftCols, m.leftBatchBuf)
m.rightFlow, m.rightFlowIdx = m.bufferMatchGroup(rightVal, m.rightFlow, m.rightEqColIdx, m.right, m.rightFlowIdx, m.rightCols, m.rightBatchBuf)
if m.maybeOutput() {
return m.d
}
}
}
}
func (m *mergeJoinIntIntOp) maybeOutput() bool {
// cartesian product the buffers to the output.
avail := batchRowLen - m.d.n
required := (len(m.leftBatchBuf[0].(intColumn)) - m.leftBatchIdx) * (len(m.rightBatchBuf[0].(intColumn)) - m.rightBatchIdx)
if required == 0 {
return false
}
toCopy := required
if avail < required {
toCopy = avail
}
m.d.n += toCopy
var leftBatchIdx, rightBatchIdx int
COLUMNLOOPL:
for i := range m.leftCols {
leftBatchIdx, rightBatchIdx = m.leftBatchIdx, m.rightBatchIdx
// for each column
outputCol := m.d.b[i].(intColumn)[:batchRowLen]
rowIdx := 0
bufCol := m.leftBatchBuf[i].(intColumn)
for ; leftBatchIdx < len(bufCol); leftBatchIdx++ {
// for each value in the left side
val := bufCol[leftBatchIdx]
for ; rightBatchIdx < len(m.rightBatchBuf[i].(intColumn)); rightBatchIdx++ {
// for each value in the right side... copy it!
outputCol[rowIdx] = val
rowIdx++
if rowIdx >= toCopy {
continue COLUMNLOOPL
}
}
rightBatchIdx = 0
}
}
COLUMNLOOPR:
for i := range m.rightCols {
leftBatchIdx, rightBatchIdx = m.leftBatchIdx, m.rightBatchIdx
outputCol := m.d.b[i+len(m.leftCols)].(intColumn)[:batchRowLen]
rowIdx := 0
bufCol := m.rightBatchBuf[i].(intColumn)
for ; leftBatchIdx < len(m.leftBatchBuf[i].(intColumn)); leftBatchIdx++ {
for ; rightBatchIdx < len(bufCol); rightBatchIdx++ {
outputCol[rowIdx] = bufCol[rightBatchIdx]
rowIdx++
if rowIdx >= toCopy {
continue COLUMNLOOPR
}
}
rightBatchIdx = 0
}
}
m.leftBatchIdx, m.rightBatchIdx = leftBatchIdx, rightBatchIdx
if required <= avail {
// We got everything into our batch. Clear the bufs.
for i := range m.leftBatchBuf {
m.leftBatchBuf[i] = m.leftBatchBuf[i].(intColumn)[:0]
}
for i := range m.rightBatchBuf {
m.rightBatchBuf[i] = m.rightBatchBuf[i].(intColumn)[:0]
}
m.leftBatchIdx, m.rightBatchIdx = 0, 0
}
// output if there's no space left in the buffer
return required >= avail
}
func (m *mergeJoinIntIntOp) bufferMatchGroup(val int, flow dataFlow, colIdx int, op ExecOp, startIdx int, cols intColumn, batchBuf batch) (dataFlow, int) {
for {
col := flow.b[colIdx].(intColumn)[:batchRowLen]
for matchIdx := startIdx; matchIdx < flow.n; matchIdx++ {
found := col[matchIdx]
if val != found {
return flow, matchIdx
}
// TODO(jordan) fail. this should be col-oriented.
// It's hard because this whole process can span batch boundaries.
// The algorithm should be:
// for each col:
// add value to buffer until group's over or batch ends.
// if batch ended early, repeat.
for i, c := range cols {
batchBuf[i] = append(batchBuf[i].(intColumn), flow.b[c].(intColumn)[matchIdx])
}
}
// If we got here, we made it to the end of the batch. We must retrieve the
// next batch to ensure there are no more matches in that one.
flow = op.Next()
if flow.n == 0 {
return flow, 0
}
startIdx = 0
}
}
// returns false if no match
func (m *mergeJoinIntIntOp) advanceToMatch(val int, flow dataFlow, colIdx int, op ExecOp, startIdx int) (bool, dataFlow, int) {
for {
col := flow.b[colIdx].(intColumn)[:batchRowLen]
for matchIdx := startIdx; matchIdx < flow.n; matchIdx++ {
found := col[matchIdx]
if val == found {
return true, flow, matchIdx
} else if val < found {
return false, flow, matchIdx
}
}
flow = op.Next()
if flow.n == 0 {
return false, flow, -1
}
startIdx = 0
}
}