Skip to content

Commit

Permalink
do not reset server if we have model replicas
Browse files Browse the repository at this point in the history
  • Loading branch information
sakoush committed Aug 11, 2023
1 parent b3de961 commit a4dff31
Show file tree
Hide file tree
Showing 7 changed files with 20 additions and 10 deletions.
2 changes: 1 addition & 1 deletion scheduler/pkg/agent/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type mockStore struct {

var _ store.ModelStore = (*mockStore)(nil)

func (m *mockStore) FailedScheduling(modelVersion *store.ModelVersion, reason string) {
func (m *mockStore) FailedScheduling(modelVersion *store.ModelVersion, reason string, reset bool) {
}

func (m *mockStore) UpdateModel(config *pbs.LoadModelRequest) error {
Expand Down
12 changes: 10 additions & 2 deletions scheduler/pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,16 +149,24 @@ func (s *SimpleScheduler) scheduleToServer(modelName string) error {
filteredServers, debugTrail = s.filterServers(latestModel, servers, debugTrail)
s.sortServers(latestModel, filteredServers)
ok := false
logger.Debugf("Model %s candidate servers %v", modelName, filteredServers)
resetServerInCaseOfError := true
logger.Debugf("Model %s with desired replicas %d candidate servers %v", modelName, latestModel.DesiredReplicas(), filteredServers)
// For each server filter and sort replicas and attempt schedule if enough replicas
for _, candidateServer := range filteredServers {
logger.Debugf("Candidate server %s", candidateServer.Name)
var candidateReplicas *sorters.CandidateServer

// we need a lock here, we could have many goroutines at sorting
// without the store being reflected and hence sorting on stale values
s.muSortAndUpdate.Lock()
candidateReplicas, debugTrail = s.filterReplicas(latestModel, candidateServer, debugTrail)
if len(candidateReplicas.ChosenReplicas) < latestModel.DesiredReplicas() {
if len(candidateReplicas.ChosenReplicas) > 0 {
// in this case we have some replicas but not enough, typically in the case where we are scaling up
// beyond the number of the server replicas we have
// therefore we do not want to reset the server as we will lose the replicas we have
resetServerInCaseOfError = false
}
s.muSortAndUpdate.Unlock()
continue
}
Expand All @@ -175,7 +183,7 @@ func (s *SimpleScheduler) scheduleToServer(modelName string) error {
}
if !ok {
failureErrMsg := fmt.Sprintf("failed to schedule model %s. %v", modelName, debugTrail)
s.store.FailedScheduling(latestModel, failureErrMsg)
s.store.FailedScheduling(latestModel, failureErrMsg, resetServerInCaseOfError)
return fmt.Errorf(failureErrMsg)
}
}
Expand Down
2 changes: 1 addition & 1 deletion scheduler/pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type mockStore struct {

var _ store.ModelStore = (*mockStore)(nil)

func (f mockStore) FailedScheduling(modelVersion *store.ModelVersion, reason string) {
func (f mockStore) FailedScheduling(modelVersion *store.ModelVersion, reason string, reset bool) {
}

func (f mockStore) UnloadVersionModels(modelKey string, version uint32) (bool, error) {
Expand Down
2 changes: 1 addition & 1 deletion scheduler/pkg/store/experiment/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ func (f fakeModelStore) DrainServerReplica(serverName string, replicaIdx int) ([
panic("implement me")
}

func (f fakeModelStore) FailedScheduling(modelVersion *store.ModelVersion, reason string) {
func (f fakeModelStore) FailedScheduling(modelVersion *store.ModelVersion, reason string, reset bool) {
panic("implement me")
}

Expand Down
8 changes: 5 additions & 3 deletions scheduler/pkg/store/memory_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,16 +110,18 @@ func updateModelState(isLatest bool, modelVersion *ModelVersion, prevModelVersio
}
}

func (m *MemoryStore) FailedScheduling(modelVersion *ModelVersion, reason string) {
func (m *MemoryStore) FailedScheduling(modelVersion *ModelVersion, reason string, reset bool) {
modelVersion.state = ModelStatus{
State: ScheduleFailed,
Reason: reason,
Timestamp: time.Now(),
AvailableReplicas: modelVersion.state.AvailableReplicas,
UnavailableReplicas: modelVersion.GetModel().GetDeploymentSpec().GetReplicas() - modelVersion.state.AvailableReplicas,
}
// make sure we reset server
modelVersion.server = ""
// make sure we reset server but only if there are no available replicas
if reset {
modelVersion.server = ""
}
m.eventHub.PublishModelEvent(
modelFailureEventSource,
coordinator.ModelEventMsg{
Expand Down
2 changes: 1 addition & 1 deletion scheduler/pkg/store/pipeline/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (f fakeModelStore) RemoveServerReplica(serverName string, replicaIdx int) (
panic("implement me")
}

func (f fakeModelStore) FailedScheduling(modelVersion *store.ModelVersion, reason string) {
func (f fakeModelStore) FailedScheduling(modelVersion *store.ModelVersion, reason string, reset bool) {
panic("implement me")
}

Expand Down
2 changes: 1 addition & 1 deletion scheduler/pkg/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,6 @@ type ModelStore interface {
ServerNotify(request *pb.ServerNotifyRequest) error
RemoveServerReplica(serverName string, replicaIdx int) ([]string, error) // return previously loaded models
DrainServerReplica(serverName string, replicaIdx int) ([]string, error) // return previously loaded models
FailedScheduling(modelVersion *ModelVersion, reason string)
FailedScheduling(modelVersion *ModelVersion, reason string, reset bool)
GetAllModels() []string
}

0 comments on commit a4dff31

Please sign in to comment.