Skip to content

Commit

Permalink
Merge branch 'master' into armada-docs-DCO
Browse files Browse the repository at this point in the history
  • Loading branch information
itsaviral2609 authored Sep 8, 2023
2 parents 18f694c + 293c58c commit b0dc4ef
Show file tree
Hide file tree
Showing 30 changed files with 588 additions and 877 deletions.
2 changes: 1 addition & 1 deletion .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ indent_size = 2
indent_style = space
indent_size = 2

[{Makefile,go.mod,go.sum,*.go,.gitmodules}]
[{go.mod,go.sum,*.go,.gitmodules}]
indent_style = tab
indent_size = 4

Expand Down
2 changes: 0 additions & 2 deletions .github/workflows/not-python-client.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ on:
- '.github/workflows/python-client.yml'
- 'docs/python_armada_client.md'
- 'scripts/build-python-client.sh'
- 'makefile'
- '.github/workflows/python-tests/*'

pull_request:
Expand All @@ -24,7 +23,6 @@ on:
- '.github/workflows/python-client.yml'
- 'docs/python_armada_client.md'
- 'scripts/build-python-client.sh'
- 'makefile'
- '.github/workflows/python-tests/*'

jobs:
Expand Down
2 changes: 1 addition & 1 deletion .mergify.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ pull_request_rules:
- "#approved-reviews-by>=2"
- and:
- "#approved-reviews-by>=1"
- "author~=^(JamesMurkin|severinson|d80tb7|carlocamurri|dejanzele|Sharpz7|ClifHouck|robertdavidsmith|theAntiYeti|richscott|suprjinx|zuqq)"
- "author~=^(JamesMurkin|severinson|d80tb7|carlocamurri|dejanzele|Sharpz7|ClifHouck|robertdavidsmith|theAntiYeti|richscott|suprjinx|zuqq|msumner91|mustafai)"
title:
Two are checks required.
4 changes: 4 additions & 0 deletions config/armada/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ scheduling:
maximumResourceFractionToSchedule:
memory: 1.0
cpu: 1.0
maximumSchedulingRate: 100.0
maximumSchedulingBurst: 1000
maximumPerQueueSchedulingRate: 50.0
maximumPerQueueSchedulingBurst: 1000
maxJobSchedulingContextsPerExecutor: 10000
lease:
expireAfter: 15m
Expand Down
5 changes: 4 additions & 1 deletion config/scheduler/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,10 @@ scheduling:
maximumResourceFractionToSchedule:
memory: 1.0
cpu: 1.0
maximumJobsToSchedule: 5000
maximumSchedulingRate: 100.0
maximumSchedulingBurst: 1000
maximumPerQueueSchedulingRate: 50.0
maximumPerQueueSchedulingBurst: 1000
maxUnacknowledgedJobsPerExecutor: 2500
maxJobSchedulingContextsPerExecutor: 10000
defaultJobLimits:
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ require (
github.com/prometheus/common v0.37.0
github.com/sanity-io/litter v1.5.5
github.com/segmentio/fasthash v1.0.3
golang.org/x/time v0.3.0
google.golang.org/genproto/googleapis/api v0.0.0-20230525234035-dd9d682886f9
)

