Skip to content

Commit

Permalink
fix(model-gateway): Create consumer if model can serve traffic (#5865)
Browse files Browse the repository at this point in the history
* Add helper to get if model exists in kafka consumer

* Use exists helper in serve function

* simplify use of helper

* add helper for Exists in Manager

* Adjust creating consumer logic

* changing log message to debug

* Add basic testing

* Linting fix

* Tidy up note

* Add note about min replicas

* Add logging warning
  • Loading branch information
sakoush authored Sep 2, 2024
1 parent 57f0dcd commit 7636a14
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 7 deletions.
16 changes: 13 additions & 3 deletions scheduler/pkg/kafka/gateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,24 @@ func (kc *KafkaSchedulerClient) SubscribeModelEvents() error {

logger.Infof("Received event name %s version %d state %s", event.ModelName, latestVersionStatus.Version, latestVersionStatus.State.State.String())

switch latestVersionStatus.State.State {
case scheduler.ModelStatus_ModelAvailable:
// if there are available replicas then we add the consumer for the model
// note that this will also get triggered if the model is already added but there is a status change (e.g. due to scale up)
// and in the case then it is a no-op
// note in the future we might want to check that available replicas > min replicas
if latestVersionStatus.GetState().GetAvailableReplicas() > 0 {
if latestVersionStatus.GetState().GetState() != scheduler.ModelStatus_ModelAvailable {
logger.Warnf("Model %s state is: %s", event.ModelName, latestVersionStatus.GetState().GetState().String())
}
if kc.consumerManager.Exists(event.ModelName) {
logger.Debugf("Model consumer %s already exists", event.ModelName)
continue
}
logger.Infof("Adding model %s", event.ModelName)
err := kc.consumerManager.AddModel(event.ModelName)
if err != nil {
kc.logger.WithError(err).Errorf("Failed to add model %s", event.ModelName)
}
default:
} else {
logger.Infof("Removing model %s", event.ModelName)
err := kc.consumerManager.RemoveModel(event.ModelName)
if err != nil {
Expand Down
12 changes: 8 additions & 4 deletions scheduler/pkg/kafka/gateway/infer.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,13 @@ func (kc *InferKafkaHandler) RemoveModel(modelName string) error {
return nil
}

func (kc *InferKafkaHandler) Exists(modelName string) bool {
kc.mu.RLock()
defer kc.mu.RUnlock()
_, ok := kc.loadedModels[modelName]
return ok
}

func (kc *InferKafkaHandler) Serve() {
logger := kc.logger.WithField("func", "Serve").WithField("consumerName", kc.consumerName)
run := true
Expand Down Expand Up @@ -349,13 +356,10 @@ func (kc *InferKafkaHandler) Serve() {
continue
}

kc.mu.Lock()
if _, ok := kc.loadedModels[modelName]; !ok {
kc.mu.Unlock()
if !kc.Exists(modelName) {
logger.Infof("Failed to find model %s in loaded models", modelName)
continue
}
kc.mu.Unlock()

// Add tracing span
ctx := context.Background()
Expand Down
12 changes: 12 additions & 0 deletions scheduler/pkg/kafka/gateway/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,18 @@ func (cm *ConsumerManager) GetNumModels() int {
return tot
}

func (cm *ConsumerManager) Exists(modelName string) bool {
cm.mu.Lock()
defer cm.mu.Unlock()

ic, err := cm.getInferKafkaConsumer(modelName, false)
if err != nil {
return false
}

return ic != nil && ic.Exists(modelName)
}

func (cm *ConsumerManager) Stop() {
cm.mu.Lock()
defer cm.mu.Unlock()
Expand Down
53 changes: 53 additions & 0 deletions scheduler/pkg/kafka/gateway/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,59 @@ func TestAddRemoveModel(t *testing.T) {
}
}

func TestExists(t *testing.T) {
g := NewGomegaWithT(t)

type test struct {
name string
modelsConsumed []string
model string
exists bool
}
tests := []test{
{
name: "exists",
modelsConsumed: []string{"foo", "bar"},
model: "foo",
exists: true,
},
{
name: "doesnt Exist",
modelsConsumed: []string{"foo"},
model: "bar",
exists: false,
},
{
name: "empty",
modelsConsumed: []string{},
model: "bar",
exists: false,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
logger := log.New()
t.Log("Start test", test.name)
kafkaServerConfig := InferenceServerConfig{
Host: "0.0.0.0",
HttpPort: 1234,
GrpcPort: 1235,
}
tp, err := seldontracer.NewTraceProvider("test", nil, logger)
g.Expect(err).To(BeNil())
c := &ManagerConfig{SeldonKafkaConfig: &config.KafkaConfig{}, Namespace: "default", InferenceServerConfig: &kafkaServerConfig, TraceProvider: tp, NumWorkers: 0}
cm, err := NewConsumerManager(logger, c, 5)
g.Expect(err).To(BeNil())
for _, model := range test.modelsConsumed {
err := cm.AddModel(model)
g.Expect(err).To(BeNil())
}
g.Expect(cm.Exists(test.model)).To(Equal(test.exists))
})
}
}

func TestConsistentModelToConsumer(t *testing.T) {
g := NewGomegaWithT(t)

Expand Down

0 comments on commit 7636a14

Please sign in to comment.