Skip to content

Commit

Permalink
Add reset for gauge metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
theAntiYeti committed Aug 22, 2023
1 parent 3718548 commit d1f3746
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 21 deletions.
37 changes: 17 additions & 20 deletions internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,16 +167,20 @@ func (s *Scheduler) Run(ctx context.Context) error {

shouldSchedule := s.clock.Now().Sub(s.previousSchedulingRoundEnd) > s.schedulePeriod

if err := s.cycle(ctx, fullUpdate, leaderToken, shouldSchedule); err != nil {
result, err := s.cycle(ctx, fullUpdate, leaderToken, shouldSchedule)
if err != nil {
logging.WithStacktrace(log, err).Error("scheduling cycle failure")
leaderToken = InvalidLeaderToken()
}

cycleTime := s.clock.Since(start)

s.metrics.ResetGaugeMetrics()

if shouldSchedule && leaderToken.leader {
// Only the leader token does real scheduling rounds.
s.metrics.ReportScheduleCycleTime(cycleTime)
s.metrics.ReportSchedulerResult(result)
log.Infof("scheduling cycle completed in %s", cycleTime)
} else {
s.metrics.ReportReconcileCycleTime(cycleTime)
Expand All @@ -194,18 +198,20 @@ func (s *Scheduler) Run(ctx context.Context) error {
// cycle is a single iteration of the main scheduling loop.
// If updateAll is true, we generate events from all jobs in the jobDb.
// Otherwise, we only generate events from jobs updated since the last cycle.
func (s *Scheduler) cycle(ctx context.Context, updateAll bool, leaderToken LeaderToken, shouldSchedule bool) error {
func (s *Scheduler) cycle(ctx context.Context, updateAll bool, leaderToken LeaderToken, shouldSchedule bool) (*SchedulerResult, error) {
var overallSchedulerResult *SchedulerResult

log := ctxlogrus.Extract(ctx)
log = log.WithField("function", "cycle")
// Update job state.
updatedJobs, err := s.syncState(ctx)
if err != nil {
return err
return nil, err
}

// Only the leader may make decisions; exit if not leader.
if !s.leaderController.ValidateToken(leaderToken) {
return nil
return nil, nil
}

// If we've been asked to generate messages for all jobs, do so.
Expand All @@ -219,35 +225,26 @@ func (s *Scheduler) cycle(ctx context.Context, updateAll bool, leaderToken Leade
// Generate any events that came out of synchronising the db state.
events, err := s.generateUpdateMessages(ctx, updatedJobs, txn)
if err != nil {
return err
return nil, err
}

// Expire any jobs running on clusters that haven't heartbeated within the configured deadline.
expirationEvents, err := s.expireJobsIfNecessary(ctx, txn)
if err != nil {
return err
return nil, err
}
events = append(events, expirationEvents...)

// Schedule jobs.
if shouldSchedule {
overallSchedulerResult, err := s.schedulingAlgo.Schedule(ctx, txn, s.jobDb)
overallSchedulerResult, err = s.schedulingAlgo.Schedule(ctx, txn, s.jobDb)
if err != nil {
return err
}

// This check feels redundant. It feels like we shouldn't have got here without
// a leader token.
if leaderToken.leader {
// Report various metrics computed from the scheduling cycle.
// TODO: preemptible jobs, possibly other metrics
// TODO: Return this information and deal with metrics after the cycle?
s.metrics.ReportSchedulerResult(overallSchedulerResult)
return nil, err
}

resultEvents, err := s.eventsFromSchedulerResult(txn, overallSchedulerResult)
if err != nil {
return err
return nil, err
}
events = append(events, resultEvents...)
s.previousSchedulingRoundEnd = s.clock.Now()
Expand All @@ -259,11 +256,11 @@ func (s *Scheduler) cycle(ctx context.Context, updateAll bool, leaderToken Leade
}
start := s.clock.Now()
if err := s.publisher.PublishMessages(ctx, events, isLeader); err != nil {
return err
return nil, err
}
log.Infof("published %d events to pulsar in %s", len(events), s.clock.Since(start))
txn.Commit()
return nil
return overallSchedulerResult, nil
}

// syncState updates jobs in jobDb to match state in postgres and returns all updated jobs.
Expand Down
5 changes: 5 additions & 0 deletions internal/scheduler/scheduler_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ func NewSchedulerMetrics(config configuration.SchedulerMetricsConfig) *Scheduler
}
}

func (metrics *SchedulerMetrics) ResetGaugeMetrics() {
metrics.fairSharePerQueue.Reset()
metrics.actualSharePerQueue.Reset()
}

func (metrics *SchedulerMetrics) ReportScheduleCycleTime(cycleTime time.Duration) {
metrics.scheduleCycleTime.Observe(float64(cycleTime.Milliseconds()))
}
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ func TestScheduler_TestCycle(t *testing.T) {

// run a scheduler cycle
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
err = sched.cycle(ctx, false, sched.leaderController.GetToken(), true)
_, err = sched.cycle(ctx, false, sched.leaderController.GetToken(), true)
if tc.fetchError || tc.publishError || tc.scheduleError {
assert.Error(t, err)
} else {
Expand Down

0 comments on commit d1f3746

Please sign in to comment.