Skip to content

Commit

Permalink
Merge branch 'develop' into integration
Browse files Browse the repository at this point in the history
  • Loading branch information
Yu Zhang committed Sep 19, 2023
2 parents 50d28ea + c3bbdd6 commit bd0f1dd
Show file tree
Hide file tree
Showing 69 changed files with 1,544 additions and 847 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class MemstoreCassandraSinkSpec extends AllTablesTest {
}

it("should flush MemStore data to C*, and be able to read back data from C* directly") {
memStore.setup(dataset1.ref, Schemas(dataset1.schema), 0, TestData.storeConf)
memStore.setup(dataset1.ref, Schemas(dataset1.schema), 0, TestData.storeConf, 1)
memStore.store.sinkStats.chunksetsWritten.get shouldEqual 0

// Flush every ~50 records
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class OdpSpec extends AnyFunSpec with Matchers with BeforeAndAfterAll with Scala
val policy = new FixedMaxPartitionsEvictionPolicy(20)
val memStore = new TimeSeriesMemStore(config, colStore, new InMemoryMetaStore(), Some(policy))
try {
memStore.setup(dataset.ref, schemas, 0, TestData.storeConf)
memStore.setup(dataset.ref, schemas, 0, TestData.storeConf, 1)
memStore.recoverIndex(dataset.ref, 0).futureValue
memStore.refreshIndexForTesting(dataset.ref)

Expand All @@ -112,7 +112,7 @@ class OdpSpec extends AnyFunSpec with Matchers with BeforeAndAfterAll with Scala
val policy = new FixedMaxPartitionsEvictionPolicy(20)
val memStore = new TimeSeriesMemStore(config, colStore, new InMemoryMetaStore(), Some(policy))
try {
memStore.setup(dataset.ref, schemas, 0, TestData.storeConf)
memStore.setup(dataset.ref, schemas, 0, TestData.storeConf, 1)
memStore.recoverIndex(dataset.ref, 0).futureValue
memStore.refreshIndexForTesting(dataset.ref)

Expand All @@ -134,7 +134,7 @@ class OdpSpec extends AnyFunSpec with Matchers with BeforeAndAfterAll with Scala
val policy = new FixedMaxPartitionsEvictionPolicy(20)
val memStore = new TimeSeriesMemStore(config, colStore, new InMemoryMetaStore(), Some(policy))
try {
memStore.setup(dataset.ref, schemas, 0, TestData.storeConf)
memStore.setup(dataset.ref, schemas, 0, TestData.storeConf, 1)
memStore.recoverIndex(dataset.ref, 0).futureValue
memStore.refreshIndexForTesting(dataset.ref)

Expand All @@ -157,7 +157,7 @@ class OdpSpec extends AnyFunSpec with Matchers with BeforeAndAfterAll with Scala
val policy = new FixedMaxPartitionsEvictionPolicy(20)
val memStore = new TimeSeriesMemStore(config, colStore, new InMemoryMetaStore(), Some(policy))
try {
memStore.setup(dataset.ref, schemas, 0, TestData.storeConf)
memStore.setup(dataset.ref, schemas, 0, TestData.storeConf, 1)
memStore.recoverIndex(dataset.ref, 0).futureValue
memStore.refreshIndexForTesting(dataset.ref)

Expand Down
2 changes: 1 addition & 1 deletion conf/promperf-filodb-server.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ filodb {
"conf/promperf-source.conf"
]

min-num-nodes-in-cluster = 1
cluster-discovery {
num-nodes = 1
failure-detection-interval = 20s
host-list = [
"127.0.0.1:2552"
Expand Down
3 changes: 3 additions & 0 deletions conf/timeseries-dev-source.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
# Should not change once dataset has been set up on the server and data has been persisted to cassandra
num-shards = 4

# deprecated in favor of min-num-nodes-in-cluster config in filodb server config
# To be removed eventually. There is no reason to set a value for this for each dataset
min-num-nodes = 2

# Length of chunks to be written, roughly
sourcefactory = "filodb.kafka.KafkaIngestionStreamFactory"

Expand Down
2 changes: 1 addition & 1 deletion conf/timeseries-filodb-server.conf
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
dataset-prometheus = { include required("timeseries-dev-source.conf") }

filodb {
min-num-nodes-in-cluster = 2
v2-cluster-enabled = false
cluster-discovery {
num-nodes = 2
failure-detection-interval = 20s
host-list = [
"127.0.0.1:2552",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ final class FilodbSettings(val conf: Config) {

lazy val datasetConfPaths = config.as[Seq[String]]("dataset-configs")

lazy val numNodes = config.getInt("cluster-discovery.num-nodes")
lazy val k8sHostFormat = config.as[Option[String]]("cluster-discovery.k8s-stateful-sets-hostname-format")

// used for development mode only
lazy val hostList = config.as[Option[Seq[String]]]("cluster-discovery.host-list")
lazy val localhostOrdinal = config.as[Option[Int]]("cluster-discovery.localhost-ordinal")

lazy val minNumNodes = config.as[Option[Int]]("min-num-nodes-in-cluster")

/**
* Returns IngestionConfig/dataset configuration from parsing dataset-configs file paths.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ object IngestionActor {
source: NodeClusterActor.IngestionSource,
downsample: DownsampleConfig,
storeConfig: StoreConfig,
numShards: Int,
statusActor: ActorRef): Props =
Props(new IngestionActor(ref, schemas, memStore, source, downsample, storeConfig, statusActor))
Props(new IngestionActor(ref, schemas, memStore, source, downsample, storeConfig, numShards, statusActor))
}

/**
Expand All @@ -62,6 +63,7 @@ private[filodb] final class IngestionActor(ref: DatasetRef,
source: NodeClusterActor.IngestionSource,
downsample: DownsampleConfig,
storeConfig: StoreConfig,
numShards: Int,
statusActor: ActorRef) extends BaseActor {

import IngestionActor._
Expand Down Expand Up @@ -170,7 +172,7 @@ private[filodb] final class IngestionActor(ref: DatasetRef,

// scalastyle:off method.length
private def startIngestion(shard: Int): Unit = {
try tsStore.setup(ref, schemas, shard, storeConfig, downsample) catch {
try tsStore.setup(ref, schemas, shard, storeConfig, numShards, downsample) catch {
case ShardAlreadySetup(ds, s) =>
logger.warn(s"dataset=$ds shard=$s already setup, skipping....")
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ private[filodb] final class NodeCoordinatorActor(metaStore: MetaStore,

setupDataset( dataset,
ingestConfig.storeConfig,
ingestConfig.numShards,
IngestionSource(ingestConfig.streamFactoryClass, ingestConfig.sourceConfig),
ingestConfig.downsampleConfig)
}
Expand All @@ -141,6 +142,7 @@ private[filodb] final class NodeCoordinatorActor(metaStore: MetaStore,
*/
private def setupDataset(dataset: Dataset,
storeConf: StoreConfig,
numShards: Int,
source: IngestionSource,
downsample: DownsampleConfig,
schemaOverride: Boolean = false): Unit = {
Expand All @@ -154,7 +156,7 @@ private[filodb] final class NodeCoordinatorActor(metaStore: MetaStore,
val schemas = if (schemaOverride) Schemas(dataset.schema) else settings.schemas
if (schemaOverride) logger.info(s"Overriding schemas from settings: this better be a test!")
val props = IngestionActor.props(dataset.ref, schemas, memStore,
source, downsample, storeConf, statusActor.get)
source, downsample, storeConf, numShards, statusActor.get)
val ingester = context.actorOf(props, s"$Ingestion-${dataset.name}")
context.watch(ingester)
ingesters(ref) = ingester
Expand Down Expand Up @@ -187,7 +189,9 @@ private[filodb] final class NodeCoordinatorActor(metaStore: MetaStore,
def ingestHandlers: Receive = LoggingReceive {
case SetupDataset(dataset, resources, source, storeConf, downsample) =>
// used only in unit tests
if (!(ingesters contains dataset.ref)) { setupDataset(dataset, storeConf, source, downsample, true) }
if (!(ingesters contains dataset.ref)) {
setupDataset(dataset, storeConf, resources.numShards, source, downsample, true)
}

case IngestRows(dataset, shard, rows) =>
withIngester(sender(), dataset) { _ ! IngestionActor.IngestRows(sender(), shard, rows) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,8 @@ private[coordinator] final class ShardManager(settings: FilodbSettings,
ackTo.foreach(_ ! DatasetExists(dataset.ref))
Map.empty
case None =>
val resources = DatasetResourceSpec(ingestConfig.numShards, ingestConfig.minNumNodes)
val minNumNodes = settings.minNumNodes.getOrElse(ingestConfig.minNumNodes)
val resources = DatasetResourceSpec(ingestConfig.numShards, minNumNodes)
val mapper = new ShardMapper(resources.numShards)
_shardMappers(dataset.ref) = mapper
// Access the shardmapper through the HashMap so even if it gets replaced it will update the shard stats
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@ class FiloDbClusterDiscovery(settings: FilodbSettings,
}

def shardsForOrdinal(ordinal: Int, numShards: Int): Seq[Int] = {
require(ordinal < settings.numNodes, s"Ordinal $ordinal was not expected. Number of nodes is ${settings.numNodes}")
val numShardsPerHost = numShards / settings.numNodes
require(settings.minNumNodes.isDefined, "Minimum Number of Nodes config not provided")
require(ordinal < settings.minNumNodes.get, s"Ordinal $ordinal was not expected. " +
s"Number of nodes is ${settings.minNumNodes.get}")
val numShardsPerHost = numShards / settings.minNumNodes.get
// Suppose we have a total of 8 shards and 2 hosts, assuming the hostnames are host-0 and host-1, we will map
// host-0 to shard [0,1,2,3] and host-1 to shard [4,5,6,7]
val numExtraShardsToAssign = numShards % settings.numNodes
val numExtraShardsToAssign = numShards % settings.minNumNodes.get
val (firstShardThisNode, numShardsThisHost) = if (numExtraShardsToAssign != 0) {
logger.warn("For stateful shard assignment, numShards should be a multiple of nodes per shard, " +
"using default strategy")
Expand All @@ -69,8 +71,9 @@ class FiloDbClusterDiscovery(settings: FilodbSettings,
def shardsForLocalhost(numShards: Int): Seq[Int] = shardsForOrdinal(ordinalOfLocalhost, numShards)

lazy private val hostNames = {
require(settings.minNumNodes.isDefined, "Minimum Number of Nodes config not provided")
if (settings.k8sHostFormat.isDefined) {
(0 until settings.numNodes).map(i => String.format(settings.k8sHostFormat.get, i.toString))
(0 until settings.minNumNodes.get).map(i => String.format(settings.k8sHostFormat.get, i.toString))
} else if (settings.hostList.isDefined) {
settings.hostList.get.sorted // sort to make order consistent on all nodes of cluster
} else throw new IllegalArgumentException("Cluster Discovery mechanism not defined")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ private[filodb] final class NewNodeCoordinatorActor(memStore: TimeSeriesStore,
.foreach { downsampleDataset => memStore.store.initialize(downsampleDataset, ingestConfig.numShards) }

setupDataset( dataset,
ingestConfig.storeConfig,
ingestConfig.storeConfig, ingestConfig.numShards,
IngestionSource(ingestConfig.streamFactoryClass, ingestConfig.sourceConfig),
ingestConfig.downsampleConfig)
initShards(dataset, ingestConfig)
Expand Down Expand Up @@ -121,6 +121,7 @@ private[filodb] final class NewNodeCoordinatorActor(memStore: TimeSeriesStore,
*/
private def setupDataset(dataset: Dataset,
storeConf: StoreConfig,
numShards: Int,
source: IngestionSource,
downsample: DownsampleConfig,
schemaOverride: Boolean = false): Unit = {
Expand All @@ -132,7 +133,7 @@ private[filodb] final class NewNodeCoordinatorActor(memStore: TimeSeriesStore,
val schemas = if (schemaOverride) Schemas(dataset.schema) else settings.schemas
if (schemaOverride) logger.info(s"Overriding schemas from settings: this better be a test!")
val props = IngestionActor.props(dataset.ref, schemas, memStore,
source, downsample, storeConf, self)
source, downsample, storeConf, numShards, self)
val ingester = context.actorOf(props, s"$Ingestion-${dataset.name}")
context.watch(ingester)
ingestionActors(ref) = ingester
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class FiloDbClusterDiscoverySpec extends AkkaSpec {

val config = ConfigFactory.parseString(
"""
|filodb.cluster-discovery.num-nodes = 4
|filodb.min-num-nodes-in-cluster = 4
|""".stripMargin)

val settings = new FilodbSettings(config)
Expand All @@ -29,7 +29,7 @@ class FiloDbClusterDiscoverySpec extends AkkaSpec {
"Should allocate the extra n shards to first n nodes" in {
val config = ConfigFactory.parseString(
"""
|filodb.cluster-discovery.num-nodes = 5
|filodb.min-num-nodes-in-cluster = 5
|""".stripMargin)

val settings = new FilodbSettings(config)
Expand Down
42 changes: 41 additions & 1 deletion core/src/main/resources/filodb-defaults.conf
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
filodb {
v2-cluster-enabled = false

# Number of nodes in cluster; used to calculate per-shard resources based
# on how many shards assigned to node.
# Required config if v2-clustering or automatic memory-alloc is enabled
# min-num-nodes-in-cluster = 2

cluster-discovery {
// set this to a smaller value (like 30s) at thee query entry points
// if FiloDB HTTP API is indeed the query entry point, this should be overridden to small value
// so that failure detection is quick.
failure-detection-interval = 15 minutes
num-nodes = 2

# one of the two below properties should be enabled
# enable this to use the k8s stateful sets mode and its hostname to extract ordinal
Expand Down Expand Up @@ -691,6 +696,41 @@ filodb {
# Note: this memory is shared across all configued datasets on a node.
ingestion-buffer-mem-size = 200MB

memory-alloc {
# automatic memory allocation is enabled if true
automatic-alloc-enabled = false

# if not provided this is calculated as ContainerOrNodeMemory - os-memory-needs - CurrentJVMHeapMemory
# available-memory-bytes = 5GB

# memory dedicated for proper functioning of OS
os-memory-needs = 500MB

# NOTE: In the three configs below,
# lucene-memory-percent + native-memory-manager-percent + block-memory-manager-percent
# should equal 100
##############################################################
# # # #
# LuceneMemPercent # NativeMemPercent # BlockMemPercent #
# # # #
##############################################################

# Memory percent of available-memory reserved for Lucene memory maps.
# Note we do not use this config to explicitly allocate space for lucene.
# But reserving this space ensures that more of the lucene memory maps are stored in memory
lucene-memory-percent = 5

# memory percent of available-memory reserved for native memory manager
# (used for partKeys, chunkMaps, chunkInfos, writeBuffers)
native-memory-manager-percent = 24

# Memory percent of available-memory reserved for block memory manager
# (used for storing chunks)
# This is divvied amongst datasets on the node per configuration for dataset
# The shards of the dataset on the node get even amount of memory from this fraction
block-memory-manager-percent = 71
}

# At the cost of some extra heap memory, we can track queries holding shared lock for a long time
# and starving the exclusive access of lock for eviction
track-queries-holding-eviction-lock = true
Expand Down
25 changes: 25 additions & 0 deletions core/src/main/scala/filodb.core/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package filodb.core

import java.lang.management.ManagementFactory

import com.typesafe.config.{Config, ConfigRenderOptions}
import com.typesafe.scalalogging.StrictLogging

object Utils extends StrictLogging {
Expand All @@ -13,4 +14,28 @@ object Utils extends StrictLogging {
if (cpuTimeEnabled) threadMbean.getCurrentThreadCpuTime
else System.nanoTime()
}

def calculateAvailableOffHeapMemory(filodbConfig: Config): Long = {
val containerMemory = ManagementFactory.getOperatingSystemMXBean()
.asInstanceOf[com.sun.management.OperatingSystemMXBean].getTotalPhysicalMemorySize()
val currentJavaHeapMemory = Runtime.getRuntime().maxMemory()
val osMemoryNeeds = filodbConfig.getMemorySize("memstore.memory-alloc.os-memory-needs").toBytes
logger.info(s"Detected available memory containerMemory=$containerMemory" +
s" currentJavaHeapMemory=$currentJavaHeapMemory osMemoryNeeds=$osMemoryNeeds")

logger.info(s"Memory Alloc Options: " +
s"${filodbConfig.getConfig("memstore.memory-alloc").root().render(ConfigRenderOptions.concise())}")

val availableMem = if (filodbConfig.hasPath("memstore.memory-alloc.available-memory-bytes")) {
val avail = filodbConfig.getMemorySize("memstore.memory-alloc.available-memory-bytes").toBytes
logger.info(s"Using automatic-memory-config using overridden memory-alloc.available-memory $avail")
avail
} else {
logger.info(s"Using automatic-memory-config using without available memory override")
containerMemory - currentJavaHeapMemory - osMemoryNeeds
}
logger.info(s"Available memory calculated or configured as $availableMem")
availableMem
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import scala.reflect.ClassTag
import debox.Buffer

import filodb.memory.{BinaryRegionConsumer, BinaryRegionLarge}
import filodb.memory.format.{RowReader, UnsafeUtils}
import filodb.memory.format.UnsafeUtils

/**
* A RecordContainer is a binary, wire-compatible container for BinaryRecords V2.
Expand Down Expand Up @@ -72,13 +72,13 @@ final class RecordContainer(val base: Any, val offset: Long, maxLength: Int,
* Iterates through each BinaryRecord as a RowReader. Results in two allocations: the Iterator
* as well as a BinaryRecordRowReader.
*/
final def iterate(schema: RecordSchema): Iterator[RowReader] = new Iterator[RowReader] {
final def iterate(schema: RecordSchema): Iterator[BinaryRecordRowReader] = new Iterator[BinaryRecordRowReader] {
val reader = new BinaryRecordRowReader(schema, base)
val endOffset = offset + 4 + numBytes
var curOffset = offset + ContainerHeaderLen

final def hasNext: Boolean = curOffset < endOffset
final def next: RowReader = {
final def next: BinaryRecordRowReader = {
val recordLen = BinaryRegionLarge.numBytes(base, curOffset)
reader.recordOffset = curOffset
curOffset += (recordLen + 7) & ~3 // +4, then aligned/rounded up to next 4 bytes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,12 @@ trait BinaryRecordRowReaderBase extends RowReader {

final class BinaryRecordRowReader(val schema: RecordSchema,
var recordBase: Any = UnsafeUtils.ZeroPointer,
var recordOffset: Long = 0L) extends BinaryRecordRowReaderBase
var recordOffset: Long = 0L) extends BinaryRecordRowReaderBase {
def recordLength: Int = {
val len = BinaryRegionLarge.numBytes(recordBase, recordOffset)
(len + 7) & ~3 // +4, then aligned/rounded up to next 4 bytes
}
}

final class MultiSchemaBRRowReader(var recordBase: Any = UnsafeUtils.ZeroPointer,
var recordOffset: Long = 0L) extends BinaryRecordRowReaderBase {
Expand Down
Loading

0 comments on commit bd0f1dd

Please sign in to comment.