Skip to content

Commit

Permalink
fix(envoy): upgrade models in experiment (#5874)
Browse files Browse the repository at this point in the history
* remove envoy route for experiment before adding/updating the new one

* add test for new model version

* update test to check for versions

* remove unnecessarily check for routes

* add iris2 as model example
  • Loading branch information
sakoush authored Sep 6, 2024
1 parent 5383368 commit 23a0fb1
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 12 deletions.
8 changes: 8 additions & 0 deletions samples/models/iris2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
apiVersion: mlops.seldon.io/v1alpha1
kind: Model
metadata:
name: iris2
spec:
storageUri: "gs://seldon-models/scv2/samples/rolling/iris/v2"
requirements:
- sklearn
11 changes: 9 additions & 2 deletions scheduler/pkg/envoy/processor/incremental.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,13 @@ func (p *IncrementalProcessor) addExperiment(exp *experiment.Experiment) error {
p.mu.Lock()
defer p.mu.Unlock()
routeName := fmt.Sprintf("%s.experiment", exp.Name)

// first clear any existing routes
if err := p.removeRouteForServerInEnvoyCache(routeName); err != nil {
logger.WithError(err).Errorf("Failed to remove traffic for experiment %s", routeName)
return err
}

if err := p.addTrafficForExperiment(routeName, exp); err != nil {
logger.WithError(err).Errorf("Failed to add traffic for experiment %s", routeName)
return err
Expand Down Expand Up @@ -640,8 +647,8 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error {
func (p *IncrementalProcessor) callVersionCleanupIfNeeded(modelName string) {
logger := p.logger.WithField("func", "callVersionCleanupIfNeeded")
if routes, ok := p.xdsCache.Routes[modelName]; ok {
activeRoutes := len(routes.Clusters)
if activeRoutes == 1 && p.versionCleaner != nil {
logger.Debugf("routes for model %s %v", modelName, routes)
if p.versionCleaner != nil {
logger.Debugf("Calling cleanup for model %s", modelName)
p.versionCleaner.RunCleanup(modelName)
}
Expand Down
53 changes: 43 additions & 10 deletions scheduler/pkg/envoy/processor/incremental_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,14 +437,15 @@ func createTestPipeline(pipelineName string, modelNames []string, version uint32
func TestEnvoySettings(t *testing.T) {
g := NewGomegaWithT(t)
type test struct {
name string
ops []func(proc *IncrementalProcessor, g *WithT)
numExpectedClusters int
numExpectedRoutes int
numExpectedPipelines int
experimentActive bool
experimentExists bool
experimentDeleted bool
name string
ops []func(proc *IncrementalProcessor, g *WithT)
numExpectedClusters int
numExpectedRoutes int
numExpectedPipelines int
experimentActive bool
experimentExists bool
experimentDeleted bool
expectedVersionsInRoutes map[string]uint32
}

getStrPtr := func(t string) *string { return &t }
Expand Down Expand Up @@ -504,6 +505,29 @@ func TestEnvoySettings(t *testing.T) {
numExpectedRoutes: 3,
experimentActive: true,
experimentExists: true,
expectedVersionsInRoutes: map[string]uint32{
"model1": 1,
"model2": 1,
},
},
{
name: "experiment - new model version",
ops: []func(inc *IncrementalProcessor, g *WithT){
createTestServer("server", 2),
createTestModel("model1", "server", 1, []int{0}, 1, []store.ModelReplicaState{store.Available}),
createTestModel("model2", "server", 1, []int{1}, 1, []store.ModelReplicaState{store.Available}),
createTestExperiment("exp", []string{"model1", "model2"}, nil, nil),
// update model2 to version 2, will trigger change in routes / experiment
createTestModel("model2", "server", 1, []int{1}, 2, []store.ModelReplicaState{store.Available}),
},
numExpectedClusters: 4,
numExpectedRoutes: 3,
experimentActive: true,
experimentExists: true,
expectedVersionsInRoutes: map[string]uint32{
"model1": 1,
"model2": 2,
},
},
{
name: "experiment with deleted model",
Expand All @@ -514,7 +538,7 @@ func TestEnvoySettings(t *testing.T) {
createTestExperiment("exp", []string{"model1", "model2"}, getStrPtr("model1"), nil),
removeTestModel("model2", 1, "server", 1),
},
numExpectedClusters: 4,
numExpectedClusters: 2, // model2 should be removed from the clusters (server 1)
numExpectedRoutes: 2, // model2 should be removed from the routes
experimentActive: false,
experimentExists: true,
Expand Down Expand Up @@ -555,7 +579,7 @@ func TestEnvoySettings(t *testing.T) {
createTestExperiment("exp", []string{"model1"}, getStrPtr("model1"), getStrPtr("model2")),
removeTestModel("model2", 1, "server", 1),
},
numExpectedClusters: 4,
numExpectedClusters: 2, // model2 should be removed from the clusters (server 1)
numExpectedRoutes: 2, // model2 should be removed from the routes
experimentActive: false,
experimentExists: true,
Expand Down Expand Up @@ -653,6 +677,15 @@ func TestEnvoySettings(t *testing.T) {
g.Expect(err).NotTo(BeNil())
g.Expect(exp).To(BeNil())
}
for modelName, version := range test.expectedVersionsInRoutes {
for _, route := range inc.xdsCache.Routes {
for _, cluster := range route.Clusters {
if cluster.ModelName == modelName {
g.Expect(cluster.ModelVersion).To(Equal(version))
}
}
}
}

})
}
Expand Down

0 comments on commit 23a0fb1

Please sign in to comment.