Skip to content

Commit

Permalink
Fix MultiPartition Card Queries
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeep6189 committed Aug 18, 2023
1 parent 594ffce commit 4ab7a99
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,8 @@ class MultiPartitionPlanner(partitionLocationProvider: PartitionLocationProvider
if (p.partitionName.equals(localPartitionName))
localPartitionPlanner.materialize(lp, qContext)
else {
createMetadataRemoteExec(qContext, p, lp.queryParams())
val newQueryContext = qContext.copy(origQueryParams = queryParams.copy(verbose = true))
createMetadataRemoteExec(newQueryContext, p, lp.queryParams())
}
}
if (execPlans.size == 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,8 @@ class SingleClusterPlanner(val dataset: Dataset,
forceInProcess: Boolean): PlanResult = {
val metaExec = shardMapperFunc.assignedShards.map{ shard =>
val dispatcher = dispatcherForShard(shard, forceInProcess, qContext)
exec.TsCardExec(qContext, dispatcher, dsRef, shard, lp.shardKeyPrefix, lp.numGroupByFields, clusterName)
exec.TsCardExec(qContext, dispatcher, dsRef, shard, lp.shardKeyPrefix, lp.numGroupByFields, clusterName,
lp.version)
}
PlanResult(metaExec)
}
Expand Down
17 changes: 14 additions & 3 deletions query/src/main/scala/filodb/query/exec/MetadataExecPlan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,8 @@ final case class TsCardExec(queryContext: QueryContext,
shard: Int,
shardKeyPrefix: Seq[String],
numGroupByFields: Int,
clusterName: String) extends LeafExecPlan with StrictLogging {
clusterName: String,
version: Int) extends LeafExecPlan with StrictLogging {
require(numGroupByFields >= 1,
"numGroupByFields must be positive")
require(numGroupByFields >= shardKeyPrefix.size,
Expand All @@ -584,8 +585,14 @@ final case class TsCardExec(queryContext: QueryContext,
val cards = tsMemStore.scanTsCardinalities(
dataset, Seq(shard), shardKeyPrefix, numGroupByFields)
val it = cards.map { card =>
val groupKey = prefixToGroupWithDataset(card.prefix, dataset.dataset)

// v1 and v2 cardinality have different schemas and required group key. Hence we are segregating
// w.r.t to the version
val groupKey =
version match {
case 1 => prefixToGroup(card.prefix)
case _ => prefixToGroupWithDataset(card.prefix, dataset.dataset)
}
// NOTE: cardinality data from downsample cluster is stored as total count in CardinalityStore. But for the
// user perspective, the cardinality data in downsample is a longterm data. Hence, we are forking the
// data path based on the cluster the data is being served from
Expand All @@ -605,7 +612,11 @@ final case class TsCardExec(queryContext: QueryContext,
case other =>
Observable.empty
}
ExecResult(rvs, Task.eval(RESULT_SCHEMA))
// Sending V1 SCHEMA for v1 queries
version match {
case 1 => ExecResult(rvs, Task.eval(RESULT_SCHEMA_V1))
case _ => ExecResult(rvs, Task.eval(RESULT_SCHEMA))
}
}
// scalastyle:on method.length

Expand Down
4 changes: 2 additions & 2 deletions query/src/test/scala/filodb/query/exec/MetadataExecSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -406,12 +406,12 @@ class MetadataExecSpec extends AnyFunSpec with Matchers with ScalaFutures with B

val leavesRaw = (0 until shardPartKeyLabelValues.size).map{ ishard =>
new TsCardExec(QueryContext(), executeDispatcher,timeseriesDatasetMultipleShardKeys.ref,
ishard, testSpec.shardKeyPrefix, testSpec.numGroupByFields, "raw")
ishard, testSpec.shardKeyPrefix, testSpec.numGroupByFields, "raw", 2)
}.toSeq
// UPDATE: Simulating the call to downsample cluster to get longterm metrics as well
val leavesDownsample = (0 until shardPartKeyLabelValues.size).map { ishard =>
new TsCardExec(QueryContext(), executeDispatcher, timeseriesDatasetMultipleShardKeys.ref,
ishard, testSpec.shardKeyPrefix, testSpec.numGroupByFields, "downsample")
ishard, testSpec.shardKeyPrefix, testSpec.numGroupByFields, "downsample", 2)
}.toSeq

val allLeaves = leavesRaw ++ leavesDownsample
Expand Down

0 comments on commit 4ab7a99

Please sign in to comment.