Skip to content

Commit

Permalink
feat(scheduler): exposing the deleted resource ttl as a CLI param (#5994
Browse files Browse the repository at this point in the history
)

* exposing the deleted resource ttl as a CLI param

* switching to seconds and adding a little test for the cleanup functions

* merge v2

* Revert "merge v2"

This reverts commit 4cf8644.
  • Loading branch information
driev authored Nov 4, 2024
1 parent 2c79e77 commit a17e12d
Show file tree
Hide file tree
Showing 11 changed files with 107 additions and 81 deletions.
10 changes: 7 additions & 3 deletions scheduler/cmd/scheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ var (
tracingConfigPath string
dbPath string
nodeID string
allowPlaintxt bool //scheduler server
allowPlaintxt bool // scheduler server
autoscalingDisabled bool
kafkaConfigPath string
schedulerReadyTimeoutSeconds uint
deletedResourceTTLSeconds uint
)

const (
Expand Down Expand Up @@ -115,6 +116,9 @@ func init() {

// Timeout for scheduler to be ready
flag.UintVar(&schedulerReadyTimeoutSeconds, "scheduler-ready-timeout-seconds", 300, "Timeout for scheduler to be ready")

// This TTL is set in badger DB
flag.UintVar(&deletedResourceTTLSeconds, "deleted-resource-ttl-seconds", 86400, "TTL for deleted experiments and pipelines (in seconds)")
}

func getNamespace() string {
Expand Down Expand Up @@ -211,11 +215,11 @@ func main() {
// Do here after other services created so eventHub events will be handled on pipeline/experiment load
// If we start earlier events will be sent but not received by services that start listening "late" to eventHub
if dbPath != "" {
err := ps.InitialiseOrRestoreDB(dbPath)
err := ps.InitialiseOrRestoreDB(dbPath, deletedResourceTTLSeconds)
if err != nil {
log.WithError(err).Fatalf("Failed to initialise pipeline db at %s", dbPath)
}
err = es.InitialiseOrRestoreDB(dbPath)
err = es.InitialiseOrRestoreDB(dbPath, deletedResourceTTLSeconds)
if err != nil {
log.WithError(err).Fatalf("Failed to initialise experiment db at %s", dbPath)
}
Expand Down
44 changes: 29 additions & 15 deletions scheduler/pkg/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ func TestLoadModel(t *testing.T) {
},
},
},
model: &pb.Model{Meta: &pb.MetaData{Name: "model1"},
model: &pb.Model{
Meta: &pb.MetaData{Name: "model1"},
ModelSpec: &pb.ModelSpec{
Uri: "gs://model",
Requirements: []string{"sklearn"},
Expand Down Expand Up @@ -384,39 +385,54 @@ func TestUnloadModel(t *testing.T) {
{
name: "Simple",
req: []*pba.AgentSubscribeRequest{
{ServerName: "server1", ReplicaIdx: 0, Shared: true, AvailableMemoryBytes: 1000,
ReplicaConfig: &pba.ReplicaConfig{InferenceSvc: "server1", InferenceHttpPort: 1, Capabilities: []string{"sklearn"}}}},
{
ServerName: "server1", ReplicaIdx: 0, Shared: true, AvailableMemoryBytes: 1000,
ReplicaConfig: &pba.ReplicaConfig{InferenceSvc: "server1", InferenceHttpPort: 1, Capabilities: []string{"sklearn"}},
},
},
model: &pb.Model{Meta: &pb.MetaData{Name: "model1"}, ModelSpec: &pb.ModelSpec{Uri: "gs://model", Requirements: []string{"sklearn"}, MemoryBytes: &smallMemory}, DeploymentSpec: &pb.DeploymentSpec{Replicas: 1}},
code: codes.OK,
modelState: store.ModelTerminated,
},
{
name: "Multiple",
req: []*pba.AgentSubscribeRequest{
{ServerName: "server1", ReplicaIdx: 0, Shared: true, AvailableMemoryBytes: 1000,
ReplicaConfig: &pba.ReplicaConfig{InferenceSvc: "server1", InferenceHttpPort: 1, Capabilities: []string{"sklearn", "xgboost"}}}},
{
ServerName: "server1", ReplicaIdx: 0, Shared: true, AvailableMemoryBytes: 1000,
ReplicaConfig: &pba.ReplicaConfig{InferenceSvc: "server1", InferenceHttpPort: 1, Capabilities: []string{"sklearn", "xgboost"}},
},
},
model: &pb.Model{Meta: &pb.MetaData{Name: "model1"}, ModelSpec: &pb.ModelSpec{Uri: "gs://model", Requirements: []string{"sklearn", "xgboost"}, MemoryBytes: &smallMemory}, DeploymentSpec: &pb.DeploymentSpec{Replicas: 1}},
code: codes.OK,
modelState: store.ModelTerminated,
},
{
name: "TwoReplicas",
req: []*pba.AgentSubscribeRequest{
{ServerName: "server1", ReplicaIdx: 0, Shared: true, AvailableMemoryBytes: 1000,
ReplicaConfig: &pba.ReplicaConfig{InferenceSvc: "server1", InferenceHttpPort: 1, Capabilities: []string{"sklearn"}}},
{ServerName: "server1", ReplicaIdx: 1, Shared: true, AvailableMemoryBytes: 1000,
ReplicaConfig: &pba.ReplicaConfig{InferenceSvc: "server1", InferenceHttpPort: 1, Capabilities: []string{"sklearn"}}}},
{
ServerName: "server1", ReplicaIdx: 0, Shared: true, AvailableMemoryBytes: 1000,
ReplicaConfig: &pba.ReplicaConfig{InferenceSvc: "server1", InferenceHttpPort: 1, Capabilities: []string{"sklearn"}},
},
{
ServerName: "server1", ReplicaIdx: 1, Shared: true, AvailableMemoryBytes: 1000,
ReplicaConfig: &pba.ReplicaConfig{InferenceSvc: "server1", InferenceHttpPort: 1, Capabilities: []string{"sklearn"}},
},
},
model: &pb.Model{Meta: &pb.MetaData{Name: "model1"}, ModelSpec: &pb.ModelSpec{Uri: "gs://model", Requirements: []string{"sklearn"}, MemoryBytes: &smallMemory}, DeploymentSpec: &pb.DeploymentSpec{Replicas: 2}},
code: codes.OK,
modelState: store.ModelTerminated,
},
{
name: "NotExist",
req: []*pba.AgentSubscribeRequest{
{ServerName: "server1", ReplicaIdx: 0, Shared: true, AvailableMemoryBytes: 1000,
ReplicaConfig: &pba.ReplicaConfig{InferenceSvc: "server1", InferenceHttpPort: 1, Capabilities: []string{"sklearn"}}}},
{
ServerName: "server1", ReplicaIdx: 0, Shared: true, AvailableMemoryBytes: 1000,
ReplicaConfig: &pba.ReplicaConfig{InferenceSvc: "server1", InferenceHttpPort: 1, Capabilities: []string{"sklearn"}},
},
},
model: nil,
code: codes.FailedPrecondition},
code: codes.FailedPrecondition,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
Expand Down Expand Up @@ -518,7 +534,6 @@ func TestLoadPipeline(t *testing.T) {
}
})
}

}

