Skip to content

Commit

Permalink
Scheduler: include floating resources again in constaints/fairness (#…
Browse files Browse the repository at this point in the history
…4000)

Signed-off-by: Robert Smith <[email protected]>
  • Loading branch information
robertdavidsmith authored Oct 11, 2024
1 parent fae186a commit 82f26d4
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ func (frt *FloatingResourceTypes) GetTotalAvailableForPool(poolName string) sche
return pool.totalResources.DeepCopy()
}

func (frt *FloatingResourceTypes) AddTotalAvailableForPool(poolName string, kubernetesResources schedulerobjects.ResourceList) schedulerobjects.ResourceList {
floatingResources := frt.GetTotalAvailableForPool(poolName) // Note GetTotalAvailableForPool returns a deep copy
floatingResources.Add(kubernetesResources)
return floatingResources
}

func (frt *FloatingResourceTypes) SummaryString() string {
if len(frt.zeroFloatingResources.Resources) == 0 {
return "none"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@ func TestGetTotalAvailableForPool(t *testing.T) {
assert.Equal(t, map[string]resource.Quantity{"floating-resource-1": zero, "floating-resource-2": zero}, sut.GetTotalAvailableForPool("some-other-pool").Resources)
}

func TestAddTotalAvailableForPool(t *testing.T) {
sut := makeSut(t)
zero := resource.Quantity{}
ten := *resource.NewQuantity(10, resource.DecimalSI)
kubernetesResources := schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": ten}}
assert.Equal(t, map[string]resource.Quantity{"cpu": ten, "floating-resource-1": resource.MustParse("200"), "floating-resource-2": resource.MustParse("300")}, sut.AddTotalAvailableForPool("cpu", kubernetesResources).Resources)
assert.Equal(t, map[string]resource.Quantity{"cpu": ten, "floating-resource-1": resource.MustParse("100"), "floating-resource-2": zero}, sut.AddTotalAvailableForPool("gpu", kubernetesResources).Resources)
assert.Equal(t, map[string]resource.Quantity{"cpu": ten, "floating-resource-1": zero, "floating-resource-2": zero}, sut.AddTotalAvailableForPool("some-other-pool", kubernetesResources).Resources)
assert.Equal(t, map[string]resource.Quantity{"cpu": ten}, kubernetesResources.Resources) // check hasn't mutated arg
}

func TestWithinLimits_WhenWithinLimits_ReturnsTrue(t *testing.T) {
sut := makeSut(t)
withinLimits, errorMessage := sut.WithinLimits("cpu",
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/nodedb/nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func (nodeDb *NodeDb) NumNodes() int {
return int(nodeDb.numNodes)
}

func (nodeDb *NodeDb) TotalResources() schedulerobjects.ResourceList {
func (nodeDb *NodeDb) TotalKubernetesResources() schedulerobjects.ResourceList {
nodeDb.mu.Lock()
defer nodeDb.mu.Unlock()
return schedulerobjects.ResourceList{Resources: nodeDb.totalResources.ToMap()}
Expand Down
6 changes: 3 additions & 3 deletions internal/scheduler/nodedb/nodedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestTotalResources(t *testing.T) {
require.NoError(t, err)

expected := schedulerobjects.ResourceList{Resources: make(map[string]resource.Quantity)}
assert.True(t, expected.Equal(nodeDb.TotalResources()))
assert.True(t, expected.Equal(nodeDb.TotalKubernetesResources()))

// Upserting nodes for the first time should increase the resource count.
nodes := testfixtures.N32CpuNodes(2, testfixtures.TestPriorities)
Expand All @@ -48,7 +48,7 @@ func TestTotalResources(t *testing.T) {
}
txn.Commit()

assert.True(t, expected.Equal(nodeDb.TotalResources()))
assert.True(t, expected.Equal(nodeDb.TotalKubernetesResources()))

// Upserting new nodes should increase the resource count.
nodes = testfixtures.N8GpuNodes(3, testfixtures.TestPriorities)
Expand All @@ -64,7 +64,7 @@ func TestTotalResources(t *testing.T) {
}
txn.Commit()

assert.True(t, expected.Equal(nodeDb.TotalResources()))
assert.True(t, expected.Equal(nodeDb.TotalKubernetesResources()))
}

func TestSelectNodeForPod_NodeIdLabel_Success(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/scheduling/gang_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ func TestGangScheduler(t *testing.T) {
txn.Commit()
if tc.TotalResources.Resources == nil {
// Default to NodeDb total.
tc.TotalResources = nodeDb.TotalResources()
tc.TotalResources = nodeDb.TotalKubernetesResources()
}
priorityFactorByQueue := make(map[string]float64)
for _, jobs := range tc.Gangs {
Expand Down
12 changes: 6 additions & 6 deletions internal/scheduler/scheduling/preempting_queue_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1995,11 +1995,11 @@ func TestPreemptingQueueScheduler(t *testing.T) {

// If not provided, set total resources equal to the aggregate over tc.Nodes.
if tc.TotalResources.Resources == nil {
tc.TotalResources = nodeDb.TotalResources()
tc.TotalResources = nodeDb.TotalKubernetesResources()
}

fairnessCostProvider, err := fairness.NewDominantResourceFairness(
nodeDb.TotalResources(),
nodeDb.TotalKubernetesResources(),
tc.SchedulingConfig,
)
require.NoError(t, err)
Expand Down Expand Up @@ -2356,23 +2356,23 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) {
}

fairnessCostProvider, err := fairness.NewDominantResourceFairness(
nodeDb.TotalResources(),
nodeDb.TotalKubernetesResources(),
tc.SchedulingConfig,
)
require.NoError(b, err)
sctx := context.NewSchedulingContext(
testfixtures.TestPool,
fairnessCostProvider,
limiter,
nodeDb.TotalResources(),
nodeDb.TotalKubernetesResources(),
)
for queue, priorityFactor := range priorityFactorByQueue {
weight := 1 / priorityFactor
err := sctx.AddQueueSchedulingContext(queue, weight, make(schedulerobjects.QuantityByTAndResourceType[string]),
schedulerobjects.NewResourceList(0), schedulerobjects.NewResourceList(0), limiterByQueue[queue])
require.NoError(b, err)
}
constraints := schedulerconstraints.NewSchedulingConstraints(testfixtures.TestPool, nodeDb.TotalResources(), tc.SchedulingConfig, nil)
constraints := schedulerconstraints.NewSchedulingConstraints(testfixtures.TestPool, nodeDb.TotalKubernetesResources(), tc.SchedulingConfig, nil)
sch := NewPreemptingQueueScheduler(
sctx,
constraints,
Expand Down Expand Up @@ -2426,7 +2426,7 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) {
"pool",
fairnessCostProvider,
limiter,
nodeDb.TotalResources(),
nodeDb.TotalKubernetesResources(),
)
for queue, priorityFactor := range priorityFactorByQueue {
weight := 1 / priorityFactor
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/scheduling/queue_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ func TestQueueScheduler(t *testing.T) {
txn.Commit()
if tc.TotalResources.Resources == nil {
// Default to NodeDb total.
tc.TotalResources = nodeDb.TotalResources()
tc.TotalResources = nodeDb.TotalKubernetesResources()
}

queueNameToQueue := map[string]*api.Queue{}
Expand Down
15 changes: 12 additions & 3 deletions internal/scheduler/scheduling/scheduling_algo.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,11 @@ func (l *FairSchedulingAlgo) Schedule(
continue
}

ctx.Infof("Scheduling on pool %s with capacity %s", pool, fsctx.nodeDb.TotalResources().CompactString())
ctx.Infof("Scheduling on pool %s with capacity %s %s",
pool,
fsctx.nodeDb.TotalKubernetesResources().CompactString(),
l.floatingResourceTypes.GetTotalAvailableForPool(pool.Name).CompactString(),
)

start := time.Now()
schedulerResult, sctx, err := l.SchedulePool(ctx, fsctx, pool.Name)
Expand Down Expand Up @@ -264,9 +268,12 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Con
return nil, err
}

totalResources := nodeDb.TotalKubernetesResources()
totalResources = l.floatingResourceTypes.AddTotalAvailableForPool(pool.Name, totalResources)

schedulingContext, err := l.constructSchedulingContext(
pool.Name,
nodeDb.TotalResources(),
totalResources,
jobSchedulingInfo.demandByQueue,
jobSchedulingInfo.allocatedByQueueAndPriorityClass,
jobSchedulingInfo.awayAllocatedByQueueAndPriorityClass,
Expand Down Expand Up @@ -516,7 +523,9 @@ func (l *FairSchedulingAlgo) SchedulePool(
fsctx *FairSchedulingAlgoContext,
pool string,
) (*SchedulerResult, *schedulercontext.SchedulingContext, error) {
constraints := schedulerconstraints.NewSchedulingConstraints(pool, fsctx.nodeDb.TotalResources(), l.schedulingConfig, maps.Values(fsctx.queues))
totalResources := fsctx.nodeDb.TotalKubernetesResources()
totalResources = l.floatingResourceTypes.AddTotalAvailableForPool(pool, totalResources)
constraints := schedulerconstraints.NewSchedulingConstraints(pool, totalResources, l.schedulingConfig, maps.Values(fsctx.queues))

scheduler := NewPreemptingQueueScheduler(
fsctx.schedulingContext,
Expand Down
18 changes: 11 additions & 7 deletions internal/scheduler/simulator/simulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ type Simulator struct {
hardTerminationMinutes int
// Determines how often we trigger schedule events
schedulerCyclePeriodSeconds int
// Floating resource info
floatingResourceTypes *floatingresources.FloatingResourceTypes
}

type StateTransition struct {
Expand All @@ -108,6 +110,11 @@ func NewSimulator(clusterSpec *ClusterSpec, workloadSpec *WorkloadSpec, scheduli
return nil, errors.WithMessage(err, "Error with the .scheduling.supportedResourceTypes field in config")
}

floatingResourceTypes, err := floatingresources.NewFloatingResourceTypes(schedulingConfig.ExperimentalFloatingResources)
if err != nil {
return nil, err
}

clusterSpec = proto.Clone(clusterSpec).(*ClusterSpec)
workloadSpec = proto.Clone(workloadSpec).(*WorkloadSpec)
initialiseClusterSpec(clusterSpec)
Expand Down Expand Up @@ -152,6 +159,7 @@ func NewSimulator(clusterSpec *ClusterSpec, workloadSpec *WorkloadSpec, scheduli
enableFastForward: enableFastForward,
hardTerminationMinutes: hardTerminationMinutes,
schedulerCyclePeriodSeconds: schedulerCyclePeriodSeconds,
floatingResourceTypes: floatingResourceTypes,
}
jobDb.SetClock(s)
s.limiter.SetBurstAt(s.time, schedulingConfig.MaximumSchedulingBurst)
Expand Down Expand Up @@ -304,8 +312,9 @@ func (s *Simulator) setupClusters() error {
}
}
s.nodeDbByPoolAndExecutorGroup[pool.Name] = append(s.nodeDbByPoolAndExecutorGroup[pool.Name], nodeDb)
totalResourcesForPool.Add(nodeDb.TotalResources())
totalResourcesForPool.Add(nodeDb.TotalKubernetesResources())
}
totalResourcesForPool = s.floatingResourceTypes.AddTotalAvailableForPool(pool.Name, totalResourcesForPool)
s.totalResourcesByPool[pool.Name] = totalResourcesForPool
}
return nil
Expand Down Expand Up @@ -510,15 +519,10 @@ func (s *Simulator) handleScheduleEvent(ctx *armadacontext.Context) error {
}
constraints := schedulerconstraints.NewSchedulingConstraints(pool.Name, totalResources, s.schedulingConfig, nil)

floatingResourceTypes, err := floatingresources.NewFloatingResourceTypes(s.schedulingConfig.ExperimentalFloatingResources)
if err != nil {
return err
}

sch := scheduling.NewPreemptingQueueScheduler(
sctx,
constraints,
floatingResourceTypes,
s.floatingResourceTypes,
s.schedulingConfig.ProtectedFractionOfFairShare,
txn,
nodeDb,
Expand Down

0 comments on commit 82f26d4

Please sign in to comment.