Expand Down Expand Up @@ -195,7 +196,6 @@ require (
golang.org/x/sys v0.7.0 // indirect
golang.org/x/term v0.7.0 // indirect
golang.org/x/text v0.9.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect
google.golang.org/protobuf v1.30.0 // indirect
Expand Down
5 changes: 4 additions & 1 deletion internal/armada/configuration/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ const (
// All jobs in a gang are guaranteed to be scheduled onto the same cluster at the same time.
GangIdAnnotation = "armadaproject.io/gangId"
// GangCardinalityAnnotation All jobs in a gang must specify the total number of jobs in the gang via this annotation.
// The cardinality should be expressed as an integer, e.g., "3".
// The cardinality should be expressed as a positive integer, e.g., "3".
GangCardinalityAnnotation = "armadaproject.io/gangCardinality"
// GangMinimumCardinalityAnnotation All jobs in a gang must specify the minimum size for the gang to be schedulable via this annotation.
// The cardinality should be expressed as a positive integer, e.g., "3".
GangMinimumCardinalityAnnotation = "armadaproject.io/gangMinimumCardinality"
// The jobs that make up a gang may be constrained to be scheduled across a set of uniform nodes.
// Specifically, if provided, all gang jobs are scheduled onto nodes for which the value of the provided label is equal.
// Used to ensure, e.g., that all gang jobs are scheduled onto the same cluster or rack.
Expand Down
31 changes: 27 additions & 4 deletions internal/armada/configuration/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,33 @@ type SchedulingConfig struct {
MaximumResourceFractionToSchedule map[string]float64
// Overrides MaximalClusterFractionToSchedule if set for the current pool.
MaximumResourceFractionToScheduleByPool map[string]map[string]float64
// Max number of jobs to schedule in each invocation of the scheduler.
MaximumJobsToSchedule uint
// Max number of gangs to schedule in each invocation of the scheduler.
MaximumGangsToSchedule uint
// The rate at which Armada schedules jobs is rate-limited using a token bucket approach.
// Specifically, there is a token bucket that persists between scheduling rounds.
// The bucket fills up at a rate of MaximumSchedulingRate tokens per second and has capacity MaximumSchedulingBurst.
// A token is removed from the bucket when a scheduling a job and scheduling stops while the bucket is empty.
//
// Hence, MaximumSchedulingRate controls the maximum number of jobs scheduled per second in steady-state,
// i.e., once the burst capacity has been exhausted.
//
// Rate-limiting is based on the number of tokens available at the start of each scheduling round,
// i.e., tokens accumulated while scheduling become available at the start of the next scheduling round.
//
// For more information about the rate-limiter, see:
// https://pkg.go.dev/golang.org/x/time/rate#Limiter
MaximumSchedulingRate float64 `validate:"gt=0"`
// MaximumSchedulingBurst controls the burst capacity of the rate-limiter.
//
// There are two important implications:
// - Armada will never schedule more than MaximumSchedulingBurst jobs per scheduling round.
// - Gang jobs with cardinality greater than MaximumSchedulingBurst can never be scheduled.
MaximumSchedulingBurst int `validate:"gt=0"`
// In addition to the global rate-limiter, there is a separate rate-limiter for each queue.
// These work the same as the global rate-limiter, except they apply only to jobs scheduled from a specific queue.
//
// Per-queue version of MaximumSchedulingRate.
MaximumPerQueueSchedulingRate float64 `validate:"gt=0"`
// Per-queue version of MaximumSchedulingBurst.
MaximumPerQueueSchedulingBurst int `validate:"gt=0"`
// Armada stores contexts associated with recent job scheduling attempts.
// This setting limits the number of such contexts to store.
// Contexts associated with the most recent scheduling attempt for each queue and cluster are always stored.
Expand Down
37 changes: 28 additions & 9 deletions internal/armada/server/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
log "github.com/sirupsen/logrus"
"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/utils/clock"
Expand Down Expand Up @@ -59,6 +60,10 @@ type AggregatedQueueServer struct {
schedulingInfoRepository repository.SchedulingInfoRepository
decompressorPool *pool.ObjectPool
clock clock.Clock
// Global job scheduling rate-limiter.
limiter *rate.Limiter
// Per-queue job scheduling rate-limiters.
limiterByQueue map[string]*rate.Limiter
// For storing reports of scheduling attempts.
SchedulingContextRepository *scheduler.SchedulingContextRepository
// Stores the most recent NodeDb for each executor.
Expand Down Expand Up @@ -92,18 +97,22 @@ func NewAggregatedQueueServer(
TimeBetweenEvictionRuns: 0,
NumTestsPerEvictionRun: 10,
}

decompressorPool := pool.NewObjectPool(context.Background(), pool.NewPooledObjectFactorySimple(
func(context.Context) (interface{}, error) {
return compress.NewZlibDecompressor(), nil
}), &poolConfig)
return &AggregatedQueueServer{
permissions: permissions,
schedulingConfig: schedulingConfig,
jobRepository: jobRepository,
queueRepository: queueRepository,
usageRepository: usageRepository,
eventStore: eventStore,
permissions: permissions,
schedulingConfig: schedulingConfig,
jobRepository: jobRepository,
queueRepository: queueRepository,
usageRepository: usageRepository,
eventStore: eventStore,
limiter: rate.NewLimiter(
rate.Limit(schedulingConfig.MaximumSchedulingRate),
schedulingConfig.MaximumSchedulingBurst,
),
limiterByQueue: make(map[string]*rate.Limiter),
schedulingInfoRepository: schedulingInfoRepository,
decompressorPool: decompressorPool,
executorRepository: executorRepository,
Expand Down Expand Up @@ -380,7 +389,7 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL

// Group gangs.
for _, job := range jobs {
gangId, _, isGangJob, err := scheduler.GangIdAndCardinalityFromLegacySchedulerJob(job)
gangId, _, _, isGangJob, err := scheduler.GangIdAndCardinalityFromLegacySchedulerJob(job)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -491,6 +500,7 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL
q.schedulingConfig.Preemption.PriorityClasses,
q.schedulingConfig.Preemption.DefaultPriorityClass,
fairnessCostProvider,
q.limiter,
totalResources,
)
for queue, priorityFactor := range priorityFactorByQueue {
Expand All @@ -502,7 +512,16 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL
if priorityFactor > 0 {
weight = 1 / priorityFactor
}
if err := sctx.AddQueueSchedulingContext(queue, weight, allocatedByQueueAndPriorityClassForPool[queue]); err != nil {
queueLimiter, ok := q.limiterByQueue[queue]
if !ok {
// Create per-queue limiters lazily.
queueLimiter = rate.NewLimiter(
rate.Limit(q.schedulingConfig.MaximumPerQueueSchedulingRate),
q.schedulingConfig.MaximumPerQueueSchedulingBurst,
)
q.limiterByQueue[queue] = queueLimiter
}
if err := sctx.AddQueueSchedulingContext(queue, weight, allocatedByQueueAndPriorityClassForPool[queue], queueLimiter); err != nil {
return nil, err
}
}
Expand Down
49 changes: 24 additions & 25 deletions internal/common/validation/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

func ValidateApiJobs(jobs []*api.Job, config configuration.SchedulingConfig) error {
if err := validateGangs(jobs); err != nil {
if _, err := validateGangs(jobs); err != nil {
return err
}
for _, job := range jobs {
Expand All @@ -23,67 +23,66 @@ func ValidateApiJobs(jobs []*api.Job, config configuration.SchedulingConfig) err
return nil
}

func validateGangs(jobs []*api.Job) error {
gangDetailsByGangId := make(map[string]struct {
actualCardinality int
expectedCardinality int
expectedPriorityClassName string
expectedNodeUniformityLabel string
})
type gangDetails = struct {
expectedCardinality int
expectedMinimumCardinality int
expectedPriorityClassName string
expectedNodeUniformityLabel string
}

func validateGangs(jobs []*api.Job) (map[string]gangDetails, error) {
gangDetailsByGangId := make(map[string]gangDetails)
for i, job := range jobs {
annotations := job.Annotations
gangId, gangCardinality, isGangJob, err := scheduler.GangIdAndCardinalityFromAnnotations(annotations)
gangId, gangCardinality, gangMinimumCardinality, isGangJob, err := scheduler.GangIdAndCardinalityFromAnnotations(annotations)
nodeUniformityLabel := annotations[configuration.GangNodeUniformityLabelAnnotation]
if err != nil {
return errors.WithMessagef(err, "%d-th job with id %s in gang %s", i, job.Id, gangId)
return nil, errors.WithMessagef(err, "%d-th job with id %s in gang %s", i, job.Id, gangId)
}
if !isGangJob {
continue
}
if gangId == "" {
return errors.Errorf("empty gang id for %d-th job with id %s", i, job.Id)
return nil, errors.Errorf("empty gang id for %d-th job with id %s", i, job.Id)
}
podSpec := util.PodSpecFromJob(job)
if details, ok := gangDetailsByGangId[gangId]; ok {
if details.expectedCardinality != gangCardinality {
return errors.Errorf(
return nil, errors.Errorf(
"inconsistent gang cardinality for %d-th job with id %s in gang %s: expected %d but got %d",
i, job.Id, gangId, details.expectedCardinality, gangCardinality,
)
}
if details.expectedMinimumCardinality != gangMinimumCardinality {
return nil, errors.Errorf(
"inconsistent gang minimum cardinality for %d-th job with id %s in gang %s: expected %d but got %d",
i, job.Id, gangId, details.expectedMinimumCardinality, gangMinimumCardinality,
)
}
if podSpec != nil && details.expectedPriorityClassName != podSpec.PriorityClassName {
return errors.Errorf(
return nil, errors.Errorf(
"inconsistent PriorityClassName for %d-th job with id %s in gang %s: expected %s but got %s",
i, job.Id, gangId, details.expectedPriorityClassName, podSpec.PriorityClassName,
)
}
if nodeUniformityLabel != details.expectedNodeUniformityLabel {
return errors.Errorf(
return nil, errors.Errorf(
"inconsistent nodeUniformityLabel for %d-th job with id %s in gang %s: expected %s but got %s",
i, job.Id, gangId, details.expectedNodeUniformityLabel, nodeUniformityLabel,
)
}
details.actualCardinality++
gangDetailsByGangId[gangId] = details
} else {
details.actualCardinality = 1
details.expectedCardinality = gangCardinality
details.expectedMinimumCardinality = gangMinimumCardinality
if podSpec != nil {
details.expectedPriorityClassName = podSpec.PriorityClassName
}
details.expectedNodeUniformityLabel = nodeUniformityLabel
gangDetailsByGangId[gangId] = details
}
}
for gangId, details := range gangDetailsByGangId {
if details.expectedCardinality != details.actualCardinality {
return errors.Errorf(
"unexpected number of jobs for gang %s: expected %d jobs but got %d",
gangId, details.expectedCardinality, details.actualCardinality,
)
}
}
return nil
return gangDetailsByGangId, nil
}

func ValidateApiJob(job *api.Job, config configuration.SchedulingConfig) error {
Expand Down
Loading

0 comments on commit b0dc4ef

Please sign in to comment.