Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add broker transformation test to upgrade tests #8190

Merged
merged 2 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 67 additions & 37 deletions test/rekt/features/broker/feature.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cloudevents/sdk-go/v2/test"
"github.com/google/uuid"
"knative.dev/reconciler-test/pkg/environment"
"knative.dev/reconciler-test/pkg/state"

duckv1 "knative.dev/eventing/pkg/apis/duck/v1"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
Expand Down Expand Up @@ -382,51 +383,64 @@ Note: the number denotes the sequence of the event that flows in this test case.
*/
func brokerEventTransformationForTrigger() *feature.Feature {
f := feature.NewFeatureNamed("Broker event transformation for trigger")
config := BrokerEventTransformationForTriggerSetup(f)
BrokerEventTransformationForTriggerAssert(f, config)
return f
}

source := feature.MakeRandomK8sName("source")
type brokerEventTransformationConfig struct {
Broker string
Sink1 string
Sink2 string
EventToSend cloudevents.Event
TransformedEvent cloudevents.Event
}

func BrokerEventTransformationForTriggerSetup(f *feature.Feature) brokerEventTransformationConfig {
sink1 := feature.MakeRandomK8sName("sink1")
sink2 := feature.MakeRandomK8sName("sink2")

trigger1 := feature.MakeRandomK8sName("trigger1")
trigger2 := feature.MakeRandomK8sName("trigger2")

// Construct original cloudevent message
eventType := "type1"
eventSource := "http://source1.com"
eventBody := `{"msg":"e2e-brokerchannel-body"}`
// Construct cloudevent message after transformation
transformedEventType := "type2"
transformedEventSource := "http://source2.com"
transformedBody := `{"msg":"transformed body"}`

// Construct eventToSend
eventToSend := cloudevents.NewEvent()
eventToSend.SetID(uuid.New().String())
eventToSend.SetType(eventType)
eventToSend.SetSource(eventSource)
eventToSend.SetData(cloudevents.ApplicationJSON, []byte(eventBody))
eventToSend.SetType("type1")
eventToSend.SetSource("http://source1.com")
eventToSend.SetData(cloudevents.ApplicationJSON, []byte(`{"msg":"e2e-brokerchannel-body"}`))

// Construct cloudevent message after transformation
transformedEvent := cloudevents.NewEvent()
transformedEvent.SetType("type2")
transformedEvent.SetSource("http://source2.com")
transformedEvent.SetData(cloudevents.ApplicationJSON, []byte(`{"msg":"transformed body"}`))

//Install the broker
brokerName := feature.MakeRandomK8sName("broker")
f.Setup("Set context variables", func(ctx context.Context, t feature.T) {
state.SetOrFail(ctx, t, "brokerName", brokerName)
state.SetOrFail(ctx, t, "sink1", sink1)
state.SetOrFail(ctx, t, "sink2", sink2)
})
f.Setup("install broker", broker.Install(brokerName, broker.WithEnvConfig()...))
f.Setup("broker is ready", broker.IsReady(brokerName))
f.Setup("broker is addressable", broker.IsAddressable(brokerName))

f.Setup("install sink1", eventshub.Install(sink1,
eventshub.ReplyWithTransformedEvent(transformedEventType, transformedEventSource, transformedBody),
eventshub.ReplyWithTransformedEvent(transformedEvent.Type(), transformedEvent.Source(), string(transformedEvent.Data())),
eventshub.StartReceiver),
)
f.Setup("install sink2", eventshub.Install(sink2, eventshub.StartReceiver))

// filter1 filters the original events
filter1 := eventingv1.TriggerFilterAttributes{
"type": eventType,
"source": eventSource,
"type": eventToSend.Type(),
"source": eventToSend.Source(),
}
// filter2 filters events after transformation
filter2 := eventingv1.TriggerFilterAttributes{
"type": transformedEventType,
"source": transformedEventSource,
"type": transformedEvent.Type(),
"source": transformedEvent.Source(),
}

// Install the trigger1 point to Broker and transform the original events to new events
Expand All @@ -446,33 +460,49 @@ func brokerEventTransformationForTrigger() *feature.Feature {
))
f.Setup("trigger2 goes ready", trigger.IsReady(trigger2))

