Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add EventPolicy reconciliation for Sequence #8106

Merged
merged 32 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
fb45ef6
feat: initial commit
Leo6Leo Jul 19, 2024
3298698
Merge branch 'main' into Create-EventPolicies-for-Sequence
Leo6Leo Jul 22, 2024
043141a
feat: add the test for eventpolicy in sequence reconciler
Leo6Leo Jul 23, 2024
2c99a12
fix: fix the typo and remove the unused helper function
Leo6Leo Jul 23, 2024
1696769
fix: trying to fix the git diff issue
Leo6Leo Jul 24, 2024
0adb907
fix: trying to fix the git diff issue
Leo6Leo Jul 24, 2024
e82a1db
fix: fix the nit minor comments
Leo6Leo Jul 24, 2024
5e73f53
fix: update the reconcilation mechanism
Leo6Leo Jul 24, 2024
ca8b3ae
Merge branch 'main' into Create-EventPolicies-for-Sequence
Leo6Leo Jul 25, 2024
78a8b0a
fix: fix the goimports and remove unused helper functions and input p…
Leo6Leo Jul 25, 2024
fe72c3c
fix: add more unit tests to test out remove steps from the sequence
Leo6Leo Jul 26, 2024
f088cf8
Update pkg/reconciler/sequence/sequence.go
Leo6Leo Jul 26, 2024
22520ed
Update pkg/reconciler/sequence/sequence.go
Leo6Leo Jul 26, 2024
f048559
Apply suggestions from code review
Leo6Leo Jul 26, 2024
092f87e
fix: fix the nit review comments from pierdipi and rahul
Leo6Leo Jul 26, 2024
8126f8c
fix: using auth.GetEventPoliciesForResource when trying to list all S…
Leo6Leo Jul 26, 2024
2f23635
Merge remote-tracking branch 'upstream/main' into Create-EventPolicie…
Leo6Leo Jul 29, 2024
268c595
feat: add the sorting to avoid flaky test when there are multiple eve…
Leo6Leo Jul 31, 2024
38017b8
feat: remove the nil condition for channel name when creating the seq…
Leo6Leo Jul 31, 2024
fc8be57
feat: add more unit tests
Leo6Leo Jul 31, 2024
b7bfc92
fix: lint & goimports
Leo6Leo Jul 31, 2024
56b071b
Merge remote-tracking branch 'upstream/main' into Create-EventPolicie…
Leo6Leo Jul 31, 2024
dce2679
Merge remote-tracking branch 'upstream/main' into Create-EventPolicie…
Leo6Leo Aug 1, 2024
18373ca
Merge branch 'main' into Create-EventPolicies-for-Sequence
Leo6Leo Aug 7, 2024
5da98e0
fix: fix the review comments
Leo6Leo Aug 7, 2024
36a1cb3
fix: fix Christoph's review comments
Leo6Leo Aug 7, 2024
6042cbb
feat: adding a test for sequence with existing intermediate eventpoli…
Leo6Leo Aug 7, 2024
9d1e73b
fix: the deepDerivative failed to compare the eventpolicy's From.Spec…
Leo6Leo Aug 7, 2024
e6cddbe
fix: change back to use DeepDerivative
Leo6Leo Aug 8, 2024
cf4d257
fix: fix the test case to make the eventpolicy has a valid spec
Leo6Leo Aug 8, 2024
ad4df83
fix: fix the flaky issue by soring the policies
Leo6Leo Aug 8, 2024
6c8fa1a
fix: change input channel's ownerref to sequence's eventpolicy
Leo6Leo Aug 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions pkg/reconciler/sequence/resources/eventpolicy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
Copyright 2024 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package resources

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
flowsv1 "knative.dev/eventing/pkg/apis/flows/v1"
messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/pkg/kmeta"
)

const (
SequenceChannelEventPolicyLabelPrefix = "flows.knative.dev/"
sequenceKind = "Sequence"
)

func MakeEventPolicyForSequenceChannel(s *flowsv1.Sequence, channel *eventingduckv1.Channelable, subscription *messagingv1.Subscription) *eventingv1alpha1.EventPolicy {
return &eventingv1alpha1.EventPolicy{
ObjectMeta: metav1.ObjectMeta{
Namespace: channel.Namespace,
Name: SequenceEventPolicyName(s.Name, channel.Name),
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: flowsv1.SchemeGroupVersion.String(),
Kind: sequenceKind,
Name: s.Name,
creydr marked this conversation as resolved.
Show resolved Hide resolved
},
},
Labels: LabelsForSequenceChannelsEventPolicy(s.Name),
},
Spec: eventingv1alpha1.EventPolicySpec{
To: []eventingv1alpha1.EventPolicySpecTo{
{
Ref: &eventingv1alpha1.EventPolicyToReference{
APIVersion: channel.APIVersion,
Kind: channel.Kind,
Name: channel.Name,
},
},
},
From: []eventingv1alpha1.EventPolicySpecFrom{
{
Ref: &eventingv1alpha1.EventPolicyFromReference{
APIVersion: subscription.APIVersion,
Kind: subscription.Kind,
Name: subscription.Name,
Namespace: subscription.Namespace,
},
},
},
},
}
}

