Skip to content

Commit

Permalink
Simulator Improvements (#4005)
Browse files Browse the repository at this point in the history
* improvements

Signed-off-by: Chris Martin <[email protected]>

* schedule by pool

Signed-off-by: Chris Martin <[email protected]>

* wip

Signed-off-by: Chris Martin <[email protected]>

* fix

Signed-off-by: Chris Martin <[email protected]>

* lint

Signed-off-by: Chris Martin <[email protected]>

* lint

Signed-off-by: Chris Martin <[email protected]>

* wip

Signed-off-by: Chris Martin <[email protected]>

* lint

Signed-off-by: Chris Martin <[email protected]>

* wip

Signed-off-by: Chris Martin <[email protected]>

* remove test

Signed-off-by: Chris Martin <[email protected]>

* remove test

Signed-off-by: Chris Martin <[email protected]>

* fix tests

Signed-off-by: Chris Martin <[email protected]>

* remove ability to run multiple simulations in process

Signed-off-by: Chris Martin <[email protected]>

* wip

Signed-off-by: Chris Martin <[email protected]>

* wip

Signed-off-by: Chris Martin <[email protected]>

* wip

Signed-off-by: Chris Martin <[email protected]>

* wip

Signed-off-by: Chris Martin <[email protected]>

* lint

Signed-off-by: Chris Martin <[email protected]>

* wip

Signed-off-by: Chris Martin <[email protected]>

* wip

Signed-off-by: Chris Martin <[email protected]>

* Fix simulator tests

Signed-off-by: Chris Martin <[email protected]>

* wip

Signed-off-by: Chris Martin <[email protected]>

* wip

Signed-off-by: Chris Martin <[email protected]>

* fixed test clusters

Signed-off-by: Chris Martin <[email protected]>

* lint

Signed-off-by: Chris Martin <[email protected]>

* go mod tidy

Signed-off-by: Chris Martin <[email protected]>

* update proto

Signed-off-by: Chris Martin <[email protected]>

* profiling behind cmd line flag

Signed-off-by: Chris Martin <[email protected]>

* lint

Signed-off-by: Chris Martin <[email protected]>

---------

Signed-off-by: Chris Martin <[email protected]>
  • Loading branch information
d80tb7 authored Oct 15, 2024
1 parent 0c4bfc3 commit 167811d
Show file tree
Hide file tree
Showing 21 changed files with 1,119 additions and 1,692 deletions.
204 changes: 84 additions & 120 deletions cmd/simulator/cmd/root.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
package cmd

import (
"math"
"fmt"
"os"
"runtime/pprof"
"time"

"github.com/pkg/errors"
"github.com/armadaproject/armada/internal/scheduler/simulator/sink"

log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"golang.org/x/exp/maps"

"github.com/armadaproject/armada/internal/common/armadacontext"
"github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/internal/scheduler/configuration"
"github.com/armadaproject/armada/internal/scheduler/simulator"
"github.com/armadaproject/armada/internal/scheduler/testfixtures"
)
Expand All @@ -21,42 +22,42 @@ func RootCmd() *cobra.Command {
Short: "Simulate running jobs on Armada.",
RunE: runSimulations,
}
// cmd.Flags().BoolP("verbose", "v", false, "Log detailed output to console.")
cmd.Flags().String("clusters", "", "Glob pattern specifying cluster configurations to simulate.")
cmd.Flags().String("workloads", "", "Glob pattern specifying workloads to simulate.")
cmd.Flags().String("configs", "", "Glob pattern specifying scheduler configurations to simulate. Uses a default config if not provided.")
cmd.Flags().String("clusters", "", "Path specifying cluster configurations to simulate.")
cmd.Flags().String("workloads", "", "Path specifying workloads to simulate.")
cmd.Flags().String("config", "", "Path to scheduler configurations to simulate. Uses a default config if not provided.")
cmd.Flags().Bool("showSchedulerLogs", false, "Show scheduler logs.")
cmd.Flags().Int("logInterval", 0, "Log summary statistics every this many events. Disabled if 0.")
cmd.Flags().String("eventsOutputFilePath", "", "Path of file to write events to.")
cmd.Flags().String("outputDir", "", "Path to directory where output files will be written. Defaults to timestamped directory.")
cmd.Flags().Bool("overwriteOutputDir", false, "Overwrite output director if it already exists. If false then an error will be thrown if the directory already exists")
cmd.Flags().Bool("enableFastForward", false, "Skips schedule events when we're in a steady state")
cmd.Flags().Int("hardTerminationMinutes", math.MaxInt, "Limit the time simulated")
cmd.Flags().Int("hardTerminationMinutes", -1, "Limit the time simulated. -1 for no limit.")
cmd.Flags().Int("schedulerCyclePeriodSeconds", 10, "How often we should trigger schedule events")
cmd.Flags().Bool("profile", false, "If true then the simulator will be profiled and a profiling file written to the output directory")
return cmd
}

