From 698a50e501a1d00ff84616c37d0fcdecc098abc9 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Mon, 13 May 2024 14:39:58 -0400 Subject: [PATCH 01/11] feat: add constructor for full graph Signed-off-by: Calum Murray --- pkg/graph/constructor.go | 146 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 143 insertions(+), 3 deletions(-) diff --git a/pkg/graph/constructor.go b/pkg/graph/constructor.go index 96c7c877bd0..cdfbade85c8 100644 --- a/pkg/graph/constructor.go +++ b/pkg/graph/constructor.go @@ -17,14 +17,154 @@ limitations under the License. package graph import ( + "context" "fmt" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" eventingv1beta3 "knative.dev/eventing/pkg/apis/eventing/v1beta3" messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" + eventingclient "knative.dev/eventing/pkg/client/injection/client" duckv1 "knative.dev/pkg/apis/duck/v1" ) +func ConstructGraph(ctx context.Context, filterFunc func(obj interface{}) bool) (*Graph, error) { + eventingClient := eventingclient.Get(ctx) + + g := NewGraph() + + brokers, err := eventingClient.EventingV1().Brokers("").List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, err + } + + for _, broker := range brokers.Items { + if filterFunc(broker) { + g.AddBroker(broker) + } + } + + channels, err := eventingClient.MessagingV1().Channels("").List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, err + } + + for _, channel := range channels.Items { + if filterFunc(channel) { + g.AddChannel(channel) + } + } + + apiServerSources, err := eventingClient.SourcesV1().ApiServerSources("").List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, err + } + + for _, apiServerSource := range apiServerSources.Items { + if filterFunc(apiServerSource) { + g.AddSource(duckv1.Source{ + ObjectMeta: apiServerSource.ObjectMeta, + TypeMeta: apiServerSource.TypeMeta, + Spec: apiServerSource.Spec.SourceSpec, + Status: apiServerSource.Status.SourceStatus, + }) + } + } + + containerSources, err := eventingClient.SourcesV1().ContainerSources("").List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, err + } + + for _, containerSource := range containerSources.Items { + if filterFunc(containerSource) { + g.AddSource(duckv1.Source{ + ObjectMeta: containerSource.ObjectMeta, + TypeMeta: containerSource.TypeMeta, + Spec: containerSource.Spec.SourceSpec, + Status: containerSource.Status.SourceStatus, + }) + } + } + + pingSources, err := eventingClient.SourcesV1().PingSources("").List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, err + } + + for _, pingSource := range pingSources.Items { + if filterFunc(pingSource) { + g.AddSource(duckv1.Source{ + ObjectMeta: pingSource.ObjectMeta, + TypeMeta: pingSource.TypeMeta, + Spec: pingSource.Spec.SourceSpec, + Status: pingSource.Status.SourceStatus, + }) + } + } + + sinkBindings, err := eventingClient.SourcesV1().SinkBindings("").List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, err + } + + for _, sinkBinding := range sinkBindings.Items { + if filterFunc(sinkBinding) { + g.AddSource(duckv1.Source{ + ObjectMeta: sinkBinding.ObjectMeta, + TypeMeta: sinkBinding.TypeMeta, + Spec: sinkBinding.Spec.SourceSpec, + Status: sinkBinding.Status.SourceStatus, + }) + } + } + + triggers, err := eventingClient.EventingV1().Triggers("").List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, err + } + + for _, trigger := range triggers.Items { + if filterFunc(trigger) { + err := g.AddTrigger(trigger) + if err != nil { + return nil, err + } + } + } + + subscriptions, err := eventingClient.MessagingV1().Subscriptions("").List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, err + } + + for _, subscription := range subscriptions.Items { + if filterFunc(subscription) { + err := g.AddSubscription(subscription) + if err != nil { + return nil, err + } + } + } + + eventTypes, err := eventingClient.EventingV1beta3().EventTypes("").List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, err + } + + for _, eventType := range eventTypes.Items { + if filterFunc(eventType) { + err := g.AddEventType(eventType) + if err != nil { + return nil, err + } + } + } + + return g, nil +} + func (g *Graph) AddBroker(broker eventingv1.Broker) { ref := &duckv1.KReference{ Name: broker.Name, @@ -74,7 +214,7 @@ func (g *Graph) AddChannel(channel messagingv1.Channel) { v.AddEdge(to, dest, NoTransform{}, true) } -func (g *Graph) AddEventType(et *eventingv1beta3.EventType) error { +func (g *Graph) AddEventType(et eventingv1beta3.EventType) error { ref := &duckv1.KReference{ Name: et.Name, Namespace: et.Namespace, @@ -89,7 +229,7 @@ func (g *Graph) AddEventType(et *eventingv1beta3.EventType) error { return fmt.Errorf("trigger/subscription must have a primary outward edge, but had none") } - outEdge.To().AddEdge(outEdge.From(), dest, EventTypeTransform{EventType: et}, false) + outEdge.To().AddEdge(outEdge.From(), dest, EventTypeTransform{EventType: &et}, false) return nil } @@ -97,7 +237,7 @@ func (g *Graph) AddEventType(et *eventingv1beta3.EventType) error { from := g.getOrCreateVertex(dest) to := g.getOrCreateVertex(&duckv1.Destination{Ref: et.Spec.Reference}) - from.AddEdge(to, dest, EventTypeTransform{EventType: et}, false) + from.AddEdge(to, dest, EventTypeTransform{EventType: &et}, false) return nil } From 582ca21dcbf3e8574640aa9436b0cd852fc61d0b Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Mon, 13 May 2024 14:51:01 -0400 Subject: [PATCH 02/11] fix: tests build again Signed-off-by: Calum Murray --- pkg/graph/constructor_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/graph/constructor_test.go b/pkg/graph/constructor_test.go index 944045ddc34..cc9a076dae5 100644 --- a/pkg/graph/constructor_test.go +++ b/pkg/graph/constructor_test.go @@ -1788,12 +1788,12 @@ func TestAddChannel(t *testing.T) { func TestAddEventType(t *testing.T) { tests := []struct { name string - et *eventingv1beta3.EventType + et eventingv1beta3.EventType expected map[comparableDestination]*Vertex }{ { name: "ET references source", - et: &eventingv1beta3.EventType{ + et: eventingv1beta3.EventType{ ObjectMeta: metav1.ObjectMeta{ Name: "my-EventType", Namespace: "default", From 67fefb7fc6d7ad98aa50234e51eb73563bde065e Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Tue, 16 Jul 2024 16:10:02 -0400 Subject: [PATCH 03/11] fix: handle fetching sources properly Signed-off-by: Calum Murray --- pkg/graph/constructor.go | 185 +++++++++++++++++++++++++++------------ 1 file changed, 128 insertions(+), 57 deletions(-) diff --git a/pkg/graph/constructor.go b/pkg/graph/constructor.go index cdfbade85c8..940af9a4d21 100644 --- a/pkg/graph/constructor.go +++ b/pkg/graph/constructor.go @@ -18,15 +18,20 @@ package graph import ( "context" + "encoding/json" "fmt" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" eventingv1beta3 "knative.dev/eventing/pkg/apis/eventing/v1beta3" messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" eventingclient "knative.dev/eventing/pkg/client/injection/client" duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/pkg/injection/clients/dynamicclient" ) func ConstructGraph(ctx context.Context, filterFunc func(obj interface{}) bool) (*Graph, error) { @@ -56,67 +61,14 @@ func ConstructGraph(ctx context.Context, filterFunc func(obj interface{}) bool) } } - apiServerSources, err := eventingClient.SourcesV1().ApiServerSources("").List(ctx, metav1.ListOptions{}) + sources, err := getSources(ctx) if err != nil { return nil, err } - for _, apiServerSource := range apiServerSources.Items { - if filterFunc(apiServerSource) { - g.AddSource(duckv1.Source{ - ObjectMeta: apiServerSource.ObjectMeta, - TypeMeta: apiServerSource.TypeMeta, - Spec: apiServerSource.Spec.SourceSpec, - Status: apiServerSource.Status.SourceStatus, - }) - } - } - - containerSources, err := eventingClient.SourcesV1().ContainerSources("").List(ctx, metav1.ListOptions{}) - if err != nil { - return nil, err - } - - for _, containerSource := range containerSources.Items { - if filterFunc(containerSource) { - g.AddSource(duckv1.Source{ - ObjectMeta: containerSource.ObjectMeta, - TypeMeta: containerSource.TypeMeta, - Spec: containerSource.Spec.SourceSpec, - Status: containerSource.Status.SourceStatus, - }) - } - } - - pingSources, err := eventingClient.SourcesV1().PingSources("").List(ctx, metav1.ListOptions{}) - if err != nil { - return nil, err - } - - for _, pingSource := range pingSources.Items { - if filterFunc(pingSource) { - g.AddSource(duckv1.Source{ - ObjectMeta: pingSource.ObjectMeta, - TypeMeta: pingSource.TypeMeta, - Spec: pingSource.Spec.SourceSpec, - Status: pingSource.Status.SourceStatus, - }) - } - } - - sinkBindings, err := eventingClient.SourcesV1().SinkBindings("").List(ctx, metav1.ListOptions{}) - if err != nil { - return nil, err - } - - for _, sinkBinding := range sinkBindings.Items { - if filterFunc(sinkBinding) { - g.AddSource(duckv1.Source{ - ObjectMeta: sinkBinding.ObjectMeta, - TypeMeta: sinkBinding.TypeMeta, - Spec: sinkBinding.Spec.SourceSpec, - Status: sinkBinding.Status.SourceStatus, - }) + for _, source := range sources { + if filterFunc(source) { + g.AddSource(source) } } @@ -349,3 +301,122 @@ func (g *Graph) AddSubscription(subscription messagingv1.Subscription) error { return nil } + +func getSources(ctx context.Context) ([]duckv1.Source, error) { + client := dynamicclient.Get(ctx) + sourceCRDs, err := client.Resource( + schema.GroupVersionResource{ + Group: "apiextentions.k8s.io", + Version: "v1", + Resource: "customresourcedefinitions", + }, + ).List(ctx, metav1.ListOptions{LabelSelector: labels.Set{"duck.knative.dev/source": "true"}.String()}) + if err != nil { + return nil, fmt.Errorf("unable to list source CRDs: %w", err) + } + + duckSources := []duckv1.Source{} + + for i := range sourceCRDs.Items { + sourceCrd := sourceCRDs.Items[i] + sourceGVR, err := gvrFromUnstructured(&sourceCrd) + if err != nil { + continue + } + + sourcesList, err := client.Resource(sourceGVR).List(ctx, metav1.ListOptions{}) + if err != nil { + continue + } + + for i := range sourcesList.Items { + unstructuredSource := sourcesList.Items[i] + duckSource, err := duckSourceFromUnstructured(&unstructuredSource) + if err == nil { + duckSources = append(duckSources, duckSource) + } + } + } + + return duckSources, nil +} + +func duckSourceFromUnstructured(u *unstructured.Unstructured) (duckv1.Source, error) { + duckSource := duckv1.Source{} + marshalled, err := u.MarshalJSON() + if err != nil { + return duckSource, err + } + + err = json.Unmarshal(marshalled, &duckSource) + return duckSource, err +} +func gvrFromUnstructured(u *unstructured.Unstructured) (schema.GroupVersionResource, error) { + group, err := groupFromUnstructured(u) + if err != nil { + return schema.GroupVersionResource{}, err + } + + version, err := versionFromUnstructured(u) + if err != nil { + return schema.GroupVersionResource{}, err + } + + resource, err := resourceFromUnstructured(u) + if err != nil { + return schema.GroupVersionResource{}, err + } + + return schema.GroupVersionResource{ + Group: group, + Version: version, + Resource: resource, + }, nil +} + +func groupFromUnstructured(u *unstructured.Unstructured) (string, error) { + content := u.UnstructuredContent() + group, found, err := unstructured.NestedString(content, "spec", "group") + if !found || err != nil { + return "", fmt.Errorf("can't find source kind from source CRD: %w", err) + } + + return group, nil +} + +func versionFromUnstructured(u *unstructured.Unstructured) (string, error) { + content := u.UnstructuredContent() + var version string + versions, found, err := unstructured.NestedSlice(content, "spec", "versions") + if !found || err != nil || len(versions) == 0 { + version, found, err = unstructured.NestedString(content, "spec", "version") + if !found || err != nil { + return "", fmt.Errorf("can't find source version from source CRD: %w", err) + } + } else { + for _, v := range versions { + if vmap, ok := v.(map[string]interface{}); ok { + if vmap["served"] == true { + version = vmap["name"].(string) + break + } + } + } + } + + if version == "" { + return "", fmt.Errorf("can't find source version from source CRD: %w", err) + } + + return version, nil +} + +func resourceFromUnstructured(u *unstructured.Unstructured) (string, error) { + content := u.UnstructuredContent() + resource, found, err := unstructured.NestedString(content, "spec", "names", "plural") + if !found || err != nil { + return "", fmt.Errorf("can't find source resource from source CRD: %w", err) + } + + return resource, nil +} From ab9870d9155ff2bc033d59b1ac5fca166c3b7eb1 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Tue, 16 Jul 2024 16:23:19 -0400 Subject: [PATCH 04/11] fix: don't return early for auth errors Signed-off-by: Calum Murray --- pkg/graph/constructor.go | 63 +++++++++++++++++++++++----------------- 1 file changed, 37 insertions(+), 26 deletions(-) diff --git a/pkg/graph/constructor.go b/pkg/graph/constructor.go index f4037b94b22..eb9e0d45200 100644 --- a/pkg/graph/constructor.go +++ b/pkg/graph/constructor.go @@ -21,6 +21,7 @@ import ( "encoding/json" "fmt" + apierrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" @@ -40,24 +41,28 @@ func ConstructGraph(ctx context.Context, filterFunc func(obj interface{}) bool) g := NewGraph() brokers, err := eventingClient.EventingV1().Brokers("").List(ctx, metav1.ListOptions{}) - if err != nil { + if err != nil && !apierrs.IsNotFound(err) && !apierrs.IsUnauthorized(err) && !apierrs.IsForbidden(err) { return nil, err } - for _, broker := range brokers.Items { - if filterFunc(broker) { - g.AddBroker(broker) + if err == nil { + for _, broker := range brokers.Items { + if filterFunc(broker) { + g.AddBroker(broker) + } } } channels, err := eventingClient.MessagingV1().Channels("").List(ctx, metav1.ListOptions{}) - if err != nil { + if err != nil && !apierrs.IsNotFound(err) && !apierrs.IsUnauthorized(err) && !apierrs.IsForbidden(err) { return nil, err } - for _, channel := range channels.Items { - if filterFunc(channel) { - g.AddChannel(channel) + if err == nil { + for _, channel := range channels.Items { + if filterFunc(channel) { + g.AddChannel(channel) + } } } @@ -73,43 +78,49 @@ func ConstructGraph(ctx context.Context, filterFunc func(obj interface{}) bool) } triggers, err := eventingClient.EventingV1().Triggers("").List(ctx, metav1.ListOptions{}) - if err != nil { + if err != nil && !apierrs.IsNotFound(err) && !apierrs.IsUnauthorized(err) && !apierrs.IsForbidden(err) { return nil, err } - for _, trigger := range triggers.Items { - if filterFunc(trigger) { - err := g.AddTrigger(trigger) - if err != nil { - return nil, err + if err == nil { + for _, trigger := range triggers.Items { + if filterFunc(trigger) { + err := g.AddTrigger(trigger) + if err != nil { + return nil, err + } } } } subscriptions, err := eventingClient.MessagingV1().Subscriptions("").List(ctx, metav1.ListOptions{}) - if err != nil { + if err != nil && !apierrs.IsNotFound(err) && !apierrs.IsUnauthorized(err) && !apierrs.IsForbidden(err) { return nil, err } - for _, subscription := range subscriptions.Items { - if filterFunc(subscription) { - err := g.AddSubscription(subscription) - if err != nil { - return nil, err + if err == nil { + for _, subscription := range subscriptions.Items { + if filterFunc(subscription) { + err := g.AddSubscription(subscription) + if err != nil { + return nil, err + } } } } eventTypes, err := eventingClient.EventingV1beta3().EventTypes("").List(ctx, metav1.ListOptions{}) - if err != nil { + if err != nil && !apierrs.IsNotFound(err) && !apierrs.IsUnauthorized(err) && !apierrs.IsForbidden(err) { return nil, err } - for _, eventType := range eventTypes.Items { - if filterFunc(eventType) { - err := g.AddEventType(eventType) - if err != nil { - return nil, err + if err == nil { + for _, eventType := range eventTypes.Items { + if filterFunc(eventType) { + err := g.AddEventType(eventType) + if err != nil { + return nil, err + } } } } From 43cd8e86372abb82de2c29c7d12f366ea034d2ed Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Mon, 22 Jul 2024 23:51:21 -0400 Subject: [PATCH 05/11] feat: added config for graph constructor Signed-off-by: Calum Murray --- pkg/graph/constructor.go | 171 ++++++++++++++++++++++----------------- 1 file changed, 98 insertions(+), 73 deletions(-) diff --git a/pkg/graph/constructor.go b/pkg/graph/constructor.go index eb9e0d45200..31ab572df8c 100644 --- a/pkg/graph/constructor.go +++ b/pkg/graph/constructor.go @@ -26,100 +26,117 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + rest "k8s.io/client-go/rest" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" eventingv1beta3 "knative.dev/eventing/pkg/apis/eventing/v1beta3" messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" - eventingclient "knative.dev/eventing/pkg/client/injection/client" + eventingclient "knative.dev/eventing/pkg/client/clientset/versioned" duckv1 "knative.dev/pkg/apis/duck/v1" - "knative.dev/pkg/injection/clients/dynamicclient" ) -func ConstructGraph(ctx context.Context, filterFunc func(obj interface{}) bool) (*Graph, error) { - eventingClient := eventingclient.Get(ctx) - - g := NewGraph() +type ConstructorConfig struct { + RestConfig rest.Config + Namespaces []string + ShouldAddBroker func(b eventingv1.Broker) bool + ShouldAddChannel func(c messagingv1.Channel) bool + ShouldAddSource func(s duckv1.Source) bool + ShouldAddTrigger func(t eventingv1.Trigger) bool + ShouldAddSubscription func(s messagingv1.Subscription) bool + ShouldAddEventType func(et eventingv1beta3.EventType) bool +} - brokers, err := eventingClient.EventingV1().Brokers("").List(ctx, metav1.ListOptions{}) - if err != nil && !apierrs.IsNotFound(err) && !apierrs.IsUnauthorized(err) && !apierrs.IsForbidden(err) { +func ConstructGraph(ctx context.Context, config ConstructorConfig, filterFunc func(obj interface{}) bool) (*Graph, error) { + eventingClient, err := eventingclient.NewForConfig(&config.RestConfig) + if err != nil { return nil, err } - if err == nil { - for _, broker := range brokers.Items { - if filterFunc(broker) { - g.AddBroker(broker) + g := NewGraph() + + for _, ns := range config.Namespaces { + brokers, err := eventingClient.EventingV1().Brokers(ns).List(ctx, metav1.ListOptions{}) + if err != nil && !apierrs.IsNotFound(err) && !apierrs.IsUnauthorized(err) && !apierrs.IsForbidden(err) { + return nil, err + } + + if err == nil { + for _, broker := range brokers.Items { + if config.ShouldAddBroker(broker) { + g.AddBroker(broker) + } } } - } - channels, err := eventingClient.MessagingV1().Channels("").List(ctx, metav1.ListOptions{}) - if err != nil && !apierrs.IsNotFound(err) && !apierrs.IsUnauthorized(err) && !apierrs.IsForbidden(err) { - return nil, err - } + channels, err := eventingClient.MessagingV1().Channels(ns).List(ctx, metav1.ListOptions{}) + if err != nil && !apierrs.IsNotFound(err) && !apierrs.IsUnauthorized(err) && !apierrs.IsForbidden(err) { + return nil, err + } - if err == nil { - for _, channel := range channels.Items { - if filterFunc(channel) { - g.AddChannel(channel) + if err == nil { + for _, channel := range channels.Items { + if config.ShouldAddChannel(channel) { + g.AddChannel(channel) + } } } - } - sources, err := getSources(ctx) - if err != nil { - return nil, err - } + sources, err := getSources(ctx, config) + if err != nil { + return nil, err + } - for _, source := range sources { - if filterFunc(source) { - g.AddSource(source) + for _, source := range sources { + if config.ShouldAddSource(source) { + g.AddSource(source) + } } - } - triggers, err := eventingClient.EventingV1().Triggers("").List(ctx, metav1.ListOptions{}) - if err != nil && !apierrs.IsNotFound(err) && !apierrs.IsUnauthorized(err) && !apierrs.IsForbidden(err) { - return nil, err - } + triggers, err := eventingClient.EventingV1().Triggers(ns).List(ctx, metav1.ListOptions{}) + if err != nil && !apierrs.IsNotFound(err) && !apierrs.IsUnauthorized(err) && !apierrs.IsForbidden(err) { + return nil, err + } - if err == nil { - for _, trigger := range triggers.Items { - if filterFunc(trigger) { - err := g.AddTrigger(trigger) - if err != nil { - return nil, err + if err == nil { + for _, trigger := range triggers.Items { + if config.ShouldAddTrigger(trigger) { + err := g.AddTrigger(trigger) + if err != nil { + return nil, err + } } } } - } - subscriptions, err := eventingClient.MessagingV1().Subscriptions("").List(ctx, metav1.ListOptions{}) - if err != nil && !apierrs.IsNotFound(err) && !apierrs.IsUnauthorized(err) && !apierrs.IsForbidden(err) { - return nil, err - } + subscriptions, err := eventingClient.MessagingV1().Subscriptions(ns).List(ctx, metav1.ListOptions{}) + if err != nil && !apierrs.IsNotFound(err) && !apierrs.IsUnauthorized(err) && !apierrs.IsForbidden(err) { + return nil, err + } - if err == nil { - for _, subscription := range subscriptions.Items { - if filterFunc(subscription) { - err := g.AddSubscription(subscription) - if err != nil { - return nil, err + if err == nil { + for _, subscription := range subscriptions.Items { + if config.ShouldAddSubscription(subscription) { + err := g.AddSubscription(subscription) + if err != nil { + return nil, err + } } } } - } - eventTypes, err := eventingClient.EventingV1beta3().EventTypes("").List(ctx, metav1.ListOptions{}) - if err != nil && !apierrs.IsNotFound(err) && !apierrs.IsUnauthorized(err) && !apierrs.IsForbidden(err) { - return nil, err - } + eventTypes, err := eventingClient.EventingV1beta3().EventTypes(ns).List(ctx, metav1.ListOptions{}) + if err != nil && !apierrs.IsNotFound(err) && !apierrs.IsUnauthorized(err) && !apierrs.IsForbidden(err) { + return nil, err + } - if err == nil { - for _, eventType := range eventTypes.Items { - if filterFunc(eventType) { - err := g.AddEventType(eventType) - if err != nil { - return nil, err + if err == nil { + for _, eventType := range eventTypes.Items { + if config.ShouldAddEventType(eventType) { + err := g.AddEventType(eventType) + if err != nil { + return nil, err + } } } } @@ -258,6 +275,7 @@ func (g *Graph) AddTrigger(trigger eventingv1.Trigger) error { return nil } + func (g *Graph) AddSubscription(subscription messagingv1.Subscription) error { channelRef := &duckv1.KReference{ Name: subscription.Spec.Channel.Name, @@ -300,8 +318,12 @@ func (g *Graph) AddSubscription(subscription messagingv1.Subscription) error { } -func getSources(ctx context.Context) ([]duckv1.Source, error) { - client := dynamicclient.Get(ctx) +func getSources(ctx context.Context, config ConstructorConfig) ([]duckv1.Source, error) { + client, err := dynamic.NewForConfig(&config.RestConfig) + if err != nil { + return nil, err + } + sourceCRDs, err := client.Resource( schema.GroupVersionResource{ Group: "apiextentions.k8s.io", @@ -322,16 +344,18 @@ func getSources(ctx context.Context) ([]duckv1.Source, error) { continue } - sourcesList, err := client.Resource(sourceGVR).List(ctx, metav1.ListOptions{}) - if err != nil { - continue - } + for _, ns := range config.Namespaces { + sourcesList, err := client.Resource(sourceGVR).Namespace(ns).List(ctx, metav1.ListOptions{}) + if err != nil { + continue + } - for i := range sourcesList.Items { - unstructuredSource := sourcesList.Items[i] - duckSource, err := duckSourceFromUnstructured(&unstructuredSource) - if err == nil { - duckSources = append(duckSources, duckSource) + for i := range sourcesList.Items { + unstructuredSource := sourcesList.Items[i] + duckSource, err := duckSourceFromUnstructured(&unstructuredSource) + if err == nil { + duckSources = append(duckSources, duckSource) + } } } } @@ -349,6 +373,7 @@ func duckSourceFromUnstructured(u *unstructured.Unstructured) (duckv1.Source, er err = json.Unmarshal(marshalled, &duckSource) return duckSource, err } + func gvrFromUnstructured(u *unstructured.Unstructured) (schema.GroupVersionResource, error) { group, err := groupFromUnstructured(u) if err != nil { From 0173c1adc63e1bae8cbe1a1658d3012973f38761 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Tue, 23 Jul 2024 09:52:31 -0400 Subject: [PATCH 06/11] feat: add logging when unauthorized/forbidden to list resources Signed-off-by: Calum Murray --- pkg/graph/constructor.go | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/pkg/graph/constructor.go b/pkg/graph/constructor.go index 31ab572df8c..831d5e0c01d 100644 --- a/pkg/graph/constructor.go +++ b/pkg/graph/constructor.go @@ -21,6 +21,7 @@ import ( "encoding/json" "fmt" + "go.uber.org/zap" apierrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -47,7 +48,7 @@ type ConstructorConfig struct { ShouldAddEventType func(et eventingv1beta3.EventType) bool } -func ConstructGraph(ctx context.Context, config ConstructorConfig, filterFunc func(obj interface{}) bool) (*Graph, error) { +func ConstructGraph(ctx context.Context, config ConstructorConfig, logger zap.Logger) (*Graph, error) { eventingClient, err := eventingclient.NewForConfig(&config.RestConfig) if err != nil { return nil, err @@ -61,6 +62,10 @@ func ConstructGraph(ctx context.Context, config ConstructorConfig, filterFunc fu return nil, err } + if apierrs.IsUnauthorized(err) || apierrs.IsForbidden(err) { + logger.Warn("failed to list brokers while constructing lineage graph", zap.Error(err)) + } + if err == nil { for _, broker := range brokers.Items { if config.ShouldAddBroker(broker) { @@ -74,6 +79,10 @@ func ConstructGraph(ctx context.Context, config ConstructorConfig, filterFunc fu return nil, err } + if apierrs.IsUnauthorized(err) || apierrs.IsForbidden(err) { + logger.Warn("failed to list channels while constructing lineage graph", zap.Error(err)) + } + if err == nil { for _, channel := range channels.Items { if config.ShouldAddChannel(channel) { @@ -98,6 +107,10 @@ func ConstructGraph(ctx context.Context, config ConstructorConfig, filterFunc fu return nil, err } + if apierrs.IsUnauthorized(err) || apierrs.IsForbidden(err) { + logger.Warn("failed to list triggers while constructing lineage graph", zap.Error(err)) + } + if err == nil { for _, trigger := range triggers.Items { if config.ShouldAddTrigger(trigger) { @@ -114,6 +127,10 @@ func ConstructGraph(ctx context.Context, config ConstructorConfig, filterFunc fu return nil, err } + if apierrs.IsUnauthorized(err) || apierrs.IsForbidden(err) { + logger.Warn("failed to list subscriptions while constructing lineage graph", zap.Error(err)) + } + if err == nil { for _, subscription := range subscriptions.Items { if config.ShouldAddSubscription(subscription) { @@ -130,6 +147,10 @@ func ConstructGraph(ctx context.Context, config ConstructorConfig, filterFunc fu return nil, err } + if apierrs.IsUnauthorized(err) || apierrs.IsForbidden(err) { + logger.Warn("failed to list eventtypes while constructing lineage graph", zap.Error(err)) + } + if err == nil { for _, eventType := range eventTypes.Items { if config.ShouldAddEventType(eventType) { From bdc959b56cf141884b0aeb8667a329df7318ee79 Mon Sep 17 00:00:00 2001 From: Ali Ok Date: Thu, 25 Jul 2024 17:56:26 +0300 Subject: [PATCH 07/11] Fix vertices fn creating nil pointers for 3 entries, it was creating an array of 6 Signed-off-by: Ali Ok --- pkg/graph/types.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/graph/types.go b/pkg/graph/types.go index 838019ac614..b0a1bdd7a52 100644 --- a/pkg/graph/types.go +++ b/pkg/graph/types.go @@ -75,8 +75,10 @@ func NewGraph() *Graph { func (g *Graph) Vertices() Vertices { vertices := make([]*Vertex, len(g.vertices)) + i := 0 for _, v := range g.vertices { - vertices = append(vertices, v) + vertices[i] = v + i++ } return vertices } From 3fb2c40bf14967164e946a76df8259d0357ccceb Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Thu, 25 Jul 2024 14:43:24 -0400 Subject: [PATCH 08/11] fix review comments from @aliok Signed-off-by: Calum Murray --- pkg/graph/constructor.go | 182 ++++++++++++++++++++++++++++++--------- pkg/graph/types.go | 8 ++ 2 files changed, 151 insertions(+), 39 deletions(-) diff --git a/pkg/graph/constructor.go b/pkg/graph/constructor.go index 831d5e0c01d..374ba58edd8 100644 --- a/pkg/graph/constructor.go +++ b/pkg/graph/constructor.go @@ -22,6 +22,7 @@ import ( "fmt" "go.uber.org/zap" + "k8s.io/apimachinery/pkg/api/errors" apierrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -41,11 +42,17 @@ type ConstructorConfig struct { RestConfig rest.Config Namespaces []string ShouldAddBroker func(b eventingv1.Broker) bool + FetchBrokers bool ShouldAddChannel func(c messagingv1.Channel) bool + FetchChannels bool ShouldAddSource func(s duckv1.Source) bool + FetchSources bool ShouldAddTrigger func(t eventingv1.Trigger) bool + FetchTriggers bool ShouldAddSubscription func(s messagingv1.Subscription) bool + FetchSubscriptions bool ShouldAddEventType func(et eventingv1beta3.EventType) bool + FetchEventTypes bool } func ConstructGraph(ctx context.Context, config ConstructorConfig, logger zap.Logger) (*Graph, error) { @@ -56,10 +63,48 @@ func ConstructGraph(ctx context.Context, config ConstructorConfig, logger zap.Lo g := NewGraph() + err = g.fetchBrokers(ctx, config, eventingClient, logger) + if err != nil { + return nil, err + } + + err = g.fetchChannels(ctx, config, eventingClient, logger) + if err != nil { + return nil, err + } + + err = g.fetchSources(ctx, config, eventingClient, logger) + if err != nil { + return nil, err + } + + err = g.fetchTriggers(ctx, config, eventingClient, logger) + if err != nil { + return nil, err + } + + err = g.fetchSubscriptions(ctx, config, eventingClient, logger) + if err != nil { + return nil, err + } + + err = g.fetchEventTypes(ctx, config, eventingClient, logger) + if err != nil { + return nil, err + } + + return g, nil +} + +func (g *Graph) fetchBrokers(ctx context.Context, config ConstructorConfig, eventingClient *eventingclient.Clientset, logger zap.Logger) error { + if !config.FetchBrokers { + return nil + } + for _, ns := range config.Namespaces { brokers, err := eventingClient.EventingV1().Brokers(ns).List(ctx, metav1.ListOptions{}) if err != nil && !apierrs.IsNotFound(err) && !apierrs.IsUnauthorized(err) && !apierrs.IsForbidden(err) { - return nil, err + return err } if apierrs.IsUnauthorized(err) || apierrs.IsForbidden(err) { @@ -68,15 +113,26 @@ func ConstructGraph(ctx context.Context, config ConstructorConfig, logger zap.Lo if err == nil { for _, broker := range brokers.Items { - if config.ShouldAddBroker(broker) { + if config.ShouldAddBroker == nil || config.ShouldAddBroker(broker) { g.AddBroker(broker) } } } + } + + return nil +} + +func (g *Graph) fetchChannels(ctx context.Context, config ConstructorConfig, eventingClient *eventingclient.Clientset, logger zap.Logger) error { + if !config.FetchChannels { + return nil + } + + for _, ns := range config.Namespaces { channels, err := eventingClient.MessagingV1().Channels(ns).List(ctx, metav1.ListOptions{}) if err != nil && !apierrs.IsNotFound(err) && !apierrs.IsUnauthorized(err) && !apierrs.IsForbidden(err) { - return nil, err + return err } if apierrs.IsUnauthorized(err) || apierrs.IsForbidden(err) { @@ -85,26 +141,44 @@ func ConstructGraph(ctx context.Context, config ConstructorConfig, logger zap.Lo if err == nil { for _, channel := range channels.Items { - if config.ShouldAddChannel(channel) { + if config.ShouldAddChannel == nil || config.ShouldAddChannel(channel) { g.AddChannel(channel) } } } + } - sources, err := getSources(ctx, config) - if err != nil { - return nil, err - } + return nil +} - for _, source := range sources { - if config.ShouldAddSource(source) { - g.AddSource(source) - } +func (g *Graph) fetchSources(ctx context.Context, config ConstructorConfig, _ *eventingclient.Clientset, logger zap.Logger) error { + if !config.FetchSources { + return nil + } + + sources, err := getSources(ctx, config, logger) + if err != nil { + return err + } + + for _, source := range sources { + if config.ShouldAddSource == nil || config.ShouldAddSource(source) { + g.AddSource(source) } + } + + return nil +} +func (g *Graph) fetchTriggers(ctx context.Context, config ConstructorConfig, eventingClient *eventingclient.Clientset, logger zap.Logger) error { + if !config.FetchTriggers { + return nil + } + + for _, ns := range config.Namespaces { triggers, err := eventingClient.EventingV1().Triggers(ns).List(ctx, metav1.ListOptions{}) if err != nil && !apierrs.IsNotFound(err) && !apierrs.IsUnauthorized(err) && !apierrs.IsForbidden(err) { - return nil, err + return err } if apierrs.IsUnauthorized(err) || apierrs.IsForbidden(err) { @@ -113,18 +187,28 @@ func ConstructGraph(ctx context.Context, config ConstructorConfig, logger zap.Lo if err == nil { for _, trigger := range triggers.Items { - if config.ShouldAddTrigger(trigger) { + if config.ShouldAddTrigger == nil || config.ShouldAddTrigger(trigger) { err := g.AddTrigger(trigger) if err != nil { - return nil, err + return err } } } } + } + + return nil +} + +func (g *Graph) fetchSubscriptions(ctx context.Context, config ConstructorConfig, eventingClient *eventingclient.Clientset, logger zap.Logger) error { + if !config.FetchSubscriptions { + return nil + } + for _, ns := range config.Namespaces { subscriptions, err := eventingClient.MessagingV1().Subscriptions(ns).List(ctx, metav1.ListOptions{}) if err != nil && !apierrs.IsNotFound(err) && !apierrs.IsUnauthorized(err) && !apierrs.IsForbidden(err) { - return nil, err + return err } if apierrs.IsUnauthorized(err) || apierrs.IsForbidden(err) { @@ -133,18 +217,28 @@ func ConstructGraph(ctx context.Context, config ConstructorConfig, logger zap.Lo if err == nil { for _, subscription := range subscriptions.Items { - if config.ShouldAddSubscription(subscription) { + if config.ShouldAddSubscription == nil || config.ShouldAddSubscription(subscription) { err := g.AddSubscription(subscription) if err != nil { - return nil, err + return err } } } } + } + return nil +} + +func (g *Graph) fetchEventTypes(ctx context.Context, config ConstructorConfig, eventingClient *eventingclient.Clientset, logger zap.Logger) error { + if !config.FetchEventTypes { + return nil + } + + for _, ns := range config.Namespaces { eventTypes, err := eventingClient.EventingV1beta3().EventTypes(ns).List(ctx, metav1.ListOptions{}) if err != nil && !apierrs.IsNotFound(err) && !apierrs.IsUnauthorized(err) && !apierrs.IsForbidden(err) { - return nil, err + return err } if apierrs.IsUnauthorized(err) || apierrs.IsForbidden(err) { @@ -156,14 +250,14 @@ func ConstructGraph(ctx context.Context, config ConstructorConfig, logger zap.Lo if config.ShouldAddEventType(eventType) { err := g.AddEventType(eventType) if err != nil { - return nil, err + return err } } } } } - return g, nil + return nil } func (g *Graph) AddBroker(broker eventingv1.Broker) { @@ -176,7 +270,7 @@ func (g *Graph) AddBroker(broker eventingv1.Broker) { dest := &duckv1.Destination{Ref: ref} // check if this vertex already exists - v := g.getOrCreateVertex(dest) + v := g.getOrCreateVertex(dest, broker) if broker.Spec.Delivery == nil || broker.Spec.Delivery.DeadLetterSink == nil { // no DLS, we are done @@ -184,7 +278,7 @@ func (g *Graph) AddBroker(broker eventingv1.Broker) { } // broker has a DLS, we need to add an edge to that - to := g.getOrCreateVertex(broker.Spec.Delivery.DeadLetterSink) + to := g.getOrCreateVertex(broker.Spec.Delivery.DeadLetterSink, nil) v.AddEdge(to, dest, NoTransform{}, true) } @@ -202,7 +296,7 @@ func (g *Graph) AddChannel(channel messagingv1.Channel) { } dest := &duckv1.Destination{Ref: ref} - v := g.getOrCreateVertex(dest) + v := g.getOrCreateVertex(dest, channel) if channel.Spec.Delivery == nil || channel.Spec.Delivery.DeadLetterSink == nil { // no DLS, we are done @@ -210,7 +304,7 @@ func (g *Graph) AddChannel(channel messagingv1.Channel) { } // channel has a DLS, we need to add an edge to that - to := g.getOrCreateVertex(channel.Spec.Delivery.DeadLetterSink) + to := g.getOrCreateVertex(channel.Spec.Delivery.DeadLetterSink, nil) v.AddEdge(to, dest, NoTransform{}, true) } @@ -235,8 +329,8 @@ func (g *Graph) AddEventType(et eventingv1beta3.EventType) error { return nil } - from := g.getOrCreateVertex(dest) - to := g.getOrCreateVertex(&duckv1.Destination{Ref: et.Spec.Reference}) + from := g.getOrCreateVertex(dest, et) + to := g.getOrCreateVertex(&duckv1.Destination{Ref: et.Spec.Reference}, nil) from.AddEdge(to, dest, EventTypeTransform{EventType: &et}, false) @@ -252,9 +346,9 @@ func (g *Graph) AddSource(source duckv1.Source) { } dest := &duckv1.Destination{Ref: ref} - v := g.getOrCreateVertex(dest) + v := g.getOrCreateVertex(dest, source) - to := g.getOrCreateVertex(&source.Spec.Sink) + to := g.getOrCreateVertex(&source.Spec.Sink, nil) v.AddEdge(to, dest, CloudEventOverridesTransform{Overrides: source.Spec.CloudEventOverrides}, true) } @@ -280,7 +374,7 @@ func (g *Graph) AddTrigger(trigger eventingv1.Trigger) error { } triggerDest := &duckv1.Destination{Ref: triggerRef} - to := g.getOrCreateVertex(&trigger.Spec.Subscriber) + to := g.getOrCreateVertex(&trigger.Spec.Subscriber, nil) //TODO: the transform function should be set according to the trigger filter - there are multiple open issues to address this later broker.AddEdge(to, triggerDest, getTransformForTrigger(trigger), false) @@ -289,7 +383,7 @@ func (g *Graph) AddTrigger(trigger eventingv1.Trigger) error { return nil } - dls := g.getOrCreateVertex(trigger.Spec.Delivery.DeadLetterSink) + dls := g.getOrCreateVertex(trigger.Spec.Delivery.DeadLetterSink, nil) broker.AddEdge(dls, triggerDest, NoTransform{}, true) @@ -319,12 +413,12 @@ func (g *Graph) AddSubscription(subscription messagingv1.Subscription) error { } subscriptionDest := &duckv1.Destination{Ref: subscriptionRef} - to := g.getOrCreateVertex(subscription.Spec.Subscriber) + to := g.getOrCreateVertex(subscription.Spec.Subscriber, nil) channel.AddEdge(to, subscriptionDest, NoTransform{}, false) // If the subscription has a reply field set, there should be another Edge struct. if subscription.Spec.Reply != nil { - reply := g.getOrCreateVertex(subscription.Spec.Reply) + reply := g.getOrCreateVertex(subscription.Spec.Reply, nil) to.AddEdge(reply, subscriptionDest, NoTransform{}, false) } @@ -332,14 +426,14 @@ func (g *Graph) AddSubscription(subscription messagingv1.Subscription) error { if subscription.Spec.Delivery == nil || subscription.Spec.Delivery.DeadLetterSink == nil { return nil } - dls := g.getOrCreateVertex(subscription.Spec.Delivery.DeadLetterSink) + dls := g.getOrCreateVertex(subscription.Spec.Delivery.DeadLetterSink, nil) channel.AddEdge(dls, subscriptionDest, NoTransform{}, true) return nil } -func getSources(ctx context.Context, config ConstructorConfig) ([]duckv1.Source, error) { +func getSources(ctx context.Context, config ConstructorConfig, logger zap.Logger) ([]duckv1.Source, error) { client, err := dynamic.NewForConfig(&config.RestConfig) if err != nil { return nil, err @@ -353,7 +447,14 @@ func getSources(ctx context.Context, config ConstructorConfig) ([]duckv1.Source, }, ).List(ctx, metav1.ListOptions{LabelSelector: labels.Set{"duck.knative.dev/source": "true"}.String()}) if err != nil { - return nil, fmt.Errorf("unable to list source CRDs: %w", err) + if errors.IsNotFound(err) || errors.IsUnauthorized(err) || errors.IsForbidden(err) { + logger.Warn("failed to list source CRDs", zap.Error(err)) + // no need to keep processing here, but also this isn't an error that should stop us from + // continuing to build the graph + return nil, nil + } else { + return nil, fmt.Errorf("unable to list source CRDs: %w", err) + } } duckSources := []duckv1.Source{} @@ -368,6 +469,8 @@ func getSources(ctx context.Context, config ConstructorConfig) ([]duckv1.Source, for _, ns := range config.Namespaces { sourcesList, err := client.Resource(sourceGVR).Namespace(ns).List(ctx, metav1.ListOptions{}) if err != nil { + // just log and continue, we may succeed for other sources + logger.Warn("Failed to list sources", zap.Error(err)) continue } @@ -473,12 +576,13 @@ func getTransformForTrigger(trigger eventingv1.Trigger) Transform { return NoTransform{} } -func (g *Graph) getOrCreateVertex(dest *duckv1.Destination) *Vertex { +func (g *Graph) getOrCreateVertex(dest *duckv1.Destination, resource interface{}) *Vertex { v, ok := g.vertices[makeComparableDestination(dest)] if !ok { v = &Vertex{ - self: dest, - parent: g, + self: dest, + parent: g, + resource: resource, } g.vertices[makeComparableDestination(dest)] = v } diff --git a/pkg/graph/types.go b/pkg/graph/types.go index b0a1bdd7a52..d6cb743e46a 100644 --- a/pkg/graph/types.go +++ b/pkg/graph/types.go @@ -33,6 +33,7 @@ type Vertex struct { inEdges []*Edge outEdges []*Edge visited bool + resource interface{} } type Vertices []*Vertex @@ -134,6 +135,13 @@ func (v *Vertex) Visited() bool { return v.visited } +func (v *Vertex) Resource() (obj interface{}, ok bool) { + if v.resource == nil { + return nil, false + } + return v.resource, true +} + func (v *Vertex) NewWithSameRef() *Vertex { return &Vertex{ self: v.self, From 64dd3f669eaf9987d6db83daa702b8425965fcc7 Mon Sep 17 00:00:00 2001 From: Ali Ok Date: Fri, 26 Jul 2024 12:33:54 +0300 Subject: [PATCH 09/11] Add stringer impl to types Signed-off-by: Ali Ok --- pkg/graph/types.go | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/pkg/graph/types.go b/pkg/graph/types.go index d6cb743e46a..632727b700a 100644 --- a/pkg/graph/types.go +++ b/pkg/graph/types.go @@ -17,6 +17,8 @@ limitations under the License. package graph import ( + "fmt" + eventingv1beta3 "knative.dev/eventing/pkg/apis/eventing/v1beta3" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" @@ -103,6 +105,21 @@ func (g *Graph) Sources() Vertices { return sources } +func (g *Graph) String() string { + s := "" + for _, v := range g.vertices { + s += fmt.Sprintf("%s\n", v) + if len(v.outEdges) != 0 { + s += "Out Edges:\n" + for _, e := range v.outEdges { + s += fmt.Sprintf("\t%s\n", e) + } + } + } + + return s +} + func (v *Vertex) InDegree() int { return len(v.inEdges) } @@ -164,6 +181,10 @@ func (v *Vertex) AddEdge(to *Vertex, edgeRef *duckv1.Destination, transform Tran v.parent.edges[makeComparableDestination(edgeRef)] = append(v.parent.edges[makeComparableDestination(edgeRef)], edge) } +func (v *Vertex) String() string { + return DestString(v.self) +} + func (g *Graph) GetPrimaryOutEdgeWithRef(edgeRef *duckv1.KReference) *Edge { if edges, ok := g.edges[makeComparableDestination(&duckv1.Destination{Ref: edgeRef})]; ok { for _, e := range edges { @@ -196,6 +217,23 @@ func (e *Edge) Reference() *duckv1.Destination { return e.self } +func (e *Edge) String() string { + return fmt.Sprintf("[%s] --> [%s]", DestString(e.from.self), DestString(e.to.self)) +} + +func DestString(self *duckv1.Destination) string { + if self.Ref != nil && self.URI != nil { + return fmt.Sprintf("%s (%s)", self.Ref.String(), self.URI.String()) + } + if self.Ref != nil { + return self.Ref.String() + } + if self.URI != nil { + return self.URI.String() + } + return "nil" +} + func makeComparableDestination(dest *duckv1.Destination) comparableDestination { res := comparableDestination{} if dest.Ref != nil { From b68d8398dc02cc3862d0d4cd38c10e3eb5f2bea6 Mon Sep 17 00:00:00 2001 From: Ali Ok Date: Fri, 26 Jul 2024 12:41:30 +0300 Subject: [PATCH 10/11] No panic if source filter is not specified Signed-off-by: Ali Ok --- pkg/graph/constructor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/graph/constructor.go b/pkg/graph/constructor.go index 374ba58edd8..0668bbdb143 100644 --- a/pkg/graph/constructor.go +++ b/pkg/graph/constructor.go @@ -247,7 +247,7 @@ func (g *Graph) fetchEventTypes(ctx context.Context, config ConstructorConfig, e if err == nil { for _, eventType := range eventTypes.Items { - if config.ShouldAddEventType(eventType) { + if config.ShouldAddEventType == nil || config.ShouldAddEventType(eventType) { err := g.AddEventType(eventType) if err != nil { return err From d60a069f46da847f563622880a830e22848df46c Mon Sep 17 00:00:00 2001 From: Ali Ok Date: Mon, 29 Jul 2024 15:04:09 +0300 Subject: [PATCH 11/11] Replace some hardcoded strings Signed-off-by: Ali Ok --- pkg/graph/constructor.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/graph/constructor.go b/pkg/graph/constructor.go index 0668bbdb143..2cd4e6d43f4 100644 --- a/pkg/graph/constructor.go +++ b/pkg/graph/constructor.go @@ -264,7 +264,7 @@ func (g *Graph) AddBroker(broker eventingv1.Broker) { ref := &duckv1.KReference{ Name: broker.Name, Namespace: broker.Namespace, - APIVersion: "eventing.knative.dev/v1", + APIVersion: eventingv1.SchemeGroupVersion.String(), Kind: "Broker", } dest := &duckv1.Destination{Ref: ref} @@ -291,7 +291,7 @@ func (g *Graph) AddChannel(channel messagingv1.Channel) { ref := &duckv1.KReference{ Name: channel.Name, Namespace: channel.Namespace, - APIVersion: "messaging.knative.dev/v1", + APIVersion: messagingv1.SchemeGroupVersion.String(), Kind: channel.Kind, } dest := &duckv1.Destination{Ref: ref} @@ -313,7 +313,7 @@ func (g *Graph) AddEventType(et eventingv1beta3.EventType) error { ref := &duckv1.KReference{ Name: et.Name, Namespace: et.Namespace, - APIVersion: "eventing.knative.dev/v1beta3", + APIVersion: eventingv1beta3.SchemeGroupVersion.String(), Kind: "EventType", } dest := &duckv1.Destination{Ref: ref} @@ -357,7 +357,7 @@ func (g *Graph) AddTrigger(trigger eventingv1.Trigger) error { brokerRef := &duckv1.KReference{ Name: trigger.Spec.Broker, Namespace: trigger.Namespace, - APIVersion: "eventing.knative.dev/v1", + APIVersion: eventingv1.SchemeGroupVersion.String(), Kind: "Broker", } brokerDest := &duckv1.Destination{Ref: brokerRef} @@ -369,7 +369,7 @@ func (g *Graph) AddTrigger(trigger eventingv1.Trigger) error { triggerRef := &duckv1.KReference{ Name: trigger.Name, Namespace: trigger.Namespace, - APIVersion: "eventing.knative.dev/v1", + APIVersion: eventingv1.SchemeGroupVersion.String(), Kind: "Trigger", } triggerDest := &duckv1.Destination{Ref: triggerRef}