func LabelsForSequenceChannelsEventPolicy(sequenceName string) map[string]string {
return map[string]string{
SequenceChannelEventPolicyLabelPrefix + "sequence-group": flowsv1.SchemeGroupVersion.Group,
SequenceChannelEventPolicyLabelPrefix + "sequence-version": flowsv1.SchemeGroupVersion.Version,
SequenceChannelEventPolicyLabelPrefix + "sequence-kind": sequenceKind,
SequenceChannelEventPolicyLabelPrefix + "sequence-name": sequenceName,
Copy link
Member

@pierDipi pierDipi Jul 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we using the kind, version or group label? I can only see the name being useful, and therefore the GVK can be removed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand the flow correctly, I think the GK is used when the controller is trying to watch which eventpolicy get changed.

sequenceGK := flowsv1.SchemeGroupVersion.WithKind("Sequence").GroupKind()
// Enqueue the Sequence, if we have an EventPolicy which was referencing
// or got updated and now is referencing the Sequence
eventPolicyInformer.Informer().AddEventHandler(auth.EventPolicyEventHandler(
sequenceInformer.Informer().GetIndexer(),
sequenceGK,
impl.EnqueueKey,
))

}
}

func SequenceEventPolicyName(sequenceName, channelName string) string {
// if channel name is empty, it means the event policy is for the output channel
Leo6Leo marked this conversation as resolved.
Show resolved Hide resolved
if channelName == "" {
return kmeta.ChildName(sequenceName, "-ep") // no need to add the channel name
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when is the channel name empty?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we are making the eventpolicy for the sequence.

As you can see from the test here:
https://github.com/knative/eventing/pull/8106/files#diff-42ed9804c3511a386317c39a39cea9571e16d694153edb7d79c800ae4589ffaaR2302

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, but I didn't get it, is empty only for testing? when is the channel name empty in the real production case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see now by trying to answer your question. It is only for the testing purposes. I couldn't come up with any use case in the real production use. I will try to see what I can do and provide update in this thread.

} else {
return kmeta.ChildName(sequenceName, "-ep-"+channelName)
}

}

// MakeEventPolicyForSequenceInputChannel creates an EventPolicy for the input channel of a Sequence
func MakeEventPolicyForSequenceInputChannel(s *flowsv1.Sequence, inputChannel *eventingduckv1.Channelable, sequencePolicy *eventingv1alpha1.EventPolicy) *eventingv1alpha1.EventPolicy {
return &eventingv1alpha1.EventPolicy{
ObjectMeta: metav1.ObjectMeta{
Namespace: inputChannel.Namespace,
Name: SequenceEventPolicyName(s.Name, inputChannel.Name),
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: flowsv1.SchemeGroupVersion.String(),
Kind: sequenceKind,
Name: s.Name,
creydr marked this conversation as resolved.
Show resolved Hide resolved
},
},
Labels: LabelsForSequenceChannelsEventPolicy(s.Name),
},
Spec: eventingv1alpha1.EventPolicySpec{
To: []eventingv1alpha1.EventPolicySpecTo{
{
Ref: &eventingv1alpha1.EventPolicyToReference{
APIVersion: inputChannel.APIVersion,
Kind: inputChannel.Kind,
Name: inputChannel.Name,
},
},
},
From: sequencePolicy.Spec.From,
},
}
}
105 changes: 104 additions & 1 deletion pkg/reconciler/sequence/sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@

"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/dynamic"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
Leo6Leo marked this conversation as resolved.
Show resolved Hide resolved
"knative.dev/pkg/kmeta"

duckapis "knative.dev/pkg/apis/duck"
Expand Down Expand Up @@ -129,6 +131,11 @@
return err
}

// Handle EventPolicies
Leo6Leo marked this conversation as resolved.
Show resolved Hide resolved
if err := r.reconcileEventPolicies(ctx, s, channels, subs, featureFlags); err != nil {
return fmt.Errorf("failed to reconcile EventPolicies: %w", err)
}