func TestUnloadPipeline(t *testing.T) {
Expand Down Expand Up @@ -562,7 +577,7 @@ func TestUnloadPipeline(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
path := fmt.Sprintf("%s/db", t.TempDir())
_ = test.server.pipelineHandler.(*pipeline.PipelineStore).InitialiseOrRestoreDB(path)
_ = test.server.pipelineHandler.(*pipeline.PipelineStore).InitialiseOrRestoreDB(path, 10)
if test.loadReq != nil {
err := test.server.pipelineHandler.AddPipeline(test.loadReq.Pipeline)
g.Expect(err).To(BeNil())
Expand All @@ -575,7 +590,6 @@ func TestUnloadPipeline(t *testing.T) {
}
})
}

}

func TestPipelineStatus(t *testing.T) {
Expand Down
19 changes: 11 additions & 8 deletions scheduler/pkg/store/experiment/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ the Change License after the Change Date as each is defined in accordance with t
package experiment

import (
"time"

"github.com/dgraph-io/badger/v3"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
Expand All @@ -25,19 +27,21 @@ const (
)

type ExperimentDBManager struct {
db *badger.DB
logger logrus.FieldLogger
db *badger.DB
logger logrus.FieldLogger
deletedResourceTTL time.Duration
}

func newExperimentDbManager(path string, logger logrus.FieldLogger) (*ExperimentDBManager, error) {
func newExperimentDbManager(path string, logger logrus.FieldLogger, deletedResourceTTL uint) (*ExperimentDBManager, error) {
db, err := utils.Open(path, logger, "experimentDb")
if err != nil {
return nil, err
}

edb := &ExperimentDBManager{
db: db,
logger: logger,
db: db,
logger: logger,
deletedResourceTTL: time.Duration(deletedResourceTTL * uint(time.Second)),
}

version, err := edb.getVersion()
Expand Down Expand Up @@ -73,9 +77,8 @@ func (edb *ExperimentDBManager) save(experiment *Experiment) error {
return err
})
} else {
ttl := utils.DeletedResourceTTL
return edb.db.Update(func(txn *badger.Txn) error {
e := badger.NewEntry([]byte(experiment.Name), experimentBytes).WithTTL(ttl)
e := badger.NewEntry([]byte(experiment.Name), experimentBytes).WithTTL(edb.deletedResourceTTL)
err = txn.SetEntry(e)
return err
})
Expand Down Expand Up @@ -119,7 +122,7 @@ func (edb *ExperimentDBManager) restore(
}
experiment := CreateExperimentFromSnapshot(&snapshot)
if experiment.Deleted {
experiment.DeletedAt = utils.GetDeletedAt(item)
experiment.DeletedAt = utils.GetDeletedAt(item, edb.deletedResourceTTL)
err = stopExperimentCb(experiment)
} else {
// otherwise attempt to start the experiment
Expand Down
18 changes: 9 additions & 9 deletions scheduler/pkg/store/experiment/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func TestSaveWithTTL(t *testing.T) {

path := fmt.Sprintf("%s/db", t.TempDir())
logger := log.New()
db, err := newExperimentDbManager(getExperimentDbFolder(path), logger)
db, err := newExperimentDbManager(getExperimentDbFolder(path), logger, 10)
g.Expect(err).To(BeNil())
err = db.save(experiment)
g.Expect(err).To(BeNil())
Expand Down Expand Up @@ -307,7 +307,7 @@ func TestSaveAndRestore(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
path := fmt.Sprintf("%s/db", t.TempDir())
logger := log.New()
db, err := newExperimentDbManager(getExperimentDbFolder(path), logger)
db, err := newExperimentDbManager(getExperimentDbFolder(path), logger, 10)
g.Expect(err).To(BeNil())
for _, p := range test.experiments {
err := db.save(p)
Expand All @@ -317,7 +317,7 @@ func TestSaveAndRestore(t *testing.T) {
g.Expect(err).To(BeNil())

es := NewExperimentServer(log.New(), nil, nil, nil)
err = es.InitialiseOrRestoreDB(path)
err = es.InitialiseOrRestoreDB(path, 10)
g.Expect(err).To(BeNil())
for idx, p := range test.experiments {
if !test.errors[idx] {
Expand Down Expand Up @@ -379,7 +379,7 @@ func TestSaveAndRestoreDeletedExperiments(t *testing.T) {
g.Expect(test.experiment.Deleted).To(BeTrue(), "this is a test for deleted experiments")
path := fmt.Sprintf("%s/db", t.TempDir())
logger := log.New()
edb, err := newExperimentDbManager(getExperimentDbFolder(path), logger)
edb, err := newExperimentDbManager(getExperimentDbFolder(path), logger, 10)
g.Expect(err).To(BeNil())
if !test.withTTL {
err = saveWithOutTTL(&test.experiment, edb.db)
Expand All @@ -392,7 +392,7 @@ func TestSaveAndRestoreDeletedExperiments(t *testing.T) {
g.Expect(err).To(BeNil())

es := NewExperimentServer(log.New(), nil, nil, nil)
err = es.InitialiseOrRestoreDB(path)
err = es.InitialiseOrRestoreDB(path, 10)
g.Expect(err).To(BeNil())

if !test.withTTL {
Expand Down Expand Up @@ -538,7 +538,7 @@ func TestGetExperimentFromDB(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
path := fmt.Sprintf("%s/db", t.TempDir())
logger := log.New()
db, err := newExperimentDbManager(getExperimentDbFolder(path), logger)
db, err := newExperimentDbManager(getExperimentDbFolder(path), logger, 10)
g.Expect(err).To(BeNil())
for _, p := range test.experiments {
err := db.save(p)
Expand Down Expand Up @@ -678,7 +678,7 @@ func TestDeleteExperimentFromDB(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
path := fmt.Sprintf("%s/db", t.TempDir())
logger := log.New()
db, err := newExperimentDbManager(getExperimentDbFolder(path), logger)
db, err := newExperimentDbManager(getExperimentDbFolder(path), logger, 10)
g.Expect(err).To(BeNil())
for _, p := range test.experiments {
err := db.save(p)
Expand Down Expand Up @@ -844,7 +844,7 @@ func TestMigrateFromV1ToV2(t *testing.T) {
_ = db.Close()

// migrate
edb, err := newExperimentDbManager(getExperimentDbFolder(path), logger)
edb, err := newExperimentDbManager(getExperimentDbFolder(path), logger, 10)
g.Expect(err).To(BeNil())

g.Expect(err).To(BeNil())
Expand All @@ -856,7 +856,7 @@ func TestMigrateFromV1ToV2(t *testing.T) {

// check that we have no experiments in the db format
es := NewExperimentServer(log.New(), nil, nil, nil)
err = es.InitialiseOrRestoreDB(path)
err = es.InitialiseOrRestoreDB(path, 10)
g.Expect(err).To(BeNil())
g.Expect(len(es.experiments)).To(Equal(0))
})
Expand Down
6 changes: 3 additions & 3 deletions scheduler/pkg/store/experiment/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,15 @@ func (es *ExperimentStore) addExperimentInMap(experiment *Experiment) error {
}
}

func (es *ExperimentStore) InitialiseOrRestoreDB(path string) error {
func (es *ExperimentStore) InitialiseOrRestoreDB(path string, deletedResourceTTL uint) error {
logger := es.logger.WithField("func", "initialiseDB")
experimentDbPath := getExperimentDbFolder(path)
logger.Infof("Initialise DB at %s", experimentDbPath)
err := os.MkdirAll(experimentDbPath, os.ModePerm)
if err != nil {
return err
}
db, err := newExperimentDbManager(experimentDbPath, es.logger)
db, err := newExperimentDbManager(experimentDbPath, es.logger, deletedResourceTTL)
if err != nil {
return err
}
Expand Down Expand Up @@ -484,7 +484,7 @@ func (es *ExperimentStore) cleanupDeletedExperiments() {
es.logger.Warnf("could not update DB TTL for experiment: %s", experiment.Name)
}
}
} else if experiment.DeletedAt.Add(utils.DeletedResourceTTL).Before(time.Now()) {
} else if experiment.DeletedAt.Add(es.db.deletedResourceTTL).Before(time.Now()) {
delete(es.experiments, experiment.Name)
es.logger.Info("cleaning up deleted experiment: %s", experiment.Name)
}
Expand Down
12 changes: 8 additions & 4 deletions scheduler/pkg/store/experiment/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func TestStartExperiment(t *testing.T) {
g.Expect(err).To(BeNil())
server := NewExperimentServer(logger, eventHub, fakeModelStore{}, fakePipelineStore{})
// init db
_ = server.InitialiseOrRestoreDB(path)
_ = server.InitialiseOrRestoreDB(path, 10)
for _, ea := range test.experiments {
err := server.StartExperiment(ea.experiment)
if ea.fail {
Expand Down Expand Up @@ -262,7 +262,7 @@ func TestStopExperiment(t *testing.T) {
path := fmt.Sprintf("%s/db", t.TempDir())

// init db
err := test.store.InitialiseOrRestoreDB(path)
err := test.store.InitialiseOrRestoreDB(path, 1)
g.Expect(err).To(BeNil())
for _, p := range test.store.experiments {
err := test.store.db.save(p)
Expand All @@ -288,6 +288,10 @@ func TestStopExperiment(t *testing.T) {
// check db
experimentFromDB, _ := test.store.db.get(test.experimentName)
g.Expect(experimentFromDB.Deleted).To(BeTrue())

time.Sleep(1 * time.Second)
test.store.cleanupDeletedExperiments()
g.Expect(test.store.experiments[test.experimentName]).To(BeNil())
}
})
}
Expand Down Expand Up @@ -413,7 +417,7 @@ func TestRestoreExperiments(t *testing.T) {
experiments: make(map[string]*Experiment),
}
// init db
err := store.InitialiseOrRestoreDB(path)
err := store.InitialiseOrRestoreDB(path, 10)
g.Expect(err).To(BeNil())
for _, p := range test.experiments {
err := store.db.save(p)
Expand All @@ -422,7 +426,7 @@ func TestRestoreExperiments(t *testing.T) {
_ = store.db.Stop()

// restore from db now that we have state on disk
_ = store.InitialiseOrRestoreDB(path)
_ = store.InitialiseOrRestoreDB(path, 10)

for _, p := range test.experiments {
experimentFromDB, _ := store.db.get(p.Name)
Expand Down
Loading

0 comments on commit a17e12d

Please sign in to comment.