func runSimulations(cmd *cobra.Command, args []string) error {
// Get command-line arguments.
clusterPattern, err := cmd.Flags().GetString("clusters")
clusterFile, err := cmd.Flags().GetString("clusters")
if err != nil {
return err
}
workloadPattern, err := cmd.Flags().GetString("workloads")
workloadFile, err := cmd.Flags().GetString("workloads")
if err != nil {
return err
}
configPattern, err := cmd.Flags().GetString("configs")
configFile, err := cmd.Flags().GetString("config")
if err != nil {
return err
}
showSchedulerLogs, err := cmd.Flags().GetBool("showSchedulerLogs")
if err != nil {
return err
}
logInterval, err := cmd.Flags().GetInt("logInterval")
outputDirPath, err := cmd.Flags().GetString("outputDir")
if err != nil {
return err
}
filePath, err := cmd.Flags().GetString("eventsOutputFilePath")
overwriteDirIfExists, err := cmd.Flags().GetBool("overwriteOutputDir")
if err != nil {
return err
}
Expand All @@ -73,140 +74,103 @@ func runSimulations(cmd *cobra.Command, args []string) error {
if err != nil {
return err
}
shouldProfile, err := cmd.Flags().GetBool("profile")
if err != nil {
return err
}

if outputDirPath == "" {
outputDirPath = fmt.Sprintf("armada_simulator_%s", time.Now().Format("2006_01_02_15_04_05"))
}

if pathExists(outputDirPath) && overwriteDirIfExists {
err := os.Remove(outputDirPath)
if err != nil {
return err
}
} else if pathExists(outputDirPath) {
return fmt.Errorf("output directory %s already exists and overwriteOutputDir not set", outputDirPath)
}

err = os.MkdirAll(outputDirPath, 0o777)
if err != nil {
return err
}

// Load test specs. and config.
clusterSpecs, err := simulator.ClusterSpecsFromPattern(clusterPattern)
clusterSpec, err := simulator.ClusterSpecFromFilePath(clusterFile)
if err != nil {
return err
}
workloadSpecs, err := simulator.WorkloadsFromPattern(workloadPattern)
workloadSpec, err := simulator.WorkloadSpecFromFilePath(workloadFile)
if err != nil {
return err
}
var schedulingConfigsByFilePath map[string]configuration.SchedulingConfig
if configPattern == "" {
// Use default test config if no pattern is provided.
schedulingConfigsByFilePath = map[string]configuration.SchedulingConfig{
"default": testfixtures.TestSchedulingConfig(),
}
} else {
schedulingConfigsByFilePath, err = simulator.SchedulingConfigsByFilePathFromPattern(configPattern)

schedulingConfig := testfixtures.TestSchedulingConfig()
if configFile != "" {
schedulingConfig, err = simulator.SchedulingConfigFromFilePath(configFile)
if err != nil {
return err
}
}
if len(clusterSpecs)*len(workloadSpecs)*len(schedulingConfigsByFilePath) > 1 && filePath != "" {
return errors.Errorf("cannot save multiple simulations to file")
}

ctx := armadacontext.Background()
outputSink, err := sink.NewParquetSink(outputDirPath)
if err != nil {
return err
}
defer outputSink.Close(ctx)

ctx.Info("Armada simulator")
ctx.Infof("ClusterSpecs: %v", slices.Map(clusterSpecs, func(clusperSpec *simulator.ClusterSpec) string { return clusperSpec.Name }))
ctx.Infof("WorkloadSpecs: %v", slices.Map(workloadSpecs, func(workloadSpec *simulator.WorkloadSpec) string { return workloadSpec.Name }))
ctx.Infof("SchedulingConfigs: %v", maps.Keys(schedulingConfigsByFilePath))
ctx.Infof("ClusterSpec: %v", clusterSpec.Name)
ctx.Infof("WorkloadSpecs: %v", workloadSpec.Name)
ctx.Infof("SchedulingConfig: %v", configFile)
ctx.Infof("OutputDir: %v", outputDirPath)

var fileWriter *simulator.Writer
file, err := os.Create(filePath)
s, err := simulator.NewSimulator(
clusterSpec, workloadSpec, schedulingConfig, enableFastForward, hardTerminationMinutes, schedulerCyclePeriodSeconds, outputSink)
if err != nil {
return err
}
defer func() {
if err = file.Close(); err != nil {
ctx.Errorf("failed to close file: %s", err)
return

if shouldProfile {
profilingFile := outputDirPath + "/profile"
log.Infof("Will write profiling information to %s", profilingFile)
f, err := os.Create(profilingFile)
if err != nil {
log.Fatal(err)
}
}()

// Setup a simulator for each combination of (clusterSpec, workloadSpec, schedulingConfig).
simulators := make([]*simulator.Simulator, 0)
metricsCollectors := make([]*simulator.MetricsCollector, 0)
stateTransitionChannels := make([]<-chan simulator.StateTransition, 0)
schedulingConfigPaths := make([]string, 0)
for _, clusterSpec := range clusterSpecs {
for _, workloadSpec := range workloadSpecs {
for schedulingConfigPath, schedulingConfig := range schedulingConfigsByFilePath {
if s, err := simulator.NewSimulator(clusterSpec, workloadSpec, schedulingConfig, enableFastForward, hardTerminationMinutes, schedulerCyclePeriodSeconds); err != nil {
return err
} else {
if !showSchedulerLogs {
s.SuppressSchedulerLogs = true
} else {
ctx.Info("Showing scheduler logs")
}
simulators = append(simulators, s)
mc := simulator.NewMetricsCollector(s.StateTransitions())
mc.LogSummaryInterval = logInterval
metricsCollectors = append(metricsCollectors, mc)

if filePath != "" {
fw, err := simulator.NewWriter(file, s.StateTransitions())
if err != nil {
return errors.WithStack(err)
}
fileWriter = fw
}
stateTransitionChannels = append(stateTransitionChannels, s.StateTransitions())
schedulingConfigPaths = append(schedulingConfigPaths, schedulingConfigPath)
}
}
err = pprof.StartCPUProfile(f)
if err != nil {
log.Fatal(err)
}
defer pprof.StopCPUProfile()
}

if !showSchedulerLogs {
s.SuppressSchedulerLogs = true
} else {
ctx.Info("Showing scheduler logs")
}

// Run simulators.
g, ctx := armadacontext.ErrGroup(ctx)
for _, s := range simulators {
s := s
g.Go(func() error {
return s.Run(ctx)
})
}

// Log events to stdout.
for _, c := range stateTransitionChannels {
c := c
g.Go(func() error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case stateTransition, ok := <-c:
if !ok {
return nil
}
ctx.Debug(*stateTransition.EventSequence.Events[0].Created, simulator.EventSequenceSummary(stateTransition.EventSequence))
}
}
})
}

// Run file writer
g.Go(func() error {
return fileWriter.Run(ctx)
return s.Run(ctx)
})

// Run metric collectors.
for _, mc := range metricsCollectors {
mc := mc
g.Go(func() error {
return mc.Run(ctx)
})
}

// Wait for simulations to complete.
if err := g.Wait(); err != nil {
return err
}

// Log overall statistics.
for i, mc := range metricsCollectors {
s := simulators[i]
schedulingConfigPath := schedulingConfigPaths[i]
ctx.Infof("Simulation result")
ctx.Infof("ClusterSpec: %s", s.ClusterSpec.Name)
ctx.Infof("WorkloadSpec: %s", s.WorkloadSpec.Name)
ctx.Infof("SchedulingConfig: %s", schedulingConfigPath)
ctx.Info(mc.String())
}

return nil
}

func pathExists(path string) bool {
_, err := os.Stat(path)
if os.IsNotExist(err) {
return false
}
return err == nil
}
Loading

0 comments on commit 167811d

Please sign in to comment.