Skip to content

Commit

Permalink
Scheduler: refactor - more schedulerobjects to internaltypes (#3998)
Browse files Browse the repository at this point in the history
Signed-off-by: Robert Smith <[email protected]>
  • Loading branch information
robertdavidsmith authored Oct 11, 2024
1 parent a6f6af7 commit fae186a
Show file tree
Hide file tree
Showing 11 changed files with 211 additions and 142 deletions.
13 changes: 13 additions & 0 deletions internal/scheduler/internaltypes/resource_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,19 @@ func (rl ResourceList) AllZero() bool {
return true
}

func (rl ResourceList) FloorAtZero() ResourceList {
if rl.IsEmpty() {
return rl
}
result := make([]int64, len(rl.resources))
for i, r := range rl.resources {
if r > 0 {
result[i] = r
}
}
return ResourceList{factory: rl.factory, resources: result}
}

func (rl ResourceList) HasNegativeValues() bool {
if rl.IsEmpty() {
return false
Expand Down
10 changes: 10 additions & 0 deletions internal/scheduler/internaltypes/resource_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,16 @@ func TestAllZero_HandlesEmptyCorrectly(t *testing.T) {
assert.True(t, empty.AllZero())
}

func TestFloorAtZero(t *testing.T) {
factory := testFactory()

assert.Equal(t, testResourceList(factory, "0", "1Ki"), testResourceList(factory, "-1", "1Ki").FloorAtZero())
}

func TestFloorAtZero_HandlesEmptyCorrectly(t *testing.T) {
assert.Equal(t, ResourceList{}, ResourceList{}.FloorAtZero())
}

func TestHasNegativeValues(t *testing.T) {
factory := testFactory()
assert.False(t, testResourceList(factory, "0", "0").HasNegativeValues())
Expand Down
13 changes: 11 additions & 2 deletions internal/scheduler/jobdb/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type Job struct {
queuedVersion int32
// Scheduling requirements of this job.
jobSchedulingInfo *schedulerobjects.JobSchedulingInfo
// All resource requirements, including floating resources, for this job
allResourceRequirements internaltypes.ResourceList
// Kubernetes (i.e. non-floating) resource requirements of this job
kubernetesResourceRequirements internaltypes.ResourceList
// Priority class of this job. Populated automatically on job creation.
Expand Down Expand Up @@ -508,6 +510,11 @@ func (job *Job) ResourceRequirements() v1.ResourceRequirements {
return v1.ResourceRequirements{}
}

// All resource requirements, including floating resources, for this job
func (job *Job) AllResourceRequirements() internaltypes.ResourceList {
return job.allResourceRequirements
}

// Kubernetes (i.e. non-floating) resource requirements of this job
func (job *Job) KubernetesResourceRequirements() internaltypes.ResourceList {
return job.kubernetesResourceRequirements
Expand Down Expand Up @@ -779,7 +786,7 @@ func (job *Job) Validated() bool {

// Does this job request any floating resources?
func (job *Job) RequestsFloatingResources() bool {
return !job.jobDb.getResourceRequirements(job.jobSchedulingInfo).OfType(internaltypes.Floating).AllZero()
return !job.AllResourceRequirements().OfType(internaltypes.Floating).AllZero()
}

// WithJobSchedulingInfo returns a copy of the job with the job scheduling info updated.
Expand All @@ -790,7 +797,9 @@ func (job *Job) WithJobSchedulingInfo(jobSchedulingInfo *schedulerobjects.JobSch

// Changing the scheduling info invalidates the scheduling key stored with the job.
j.schedulingKey = SchedulingKeyFromJob(j.jobDb.schedulingKeyGenerator, j)
j.kubernetesResourceRequirements = job.jobDb.getKubernetesResourceRequirements(jobSchedulingInfo)

j.allResourceRequirements = j.jobDb.getResourceRequirements(jobSchedulingInfo)
j.kubernetesResourceRequirements = j.allResourceRequirements.OfType(internaltypes.Kubernetes)

return j, nil
}
Expand Down
56 changes: 56 additions & 0 deletions internal/scheduler/jobdb/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package jobdb
import (
"testing"

v1 "k8s.io/api/core/v1"
k8sResource "k8s.io/apimachinery/pkg/api/resource"

"github.com/gogo/protobuf/proto"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
Expand All @@ -16,6 +19,12 @@ var jobSchedulingInfo = &schedulerobjects.JobSchedulingInfo{
{
Requirements: &schedulerobjects.ObjectRequirements_PodRequirements{
PodRequirements: &schedulerobjects.PodRequirements{
ResourceRequirements: v1.ResourceRequirements{
Requests: v1.ResourceList{
"cpu": k8sResource.MustParse("1"),
"storage-connections": k8sResource.MustParse("1"),
},
},
Annotations: map[string]string{
"foo": "bar",
},
Expand Down Expand Up @@ -339,6 +348,12 @@ func TestJob_TestWithJobSchedulingInfo(t *testing.T) {
{
Requirements: &schedulerobjects.ObjectRequirements_PodRequirements{
PodRequirements: &schedulerobjects.PodRequirements{
ResourceRequirements: v1.ResourceRequirements{
Requests: v1.ResourceList{
"cpu": k8sResource.MustParse("2"),
"storage-connections": k8sResource.MustParse("2"),
},
},
Annotations: map[string]string{
"fish": "chips",
},
Expand All @@ -350,6 +365,37 @@ func TestJob_TestWithJobSchedulingInfo(t *testing.T) {
newJob := JobWithJobSchedulingInfo(baseJob, newSchedInfo)
assert.Equal(t, jobSchedulingInfo, baseJob.JobSchedulingInfo())
assert.Equal(t, newSchedInfo, newJob.JobSchedulingInfo())

assert.Equal(t, int64(1000), baseJob.AllResourceRequirements().GetByNameZeroIfMissing("cpu"))
assert.Equal(t, int64(1), baseJob.AllResourceRequirements().GetByNameZeroIfMissing("storage-connections"))
assert.Equal(t, int64(1000), baseJob.KubernetesResourceRequirements().GetByNameZeroIfMissing("cpu"))
assert.Equal(t, int64(0), baseJob.KubernetesResourceRequirements().GetByNameZeroIfMissing("storage-connections"))

assert.Equal(t, int64(2000), newJob.AllResourceRequirements().GetByNameZeroIfMissing("cpu"))
assert.Equal(t, int64(2), newJob.AllResourceRequirements().GetByNameZeroIfMissing("storage-connections"))
assert.Equal(t, int64(2000), newJob.KubernetesResourceRequirements().GetByNameZeroIfMissing("cpu"))
assert.Equal(t, int64(0), newJob.KubernetesResourceRequirements().GetByNameZeroIfMissing("storage-connections"))
}

func TestRequestsFloatingResources(t *testing.T) {
noFloatingResourcesJob := JobWithJobSchedulingInfo(baseJob, &schedulerobjects.JobSchedulingInfo{
ObjectRequirements: []*schedulerobjects.ObjectRequirements{
{
Requirements: &schedulerobjects.ObjectRequirements_PodRequirements{
PodRequirements: &schedulerobjects.PodRequirements{
ResourceRequirements: v1.ResourceRequirements{
Requests: v1.ResourceList{
"cpu": k8sResource.MustParse("1"),
"storage-connections": k8sResource.MustParse("0"),
},
},
},
},
},
},
})
assert.True(t, baseJob.RequestsFloatingResources())
assert.False(t, noFloatingResourcesJob.RequestsFloatingResources())
}

func TestJobSchedulingInfoFieldsInitialised(t *testing.T) {
Expand Down Expand Up @@ -396,3 +442,13 @@ func TestJob_TestResolvedPools(t *testing.T) {
// Job with an active run
assert.Equal(t, []string{"testPool2"}, jobWithJobRunPool.ResolvedPools())
}

func TestJob_TestAllResourceRequirements(t *testing.T) {
assert.Equal(t, int64(1000), baseJob.AllResourceRequirements().GetByNameZeroIfMissing("cpu"))
assert.Equal(t, int64(1), baseJob.AllResourceRequirements().GetByNameZeroIfMissing("storage-connections"))
}

func TestJob_TestKubernetesResourceRequirements(t *testing.T) {
assert.Equal(t, int64(1000), baseJob.KubernetesResourceRequirements().GetByNameZeroIfMissing("cpu"))
assert.Equal(t, int64(0), baseJob.KubernetesResourceRequirements().GetByNameZeroIfMissing("storage-connections"))
}
9 changes: 4 additions & 5 deletions internal/scheduler/jobdb/jobdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ func (jobDb *JobDb) NewJob(
priorityClass = jobDb.defaultPriorityClass
}

rr := jobDb.getResourceRequirements(schedulingInfo)

job := &Job{
jobDb: jobDb,
id: jobId,
Expand All @@ -160,7 +162,8 @@ func (jobDb *JobDb) NewJob(
requestedPriority: priority,
submittedTime: created,
jobSchedulingInfo: jobDb.internJobSchedulingInfoStrings(schedulingInfo),
kubernetesResourceRequirements: jobDb.getKubernetesResourceRequirements(schedulingInfo),
allResourceRequirements: rr,
kubernetesResourceRequirements: rr.OfType(internaltypes.Kubernetes),
priorityClass: priorityClass,
cancelRequested: cancelRequested,
cancelByJobSetRequested: cancelByJobSetRequested,
Expand All @@ -174,10 +177,6 @@ func (jobDb *JobDb) NewJob(
return job, nil
}

func (jobDb *JobDb) getKubernetesResourceRequirements(schedulingInfo *schedulerobjects.JobSchedulingInfo) internaltypes.ResourceList {
return jobDb.getResourceRequirements(schedulingInfo).OfType(internaltypes.Kubernetes)
}

func (jobDb *JobDb) getResourceRequirements(schedulingInfo *schedulerobjects.JobSchedulingInfo) internaltypes.ResourceList {
return jobDb.resourceListFactory.FromJobResourceListIgnoreUnknown(safeGetRequirements(schedulingInfo))
}
Expand Down
8 changes: 7 additions & 1 deletion internal/scheduler/jobdb/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ var testResourceListFactory = makeTestResourceListFactory()
func makeTestResourceListFactory() *internaltypes.ResourceListFactory {
result, _ := internaltypes.NewResourceListFactory(
getTestSupportedResourceTypes(),
nil,
getTestFloatingResourceTypes(),
)
return result
}
Expand All @@ -27,6 +27,12 @@ func getTestSupportedResourceTypes() []schedulerconfiguration.ResourceType {
}
}

func getTestFloatingResourceTypes() []schedulerconfiguration.FloatingResourceConfig {
return []schedulerconfiguration.FloatingResourceConfig{
{Name: "storage-connections", Resolution: resource.MustParse("1")},
}
}

func WithJobDbJobPodRequirements(job *Job, reqs *schedulerobjects.PodRequirements) *Job {
return JobWithJobSchedulingInfo(job, &schedulerobjects.JobSchedulingInfo{
PriorityClassName: job.JobSchedulingInfo().PriorityClassName,
Expand Down
13 changes: 0 additions & 13 deletions internal/scheduler/schedulerobjects/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package schedulerobjects

import (
"fmt"
"math"

"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
Expand Down Expand Up @@ -45,15 +44,3 @@ func (node *Node) AvailableArmadaResource() ResourceList {
}
return tr
}

func (node *Node) MarkResourceUnallocatable(unallocatable ResourceList) {
currentAllocatable := node.UnallocatableResources[math.MaxInt32]
(&currentAllocatable).Add(unallocatable)
node.UnallocatableResources[math.MaxInt32] = currentAllocatable

for priority, allocatable := range node.AllocatableByPriorityAndResource {
allocatable.Sub(unallocatable)
allocatable.LimitToZero()
node.AllocatableByPriorityAndResource[priority] = allocatable
}
}
88 changes: 0 additions & 88 deletions internal/scheduler/schedulerobjects/node_test.go

This file was deleted.

Loading

0 comments on commit fae186a

Please sign in to comment.