Skip to content

Commit

Permalink
Fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
severinson committed Sep 8, 2023
1 parent 8d7d05c commit e6af79d
Showing 1 changed file with 22 additions and 2 deletions.
24 changes: 22 additions & 2 deletions internal/scheduler/simulator/simulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/spf13/viper"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
"golang.org/x/time/rate"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/yaml"

Expand Down Expand Up @@ -69,6 +70,11 @@ type Simulator struct {
eventLog EventLog
// Simulated events are emitted on this channel in order.
c chan *armadaevents.EventSequence

// Global job scheduling rate-limiter.
limiter *rate.Limiter
// Per-queue job scheduling rate-limiters.
limiterByQueue map[string]*rate.Limiter
}

func NewSimulator(testCase *TestCase, schedulingConfig configuration.SchedulingConfig) (*Simulator, error) {
Expand Down Expand Up @@ -143,6 +149,11 @@ func NewSimulator(testCase *TestCase, schedulingConfig configuration.SchedulingC
allocationByPoolAndQueueAndPriorityClass: make(map[string]map[string]schedulerobjects.QuantityByTAndResourceType[string]),
totalResourcesByPool: totalResourcesByPool,
c: make(chan *armadaevents.EventSequence),
limiter: rate.NewLimiter(
rate.Limit(schedulingConfig.MaximumSchedulingRate),
schedulingConfig.MaximumSchedulingBurst,
),
limiterByQueue: make(map[string]*rate.Limiter),
}

// Mark all jobTemplates as active.
Expand Down Expand Up @@ -415,15 +426,24 @@ func (s *Simulator) handleScheduleEvent() error {
s.schedulingConfig.Preemption.PriorityClasses,
s.schedulingConfig.Preemption.DefaultPriorityClass,
fairnessCostProvider,
nil,
s.limiter,
totalResources,
)
sctx.Started = s.time
for _, queue := range s.testCase.Queues {
limiter, ok := s.limiterByQueue[queue.Name]
if !ok {
limiter = rate.NewLimiter(
rate.Limit(s.schedulingConfig.MaximumPerQueueSchedulingRate),
s.schedulingConfig.MaximumPerQueueSchedulingBurst,
)
s.limiterByQueue[queue.Name] = limiter
}
err := sctx.AddQueueSchedulingContext(
queue.Name,
queue.Weight,
s.allocationByPoolAndQueueAndPriorityClass[pool.Name][queue.Name],
nil,
limiter,
)
if err != nil {
return err
Expand Down

0 comments on commit e6af79d

Please sign in to comment.