diff --git a/control-plane/pkg/reconciler/consumergroup/consumergroup.go b/control-plane/pkg/reconciler/consumergroup/consumergroup.go index 1b7a99902b..64cc211085 100644 --- a/control-plane/pkg/reconciler/consumergroup/consumergroup.go +++ b/control-plane/pkg/reconciler/consumergroup/consumergroup.go @@ -449,7 +449,7 @@ func (r *Reconciler) schedule(ctx context.Context, cg *kafkainternals.ConsumerGr return cg.MarkScheduleConsumerFailed("Schedule", err) } - placements, err := statefulSetScheduler.Schedule(cg) + placements, err := statefulSetScheduler.Schedule(ctx, cg) if err != nil { return cg.MarkScheduleConsumerFailed("Schedule", err) } diff --git a/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go b/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go index 536d0574aa..f1c37fd574 100644 --- a/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go +++ b/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go @@ -61,10 +61,10 @@ import ( kedaclient "knative.dev/eventing-kafka-broker/third_party/pkg/client/injection/client/fake" ) -type SchedulerFunc func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) +type SchedulerFunc func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) -func (f SchedulerFunc) Schedule(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { - return f(vpod) +func (f SchedulerFunc) Schedule(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + return f(ctx, vpod) } const ( @@ -102,7 +102,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: SchedulerFunc(func(_ context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -189,7 +189,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: SchedulerFunc(func(_ context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -307,7 +307,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: SchedulerFunc(func(_ context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -402,7 +402,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: SchedulerFunc(func(_ context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -528,7 +528,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: SchedulerFunc(func(_ context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -702,7 +702,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: SchedulerFunc(func(_ context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -877,7 +877,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: SchedulerFunc(func(_ context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -1034,7 +1034,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: SchedulerFunc(func(_ context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, }, nil @@ -1121,7 +1121,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: SchedulerFunc(func(_ context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -1208,7 +1208,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: SchedulerFunc(func(_ context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -1303,7 +1303,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: SchedulerFunc(func(_ context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 2}, @@ -1426,7 +1426,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: SchedulerFunc(func(_ context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 2}, @@ -1533,7 +1533,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: SchedulerFunc(func(_ context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -1630,7 +1630,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: SchedulerFunc(func(_ context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return nil, io.EOF }), }, @@ -1762,7 +1762,7 @@ func TestReconcileKindNoAutoscaler(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: SchedulerFunc(func(_ context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -1926,7 +1926,7 @@ func TestFinalizeKind(t *testing.T) { }, Key: testKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: SchedulerFunc(func(_ context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return nil, nil }), }, @@ -1995,7 +1995,7 @@ func TestFinalizeKind(t *testing.T) { }, Key: testKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: SchedulerFunc(func(_ context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return nil, nil }), }, @@ -2121,7 +2121,7 @@ func TestFinalizeKind(t *testing.T) { }, Key: testKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: SchedulerFunc(func(_ context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return nil, nil }), }, @@ -2167,7 +2167,7 @@ func TestFinalizeKind(t *testing.T) { }, Key: testKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: SchedulerFunc(func(_ context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return nil, nil }), kafkatesting.ErrorOnDeleteConsumerGroupTestKey: sarama.ErrUnknownTopicOrPartition, @@ -2214,7 +2214,7 @@ func TestFinalizeKind(t *testing.T) { }, Key: testKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: SchedulerFunc(func(_ context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return nil, nil }), kafkatesting.ErrorOnDeleteConsumerGroupTestKey: sarama.ErrGroupIDNotFound, @@ -2262,7 +2262,7 @@ func TestFinalizeKind(t *testing.T) { WantErr: true, Key: testKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: SchedulerFunc(func(_ context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return nil, nil }), kafkatesting.ErrorOnDeleteConsumerGroupTestKey: sarama.ErrClusterAuthorizationFailed,