From 641cbb7dffe2dda70b1b06bb1a98d3d468ac3de7 Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Mon, 23 Sep 2024 15:50:23 +0200 Subject: [PATCH] Remove scheduler `wait`s to speed up recovery time (#8200) Currently, the scheduler and autoscaler are single threads and use a lock to prevent multiple scheduling and autoscaling decision from happening in parallel; this is not a problem for our use cases, however, the multiple `wait` currently present are slowing down recovery time. From my testing, if I delete and recreate the Kafka control plane and data plane, without this patch it takes 1h to recover when there are 400 triggers or 20 minutes when there are 100 triggers; with the patch it is immediate (only a 2/3 minutes with 400 triggers). - Remove `wait`s from state builder and autoscaler - Add additional debug logs - Use logger provided through the context as opposed to gloabal loggers in each individual component to preserve `knative/pkg` resource aware log keys. Signed-off-by: Pierangelo Di Pilato --- pkg/scheduler/scheduler.go | 8 +- pkg/scheduler/scheduler_test.go | 5 +- pkg/scheduler/state/helpers.go | 9 +- pkg/scheduler/state/state.go | 95 ++++++++--------- pkg/scheduler/state/state_test.go | 4 +- pkg/scheduler/statefulset/autoscaler.go | 104 ++++++++++--------- pkg/scheduler/statefulset/autoscaler_test.go | 19 ++-- pkg/scheduler/statefulset/scheduler.go | 93 ++++++++++------- pkg/scheduler/statefulset/scheduler_test.go | 4 +- 9 files changed, 181 insertions(+), 160 deletions(-) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 88e470d8b42..a9ca7b1d5a7 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -92,18 +92,18 @@ type Evictor func(pod *corev1.Pod, vpod VPod, from *duckv1alpha1.Placement) erro // Scheduler is responsible for placing VPods into real Kubernetes pods type Scheduler interface { // Schedule computes the new set of placements for vpod. - Schedule(vpod VPod) ([]duckv1alpha1.Placement, error) + Schedule(ctx context.Context, vpod VPod) ([]duckv1alpha1.Placement, error) } // SchedulerFunc type is an adapter to allow the use of // ordinary functions as Schedulers. If f is a function // with the appropriate signature, SchedulerFunc(f) is a // Scheduler that calls f. -type SchedulerFunc func(vpod VPod) ([]duckv1alpha1.Placement, error) +type SchedulerFunc func(ctx context.Context, vpod VPod) ([]duckv1alpha1.Placement, error) // Schedule implements the Scheduler interface. -func (f SchedulerFunc) Schedule(vpod VPod) ([]duckv1alpha1.Placement, error) { - return f(vpod) +func (f SchedulerFunc) Schedule(ctx context.Context, vpod VPod) ([]duckv1alpha1.Placement, error) { + return f(ctx, vpod) } // VPod represents virtual replicas placed into real Kubernetes pods diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index d8d6c62e1e1..4800edf7342 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -17,6 +17,7 @@ limitations under the License. package scheduler import ( + "context" "testing" "github.com/stretchr/testify/require" @@ -28,12 +29,12 @@ func TestSchedulerFuncSchedule(t *testing.T) { called := 0 - var s Scheduler = SchedulerFunc(func(vpod VPod) ([]duckv1alpha1.Placement, error) { + var s Scheduler = SchedulerFunc(func(ctx context.Context, vpod VPod) ([]duckv1alpha1.Placement, error) { called++ return nil, nil }) - _, err := s.Schedule(nil) + _, err := s.Schedule(context.Background(), nil) require.Nil(t, err) require.Equal(t, 1, called) } diff --git a/pkg/scheduler/state/helpers.go b/pkg/scheduler/state/helpers.go index 5ec66b2156b..ad3a5aaf765 100644 --- a/pkg/scheduler/state/helpers.go +++ b/pkg/scheduler/state/helpers.go @@ -25,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" + "knative.dev/eventing/pkg/scheduler" ) @@ -55,10 +56,10 @@ func SatisfyZoneAvailability(feasiblePods []int32, states *State) bool { var zoneName string var err error for _, podID := range feasiblePods { - wait.PollUntilContextTimeout(context.Background(), 50*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { - zoneName, _, err = states.GetPodInfo(PodNameFromOrdinal(states.StatefulSetName, podID)) - return err == nil, nil - }) + zoneName, _, err = states.GetPodInfo(PodNameFromOrdinal(states.StatefulSetName, podID)) + if err != nil { + continue + } zoneMap[zoneName] = struct{}{} } return len(zoneMap) == int(states.NumZones) diff --git a/pkg/scheduler/state/state.go b/pkg/scheduler/state/state.go index aa84ca996f9..44069babe92 100644 --- a/pkg/scheduler/state/state.go +++ b/pkg/scheduler/state/state.go @@ -22,7 +22,6 @@ import ( "errors" "math" "strconv" - "time" "go.uber.org/zap" v1 "k8s.io/api/core/v1" @@ -30,9 +29,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/wait" corev1 "k8s.io/client-go/listers/core/v1" - "knative.dev/pkg/logging" "knative.dev/eventing/pkg/scheduler" @@ -42,7 +39,7 @@ type StateAccessor interface { // State returns the current state (snapshot) about placed vpods // Take into account reserved vreplicas and update `reserved` to reflect // the current state. - State(reserved map[types.NamespacedName]map[string]int32) (*State, error) + State(ctx context.Context, reserved map[types.NamespacedName]map[string]int32) (*State, error) } // state provides information about the current scheduling of all vpods @@ -152,8 +149,6 @@ func (s *State) IsSchedulablePod(ordinal int32) bool { // stateBuilder reconstruct the state from scratch, by listing vpods type stateBuilder struct { - ctx context.Context - logger *zap.SugaredLogger vpodLister scheduler.VPodLister capacity int32 schedulerPolicy scheduler.SchedulerPolicyType @@ -166,11 +161,9 @@ type stateBuilder struct { } // NewStateBuilder returns a StateAccessor recreating the state from scratch each time it is requested -func NewStateBuilder(ctx context.Context, namespace, sfsname string, lister scheduler.VPodLister, podCapacity int32, schedulerPolicy scheduler.SchedulerPolicyType, schedPolicy *scheduler.SchedulerPolicy, deschedPolicy *scheduler.SchedulerPolicy, podlister corev1.PodNamespaceLister, nodeLister corev1.NodeLister, statefulSetCache *scheduler.ScaleCache) StateAccessor { +func NewStateBuilder(sfsname string, lister scheduler.VPodLister, podCapacity int32, schedulerPolicy scheduler.SchedulerPolicyType, schedPolicy, deschedPolicy *scheduler.SchedulerPolicy, podlister corev1.PodNamespaceLister, nodeLister corev1.NodeLister, statefulSetCache *scheduler.ScaleCache) StateAccessor { return &stateBuilder{ - ctx: ctx, - logger: logging.FromContext(ctx), vpodLister: lister, capacity: podCapacity, schedulerPolicy: schedulerPolicy, @@ -183,15 +176,18 @@ func NewStateBuilder(ctx context.Context, namespace, sfsname string, lister sche } } -func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) (*State, error) { +func (s *stateBuilder) State(ctx context.Context, reserved map[types.NamespacedName]map[string]int32) (*State, error) { vpods, err := s.vpodLister() if err != nil { return nil, err } - scale, err := s.statefulSetCache.GetScale(s.ctx, s.statefulSetName, metav1.GetOptions{}) + logger := logging.FromContext(ctx).With("subcomponent", "statebuilder") + ctx = logging.WithLogger(ctx, logger) + + scale, err := s.statefulSetCache.GetScale(ctx, s.statefulSetName, metav1.GetOptions{}) if err != nil { - s.logger.Infow("failed to get statefulset", zap.Error(err)) + logger.Infow("failed to get statefulset", zap.Error(err)) return nil, err } @@ -235,36 +231,35 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) } for podId := int32(0); podId < scale.Spec.Replicas && s.podLister != nil; podId++ { - var pod *v1.Pod - wait.PollUntilContextTimeout(context.Background(), 50*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { - pod, err = s.podLister.Get(PodNameFromOrdinal(s.statefulSetName, podId)) - return err == nil, nil - }) - - if pod != nil { - if isPodUnschedulable(pod) { - // Pod is marked for eviction - CANNOT SCHEDULE VREPS on this pod. - continue - } - - node, err := s.nodeLister.Get(pod.Spec.NodeName) - if err != nil { - return nil, err - } + pod, err := s.podLister.Get(PodNameFromOrdinal(s.statefulSetName, podId)) + if err != nil { + logger.Warnw("Failed to get pod", zap.Int32("ordinal", podId), zap.Error(err)) + continue + } + if isPodUnschedulable(pod) { + // Pod is marked for eviction - CANNOT SCHEDULE VREPS on this pod. + logger.Debugw("Pod is unschedulable", zap.Any("pod", pod)) + continue + } - if isNodeUnschedulable(node) { - // Node is marked as Unschedulable - CANNOT SCHEDULE VREPS on a pod running on this node. - continue - } + node, err := s.nodeLister.Get(pod.Spec.NodeName) + if err != nil { + return nil, err + } - // Pod has no annotation or not annotated as unschedulable and - // not on an unschedulable node, so add to feasible - schedulablePods.Insert(podId) + if isNodeUnschedulable(node) { + // Node is marked as Unschedulable - CANNOT SCHEDULE VREPS on a pod running on this node. + logger.Debugw("Pod is on an unschedulable node", zap.Any("pod", node)) + continue } + + // Pod has no annotation or not annotated as unschedulable and + // not on an unschedulable node, so add to feasible + schedulablePods.Insert(podId) } for _, p := range schedulablePods.List() { - free, last = s.updateFreeCapacity(free, last, PodNameFromOrdinal(s.statefulSetName, p), 0) + free, last = s.updateFreeCapacity(logger, free, last, PodNameFromOrdinal(s.statefulSetName, p), 0) } // Getting current state from existing placements for all vpods @@ -286,15 +281,14 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) // Account for reserved vreplicas vreplicas = withReserved(vpod.GetKey(), podName, vreplicas, reserved) - free, last = s.updateFreeCapacity(free, last, podName, vreplicas) + free, last = s.updateFreeCapacity(logger, free, last, podName, vreplicas) withPlacement[vpod.GetKey()][podName] = true - var pod *v1.Pod - wait.PollUntilContextTimeout(context.Background(), 50*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { - pod, err = s.podLister.Get(podName) - return err == nil, nil - }) + pod, err := s.podLister.Get(podName) + if err != nil { + logger.Warnw("Failed to get pod", zap.String("podName", podName), zap.Error(err)) + } if pod != nil && schedulablePods.Has(OrdinalFromPodName(pod.GetName())) { nodeName := pod.Spec.NodeName //node name for this pod @@ -315,11 +309,10 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) continue } - var pod *v1.Pod - wait.PollUntilContextTimeout(context.Background(), 50*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { - pod, err = s.podLister.Get(podName) - return err == nil, nil - }) + pod, err := s.podLister.Get(podName) + if err != nil { + logger.Warnw("Failed to get pod", zap.String("podName", podName), zap.Error(err)) + } if pod != nil && schedulablePods.Has(OrdinalFromPodName(pod.GetName())) { nodeName := pod.Spec.NodeName //node name for this pod @@ -330,7 +323,7 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) } } - free, last = s.updateFreeCapacity(free, last, podName, rvreplicas) + free, last = s.updateFreeCapacity(logger, free, last, podName, rvreplicas) } } @@ -338,7 +331,7 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) SchedulerPolicy: s.schedulerPolicy, SchedPolicy: s.schedPolicy, DeschedPolicy: s.deschedPolicy, NodeToZoneMap: nodeToZoneMap, StatefulSetName: s.statefulSetName, PodLister: s.podLister, PodSpread: podSpread, NodeSpread: nodeSpread, ZoneSpread: zoneSpread, Pending: pending, ExpectedVReplicaByVPod: expectedVReplicasByVPod} - s.logger.Infow("cluster state info", zap.Any("state", state), zap.Any("reserved", toJSONable(reserved))) + logger.Infow("cluster state info", zap.Any("state", state), zap.Any("reserved", toJSONable(reserved))) return state, nil } @@ -350,7 +343,7 @@ func pendingFromVPod(vpod scheduler.VPod) int32 { return int32(math.Max(float64(0), float64(expected-scheduled))) } -func (s *stateBuilder) updateFreeCapacity(free []int32, last int32, podName string, vreplicas int32) ([]int32, int32) { +func (s *stateBuilder) updateFreeCapacity(logger *zap.SugaredLogger, free []int32, last int32, podName string, vreplicas int32) ([]int32, int32) { ordinal := OrdinalFromPodName(podName) free = grow(free, ordinal, s.capacity) @@ -359,7 +352,7 @@ func (s *stateBuilder) updateFreeCapacity(free []int32, last int32, podName stri // Assert the pod is not overcommitted if free[ordinal] < 0 { // This should not happen anymore. Log as an error but do not interrupt the current scheduling. - s.logger.Warnw("pod is overcommitted", zap.String("podName", podName), zap.Int32("free", free[ordinal])) + logger.Warnw("pod is overcommitted", zap.String("podName", podName), zap.Int32("free", free[ordinal])) } if ordinal > last { diff --git a/pkg/scheduler/state/state_test.go b/pkg/scheduler/state/state_test.go index 027186bebf5..5a0d09b6f6f 100644 --- a/pkg/scheduler/state/state_test.go +++ b/pkg/scheduler/state/state_test.go @@ -645,8 +645,8 @@ func TestStateBuilder(t *testing.T) { scaleCache := scheduler.NewScaleCache(ctx, testNs, kubeclient.Get(ctx).AppsV1().StatefulSets(testNs), scheduler.ScaleCacheConfig{RefreshPeriod: time.Minute * 5}) - stateBuilder := NewStateBuilder(ctx, testNs, sfsName, vpodClient.List, int32(10), tc.schedulerPolicyType, &scheduler.SchedulerPolicy{}, &scheduler.SchedulerPolicy{}, lsp.GetPodLister().Pods(testNs), lsn.GetNodeLister(), scaleCache) - state, err := stateBuilder.State(tc.reserved) + stateBuilder := NewStateBuilder(sfsName, vpodClient.List, int32(10), tc.schedulerPolicyType, &scheduler.SchedulerPolicy{}, &scheduler.SchedulerPolicy{}, lsp.GetPodLister().Pods(testNs), lsn.GetNodeLister(), scaleCache) + state, err := stateBuilder.State(ctx, tc.reserved) if err != nil { t.Fatal("unexpected error", err) } diff --git a/pkg/scheduler/statefulset/autoscaler.go b/pkg/scheduler/statefulset/autoscaler.go index 296feb16f2a..3245dabc161 100644 --- a/pkg/scheduler/statefulset/autoscaler.go +++ b/pkg/scheduler/statefulset/autoscaler.go @@ -18,6 +18,7 @@ package statefulset import ( "context" + "fmt" "math" "sync" "sync/atomic" @@ -27,10 +28,8 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" - "knative.dev/pkg/reconciler" - "knative.dev/pkg/logging" + "knative.dev/pkg/reconciler" "knative.dev/eventing/pkg/scheduler" st "knative.dev/eventing/pkg/scheduler/state" @@ -58,9 +57,8 @@ type autoscaler struct { statefulSetCache *scheduler.ScaleCache statefulSetName string vpodLister scheduler.VPodLister - logger *zap.SugaredLogger stateAccessor st.StateAccessor - trigger chan struct{} + trigger chan context.Context evictor scheduler.Evictor // capacity is the total number of virtual replicas available per pod. @@ -68,7 +66,9 @@ type autoscaler struct { // refreshPeriod is how often the autoscaler tries to scale down the statefulset refreshPeriod time.Duration - lock sync.Locker + // retryPeriod is how often the autoscaler retry failed autoscale operations + retryPeriod time.Duration + lock sync.Locker // isLeader signals whether a given autoscaler instance is leader or not. // The autoscaler is considered the leader when ephemeralLeaderElectionObject is in a @@ -104,17 +104,17 @@ func (a *autoscaler) Demote(b reconciler.Bucket) { } } -func newAutoscaler(ctx context.Context, cfg *Config, stateAccessor st.StateAccessor, statefulSetCache *scheduler.ScaleCache) *autoscaler { - return &autoscaler{ - logger: logging.FromContext(ctx).With(zap.String("component", "autoscaler")), +func newAutoscaler(cfg *Config, stateAccessor st.StateAccessor, statefulSetCache *scheduler.ScaleCache) *autoscaler { + a := &autoscaler{ statefulSetCache: statefulSetCache, statefulSetName: cfg.StatefulSetName, vpodLister: cfg.VPodLister, stateAccessor: stateAccessor, evictor: cfg.Evictor, - trigger: make(chan struct{}, 1), + trigger: make(chan context.Context, 1), capacity: cfg.PodCapacity, refreshPeriod: cfg.RefreshPeriod, + retryPeriod: cfg.RetryPeriod, lock: new(sync.Mutex), isLeader: atomic.Bool{}, getReserved: cfg.getReserved, @@ -124,25 +124,38 @@ func newAutoscaler(ctx context.Context, cfg *Config, stateAccessor st.StateAcces Add(-cfg.RefreshPeriod). Add(-time.Minute), } + + if a.retryPeriod == 0 { + a.retryPeriod = time.Second + } + + return a } func (a *autoscaler) Start(ctx context.Context) { attemptScaleDown := false for { + autoscaleCtx := ctx select { case <-ctx.Done(): return case <-time.After(a.refreshPeriod): - a.logger.Infow("Triggering scale down", zap.Bool("isLeader", a.isLeader.Load())) + logging.FromContext(ctx).Infow("Triggering scale down", zap.Bool("isLeader", a.isLeader.Load())) attemptScaleDown = true - case <-a.trigger: - a.logger.Infow("Triggering scale up", zap.Bool("isLeader", a.isLeader.Load())) + case autoscaleCtx = <-a.trigger: + logging.FromContext(autoscaleCtx).Infow("Triggering scale up", zap.Bool("isLeader", a.isLeader.Load())) attemptScaleDown = false } // Retry a few times, just so that we don't have to wait for the next beat when // a transient error occurs - a.syncAutoscale(ctx, attemptScaleDown) + if err := a.syncAutoscale(autoscaleCtx, attemptScaleDown); err != nil { + logging.FromContext(autoscaleCtx).Errorw("Failed to sync autoscale", zap.Error(err)) + go func() { + time.Sleep(a.retryPeriod) + a.Autoscale(ctx) // Use top-level context for background retries + }() + } } } @@ -150,10 +163,10 @@ func (a *autoscaler) Autoscale(ctx context.Context) { select { // We trigger the autoscaler asynchronously by using the channel so that the scale down refresh // period is reset. - case a.trigger <- struct{}{}: + case a.trigger <- ctx: default: // We don't want to block if the channel's buffer is full, it will be triggered eventually. - + logging.FromContext(ctx).Debugw("Skipping autoscale since autoscale is in progress") } } @@ -161,36 +174,34 @@ func (a *autoscaler) syncAutoscale(ctx context.Context, attemptScaleDown bool) e a.lock.Lock() defer a.lock.Unlock() - var lastErr error - wait.Poll(500*time.Millisecond, 5*time.Second, func() (bool, error) { - err := a.doautoscale(ctx, attemptScaleDown) - if err != nil { - logging.FromContext(ctx).Errorw("Failed to autoscale", zap.Error(err)) - } - lastErr = err - return err == nil, nil - }) - return lastErr + if err := a.doautoscale(ctx, attemptScaleDown); err != nil { + return fmt.Errorf("failed to do autoscale: %w", err) + } + return nil } func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool) error { if !a.isLeader.Load() { return nil } - state, err := a.stateAccessor.State(a.getReserved()) + + logger := logging.FromContext(ctx).With("component", "autoscaler") + ctx = logging.WithLogger(ctx, logger) + + state, err := a.stateAccessor.State(ctx, a.getReserved()) if err != nil { - a.logger.Info("error while refreshing scheduler state (will retry)", zap.Error(err)) + logger.Info("error while refreshing scheduler state (will retry)", zap.Error(err)) return err } scale, err := a.statefulSetCache.GetScale(ctx, a.statefulSetName, metav1.GetOptions{}) if err != nil { // skip a beat - a.logger.Infow("failed to get scale subresource", zap.Error(err)) + logger.Infow("failed to get scale subresource", zap.Error(err)) return err } - a.logger.Debugw("checking adapter capacity", + logger.Debugw("checking adapter capacity", zap.Int32("replicas", scale.Spec.Replicas), zap.Any("state", state)) @@ -234,43 +245,43 @@ func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool) err if newreplicas != scale.Spec.Replicas { scale.Spec.Replicas = newreplicas - a.logger.Infow("updating adapter replicas", zap.Int32("replicas", scale.Spec.Replicas)) + logger.Infow("updating adapter replicas", zap.Int32("replicas", scale.Spec.Replicas)) _, err = a.statefulSetCache.UpdateScale(ctx, a.statefulSetName, scale, metav1.UpdateOptions{}) if err != nil { - a.logger.Errorw("updating scale subresource failed", zap.Error(err)) + logger.Errorw("updating scale subresource failed", zap.Error(err)) return err } } else if attemptScaleDown { // since the number of replicas hasn't changed and time has approached to scale down, // take the opportunity to compact the vreplicas - a.mayCompact(state, scaleUpFactor) + return a.mayCompact(logger, state, scaleUpFactor) } return nil } -func (a *autoscaler) mayCompact(s *st.State, scaleUpFactor int32) { +func (a *autoscaler) mayCompact(logger *zap.SugaredLogger, s *st.State, scaleUpFactor int32) error { // This avoids a too aggressive scale down by adding a "grace period" based on the refresh // period nextAttempt := a.lastCompactAttempt.Add(a.refreshPeriod) if time.Now().Before(nextAttempt) { - a.logger.Debugw("Compact was retried before refresh period", + logger.Debugw("Compact was retried before refresh period", zap.Time("lastCompactAttempt", a.lastCompactAttempt), zap.Time("nextAttempt", nextAttempt), zap.String("refreshPeriod", a.refreshPeriod.String()), ) - return + return nil } - a.logger.Debugw("Trying to compact and scale down", + logger.Debugw("Trying to compact and scale down", zap.Int32("scaleUpFactor", scaleUpFactor), zap.Any("state", s), ) // when there is only one pod there is nothing to move or number of pods is just enough! if s.LastOrdinal < 1 || len(s.SchedulablePods) <= int(scaleUpFactor) { - return + return nil } if s.SchedulerPolicy == scheduler.MAXFILLUP { @@ -283,7 +294,7 @@ func (a *autoscaler) mayCompact(s *st.State, scaleUpFactor int32) { a.lastCompactAttempt = time.Now() err := a.compact(s, scaleUpFactor) if err != nil { - a.logger.Errorw("vreplicas compaction failed", zap.Error(err)) + return fmt.Errorf("vreplicas compaction failed (scaleUpFactor %d): %w", scaleUpFactor, err) } } @@ -303,10 +314,11 @@ func (a *autoscaler) mayCompact(s *st.State, scaleUpFactor int32) { a.lastCompactAttempt = time.Now() err := a.compact(s, scaleUpFactor) if err != nil { - a.logger.Errorw("vreplicas compaction failed", zap.Error(err)) + return fmt.Errorf("vreplicas compaction failed (scaleUpFactor %d): %w", scaleUpFactor, err) } } } + return nil } func (a *autoscaler) compact(s *st.State, scaleUpFactor int32) error { @@ -323,16 +335,14 @@ func (a *autoscaler) compact(s *st.State, scaleUpFactor int32) error { ordinal := st.OrdinalFromPodName(placements[i].PodName) if ordinal == s.LastOrdinal-j { - wait.PollUntilContextTimeout(context.Background(), 50*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { - if s.PodLister != nil { - pod, err = s.PodLister.Get(placements[i].PodName) - } - return err == nil, nil - }) + pod, err = s.PodLister.Get(placements[i].PodName) + if err != nil { + return fmt.Errorf("failed to get pod %s: %w", placements[i].PodName, err) + } err = a.evictor(pod, vpod, &placements[i]) if err != nil { - return err + return fmt.Errorf("failed to evict pod %s: %w", pod.Name, err) } } } diff --git a/pkg/scheduler/statefulset/autoscaler_test.go b/pkg/scheduler/statefulset/autoscaler_test.go index 2d216daafa6..d7ff1c02310 100644 --- a/pkg/scheduler/statefulset/autoscaler_test.go +++ b/pkg/scheduler/statefulset/autoscaler_test.go @@ -30,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/types" v1 "k8s.io/client-go/listers/core/v1" gtesting "k8s.io/client-go/testing" + "knative.dev/pkg/logging" "knative.dev/pkg/reconciler" kubeclient "knative.dev/pkg/client/injection/kube/client/fake" @@ -451,7 +452,7 @@ func TestAutoscaler(t *testing.T) { scaleCache := scheduler.NewScaleCache(ctx, testNs, kubeclient.Get(ctx).AppsV1().StatefulSets(testNs), scheduler.ScaleCacheConfig{RefreshPeriod: time.Minute * 5}) - stateAccessor := state.NewStateBuilder(ctx, testNs, sfsName, vpodClient.List, 10, tc.schedulerPolicyType, tc.schedulerPolicy, tc.deschedulerPolicy, lspp, lsnn, scaleCache) + stateAccessor := state.NewStateBuilder(sfsName, vpodClient.List, 10, tc.schedulerPolicyType, tc.schedulerPolicy, tc.deschedulerPolicy, lspp, lsnn, scaleCache) sfsClient := kubeclient.Get(ctx).AppsV1().StatefulSets(testNs) _, err := sfsClient.Create(ctx, tscheduler.MakeStatefulset(testNs, sfsName, tc.replicas), metav1.CreateOptions{}) @@ -474,7 +475,7 @@ func TestAutoscaler(t *testing.T) { return tc.reserved }, } - autoscaler := newAutoscaler(ctx, cfg, stateAccessor, scaleCache) + autoscaler := newAutoscaler(cfg, stateAccessor, scaleCache) _ = autoscaler.Promote(reconciler.UniversalBucket(), nil) for _, vpod := range tc.vpods { @@ -512,7 +513,7 @@ func TestAutoscalerScaleDownToZero(t *testing.T) { vpodClient := tscheduler.NewVPodClient() ls := listers.NewListers(nil) scaleCache := scheduler.NewScaleCache(ctx, testNs, kubeclient.Get(ctx).AppsV1().StatefulSets(testNs), scheduler.ScaleCacheConfig{RefreshPeriod: time.Minute * 5}) - stateAccessor := state.NewStateBuilder(ctx, testNs, sfsName, vpodClient.List, 10, scheduler.MAXFILLUP, &scheduler.SchedulerPolicy{}, &scheduler.SchedulerPolicy{}, nil, ls.GetNodeLister(), scaleCache) + stateAccessor := state.NewStateBuilder(sfsName, vpodClient.List, 10, scheduler.MAXFILLUP, &scheduler.SchedulerPolicy{}, &scheduler.SchedulerPolicy{}, nil, ls.GetNodeLister(), scaleCache) sfsClient := kubeclient.Get(ctx).AppsV1().StatefulSets(testNs) _, err := sfsClient.Create(ctx, tscheduler.MakeStatefulset(testNs, sfsName, 10), metav1.CreateOptions{}) @@ -535,7 +536,7 @@ func TestAutoscalerScaleDownToZero(t *testing.T) { return nil }, } - autoscaler := newAutoscaler(ctx, cfg, stateAccessor, scaleCache) + autoscaler := newAutoscaler(cfg, stateAccessor, scaleCache) _ = autoscaler.Promote(reconciler.UniversalBucket(), nil) done := make(chan bool) @@ -950,7 +951,7 @@ func TestCompactor(t *testing.T) { lsp := listers.NewListers(podlist) lsn := listers.NewListers(nodelist) scaleCache := scheduler.NewScaleCache(ctx, testNs, kubeclient.Get(ctx).AppsV1().StatefulSets(testNs), scheduler.ScaleCacheConfig{RefreshPeriod: time.Minute * 5}) - stateAccessor := state.NewStateBuilder(ctx, testNs, sfsName, vpodClient.List, 10, tc.schedulerPolicyType, tc.schedulerPolicy, tc.deschedulerPolicy, lsp.GetPodLister().Pods(testNs), lsn.GetNodeLister(), scaleCache) + stateAccessor := state.NewStateBuilder(sfsName, vpodClient.List, 10, tc.schedulerPolicyType, tc.schedulerPolicy, tc.deschedulerPolicy, lsp.GetPodLister().Pods(testNs), lsn.GetNodeLister(), scaleCache) evictions := make(map[types.NamespacedName][]duckv1alpha1.Placement) recordEviction := func(pod *corev1.Pod, vpod scheduler.VPod, from *duckv1alpha1.Placement) error { @@ -966,7 +967,7 @@ func TestCompactor(t *testing.T) { RefreshPeriod: 10 * time.Second, PodCapacity: 10, } - autoscaler := newAutoscaler(ctx, cfg, stateAccessor, scaleCache) + autoscaler := newAutoscaler(cfg, stateAccessor, scaleCache) _ = autoscaler.Promote(reconciler.UniversalBucket(), func(bucket reconciler.Bucket, name types.NamespacedName) {}) assert.Equal(t, true, autoscaler.isLeader.Load()) @@ -974,7 +975,7 @@ func TestCompactor(t *testing.T) { vpodClient.Append(vpod) } - state, err := stateAccessor.State(nil) + state, err := stateAccessor.State(ctx, nil) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -988,7 +989,9 @@ func TestCompactor(t *testing.T) { scaleUpFactor = 1 // Non-HA scaling } - autoscaler.mayCompact(state, scaleUpFactor) + if err := autoscaler.mayCompact(logging.FromContext(ctx), state, scaleUpFactor); err != nil { + t.Fatal(err) + } if tc.wantEvictions == nil && len(evictions) != 0 { t.Fatalf("unexpected evictions: %v", evictions) diff --git a/pkg/scheduler/statefulset/scheduler.go b/pkg/scheduler/statefulset/scheduler.go index 1256e2769d9..6995d6ff452 100644 --- a/pkg/scheduler/statefulset/scheduler.go +++ b/pkg/scheduler/statefulset/scheduler.go @@ -33,11 +33,11 @@ import ( corev1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/utils/integer" + "knative.dev/pkg/logging" "knative.dev/pkg/reconciler" kubeclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/controller" - "knative.dev/pkg/logging" duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" "knative.dev/eventing/pkg/scheduler" @@ -67,6 +67,8 @@ type Config struct { PodCapacity int32 `json:"podCapacity"` // Autoscaler refresh period RefreshPeriod time.Duration `json:"refreshPeriod"` + // Autoscaler retry period + RetryPeriod time.Duration `json:"retryPeriod"` SchedulerPolicy scheduler.SchedulerPolicyType `json:"schedulerPolicy"` SchedPolicy *scheduler.SchedulerPolicy `json:"schedPolicy"` @@ -91,14 +93,14 @@ func New(ctx context.Context, cfg *Config) (scheduler.Scheduler, error) { scaleCache := scheduler.NewScaleCache(ctx, cfg.StatefulSetNamespace, kubeclient.Get(ctx).AppsV1().StatefulSets(cfg.StatefulSetNamespace), cfg.ScaleCacheConfig) - stateAccessor := st.NewStateBuilder(ctx, cfg.StatefulSetNamespace, cfg.StatefulSetName, cfg.VPodLister, cfg.PodCapacity, cfg.SchedulerPolicy, cfg.SchedPolicy, cfg.DeschedPolicy, cfg.PodLister, cfg.NodeLister, scaleCache) + stateAccessor := st.NewStateBuilder(cfg.StatefulSetName, cfg.VPodLister, cfg.PodCapacity, cfg.SchedulerPolicy, cfg.SchedPolicy, cfg.DeschedPolicy, cfg.PodLister, cfg.NodeLister, scaleCache) var getReserved GetReserved cfg.getReserved = func() map[types.NamespacedName]map[string]int32 { return getReserved() } - autoscaler := newAutoscaler(ctx, cfg, stateAccessor, scaleCache) + autoscaler := newAutoscaler(cfg, stateAccessor, scaleCache) var wg sync.WaitGroup wg.Add(1) @@ -126,8 +128,6 @@ func (p Pending) Total() int32 { // StatefulSetScheduler is a scheduler placing VPod into statefulset-managed set of pods type StatefulSetScheduler struct { - ctx context.Context - logger *zap.SugaredLogger statefulSetName string statefulSetNamespace string statefulSetClient clientappsv1.StatefulSetInterface @@ -171,8 +171,6 @@ func newStatefulSetScheduler(ctx context.Context, autoscaler Autoscaler) *StatefulSetScheduler { scheduler := &StatefulSetScheduler{ - ctx: ctx, - logger: logging.FromContext(ctx), statefulSetNamespace: cfg.StatefulSetNamespace, statefulSetName: cfg.StatefulSetName, statefulSetClient: kubeclient.Get(ctx).AppsV1().StatefulSets(cfg.StatefulSetNamespace), @@ -193,7 +191,9 @@ func newStatefulSetScheduler(ctx context.Context, sif.Apps().V1().StatefulSets().Informer(). AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: controller.FilterWithNameAndNamespace(cfg.StatefulSetNamespace, cfg.StatefulSetName), - Handler: controller.HandleAll(scheduler.updateStatefulset), + Handler: controller.HandleAll(func(i interface{}) { + scheduler.updateStatefulset(ctx, i) + }), }) sif.Start(ctx.Done()) @@ -207,13 +207,13 @@ func newStatefulSetScheduler(ctx context.Context, return scheduler } -func (s *StatefulSetScheduler) Schedule(vpod scheduler.VPod) ([]duckv1alpha1.Placement, error) { +func (s *StatefulSetScheduler) Schedule(ctx context.Context, vpod scheduler.VPod) ([]duckv1alpha1.Placement, error) { s.lock.Lock() defer s.lock.Unlock() s.reservedMu.Lock() defer s.reservedMu.Unlock() - placements, err := s.scheduleVPod(vpod) + placements, err := s.scheduleVPod(ctx, vpod) if placements == nil { return placements, err } @@ -228,11 +228,13 @@ func (s *StatefulSetScheduler) Schedule(vpod scheduler.VPod) ([]duckv1alpha1.Pla return placements, err } -func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1.Placement, error) { - logger := s.logger.With("key", vpod.GetKey(), zap.String("component", "scheduler")) +func (s *StatefulSetScheduler) scheduleVPod(ctx context.Context, vpod scheduler.VPod) ([]duckv1alpha1.Placement, error) { + logger := logging.FromContext(ctx).With("key", vpod.GetKey(), zap.String("component", "scheduler")) + ctx = logging.WithLogger(ctx, logger) + // Get the current placements state // Quite an expensive operation but safe and simple. - state, err := s.stateAccessor.State(s.reserved) + state, err := s.stateAccessor.State(ctx, s.reserved) if err != nil { logger.Debug("error while refreshing scheduler state (will retry)", zap.Error(err)) return nil, err @@ -270,13 +272,15 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1 } // Handle overcommitted pods. - if state.FreeCap[ordinal] < 0 { + if state.Free(ordinal) < 0 { // vr > free => vr: 9, overcommit 4 -> free: 0, vr: 5, pending: +4 // vr = free => vr: 4, overcommit 4 -> free: 0, vr: 0, pending: +4 // vr < free => vr: 3, overcommit 4 -> free: -1, vr: 0, pending: +3 overcommit := -state.FreeCap[ordinal] + logger.Debugw("overcommit", zap.Any("overcommit", overcommit), zap.Any("placement", p)) + if p.VReplicas >= overcommit { state.SetFree(ordinal, 0) state.Pending[vpod.GetKey()] += overcommit @@ -313,7 +317,9 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1 if state.SchedulerPolicy != "" { // Need less => scale down if tr > vpod.GetVReplicas() { - logger.Debugw("scaling down", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas())) + logger.Debugw("scaling down", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas()), + zap.Any("placements", placements), + zap.Any("existingPlacements", existingPlacements)) placements = s.removeReplicas(tr-vpod.GetVReplicas(), placements) @@ -323,15 +329,19 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1 } // Need more => scale up - logger.Debugw("scaling up", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas())) + logger.Debugw("scaling up", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas()), + zap.Any("placements", placements), + zap.Any("existingPlacements", existingPlacements)) placements, left = s.addReplicas(state, vpod.GetVReplicas()-tr, placements) } else { //Predicates and priorities must be used for scheduling // Need less => scale down if tr > vpod.GetVReplicas() && state.DeschedPolicy != nil { - logger.Infow("scaling down", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas())) - placements = s.removeReplicasWithPolicy(vpod, tr-vpod.GetVReplicas(), placements) + logger.Infow("scaling down", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas()), + zap.Any("placements", placements), + zap.Any("existingPlacements", existingPlacements)) + placements = s.removeReplicasWithPolicy(ctx, vpod, tr-vpod.GetVReplicas(), placements) // Do not trigger the autoscaler to avoid unnecessary churn @@ -343,8 +353,10 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1 // Need more => scale up // rebalancing needed for all vreps most likely since there are pending vreps from previous reconciliation // can fall here when vreps scaled up or after eviction - logger.Infow("scaling up with a rebalance (if needed)", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas())) - placements, left = s.rebalanceReplicasWithPolicy(vpod, vpod.GetVReplicas(), placements) + logger.Infow("scaling up with a rebalance (if needed)", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas()), + zap.Any("placements", placements), + zap.Any("existingPlacements", existingPlacements)) + placements, left = s.rebalanceReplicasWithPolicy(ctx, vpod, vpod.GetVReplicas(), placements) } } @@ -355,10 +367,10 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1 // Trigger the autoscaler if s.autoscaler != nil { logger.Infow("Awaiting autoscaler", zap.Any("placement", placements), zap.Int32("left", left)) - s.autoscaler.Autoscale(s.ctx) + s.autoscaler.Autoscale(ctx) } - if state.SchedPolicy != nil { + if state.SchedulerPolicy == "" && state.SchedPolicy != nil { logger.Info("reverting to previous placements") s.reservePlacements(vpod, existingPlacements) // rebalancing doesn't care about new placements since all vreps will be re-placed return existingPlacements, s.notEnoughPodReplicas(left) // requeue to wait for the autoscaler to do its job @@ -380,25 +392,25 @@ func toJSONable(pending map[types.NamespacedName]int32) map[string]int32 { return r } -func (s *StatefulSetScheduler) rebalanceReplicasWithPolicy(vpod scheduler.VPod, diff int32, placements []duckv1alpha1.Placement) ([]duckv1alpha1.Placement, int32) { +func (s *StatefulSetScheduler) rebalanceReplicasWithPolicy(ctx context.Context, vpod scheduler.VPod, diff int32, placements []duckv1alpha1.Placement) ([]duckv1alpha1.Placement, int32) { s.makeZeroPlacements(vpod, placements) - placements, diff = s.addReplicasWithPolicy(vpod, diff, make([]duckv1alpha1.Placement, 0)) //start fresh with a new placements list + placements, diff = s.addReplicasWithPolicy(ctx, vpod, diff, make([]duckv1alpha1.Placement, 0)) //start fresh with a new placements list return placements, diff } -func (s *StatefulSetScheduler) removeReplicasWithPolicy(vpod scheduler.VPod, diff int32, placements []duckv1alpha1.Placement) []duckv1alpha1.Placement { - logger := s.logger.Named("remove replicas with policy") +func (s *StatefulSetScheduler) removeReplicasWithPolicy(ctx context.Context, vpod scheduler.VPod, diff int32, placements []duckv1alpha1.Placement) []duckv1alpha1.Placement { + logger := logging.FromContext(ctx).Named("remove replicas with policy") numVreps := diff for i := int32(0); i < numVreps; i++ { //deschedule one vreplica at a time - state, err := s.stateAccessor.State(s.reserved) + state, err := s.stateAccessor.State(ctx, s.reserved) if err != nil { logger.Info("error while refreshing scheduler state (will retry)", zap.Error(err)) return placements } - feasiblePods := s.findFeasiblePods(s.ctx, state, vpod, state.DeschedPolicy) + feasiblePods := s.findFeasiblePods(ctx, state, vpod, state.DeschedPolicy) feasiblePods = s.removePodsNotInPlacement(vpod, feasiblePods) if len(feasiblePods) == 1 { //nothing to score, remove vrep from that pod placementPodID := feasiblePods[0] @@ -409,7 +421,7 @@ func (s *StatefulSetScheduler) removeReplicasWithPolicy(vpod scheduler.VPod, dif continue } - priorityList, err := s.prioritizePods(s.ctx, state, vpod, feasiblePods, state.DeschedPolicy) + priorityList, err := s.prioritizePods(ctx, state, vpod, feasiblePods, state.DeschedPolicy) if err != nil { logger.Info("error while scoring pods using priorities", zap.Error(err)) s.reservePlacements(vpod, placements) @@ -455,13 +467,13 @@ func (s *StatefulSetScheduler) removeSelectionFromPlacements(placementPodID int3 return newPlacements } -func (s *StatefulSetScheduler) addReplicasWithPolicy(vpod scheduler.VPod, diff int32, placements []duckv1alpha1.Placement) ([]duckv1alpha1.Placement, int32) { - logger := s.logger.Named("add replicas with policy") +func (s *StatefulSetScheduler) addReplicasWithPolicy(ctx context.Context, vpod scheduler.VPod, diff int32, placements []duckv1alpha1.Placement) ([]duckv1alpha1.Placement, int32) { + logger := logging.FromContext(ctx).Named("add replicas with policy") numVreps := diff for i := int32(0); i < numVreps; i++ { //schedule one vreplica at a time (find most suitable pod placement satisying predicates with high score) // Get the current placements state - state, err := s.stateAccessor.State(s.reserved) + state, err := s.stateAccessor.State(ctx, s.reserved) if err != nil { logger.Info("error while refreshing scheduler state (will retry)", zap.Error(err)) return placements, diff @@ -474,7 +486,7 @@ func (s *StatefulSetScheduler) addReplicasWithPolicy(vpod scheduler.VPod, diff i break //end the iteration for all vreps since there are not pods } - feasiblePods := s.findFeasiblePods(s.ctx, state, vpod, state.SchedPolicy) + feasiblePods := s.findFeasiblePods(ctx, state, vpod, state.SchedPolicy) if len(feasiblePods) == 0 { //no pods available to schedule this vreplica logger.Info("no feasible pods available to schedule this vreplica") s.reservePlacements(vpod, placements) @@ -492,7 +504,7 @@ func (s *StatefulSetScheduler) addReplicasWithPolicy(vpod scheduler.VPod, diff i continue } */ - priorityList, err := s.prioritizePods(s.ctx, state, vpod, feasiblePods, state.SchedPolicy) + priorityList, err := s.prioritizePods(ctx, state, vpod, feasiblePods, state.SchedPolicy) if err != nil { logger.Info("error while scoring pods using priorities", zap.Error(err)) s.reservePlacements(vpod, placements) @@ -567,7 +579,7 @@ func (s *StatefulSetScheduler) removePodsNotInPlacement(vpod scheduler.VPod, fea // prioritizePods prioritizes the pods by running the score plugins, which return a score for each pod. // The scores from each plugin are added together to make the score for that pod. func (s *StatefulSetScheduler) prioritizePods(ctx context.Context, states *st.State, vpod scheduler.VPod, feasiblePods []int32, policy *scheduler.SchedulerPolicy) (st.PodScoreList, error) { - logger := s.logger.Named("prioritize all feasible pods") + logger := logging.FromContext(ctx).Named("prioritize all feasible pods") // If no priority configs are provided, then all pods will have a score of one result := make(st.PodScoreList, 0, len(feasiblePods)) @@ -630,7 +642,7 @@ func (s *StatefulSetScheduler) selectPod(podScoreList st.PodScoreList) (int32, e // If any of these plugins doesn't return "Success", the pod is not suitable for placing the vrep. // Meanwhile, the failure message and status are set for the given pod. func (s *StatefulSetScheduler) RunFilterPlugins(ctx context.Context, states *st.State, vpod scheduler.VPod, podID int32, policy *scheduler.SchedulerPolicy) st.PluginToStatus { - logger := s.logger.Named("run all filter plugins") + logger := logging.FromContext(ctx).Named("run all filter plugins") statuses := make(st.PluginToStatus) for _, plugin := range policy.Predicates { @@ -663,7 +675,7 @@ func (s *StatefulSetScheduler) runFilterPlugin(ctx context.Context, pl st.Filter // RunScorePlugins runs the set of configured scoring plugins. It returns a list that stores for each scoring plugin name the corresponding PodScoreList(s). // It also returns *Status, which is set to non-success if any of the plugins returns a non-success status. func (s *StatefulSetScheduler) RunScorePlugins(ctx context.Context, states *st.State, vpod scheduler.VPod, feasiblePods []int32, policy *scheduler.SchedulerPolicy) (st.PluginToPodScores, *st.Status) { - logger := s.logger.Named("run all score plugins") + logger := logging.FromContext(ctx).Named("run all score plugins") pluginToPodScores := make(st.PluginToPodScores, len(policy.Priorities)) for _, plugin := range policy.Priorities { @@ -776,10 +788,11 @@ func (s *StatefulSetScheduler) addReplicas(states *st.State, diff int32, placeme return newPlacements, diff } -func (s *StatefulSetScheduler) updateStatefulset(obj interface{}) { +func (s *StatefulSetScheduler) updateStatefulset(ctx context.Context, obj interface{}) { statefulset, ok := obj.(*appsv1.StatefulSet) if !ok { - s.logger.Fatalw("expected a Statefulset object", zap.Any("object", obj)) + logging.FromContext(ctx).Warnw("expected a Statefulset object", zap.Any("object", obj)) + return } s.lock.Lock() @@ -789,7 +802,7 @@ func (s *StatefulSetScheduler) updateStatefulset(obj interface{}) { s.replicas = 1 } else if s.replicas != *statefulset.Spec.Replicas { s.replicas = *statefulset.Spec.Replicas - s.logger.Infow("statefulset replicas updated", zap.Int32("replicas", s.replicas)) + logging.FromContext(ctx).Infow("statefulset replicas updated", zap.Int32("replicas", s.replicas)) } } diff --git a/pkg/scheduler/statefulset/scheduler_test.go b/pkg/scheduler/statefulset/scheduler_test.go index eed53cffd12..f6250b11fa2 100644 --- a/pkg/scheduler/statefulset/scheduler_test.go +++ b/pkg/scheduler/statefulset/scheduler_test.go @@ -804,7 +804,7 @@ func TestStatefulsetScheduler(t *testing.T) { lsp := listers.NewListers(podlist) lsn := listers.NewListers(nodelist) scaleCache := scheduler.NewScaleCache(ctx, testNs, kubeclient.Get(ctx).AppsV1().StatefulSets(testNs), scheduler.ScaleCacheConfig{RefreshPeriod: time.Minute * 5}) - sa := state.NewStateBuilder(ctx, testNs, sfsName, vpodClient.List, 10, tc.schedulerPolicyType, tc.schedulerPolicy, tc.deschedulerPolicy, lsp.GetPodLister().Pods(testNs), lsn.GetNodeLister(), scaleCache) + sa := state.NewStateBuilder(sfsName, vpodClient.List, 10, tc.schedulerPolicyType, tc.schedulerPolicy, tc.deschedulerPolicy, lsp.GetPodLister().Pods(testNs), lsn.GetNodeLister(), scaleCache) cfg := &Config{ StatefulSetNamespace: testNs, StatefulSetName: sfsName, @@ -823,7 +823,7 @@ func TestStatefulsetScheduler(t *testing.T) { } vpod := vpodClient.Create(vpodNamespace, vpodName, tc.vreplicas, tc.placements) - placements, err := s.Schedule(vpod) + placements, err := s.Schedule(ctx, vpod) if tc.err == nil && err != nil { t.Fatal("unexpected error", err)