return brokerEventTransformationConfig{
Broker: brokerName,
Sink1: sink1,
Sink2: sink2,
EventToSend: eventToSend,
TransformedEvent: transformedEvent,
}
}

func BrokerEventTransformationForTriggerAssert(f *feature.Feature,
cfg brokerEventTransformationConfig) {

source := feature.MakeRandomK8sName("source")

// Set new ID every time we send event to allow calling this function repeatedly
cfg.EventToSend.SetID(uuid.New().String())
f.Requirement("install source", eventshub.Install(
source,
eventshub.StartSenderToResource(broker.GVR(), brokerName),
eventshub.InputEvent(eventToSend),
eventshub.StartSenderToResource(broker.GVR(), cfg.Broker),
eventshub.InputEvent(cfg.EventToSend),
))

eventMatcher := eventasssert.MatchEvent(
test.HasSource(eventSource),
test.HasType(eventType),
test.HasData([]byte(eventBody)),
test.HasId(cfg.EventToSend.ID()),
test.HasSource(cfg.EventToSend.Source()),
test.HasType(cfg.EventToSend.Type()),
test.HasData(cfg.EventToSend.Data()),
)
transformEventMatcher := eventasssert.MatchEvent(
test.HasSource(transformedEventSource),
test.HasType(transformedEventType),
test.HasData([]byte(transformedBody)),
test.HasSource(cfg.TransformedEvent.Source()),
test.HasType(cfg.TransformedEvent.Type()),
test.HasData(cfg.TransformedEvent.Data()),
)

f.Stable("Trigger2 has filtered all transformed events").
Must("delivers original events",
eventasssert.OnStore(sink2).Match(transformEventMatcher).AtLeast(1))

f.Stable("Trigger2 has no original events").
Must("delivers original events",
eventasssert.OnStore(sink2).Match(eventMatcher).Not())

return f

f.Stable("Trigger has filtered all transformed events").
Must("trigger 1 delivers original events",
eventasssert.OnStore(cfg.Sink1).Match(eventMatcher).AtLeast(1)).
Must("trigger 1 does not deliver transformed events",
eventasssert.OnStore(cfg.Sink1).Match(transformEventMatcher).Not()).
Must("trigger 2 delivers transformed events",
eventasssert.OnStore(cfg.Sink2).Match(transformEventMatcher).AtLeast(1)).
Must("trigger 2 does not deliver original events",
eventasssert.OnStore(cfg.Sink2).Match(eventMatcher).Not())
}

func BrokerPreferHeaderCheck() *feature.Feature {
Expand Down
51 changes: 39 additions & 12 deletions test/upgrade/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ import (
"sync"
"testing"

"knative.dev/eventing/pkg/apis/eventing"
brokerfeatures "knative.dev/eventing/test/rekt/features/broker"
"knative.dev/eventing/test/rekt/features/channel"
brokerresources "knative.dev/eventing/test/rekt/resources/broker"
"knative.dev/eventing/test/rekt/resources/channel_impl"
"knative.dev/eventing/test/rekt/resources/subscription"
duckv1 "knative.dev/pkg/apis/duck/v1"
Expand All @@ -37,7 +40,16 @@ import (
"knative.dev/reconciler-test/pkg/manifest"
)

var channelConfigMux = &sync.Mutex{}
var (
channelConfigMux = &sync.Mutex{}
brokerConfigMux = &sync.Mutex{}
opts = []environment.EnvOpts{
knative.WithKnativeNamespace(system.Namespace()),
knative.WithLoggingConfig,
knative.WithTracingConfig,
k8s.WithEventListener,
}
)

// RunMainTest expects flags to be already initialized.
// This function needs to be exposed, so that test cases in other repositories can call the upgrade
Expand All @@ -63,7 +75,7 @@ type DurableFeature struct {
EnvOpts []environment.EnvOpts
setupEnv environment.Environment
setupCtx context.Context
VerifyF *feature.Feature
VerifyF func() *feature.Feature
Global environment.GlobalEnvironment
}

Expand All @@ -83,14 +95,14 @@ func (fe *DurableFeature) Setup(label string) pkgupgrade.Operation {
func (fe *DurableFeature) Verify(label string) pkgupgrade.Operation {
return pkgupgrade.NewOperation(label, func(c pkgupgrade.Context) {
c.T.Parallel()
fe.setupEnv.Test(fe.setupCtx, c.T, fe.VerifyF)
fe.setupEnv.Test(fe.setupCtx, c.T, fe.VerifyF())
})
}

func (fe *DurableFeature) VerifyAndTeardown(label string) pkgupgrade.Operation {
return pkgupgrade.NewOperation(label, func(c pkgupgrade.Context) {
c.T.Parallel()
fe.setupEnv.Test(fe.setupCtx, c.T, fe.VerifyF)
fe.setupEnv.Test(fe.setupCtx, c.T, fe.VerifyF())
// Ensures teardown of resources/namespace.
fe.setupEnv.Finish()
})
Expand All @@ -103,7 +115,7 @@ func (fe *DurableFeature) SetupVerifyAndTeardown(label string) pkgupgrade.Operat
append(fe.EnvOpts, environment.Managed(c.T))...,
)
env.Test(ctx, c.T, fe.SetupF)
env.Test(ctx, c.T, fe.VerifyF)
env.Test(ctx, c.T, fe.VerifyF())
})
}