err := auth.UpdateStatusWithEventPolicies(featureFlags, &s.Status.AppliedEventPoliciesStatus, &s.Status, r.eventPolicyLister, v1.SchemeGroupVersion.WithKind("Sequence"), s.ObjectMeta)
if err != nil {
return fmt.Errorf("could not update Sequence status with EventPolicies: %v", err)
Expand Down Expand Up @@ -186,7 +193,7 @@
subName := resources.SequenceSubscriptionName(p.Name, step)
sub, err := r.subscriptionLister.Subscriptions(p.Namespace).Get(subName)

// If the resource doesn't exist, we'll create it.
// If the resource doesn't exist, we'll create itF.
if apierrs.IsNotFound(err) {
sub = expected
logging.FromContext(ctx).Infof("Creating subscription: %+v", sub)
Expand Down Expand Up @@ -331,3 +338,99 @@

return nil
}

func (r *Reconciler) reconcileEventPolicies(ctx context.Context, s *v1.Sequence, channels []*eventingduckv1.Channelable, subs []*messagingv1.Subscription, featureFlags feature.Flags) error {
if featureFlags.IsOIDCAuthentication() {
// Create or update EventPolicies, and we skip the first channel as it's the input channel!
for i := 1; i < len(channels); i++ {
if err := r.reconcileChannelEventPolicy(ctx, s, channels[i], subs[i-1]); err != nil {
return err
}
}

// Handle input channel EventPolicy
if err := r.reconcileInputChannelEventPolicy(ctx, s, channels[0]); err != nil {
return err
}
Copy link
Member

@pierDipi pierDipi Jul 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are a few more cases to handle in the logic here, for example, when a channel is removed or added (meaning a step is removed or added) channels will change and we will need to remove event policies, what I would do is:

  • list all the policies that belong to the given sequence (via <prefix> + sequence-name label)
  • based on the channels, partition the list in:
    • to be removed
    • to be updated
  • go through the to be updated list and check if there is any update needed, if so, update the policy
  • add new policies for new channels
  • remove the policies in the to be removed partition

Optimize the algorithm where possible


} else {
// Clean up existing EventPolicies, as the authentication feature flag is disabled
if err := r.cleanupEventPolicies(ctx, s); err != nil {
return err
}
}

return nil
}

func (r *Reconciler) reconcileChannelEventPolicy(ctx context.Context, s *v1.Sequence, channel *eventingduckv1.Channelable, subscription *messagingv1.Subscription) error {
expected := resources.MakeEventPolicyForSequenceChannel(s, channel, subscription)

return r.createOrUpdateEventPolicy(ctx, expected)
}
func (r *Reconciler) createOrUpdateEventPolicy(ctx context.Context, expected *eventingv1alpha1.EventPolicy) error {
existing, err := r.eventPolicyLister.EventPolicies(expected.Namespace).Get(expected.Name)
if apierrs.IsNotFound(err) {
_, err = r.eventingClientSet.EventingV1alpha1().EventPolicies(expected.Namespace).Create(ctx, expected, metav1.CreateOptions{})
return err
} else if err != nil {
return err
}

// Update if needed
if !equality.Semantic.DeepEqual(existing.Spec, expected.Spec) {
existing.Spec = expected.Spec
_, err = r.eventingClientSet.EventingV1alpha1().EventPolicies(existing.Namespace).Update(ctx, existing, metav1.UpdateOptions{})
return err
}

return nil
}

func (r *Reconciler) reconcileInputChannelEventPolicy(ctx context.Context, s *v1.Sequence, inputChannel *eventingduckv1.Channelable) error {
// Check if there's an EventPolicy for the Sequence
sequencePolicy, err := r.eventPolicyLister.EventPolicies(s.Namespace).Get(s.Name + "-ep")
if err != nil {
if apierrs.IsNotFound(err) {
// No EventPolicy for the Sequence, so we don't create one for the input channel
return nil
}
return err
}

expected := resources.MakeEventPolicyForSequenceInputChannel(s, inputChannel, sequencePolicy)

return r.createOrUpdateEventPolicy(ctx, expected)
}

func (r *Reconciler) cleanupEventPolicies(ctx context.Context, s *v1.Sequence) error {
policies, err := r.eventPolicyLister.EventPolicies(s.Namespace).List(labels.Everything())
if err != nil {
return err
}

for _, policy := range policies {
if metav1.IsControlledBy(policy, s) {
err := r.eventingClientSet.EventingV1alpha1().EventPolicies(policy.Namespace).Delete(ctx, policy.Name, metav1.DeleteOptions{})
if err != nil && !apierrs.IsNotFound(err) {
return err
}
}
}

return nil
}

func (r *Reconciler) verifyEventPolicyCreation(ctx context.Context, expected *eventingv1alpha1.EventPolicy) error {

Check failure on line 424 in pkg/reconciler/sequence/sequence.go

View workflow job for this annotation

GitHub Actions / style / Golang / Lint

func `(*Reconciler).verifyEventPolicyCreation` is unused (unused)
logging.FromContext(ctx).Infof("Verifying EventPolicy creation: %s", expected.Name)

// Try to get the EventPolicy using the client directly
policy, err := r.eventingClientSet.EventingV1alpha1().EventPolicies(expected.Namespace).Get(ctx, expected.Name, metav1.GetOptions{})
if err != nil {
logging.FromContext(ctx).Errorf("Failed to get EventPolicy after creation: %v", err)
return err
}

logging.FromContext(ctx).Infof("Successfully verified EventPolicy creation: %s", policy.Name)
return nil
}
Loading
Loading