From 7636a144c2c64a22d13cf9abc2f8f7cb3af7eeed Mon Sep 17 00:00:00 2001 From: Sherif Akoush Date: Mon, 2 Sep 2024 15:47:06 +0100 Subject: [PATCH] fix(model-gateway): Create consumer if model can serve traffic (#5865) * Add helper to get if model exists in kafka consumer * Use exists helper in serve function * simplify use of helper * add helper for Exists in Manager * Adjust creating consumer logic * changing log message to debug * Add basic testing * Linting fix * Tidy up note * Add note about min replicas * Add logging warning --- scheduler/pkg/kafka/gateway/client.go | 16 +++++-- scheduler/pkg/kafka/gateway/infer.go | 12 +++-- scheduler/pkg/kafka/gateway/manager.go | 12 +++++ scheduler/pkg/kafka/gateway/manager_test.go | 53 +++++++++++++++++++++ 4 files changed, 86 insertions(+), 7 deletions(-) diff --git a/scheduler/pkg/kafka/gateway/client.go b/scheduler/pkg/kafka/gateway/client.go index a1602e2300..114c5ff38e 100644 --- a/scheduler/pkg/kafka/gateway/client.go +++ b/scheduler/pkg/kafka/gateway/client.go @@ -153,14 +153,24 @@ func (kc *KafkaSchedulerClient) SubscribeModelEvents() error { logger.Infof("Received event name %s version %d state %s", event.ModelName, latestVersionStatus.Version, latestVersionStatus.State.State.String()) - switch latestVersionStatus.State.State { - case scheduler.ModelStatus_ModelAvailable: + // if there are available replicas then we add the consumer for the model + // note that this will also get triggered if the model is already added but there is a status change (e.g. due to scale up) + // and in the case then it is a no-op + // note in the future we might want to check that available replicas > min replicas + if latestVersionStatus.GetState().GetAvailableReplicas() > 0 { + if latestVersionStatus.GetState().GetState() != scheduler.ModelStatus_ModelAvailable { + logger.Warnf("Model %s state is: %s", event.ModelName, latestVersionStatus.GetState().GetState().String()) + } + if kc.consumerManager.Exists(event.ModelName) { + logger.Debugf("Model consumer %s already exists", event.ModelName) + continue + } logger.Infof("Adding model %s", event.ModelName) err := kc.consumerManager.AddModel(event.ModelName) if err != nil { kc.logger.WithError(err).Errorf("Failed to add model %s", event.ModelName) } - default: + } else { logger.Infof("Removing model %s", event.ModelName) err := kc.consumerManager.RemoveModel(event.ModelName) if err != nil { diff --git a/scheduler/pkg/kafka/gateway/infer.go b/scheduler/pkg/kafka/gateway/infer.go index 6555a03261..c181c0d608 100644 --- a/scheduler/pkg/kafka/gateway/infer.go +++ b/scheduler/pkg/kafka/gateway/infer.go @@ -310,6 +310,13 @@ func (kc *InferKafkaHandler) RemoveModel(modelName string) error { return nil } +func (kc *InferKafkaHandler) Exists(modelName string) bool { + kc.mu.RLock() + defer kc.mu.RUnlock() + _, ok := kc.loadedModels[modelName] + return ok +} + func (kc *InferKafkaHandler) Serve() { logger := kc.logger.WithField("func", "Serve").WithField("consumerName", kc.consumerName) run := true @@ -349,13 +356,10 @@ func (kc *InferKafkaHandler) Serve() { continue } - kc.mu.Lock() - if _, ok := kc.loadedModels[modelName]; !ok { - kc.mu.Unlock() + if !kc.Exists(modelName) { logger.Infof("Failed to find model %s in loaded models", modelName) continue } - kc.mu.Unlock() // Add tracing span ctx := context.Background() diff --git a/scheduler/pkg/kafka/gateway/manager.go b/scheduler/pkg/kafka/gateway/manager.go index 05a6d90e43..635e31355b 100644 --- a/scheduler/pkg/kafka/gateway/manager.go +++ b/scheduler/pkg/kafka/gateway/manager.go @@ -197,6 +197,18 @@ func (cm *ConsumerManager) GetNumModels() int { return tot } +func (cm *ConsumerManager) Exists(modelName string) bool { + cm.mu.Lock() + defer cm.mu.Unlock() + + ic, err := cm.getInferKafkaConsumer(modelName, false) + if err != nil { + return false + } + + return ic != nil && ic.Exists(modelName) +} + func (cm *ConsumerManager) Stop() { cm.mu.Lock() defer cm.mu.Unlock() diff --git a/scheduler/pkg/kafka/gateway/manager_test.go b/scheduler/pkg/kafka/gateway/manager_test.go index 20c7be106f..91397f84b5 100644 --- a/scheduler/pkg/kafka/gateway/manager_test.go +++ b/scheduler/pkg/kafka/gateway/manager_test.go @@ -85,6 +85,59 @@ func TestAddRemoveModel(t *testing.T) { } } +func TestExists(t *testing.T) { + g := NewGomegaWithT(t) + + type test struct { + name string + modelsConsumed []string + model string + exists bool + } + tests := []test{ + { + name: "exists", + modelsConsumed: []string{"foo", "bar"}, + model: "foo", + exists: true, + }, + { + name: "doesnt Exist", + modelsConsumed: []string{"foo"}, + model: "bar", + exists: false, + }, + { + name: "empty", + modelsConsumed: []string{}, + model: "bar", + exists: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + logger := log.New() + t.Log("Start test", test.name) + kafkaServerConfig := InferenceServerConfig{ + Host: "0.0.0.0", + HttpPort: 1234, + GrpcPort: 1235, + } + tp, err := seldontracer.NewTraceProvider("test", nil, logger) + g.Expect(err).To(BeNil()) + c := &ManagerConfig{SeldonKafkaConfig: &config.KafkaConfig{}, Namespace: "default", InferenceServerConfig: &kafkaServerConfig, TraceProvider: tp, NumWorkers: 0} + cm, err := NewConsumerManager(logger, c, 5) + g.Expect(err).To(BeNil()) + for _, model := range test.modelsConsumed { + err := cm.AddModel(model) + g.Expect(err).To(BeNil()) + } + g.Expect(cm.Exists(test.model)).To(Equal(test.exists)) + }) + } +} + func TestConsistentModelToConsumer(t *testing.T) { g := NewGomegaWithT(t)