Skip to content

Commit

Permalink
Improvements to simulation following testing (#260) (#4023)
Browse files Browse the repository at this point in the history
* fix test

* lint

Co-authored-by: Christopher Martin <[email protected]>
  • Loading branch information
d80tb7 and svc-oeg-aws2github authored Oct 24, 2024
1 parent a8cdc22 commit 741f897
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 73 deletions.
2 changes: 1 addition & 1 deletion cmd/simulator/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func runSimulations(cmd *cobra.Command, args []string) error {
}

if pathExists(outputDirPath) && overwriteDirIfExists {
err := os.Remove(outputDirPath)
err := os.RemoveAll(outputDirPath)
if err != nil {
return err
}
Expand Down
5 changes: 3 additions & 2 deletions internal/scheduler/simulator/simulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (s *Simulator) Run(ctx *armadacontext.Context) error {
return err
}
}
if time.Now().Unix()-lastLogTime.Unix() >= 15 {
if time.Now().Unix()-lastLogTime.Unix() >= 5 {
ctx.Infof("Simulator time %s", s.time)
lastLogTime = s.time
}
Expand Down Expand Up @@ -518,6 +518,7 @@ func (s *Simulator) handleScheduleEvent(ctx *armadacontext.Context) error {
return err
}
}
sctx.UpdateFairShares()
constraints := schedulerconstraints.NewSchedulingConstraints(pool, totalResources, s.schedulingConfig, nil)
sch := scheduling.NewPreemptingQueueScheduler(
sctx,
Expand All @@ -544,7 +545,7 @@ func (s *Simulator) handleScheduleEvent(ctx *armadacontext.Context) error {
return err
}

err = s.sink.OnCycleEnd(result)
err = s.sink.OnCycleEnd(s.time, result)
if err != nil {
return err
}
Expand Down
64 changes: 0 additions & 64 deletions internal/scheduler/simulator/sink/fair_share_writer.go

This file was deleted.

101 changes: 101 additions & 0 deletions internal/scheduler/simulator/sink/queue_stats_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package sink

import (
"os"
"time"

parquetWriter "github.com/xitongsys/parquet-go/writer"

"github.com/armadaproject/armada/internal/common/armadacontext"
"github.com/armadaproject/armada/internal/scheduler/scheduling"
"github.com/armadaproject/armada/internal/scheduler/scheduling/context"
)

type QueueStatsRow struct {
Ts int64 `parquet:"name=ts, type=INT64"`
Queue string `parquet:"name=queue, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"`
Pool string `parquet:"name=pool, type=BYTE_ARRAY, convertedtype=UTF8"`
FairShare float64 `parquet:"name=fair_share, type=DOUBLE"`
AdjustedFairShare float64 `parquet:"name=adjusted_fair_share, type=DOUBLE"`
ActualShare float64 `parquet:"name=actual_share, type=DOUBLE"`
CpuShare float64 `parquet:"name=cpu_share, type=DOUBLE"`
MemoryShare float64 `parquet:"name=memory_share, type=DOUBLE"`
GpuShare float64 `parquet:"name=gpu_share, type=DOUBLE"`
AllocatedCPU int `parquet:"name=allocated_cpu, type=INT64"`
AllocatedMemory int `parquet:"name=allocated_memory, type=INT64"`
AllocatedGPU int `parquet:"name=allocated_gpu, type=INT64"`
NumScheduled int `parquet:"name=num_scheduled, type=INT32"`
NumPreempted int `parquet:"name=num_preempted, type=INT32"`
NumEvicted int `parquet:"name=num_evicted, type=INT32"`
}

type QueueStatsWriter struct {
writer *parquetWriter.ParquetWriter
}

func NewQueueStatsWriter(path string) (*QueueStatsWriter, error) {
fileWriter, err := os.Create(path + "/queue_stats.parquet")
if err != nil {
return nil, err
}
pw, err := parquetWriter.NewParquetWriterFromWriter(fileWriter, new(QueueStatsRow), 1)
if err != nil {
return nil, err
}
return &QueueStatsWriter{
writer: pw,
}, nil
}

func (j *QueueStatsWriter) Update(time time.Time, result *scheduling.SchedulerResult) error {
// Work out number of preemptions per queue
preemptedJobsByQueue := map[string]int{}
for _, job := range result.PreemptedJobs {
preemptedJobsByQueue[job.Job.Queue()] = preemptedJobsByQueue[job.Job.Queue()] + 1
}

for _, sctx := range result.SchedulingContexts {
for _, qctx := range sctx.QueueSchedulingContexts {
row := QueueStatsRow{
Ts: time.Unix(),
Queue: qctx.Queue,
Pool: sctx.Pool,
FairShare: qctx.FairShare,
AdjustedFairShare: qctx.AdjustedFairShare,
ActualShare: sctx.FairnessCostProvider.UnweightedCostFromQueue(qctx),
CpuShare: calculateResourceShare(sctx, qctx, "cpu"),
MemoryShare: calculateResourceShare(sctx, qctx, "memory"),
GpuShare: calculateResourceShare(sctx, qctx, "nvidia.com/gpu"),
AllocatedCPU: allocatedResources(qctx, "cpu"),
AllocatedMemory: allocatedResources(qctx, "memory") / (1024 * 1024), // in MB
AllocatedGPU: allocatedResources(qctx, "nvidia.com/gpu"),
NumScheduled: len(qctx.SuccessfulJobSchedulingContexts),
NumPreempted: preemptedJobsByQueue[qctx.Queue],
NumEvicted: len(qctx.EvictedJobsById),
}
err := j.writer.Write(row)
if err != nil {
return err
}
}
}
return nil
}

func (j *QueueStatsWriter) Close(ctx *armadacontext.Context) {
err := j.writer.WriteStop()
if err != nil {
ctx.Warnf("Could not cleanly close queue_stats parquet file: %s", err)
}
}

func calculateResourceShare(sctx *context.SchedulingContext, qctx *context.QueueSchedulingContext, resource string) float64 {
total := sctx.Allocated.Resources[resource]
allocated := qctx.Allocated.Resources[resource]
return allocated.AsApproximateFloat64() / total.AsApproximateFloat64()
}

func allocatedResources(qctx *context.QueueSchedulingContext, resource string) int {
allocated := qctx.Allocated.Resources[resource]
return int(allocated.AsApproximateFloat64())
}
14 changes: 8 additions & 6 deletions internal/scheduler/simulator/sink/sink.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,30 @@
package sink

import (
"time"

"github.com/armadaproject/armada/internal/common/armadacontext"
"github.com/armadaproject/armada/internal/scheduler/scheduling"
"github.com/armadaproject/armada/internal/scheduler/simulator/model"
)

type Sink interface {
OnNewStateTransitions(transitions []*model.StateTransition) error
OnCycleEnd(result *scheduling.SchedulerResult) error
OnCycleEnd(time time.Time, result *scheduling.SchedulerResult) error
Close(ctx *armadacontext.Context)
}

type ParquetSink struct {
jobWriter *JobWriter
fairShareWriter *FairShareWriter
fairShareWriter *QueueStatsWriter
}

func NewParquetSink(outputDir string) (*ParquetSink, error) {
jobWriter, err := NewJobWriter(outputDir)
if err != nil {
return nil, err
}
fairShareWriter, err := NewFairShareWriter(outputDir)
fairShareWriter, err := NewQueueStatsWriter(outputDir)
if err != nil {
return nil, err
}
Expand All @@ -42,8 +44,8 @@ func (s *ParquetSink) OnNewStateTransitions(transitions []*model.StateTransition
return nil
}

func (s *ParquetSink) OnCycleEnd(result *scheduling.SchedulerResult) error {
err := s.fairShareWriter.Update(result)
func (s *ParquetSink) OnCycleEnd(time time.Time, result *scheduling.SchedulerResult) error {
err := s.fairShareWriter.Update(time, result)
if err != nil {
return err
}
Expand All @@ -61,7 +63,7 @@ func (s NullSink) OnNewStateTransitions(_ []*model.StateTransition) error {
return nil
}

func (s NullSink) OnCycleEnd(_ *scheduling.SchedulerResult) error {
func (s NullSink) OnCycleEnd(_ time.Time, _ *scheduling.SchedulerResult) error {
return nil
}

Expand Down

0 comments on commit 741f897

Please sign in to comment.