From 23a0fb1a81d37b1433c954318968d09b8921a05c Mon Sep 17 00:00:00 2001 From: Sherif Akoush Date: Fri, 6 Sep 2024 08:21:05 +0100 Subject: [PATCH] fix(envoy): upgrade models in experiment (#5874) * remove envoy route for experiment before adding/updating the new one * add test for new model version * update test to check for versions * remove unnecessarily check for routes * add iris2 as model example --- samples/models/iris2.yaml | 8 +++ scheduler/pkg/envoy/processor/incremental.go | 11 +++- .../pkg/envoy/processor/incremental_test.go | 53 +++++++++++++++---- 3 files changed, 60 insertions(+), 12 deletions(-) create mode 100644 samples/models/iris2.yaml diff --git a/samples/models/iris2.yaml b/samples/models/iris2.yaml new file mode 100644 index 0000000000..f88a8e6b5c --- /dev/null +++ b/samples/models/iris2.yaml @@ -0,0 +1,8 @@ +apiVersion: mlops.seldon.io/v1alpha1 +kind: Model +metadata: + name: iris2 +spec: + storageUri: "gs://seldon-models/scv2/samples/rolling/iris/v2" + requirements: + - sklearn diff --git a/scheduler/pkg/envoy/processor/incremental.go b/scheduler/pkg/envoy/processor/incremental.go index 74ca972c6f..ce90179824 100644 --- a/scheduler/pkg/envoy/processor/incremental.go +++ b/scheduler/pkg/envoy/processor/incremental.go @@ -435,6 +435,13 @@ func (p *IncrementalProcessor) addExperiment(exp *experiment.Experiment) error { p.mu.Lock() defer p.mu.Unlock() routeName := fmt.Sprintf("%s.experiment", exp.Name) + + // first clear any existing routes + if err := p.removeRouteForServerInEnvoyCache(routeName); err != nil { + logger.WithError(err).Errorf("Failed to remove traffic for experiment %s", routeName) + return err + } + if err := p.addTrafficForExperiment(routeName, exp); err != nil { logger.WithError(err).Errorf("Failed to add traffic for experiment %s", routeName) return err @@ -640,8 +647,8 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error { func (p *IncrementalProcessor) callVersionCleanupIfNeeded(modelName string) { logger := p.logger.WithField("func", "callVersionCleanupIfNeeded") if routes, ok := p.xdsCache.Routes[modelName]; ok { - activeRoutes := len(routes.Clusters) - if activeRoutes == 1 && p.versionCleaner != nil { + logger.Debugf("routes for model %s %v", modelName, routes) + if p.versionCleaner != nil { logger.Debugf("Calling cleanup for model %s", modelName) p.versionCleaner.RunCleanup(modelName) } diff --git a/scheduler/pkg/envoy/processor/incremental_test.go b/scheduler/pkg/envoy/processor/incremental_test.go index 257392b252..630039d750 100644 --- a/scheduler/pkg/envoy/processor/incremental_test.go +++ b/scheduler/pkg/envoy/processor/incremental_test.go @@ -437,14 +437,15 @@ func createTestPipeline(pipelineName string, modelNames []string, version uint32 func TestEnvoySettings(t *testing.T) { g := NewGomegaWithT(t) type test struct { - name string - ops []func(proc *IncrementalProcessor, g *WithT) - numExpectedClusters int - numExpectedRoutes int - numExpectedPipelines int - experimentActive bool - experimentExists bool - experimentDeleted bool + name string + ops []func(proc *IncrementalProcessor, g *WithT) + numExpectedClusters int + numExpectedRoutes int + numExpectedPipelines int + experimentActive bool + experimentExists bool + experimentDeleted bool + expectedVersionsInRoutes map[string]uint32 } getStrPtr := func(t string) *string { return &t } @@ -504,6 +505,29 @@ func TestEnvoySettings(t *testing.T) { numExpectedRoutes: 3, experimentActive: true, experimentExists: true, + expectedVersionsInRoutes: map[string]uint32{ + "model1": 1, + "model2": 1, + }, + }, + { + name: "experiment - new model version", + ops: []func(inc *IncrementalProcessor, g *WithT){ + createTestServer("server", 2), + createTestModel("model1", "server", 1, []int{0}, 1, []store.ModelReplicaState{store.Available}), + createTestModel("model2", "server", 1, []int{1}, 1, []store.ModelReplicaState{store.Available}), + createTestExperiment("exp", []string{"model1", "model2"}, nil, nil), + // update model2 to version 2, will trigger change in routes / experiment + createTestModel("model2", "server", 1, []int{1}, 2, []store.ModelReplicaState{store.Available}), + }, + numExpectedClusters: 4, + numExpectedRoutes: 3, + experimentActive: true, + experimentExists: true, + expectedVersionsInRoutes: map[string]uint32{ + "model1": 1, + "model2": 2, + }, }, { name: "experiment with deleted model", @@ -514,7 +538,7 @@ func TestEnvoySettings(t *testing.T) { createTestExperiment("exp", []string{"model1", "model2"}, getStrPtr("model1"), nil), removeTestModel("model2", 1, "server", 1), }, - numExpectedClusters: 4, + numExpectedClusters: 2, // model2 should be removed from the clusters (server 1) numExpectedRoutes: 2, // model2 should be removed from the routes experimentActive: false, experimentExists: true, @@ -555,7 +579,7 @@ func TestEnvoySettings(t *testing.T) { createTestExperiment("exp", []string{"model1"}, getStrPtr("model1"), getStrPtr("model2")), removeTestModel("model2", 1, "server", 1), }, - numExpectedClusters: 4, + numExpectedClusters: 2, // model2 should be removed from the clusters (server 1) numExpectedRoutes: 2, // model2 should be removed from the routes experimentActive: false, experimentExists: true, @@ -653,6 +677,15 @@ func TestEnvoySettings(t *testing.T) { g.Expect(err).NotTo(BeNil()) g.Expect(exp).To(BeNil()) } + for modelName, version := range test.expectedVersionsInRoutes { + for _, route := range inc.xdsCache.Routes { + for _, cluster := range route.Clusters { + if cluster.ModelName == modelName { + g.Expect(cluster.ModelVersion).To(Equal(version)) + } + } + } + } }) }