Skip to content

Commit

Permalink
fix(query): negative rate/increase due to NaN chunk (#1846)
Browse files Browse the repository at this point in the history
  • Loading branch information
sherali42 authored Sep 13, 2024
1 parent a7312de commit 4320860
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ abstract class ChunkedRateFunctionBase extends CounterChunkedRangeFunction[Trans
startRowNum: Int, endRowNum: Int,
startTime: Long, endTime: Long): Unit = {
val dblReader = reader.asDoubleReader
// if the chunk is a single row NaN value, then return. Prometheus end of time series marker.
// This is to make sure we don't set highestValue to zero. avoids negative rate/increase values.
if (startRowNum == 0 && endRowNum == 0 && dblReader.apply(acc, vector, startRowNum).isNaN)
return
if (startTime < lowestTime || endTime > highestTime) {
numSamples += endRowNum - startRowNum + 1
if (startTime < lowestTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,54 @@ class RateFunctionsSpec extends RawDataWindowingSpec {
}
}

it("rate should work for even with single value NaN end of timeseries chunks") {
val data = (1 to 500).map(_ * 10 + rand.nextInt(10)).map(_.toDouble)
val tuples = data.zipWithIndex.map { case (d, t) => (defaultStartTS + t * pubFreq, d) }
val rv = timeValueRVPk(tuples) // should be a couple chunks

// simulate creation of chunk with single NaN value
addChunkToRV(rv, Seq(defaultStartTS + 500 * pubFreq -> Double.NaN))

// add single row NaN chunk
// addChunkToRV(rv, tuples.takeRight(1))

(0 until 10).foreach { x =>
val windowSize = rand.nextInt(100) + 10
val step = rand.nextInt(50) + 5
info(s" iteration $x windowSize=$windowSize step=$step")

val slidingRate = slidingWindowIt(data, rv, RateFunction, windowSize, step)
val slidingResults = slidingRate.map(_.getDouble(1)).toBuffer
slidingRate.close()

val rateChunked = chunkedWindowIt(data, rv, new ChunkedRateFunction, windowSize, step)
val resultRows = rateChunked.map { r => (r.getLong(0), r.getDouble(1)) }.toBuffer
val rates = resultRows.map(_._2)

// Since the input data and window sizes are randomized, it is not possible to precompute results
// beforehand. Coming up with a formula to figure out the right rate is really hard.
// Thus we take an approach of comparing the sliding and chunked results to ensure they are identical.

// val windowTime = (windowSize.toLong - 1) * pubFreq
// val expected = tuples.sliding(windowSize, step).toBuffer
// .zip(resultRows).map { case (w, (ts, _)) =>
// // For some reason rate is based on window, not timestamps - so not w.last._1
// (w.last._2 - w.head._2) / (windowTime) * 1000
// // (w.last._2 - w.head._2) / (w.last._1 - w.head._1) * 1000
// }
rates.dropRight(1) shouldEqual slidingResults.dropRight(1)

// rate should be positive
val resultLen = rates.length
rates(resultLen - 1) shouldBe > (0d) // positive

// sliding window is not used for rate/increase. We use ChunkedRateFunction. There may be slight difference
// in the way NaN is handled, which is okay.
val percentError: Double = math.abs(rates(resultLen - 1) - slidingResults(resultLen - 1))/slidingResults(resultLen - 1)
percentError shouldBe < (0.1d)
}
}

val promHistDS = Dataset("histogram", Seq("metric:string", "tags:map"),
Seq("timestamp:ts", "count:long", "sum:long", "h:hist:counter=true"))
val histBufferPool = new WriteBufferPool(TestData.nativeMem, promHistDS.schema.data, TestData.storeConf)
Expand Down

0 comments on commit 4320860

Please sign in to comment.