Expand Down Expand Up @@ -290,14 +302,29 @@ func InMemoryChannelFeature(glob environment.GlobalEnvironment) *DurableFeature
setupF := feature.NewFeature()
sink, ch := channel.ChannelChainSetup(setupF, 1, createSubscriberFn)

verifyF := feature.NewFeature()
channel.ChannelChainAssert(verifyF, sink, ch)
verifyF := func() *feature.Feature {
f := feature.NewFeatureNamed(setupF.Name)
channel.ChannelChainAssert(f, sink, ch)
return f
}

opts := []environment.EnvOpts{
knative.WithKnativeNamespace(system.Namespace()),
knative.WithLoggingConfig,
knative.WithTracingConfig,
k8s.WithEventListener,
return &DurableFeature{SetupF: setupF, VerifyF: verifyF, Global: glob, EnvOpts: opts}
}

func BrokerEventTransformationForTrigger(glob environment.GlobalEnvironment,
) *DurableFeature {
// Prevent race conditions on EnvCfg.BrokerClass when running tests in parallel.
brokerConfigMux.Lock()
defer brokerConfigMux.Unlock()
brokerresources.EnvCfg.BrokerClass = eventing.MTChannelBrokerClassValue

setupF := feature.NewFeature()
cfg := brokerfeatures.BrokerEventTransformationForTriggerSetup(setupF)

verifyF := func() *feature.Feature {
f := feature.NewFeatureNamed(setupF.Name)
brokerfeatures.BrokerEventTransformationForTriggerAssert(f, cfg)
return f
}

return &DurableFeature{SetupF: setupF, VerifyF: verifyF, Global: glob, EnvOpts: opts}
Expand Down
4 changes: 4 additions & 0 deletions test/upgrade/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,16 @@ func TestEventingUpgrades(t *testing.T) {
g := FeatureGroupWithUpgradeTests{
// A feature that will run the same test post-upgrade and post-downgrade.
NewFeatureSmoke(InMemoryChannelFeature(global)),
NewFeatureSmoke(BrokerEventTransformationForTrigger(global)),
// A feature that will be created pre-upgrade and verified/removed post-upgrade.
NewFeatureOnlyUpgrade(InMemoryChannelFeature(global)),
NewFeatureOnlyUpgrade(BrokerEventTransformationForTrigger(global)),
// A feature that will be created pre-upgrade, verified post-upgrade, verified and removed post-downgrade.
NewFeatureUpgradeDowngrade(InMemoryChannelFeature(global)),
NewFeatureUpgradeDowngrade(BrokerEventTransformationForTrigger(global)),
// A feature that will be created post-upgrade, verified and removed post-downgrade.
NewFeatureOnlyDowngrade(InMemoryChannelFeature(global)),
NewFeatureOnlyDowngrade(BrokerEventTransformationForTrigger(global)),
}

suite := pkgupgrade.Suite{
Expand Down
Loading