diff --git a/pkg/source/mtadapter/adapter.go b/pkg/source/mtadapter/adapter.go index 1b5a185dbe..17d993bf05 100644 --- a/pkg/source/mtadapter/adapter.go +++ b/pkg/source/mtadapter/adapter.go @@ -198,7 +198,9 @@ func (a *Adapter) Update(ctx context.Context, obj *v1beta1.KafkaSource) error { URL: sinkURI, } - adapter := a.adapterCtor(ctx, &config, sink, reporter) + // Need kubeclient in the context to make the adapter + adapterCtx := context.WithValue(ctx, kubeclient.Key{}, a.kubeClient) + adapter := a.adapterCtor(adapterCtx, &config, sink, reporter) // TODO: define Limit interface. if sta, ok := adapter.(*stadapter.Adapter); ok {