Skip to content

Commit

Permalink
Add a reason for preemption (#3881)
Browse files Browse the repository at this point in the history
* [POC] Add a cause field to JobPreemptedEvent

This will now be populated with a cause for the job to be preempted

Allowing you to distinguish:
 - If this Preemption was caused by API request
 - If this Preemption was caused by Fair share preemption - and the preempting job id
 - If this Preemption was caused by Urgency based preemption - and the preempting job id(s)
   - We cannot always narrow this down to a single job, so it will be a comma separated list of job ids

Signed-off-by: JamesMurkin <[email protected]>

* Proto changes

Signed-off-by: JamesMurkin <[email protected]>

* Remove changes to loadtest.go

Signed-off-by: JamesMurkin <[email protected]>

* Fix proto

Signed-off-by: JamesMurkin <[email protected]>

* Fix proto

Signed-off-by: JamesMurkin <[email protected]>

* Fix proto

Signed-off-by: JamesMurkin <[email protected]>

* Generate proto

Signed-off-by: JamesMurkin <[email protected]>

* Merge

Signed-off-by: JamesMurkin <[email protected]>

* Fix common

Signed-off-by: JamesMurkin <[email protected]>

* Add tests

Signed-off-by: JamesMurkin <[email protected]>

* Add unit tests

Signed-off-by: JamesMurkin <[email protected]>

* Improve tests

Signed-off-by: JamesMurkin <[email protected]>

* Fix simulator

Signed-off-by: JamesMurkin <[email protected]>

* Undo unrelated change

Signed-off-by: JamesMurkin <[email protected]>

* Set PreemptingJobId

Signed-off-by: JamesMurkin <[email protected]>

---------

Signed-off-by: JamesMurkin <[email protected]>
  • Loading branch information
JamesMurkin authored Nov 7, 2024
1 parent 0f87b66 commit 4416515
Show file tree
Hide file tree
Showing 26 changed files with 1,178 additions and 598 deletions.
9 changes: 5 additions & 4 deletions cmd/armadactl/cmd/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,19 @@ func preemptCmd() *cobra.Command {
func preemptJobCmd() *cobra.Command {
a := armadactl.New()
cmd := &cobra.Command{
Use: "job <queue> <job-set> <job-id>",
Use: "job <queue> <job-set> <job-id> <preempt-reason>",
Short: "Preempt an armada job.",
Long: `Preempt a job by providing it's queue, jobset and jobId.`,
Args: cobra.ExactArgs(3),
Long: `Preempt a job by providing it's queue, jobset, jobId and a preemption reason.`,
Args: cobra.ExactArgs(4),
PreRunE: func(cmd *cobra.Command, args []string) error {
return initParams(cmd, a.Params)
},
RunE: func(cmd *cobra.Command, args []string) error {
queue := args[0]
jobSetId := args[1]
jobId := args[2]
return a.Preempt(queue, jobSetId, jobId)
reason := args[3]
return a.Preempt(queue, jobSetId, jobId, reason)
},
}
return cmd
Expand Down
3 changes: 2 additions & 1 deletion internal/armadactl/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

// Preempt a job.
func (a *App) Preempt(queue string, jobSetId string, jobId string) (outerErr error) {
func (a *App) Preempt(queue string, jobSetId string, jobId string, reason string) (outerErr error) {
apiConnectionDetails := a.Params.ApiConnectionDetails

fmt.Fprintf(a.Out, "Requesting preemption of job matching queue: %s, job set: %s, and job Id: %s\n", queue, jobSetId, jobId)
Expand All @@ -25,6 +25,7 @@ func (a *App) Preempt(queue string, jobSetId string, jobId string) (outerErr err
JobIds: []string{jobId},
JobSetId: jobSetId,
Queue: queue,
Reason: reason,
})
if err != nil {
return errors.Wrapf(err, "error preempting job matching queue: %s, job set: %s, and job id: %s", queue, jobSetId, jobId)
Expand Down
2 changes: 2 additions & 0 deletions internal/common/ingest/testfixtures/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
DebugMsg = "sample debug message"
LeaseReturnedMsg = "lease returned error message"
UnschedulableMsg = "test pod is unschedulable"
PreemptionReason = "job preempted"
PartitionMarkerPartitionId = 456

ExecutorCordonReason = "bad executor"
Expand Down Expand Up @@ -367,6 +368,7 @@ var JobRunPreempted = &armadaevents.EventSequence_Event{
JobRunPreempted: &armadaevents.JobRunPreempted{
PreemptedJobId: JobId,
PreemptedRunId: RunId,
Reason: PreemptionReason,
},
},
}
Expand Down
2 changes: 1 addition & 1 deletion internal/lookoutingesterv2/instructions/instructions.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ func (c *InstructionConverter) handleJobRunPreempted(ts time.Time, event *armada
RunId: event.PreemptedRunId,
JobRunState: pointer.Int32(lookout.JobRunPreemptedOrdinal),
Finished: &ts,
Error: tryCompressError(event.PreemptedJobId, "preempted", c.compressor),
Error: tryCompressError(event.PreemptedJobId, event.Reason, c.compressor),
}
update.JobRunsToUpdate = append(update.JobRunsToUpdate, &jobRun)
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ var expectedPreemptedRun = model.UpdateJobRunInstruction{
RunId: testfixtures.RunId,
Finished: &testfixtures.BaseTime,
JobRunState: pointer.Int32(lookout.JobRunPreemptedOrdinal),
Error: []byte("preempted"),
Error: []byte(testfixtures.PreemptionReason),
}

var expectedCancelledRun = model.UpdateJobRunInstruction{
Expand Down
7 changes: 7 additions & 0 deletions internal/scheduler/nodedb/nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ func (nodeDb *NodeDb) ScheduleManyWithTxn(txn *memdb.Txn, gctx *context.GangSche
// order to find the best fit for this gang); clear out any remnants of
// previous attempts.
jctx.UnschedulableReason = ""
jctx.PreemptingJobId = ""

node, err := nodeDb.SelectNodeForJobWithTxn(txn, jctx)
if err != nil {
Expand Down Expand Up @@ -420,6 +421,7 @@ func (nodeDb *NodeDb) SelectNodeForJobWithTxn(txn *memdb.Txn, jctx *context.JobS
if node, err := nodeDb.selectNodeForPodWithItAtPriority(it, jctx, priority, true); err != nil {
return nil, err
} else {
jctx.PodSchedulingContext.SchedulingMethod = context.Rescheduled
return node, nil
}
}
Expand All @@ -440,6 +442,7 @@ func (nodeDb *NodeDb) SelectNodeForJobWithTxn(txn *memdb.Txn, jctx *context.JobS
}
if node != nil {
pctx.WellKnownNodeTypeName = awayNodeType.WellKnownNodeTypeName
pctx.SchedulingMethod = context.ScheduledAsAwayJob
pctx.ScheduledAway = true
return node, nil
}
Expand Down Expand Up @@ -499,6 +502,7 @@ func (nodeDb *NodeDb) selectNodeForJobWithTxnAtPriority(
} else if err := assertPodSchedulingContextNode(pctx, node); err != nil {
return nil, err
} else if node != nil {
pctx.SchedulingMethod = context.ScheduledWithoutPreemption
return node, nil
}

Expand All @@ -522,6 +526,7 @@ func (nodeDb *NodeDb) selectNodeForJobWithTxnAtPriority(
} else if err := assertPodSchedulingContextNode(pctx, node); err != nil {
return nil, err
} else if node != nil {
pctx.SchedulingMethod = context.ScheduledWithFairSharePreemption
return node, nil
}

Expand All @@ -535,6 +540,7 @@ func (nodeDb *NodeDb) selectNodeForJobWithTxnAtPriority(
} else if err := assertPodSchedulingContextNode(pctx, node); err != nil {
return nil, err
} else if node != nil {
pctx.SchedulingMethod = context.ScheduledWithUrgencyBasedPreemption
return node, nil
}

Expand Down Expand Up @@ -760,6 +766,7 @@ func (nodeDb *NodeDb) selectNodeForJobWithFairPreemption(txn *memdb.Txn, jctx *c
if priority > maxPriority {
maxPriority = priority
}
job.JobSchedulingContext.PreemptingJobId = jctx.JobId
}

selectedNode = nodeCopy
Expand Down
29 changes: 17 additions & 12 deletions internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ func (s *Scheduler) eventsFromSchedulerResult(result *scheduling.SchedulerResult
// EventsFromSchedulerResult generates necessary EventSequences from the provided SchedulerResult.
func EventsFromSchedulerResult(result *scheduling.SchedulerResult, time time.Time) ([]*armadaevents.EventSequence, error) {
eventSequences := make([]*armadaevents.EventSequence, 0, len(result.PreemptedJobs)+len(result.ScheduledJobs))
eventSequences, err := AppendEventSequencesFromPreemptedJobs(eventSequences, scheduling.PreemptedJobsFromSchedulerResult(result), time)
eventSequences, err := AppendEventSequencesFromPreemptedJobs(eventSequences, result.PreemptedJobs, time)
if err != nil {
return nil, err
}
Expand All @@ -481,29 +481,30 @@ func EventsFromSchedulerResult(result *scheduling.SchedulerResult, time time.Tim
return eventSequences, nil
}

func AppendEventSequencesFromPreemptedJobs(eventSequences []*armadaevents.EventSequence, jobs []*jobdb.Job, time time.Time) ([]*armadaevents.EventSequence, error) {
for _, job := range jobs {
run := job.LatestRun()
func AppendEventSequencesFromPreemptedJobs(eventSequences []*armadaevents.EventSequence, jctxs []*schedulercontext.JobSchedulingContext, time time.Time) ([]*armadaevents.EventSequence, error) {
for _, jctx := range jctxs {
run := jctx.Job.LatestRun()
if run == nil {
return nil, errors.Errorf("attempting to generate preempted eventSequences for job %s with no associated runs", job.Id())
return nil, errors.Errorf("attempting to generate preempted eventSequences for job %s with no associated runs", jctx.JobId)
}
eventSequences = append(eventSequences, &armadaevents.EventSequence{
Queue: job.Queue(),
JobSetName: job.Jobset(),
Events: createEventsForPreemptedJob(job.Id(), run.Id(), time),
Queue: jctx.Job.Queue(),
JobSetName: jctx.Job.Jobset(),
Events: createEventsForPreemptedJob(jctx.JobId, run.Id(), jctx.PreemptionDescription, time),
})
}
return eventSequences, nil
}

func createEventsForPreemptedJob(jobId string, runId string, time time.Time) []*armadaevents.EventSequence_Event {
func createEventsForPreemptedJob(jobId string, runId string, reason string, time time.Time) []*armadaevents.EventSequence_Event {
return []*armadaevents.EventSequence_Event{
{
Created: protoutil.ToTimestamp(time),
Event: &armadaevents.EventSequence_Event_JobRunPreempted{
JobRunPreempted: &armadaevents.JobRunPreempted{
PreemptedRunId: runId,
PreemptedJobId: jobId,
Reason: reason,
},
},
},
Expand All @@ -517,7 +518,9 @@ func createEventsForPreemptedJob(jobId string, runId string, time time.Time) []*
{
Terminal: true,
Reason: &armadaevents.Error_JobRunPreemptedError{
JobRunPreemptedError: &armadaevents.JobRunPreemptedError{},
JobRunPreemptedError: &armadaevents.JobRunPreemptedError{
Reason: reason,
},
},
},
},
Expand All @@ -533,7 +536,9 @@ func createEventsForPreemptedJob(jobId string, runId string, time time.Time) []*
{
Terminal: true,
Reason: &armadaevents.Error_JobRunPreemptedError{
JobRunPreemptedError: &armadaevents.JobRunPreemptedError{},
JobRunPreemptedError: &armadaevents.JobRunPreemptedError{
Reason: reason,
},
},
},
},
Expand Down Expand Up @@ -776,7 +781,7 @@ func (s *Scheduler) generateUpdateMessagesFromJob(ctx *armadacontext.Context, jo
}
} else if lastRun.PreemptRequested() && job.PriorityClass().Preemptible {
job = job.WithQueued(false).WithFailed(true).WithUpdatedRun(lastRun.WithoutTerminal().WithFailed(true))
events = append(events, createEventsForPreemptedJob(job.Id(), lastRun.Id(), s.clock.Now())...)
events = append(events, createEventsForPreemptedJob(job.Id(), lastRun.Id(), "Preempted - preemption requested via API", s.clock.Now())...)
}
}

Expand Down
5 changes: 5 additions & 0 deletions internal/scheduler/scheduling/context/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ type JobSchedulingContext struct {
// This is the node the pod is assigned to.
// This is only set for evicted jobs and is set alongside adding an additionalNodeSelector for the node
AssignedNodeId string
// Id of job that preempted this pod
PreemptingJobId string
// Description of the cause of preemption
PreemptionDescription string
}

func (jctx *JobSchedulingContext) IsHomeJob(currentPool string) bool {
Expand Down Expand Up @@ -101,6 +105,7 @@ func (jctx *JobSchedulingContext) Fail(unschedulableReason string) {
jctx.UnschedulableReason = unschedulableReason
if pctx := jctx.PodSchedulingContext; pctx != nil {
pctx.NodeId = ""
pctx.SchedulingMethod = None
}
}

Expand Down
13 changes: 13 additions & 0 deletions internal/scheduler/scheduling/context/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,17 @@ import (
"time"
)

type SchedulingType int

const (
None SchedulingType = iota
Rescheduled
ScheduledWithoutPreemption
ScheduledWithFairSharePreemption
ScheduledWithUrgencyBasedPreemption
ScheduledAsAwayJob
)

// PodSchedulingContext is returned by SelectAndBindNodeToPod and
// contains detailed information on the scheduling decision made for this pod.
type PodSchedulingContext struct {
Expand All @@ -27,6 +38,8 @@ type PodSchedulingContext struct {
NumExcludedNodesByReason map[string]int
// If this pod was scheduled as an away job
ScheduledAway bool
// The method of scheduling that was used to schedule this job
SchedulingMethod SchedulingType
}

func (pctx *PodSchedulingContext) IsSuccessful() bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*Sche
}
ctx.WithField("stage", "scheduling-algo").Infof("Finished unbinding preempted and evicted jobs")

PopulatePreemptionDescriptions(preemptedJobs, scheduledJobs)
schedulercontext.PrintJobSummary(ctx, "Preempting running jobs;", preemptedJobs)
schedulercontext.PrintJobSummary(ctx, "Scheduling new jobs;", scheduledJobs)
// TODO: Show failed jobs.
Expand Down
60 changes: 60 additions & 0 deletions internal/scheduler/scheduling/preemption_description.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package scheduling

import (
"fmt"
"strings"

armadaslices "github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/internal/scheduler/scheduling/context"
"github.com/armadaproject/armada/internal/server/configuration"
)

const (
unknownPreemptionCause = "Preempted by scheduler due to the job failing to reschedule - possibly node resource changed causing this job to be unschedulable"
unknownGangPreemptionCause = "Preempted by scheduler due to the job failing to reschedule - possibly another job in the gang was preempted or the node resource changed causing this job to be unschedulable"
fairSharePreemptionTemplate = "Preempted by scheduler using fair share preemption - preempting job %s"
urgencyPreemptionTemplate = "Preempted by scheduler using urgency preemption - preempting job %s"
urgencyPreemptionMultiJobTemplate = "Preempted by scheduler using urgency preemption - preemption caused by one of the following jobs %s"
)

func PopulatePreemptionDescriptions(preemptedJobs []*context.JobSchedulingContext, scheduledJobs []*context.JobSchedulingContext) {
jobsScheduledWithUrgencyBasedPreemptionByNode := map[string][]*context.JobSchedulingContext{}
for _, schedJctx := range scheduledJobs {
if schedJctx.PodSchedulingContext == nil {
continue
}
if schedJctx.PodSchedulingContext.SchedulingMethod != context.ScheduledWithUrgencyBasedPreemption {
continue
}

nodeId := schedJctx.PodSchedulingContext.NodeId
if _, ok := jobsScheduledWithUrgencyBasedPreemptionByNode[nodeId]; !ok {
jobsScheduledWithUrgencyBasedPreemptionByNode[nodeId] = []*context.JobSchedulingContext{}
}
jobsScheduledWithUrgencyBasedPreemptionByNode[nodeId] = append(jobsScheduledWithUrgencyBasedPreemptionByNode[nodeId], schedJctx)
}

for _, preemptedJctx := range preemptedJobs {
if preemptedJctx.PreemptingJobId != "" {
preemptedJctx.PreemptionDescription = fmt.Sprintf(fairSharePreemptionTemplate, preemptedJctx.PreemptingJobId)
} else {
potentialPreemptingJobs := jobsScheduledWithUrgencyBasedPreemptionByNode[preemptedJctx.GetAssignedNodeId()]

if len(potentialPreemptingJobs) == 0 {
_, isGang := preemptedJctx.Job.Annotations()[configuration.GangIdAnnotation]
if isGang {
preemptedJctx.PreemptionDescription = fmt.Sprintf(unknownGangPreemptionCause)
} else {
preemptedJctx.PreemptionDescription = fmt.Sprintf(unknownPreemptionCause)
}
} else if len(potentialPreemptingJobs) == 1 {
preemptedJctx.PreemptionDescription = fmt.Sprintf(urgencyPreemptionTemplate, potentialPreemptingJobs[0].JobId)
} else {
jobIds := armadaslices.Map(potentialPreemptingJobs, func(job *context.JobSchedulingContext) string {
return job.JobId
})
preemptedJctx.PreemptionDescription = fmt.Sprintf(urgencyPreemptionMultiJobTemplate, strings.Join(jobIds, ","))
}
}
}
}
Loading

0 comments on commit 4416515

Please sign in to comment.