diff --git a/cmd/controller/main.go b/cmd/controller/main.go index 616c42ee5a7..68d19d50f49 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -21,6 +21,7 @@ import ( // _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" "errors" + "knative.dev/eventing/pkg/reconciler/integration/source" "log" "net/http" "os" @@ -41,7 +42,6 @@ import ( "knative.dev/eventing/pkg/reconciler/channel" "knative.dev/eventing/pkg/reconciler/containersource" "knative.dev/eventing/pkg/reconciler/eventtype" - "knative.dev/eventing/pkg/reconciler/integrationsource" "knative.dev/eventing/pkg/reconciler/parallel" "knative.dev/eventing/pkg/reconciler/pingsource" "knative.dev/eventing/pkg/reconciler/sequence" @@ -105,7 +105,7 @@ func main() { apiserversource.NewController, pingsource.NewController, containersource.NewController, - integrationsource.NewController, + source.NewController, // Sources CRD sourcecrd.NewController, diff --git a/config/core/resources/integrationsource.yaml b/config/core/resources/integrationsource.yaml index b671eaf5478..a114426d248 100644 --- a/config/core/resources/integrationsource.yaml +++ b/config/core/resources/integrationsource.yaml @@ -104,7 +104,7 @@ spec: s3: type: object properties: - bucketNameOrArn: + arn: type: string title: Bucket Name description: The S3 Bucket name or Amazon Resource Name (ARN). @@ -199,7 +199,7 @@ spec: sqs: type: object properties: - queueNameOrArn: + arn: type: string title: Queue Name description: The SQS Queue Name or ARN @@ -217,7 +217,7 @@ spec: title: Autocreate Queue description: Setting the autocreation of the SQS queue. default: false - amazonAWSHost: + host: type: string title: AWS Host description: The hostname of the Amazon AWS cloud. @@ -287,7 +287,7 @@ spec: description: The duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request. - ddb-streams: + ddbStreams: type: object properties: table: diff --git a/docs/eventing-api.md b/docs/eventing-api.md index 79a98c5983d..3f8b50e80a4 100644 --- a/docs/eventing-api.md +++ b/docs/eventing-api.md @@ -7408,512 +7408,6 @@ IntegrationSourceStatus -
-(Appears on:AWSDDBStreams, AWSS3, AWSSQS) -
--
-Field | -Description | -
---|---|
-region - -string - - |
-
- Auth is the S3 authentication (accessKey/secretKey) configuration. - |
-
-profileCredentialsName - -string - - |
-
- AWS region - |
-
-sessionToken - -string - - |
-
- Profile name for profile credentials provider - |
-
-uriEndpointOverride - -string - - |
-
- Session token - |
-
-overrideEndpoint - -bool - - |
-
- Override endpoint URI - |
-
-(Appears on:Aws) -
--
-Field | -Description | -
---|---|
-AWSCommon - - -AWSCommon - - - |
-
-
-(Members of |
-
-table - -string - - |
-
- Embeds AWSCommon to inherit its fields in JSON - |
-
-streamIteratorType - -string - - |
-
- The name of the DynamoDB table - |
-
-delay - -int - - |
-
- Defines where in the DynamoDB stream to start getting records - |
-
-(Appears on:Aws) -
--
-Field | -Description | -
---|---|
-AWSCommon - - -AWSCommon - - - |
-
-
-(Members of |
-
-bucketNameOrArn - -string - - |
-
- Embeds AWSCommon to inherit its fields in JSON - |
-
-deleteAfterRead - -bool - - |
-
- S3 Bucket name or ARN - |
-
-moveAfterRead - -bool - - |
-
- Auto-delete objects after reading - |
-
-destinationBucket - -string - - |
-
- Move objects after reading - |
-
-destinationBucketPrefix - -string - - |
-
- Destination bucket for moved objects - |
-
-destinationBucketSuffix - -string - - |
-
- Prefix for moved objects - |
-
-autoCreateBucket - -bool - - |
-
- Suffix for moved objects - |
-
-prefix - -string - - |
-
- Auto-create S3 bucket - |
-
-ignoreBody - -bool - - |
-
- S3 bucket prefix for search - |
-
-forcePathStyle - -bool - - |
-
- Ignore object body - |
-
-delay - -int - - |
-
- Force path style for bucket access - |
-
-maxMessagesPerPoll - -int - - |
-
- Delay between polls in milliseconds - |
-
-(Appears on:Aws) -
--
-Field | -Description | -
---|---|
-AWSCommon - - -AWSCommon - - - |
-
-
-(Members of |
-
-queueNameOrArn - -string - - |
-
- Embeds AWSCommon to inherit its fields in JSON - |
-
-deleteAfterRead - -bool - - |
-
- SQS Queue name or ARN - |
-
-autoCreateQueue - -bool - - |
-
- Auto-delete messages after reading - |
-
-amazonAWSHost - -string - - |
-
- Auto-create SQS queue - |
-
-protocol - -string - - |
-
- AWS host - |
-
-queueURL - -string - - |
-
- Communication protocol (http/https) - |
-
-greedy - -bool - - |
-
- Full SQS queue URL - |
-
-delay - -int - - |
-
- Greedy scheduler - |
-
-maxMessagesPerPoll - -int - - |
-
- Delay between polls in milliseconds - |
-
-waitTimeSeconds - -int - - |
-
- Max messages to return (1-10) - |
-
-visibilityTimeout - -int - - |
-
- Wait time for messages - |
-
-(Appears on:Aws) -
--
-Field | -Description | -
---|---|
-secret - - -Secret - - - |
-
- Auth Secret - |
-
-accessKey - -string - - |
-
- AccessKey is the AWS access key ID. - |
-
-secretKey - -string - - |
-
- SecretKey is the AWS secret access key. - |
-
@@ -7933,9 +7427,7 @@ string
s3
sqs
ddb-streams
ddbStreams
auth
-(Appears on:Auth) -
--
-Field | -Description | -
---|---|
-ref - - -SecretReference - - - |
-
- Secret reference for SASL and SSL configurations. - |
-
-(Appears on:Secret) -
--
-Field | -Description | -
---|---|
-name - -string - - |
-
- Secret name. - |
-
diff --git a/pkg/apis/common/integration/v1alpha1/auth.go b/pkg/apis/common/integration/v1alpha1/auth.go new file mode 100644 index 00000000000..8f815119bc9 --- /dev/null +++ b/pkg/apis/common/integration/v1alpha1/auth.go @@ -0,0 +1,43 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +type Auth struct { + // Auth Secret + Secret *Secret `json:"secret,omitempty"` + + // AccessKey is the AWS access key ID. + AccessKey string `json:"accessKey,omitempty"` + + // SecretKey is the AWS secret access key. + SecretKey string `json:"secretKey,omitempty"` +} + +func (a *Auth) HasAuth() bool { + return a != nil && a.Secret != nil && + a.Secret.Ref != nil && a.Secret.Ref.Name != "" +} + +type Secret struct { + // Secret reference for SASL and SSL configurations. + Ref *SecretReference `json:"ref,omitempty"` +} + +type SecretReference struct { + // Secret name. + Name string `json:"name"` +} diff --git a/pkg/apis/common/integration/v1alpha1/aws.go b/pkg/apis/common/integration/v1alpha1/aws.go new file mode 100644 index 00000000000..056ef5b4ea3 --- /dev/null +++ b/pkg/apis/common/integration/v1alpha1/aws.go @@ -0,0 +1,64 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +type AWSCommon struct { + // Auth is the S3 authentication (accessKey/secretKey) configuration. + Region string `json:"region,omitempty"` // AWS region + ProfileCredentialsName string `json:"profileCredentialsName,omitempty"` // Profile name for profile credentials provider + SessionToken string `json:"sessionToken,omitempty"` // Session token + URIEndpointOverride string `json:"uriEndpointOverride,omitempty"` // Override endpoint URI + OverrideEndpoint bool `json:"overrideEndpoint" default:"false"` // Override endpoint flag +} + +type AWSS3 struct { + AWSCommon `json:",inline"` // Embeds AWSCommon to inherit its fields in JSON + Arn string `json:"arn,omitempty" camel:"CAMEL_KAMELET_AWS_S3_SOURCE_BUCKETNAMEORARN"` // S3 ARN + DeleteAfterRead bool `json:"deleteAfterRead" default:"true"` // Auto-delete objects after reading + MoveAfterRead bool `json:"moveAfterRead" default:"false"` // Move objects after reading + DestinationBucket string `json:"destinationBucket,omitempty"` // Destination bucket for moved objects + DestinationBucketPrefix string `json:"destinationBucketPrefix,omitempty"` // Prefix for moved objects + DestinationBucketSuffix string `json:"destinationBucketSuffix,omitempty"` // Suffix for moved objects + AutoCreateBucket bool `json:"autoCreateBucket" default:"false"` // Auto-create S3 bucket + Prefix string `json:"prefix,omitempty"` // S3 bucket prefix for search + IgnoreBody bool `json:"ignoreBody" default:"false"` // Ignore object body + ForcePathStyle bool `json:"forcePathStyle" default:"false"` // Force path style for bucket access + Delay int `json:"delay" default:"500"` // Delay between polls in milliseconds + MaxMessagesPerPoll int `json:"maxMessagesPerPoll" default:"10"` // Max messages to poll per request +} + +type AWSSQS struct { + AWSCommon `json:",inline"` // Embeds AWSCommon to inherit its fields in JSON + Arn string `json:"arn,omitempty" camel:"CAMEL_KAMELET_AWS_SQS_SOURCE_QUEUENAMEORARN"` // SQS ARN + DeleteAfterRead bool `json:"deleteAfterRead" default:"true"` // Auto-delete messages after reading + AutoCreateQueue bool `json:"autoCreateQueue" default:"false"` // Auto-create SQS queue + Host string `json:"host" camel:"CAMEL_KAMELET_AWS_SQS_SOURCE_AMAZONAWSHOST" default:"amazonaws.com"` // AWS host + Protocol string `json:"protocol" default:"https"` // Communication protocol (http/https) + QueueURL string `json:"queueURL,omitempty"` // Full SQS queue URL + Greedy bool `json:"greedy" default:"false"` // Greedy scheduler + Delay int `json:"delay" default:"500"` // Delay between polls in milliseconds + MaxMessagesPerPoll int `json:"maxMessagesPerPoll" default:"1"` // Max messages to return (1-10) + WaitTimeSeconds int `json:"waitTimeSeconds,omitempty"` // Wait time for messages + VisibilityTimeout int `json:"visibilityTimeout,omitempty"` // Visibility timeout in seconds +} + +type AWSDDBStreams struct { + AWSCommon `json:",inline"` // Embeds AWSCommon to inherit its fields in JSON + Table string `json:"table,omitempty"` // The name of the DynamoDB table + StreamIteratorType string `json:"streamIteratorType,omitempty" default:"FROM_LATEST"` // Defines where in the DynamoDB stream to start getting records + Delay int `json:"delay,omitempty" default:"500"` // Delay in milliseconds before the next poll from the database +} diff --git a/pkg/apis/common/integration/v1alpha1/doc.go b/pkg/apis/common/integration/v1alpha1/doc.go new file mode 100644 index 00000000000..3366df6745c --- /dev/null +++ b/pkg/apis/common/integration/v1alpha1/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// +k8s:deepcopy-gen=package + +package v1alpha1 diff --git a/pkg/apis/common/integration/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/common/integration/v1alpha1/zz_generated.deepcopy.go new file mode 100644 index 00000000000..5d0f7dde59c --- /dev/null +++ b/pkg/apis/common/integration/v1alpha1/zz_generated.deepcopy.go @@ -0,0 +1,147 @@ +//go:build !ignore_autogenerated +// +build !ignore_autogenerated + +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by deepcopy-gen. DO NOT EDIT. + +package v1alpha1 + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *AWSCommon) DeepCopyInto(out *AWSCommon) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AWSCommon. +func (in *AWSCommon) DeepCopy() *AWSCommon { + if in == nil { + return nil + } + out := new(AWSCommon) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *AWSDDBStreams) DeepCopyInto(out *AWSDDBStreams) { + *out = *in + out.AWSCommon = in.AWSCommon + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AWSDDBStreams. +func (in *AWSDDBStreams) DeepCopy() *AWSDDBStreams { + if in == nil { + return nil + } + out := new(AWSDDBStreams) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *AWSS3) DeepCopyInto(out *AWSS3) { + *out = *in + out.AWSCommon = in.AWSCommon + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AWSS3. +func (in *AWSS3) DeepCopy() *AWSS3 { + if in == nil { + return nil + } + out := new(AWSS3) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *AWSSQS) DeepCopyInto(out *AWSSQS) { + *out = *in + out.AWSCommon = in.AWSCommon + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AWSSQS. +func (in *AWSSQS) DeepCopy() *AWSSQS { + if in == nil { + return nil + } + out := new(AWSSQS) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Auth) DeepCopyInto(out *Auth) { + *out = *in + if in.Secret != nil { + in, out := &in.Secret, &out.Secret + *out = new(Secret) + (*in).DeepCopyInto(*out) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Auth. +func (in *Auth) DeepCopy() *Auth { + if in == nil { + return nil + } + out := new(Auth) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Secret) DeepCopyInto(out *Secret) { + *out = *in + if in.Ref != nil { + in, out := &in.Ref, &out.Ref + *out = new(SecretReference) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Secret. +func (in *Secret) DeepCopy() *Secret { + if in == nil { + return nil + } + out := new(Secret) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SecretReference) DeepCopyInto(out *SecretReference) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SecretReference. +func (in *SecretReference) DeepCopy() *SecretReference { + if in == nil { + return nil + } + out := new(SecretReference) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/apis/sources/v1alpha1/integration_types.go b/pkg/apis/sources/v1alpha1/integration_types.go index 15b4e60a2b0..2eec773bcb5 100644 --- a/pkg/apis/sources/v1alpha1/integration_types.go +++ b/pkg/apis/sources/v1alpha1/integration_types.go @@ -20,6 +20,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "knative.dev/eventing/pkg/apis/common/integration/v1alpha1" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/kmeta" @@ -68,84 +69,11 @@ type Timer struct { RepeatCount int `json:"repeatCount,omitempty"` // Max number of fires (optional) } -type AWSCommon struct { - // Auth is the S3 authentication (accessKey/secretKey) configuration. - Region string `json:"region,omitempty"` // AWS region - ProfileCredentialsName string `json:"profileCredentialsName,omitempty"` // Profile name for profile credentials provider - SessionToken string `json:"sessionToken,omitempty"` // Session token - URIEndpointOverride string `json:"uriEndpointOverride,omitempty"` // Override endpoint URI - OverrideEndpoint bool `json:"overrideEndpoint" default:"false"` // Override endpoint flag -} - -type AWSS3 struct { - AWSCommon `json:",inline"` // Embeds AWSCommon to inherit its fields in JSON - BucketNameOrArn string `json:"bucketNameOrArn,omitempty"` // S3 Bucket name or ARN - DeleteAfterRead bool `json:"deleteAfterRead" default:"true"` // Auto-delete objects after reading - MoveAfterRead bool `json:"moveAfterRead" default:"false"` // Move objects after reading - DestinationBucket string `json:"destinationBucket,omitempty"` // Destination bucket for moved objects - DestinationBucketPrefix string `json:"destinationBucketPrefix,omitempty"` // Prefix for moved objects - DestinationBucketSuffix string `json:"destinationBucketSuffix,omitempty"` // Suffix for moved objects - AutoCreateBucket bool `json:"autoCreateBucket" default:"false"` // Auto-create S3 bucket - Prefix string `json:"prefix,omitempty"` // S3 bucket prefix for search - IgnoreBody bool `json:"ignoreBody" default:"false"` // Ignore object body - ForcePathStyle bool `json:"forcePathStyle" default:"false"` // Force path style for bucket access - Delay int `json:"delay" default:"500"` // Delay between polls in milliseconds - MaxMessagesPerPoll int `json:"maxMessagesPerPoll" default:"10"` // Max messages to poll per request -} - -type AWSSQS struct { - AWSCommon `json:",inline"` // Embeds AWSCommon to inherit its fields in JSON - QueueNameOrArn string `json:"queueNameOrArn,omitempty"` // SQS Queue name or ARN - DeleteAfterRead bool `json:"deleteAfterRead" default:"true"` // Auto-delete messages after reading - AutoCreateQueue bool `json:"autoCreateQueue" default:"false"` // Auto-create SQS queue - AmazonAWSHost string `json:"amazonAWSHost" default:"amazonaws.com"` // AWS host - Protocol string `json:"protocol" default:"https"` // Communication protocol (http/https) - QueueURL string `json:"queueURL,omitempty"` // Full SQS queue URL - Greedy bool `json:"greedy" default:"false"` // Greedy scheduler - Delay int `json:"delay" default:"500"` // Delay between polls in milliseconds - MaxMessagesPerPoll int `json:"maxMessagesPerPoll" default:"1"` // Max messages to return (1-10) - WaitTimeSeconds int `json:"waitTimeSeconds,omitempty"` // Wait time for messages - VisibilityTimeout int `json:"visibilityTimeout,omitempty"` // Visibility timeout in seconds -} - -type AWSDDBStreams struct { - AWSCommon `json:",inline"` // Embeds AWSCommon to inherit its fields in JSON - Table string `json:"table,omitempty"` // The name of the DynamoDB table - StreamIteratorType string `json:"streamIteratorType,omitempty" default:"FROM_LATEST"` // Defines where in the DynamoDB stream to start getting records - Delay int `json:"delay,omitempty" default:"500"` // Delay in milliseconds before the next poll from the database -} - type Aws struct { - S3 *AWSS3 `json:"s3,omitempty"` // S3 source configuration - SQS *AWSSQS `json:"sqs,omitempty"` // SQS source configuration - DDBStreams *AWSDDBStreams `json:"ddb-streams,omitempty"` // DynamoDB Streams source configuration - Auth *Auth `json:"auth,omitempty"` -} - -type Auth struct { - // Auth Secret - Secret *Secret `json:"secret,omitempty"` - - // AccessKey is the AWS access key ID. - AccessKey string `json:"accessKey,omitempty"` - - // SecretKey is the AWS secret access key. - SecretKey string `json:"secretKey,omitempty"` -} - -func (a *Auth) HasAuth() bool { - return a != nil && a.Secret != nil && - a.Secret.Ref != nil && a.Secret.Ref.Name != "" -} - -type Secret struct { - // Secret reference for SASL and SSL configurations. - Ref *SecretReference `json:"ref,omitempty"` -} - -type SecretReference struct { - // Secret name. - Name string `json:"name"` + S3 *v1alpha1.AWSS3 `json:"s3,omitempty"` // S3 source configuration + SQS *v1alpha1.AWSSQS `json:"sqs,omitempty"` // SQS source configuration + DDBStreams *v1alpha1.AWSDDBStreams `json:"ddbStreams,omitempty"` // DynamoDB Streams source configuration + Auth *v1alpha1.Auth `json:"auth,omitempty"` } // GetGroupVersionKind returns the GroupVersionKind. diff --git a/pkg/apis/sources/v1alpha1/integration_types_test.go b/pkg/apis/sources/v1alpha1/integration_types_test.go index 83b2ff0ffc5..4dfd20ea801 100644 --- a/pkg/apis/sources/v1alpha1/integration_types_test.go +++ b/pkg/apis/sources/v1alpha1/integration_types_test.go @@ -17,6 +17,7 @@ limitations under the License. package v1alpha1 import ( + "knative.dev/eventing/pkg/apis/common/integration/v1alpha1" "testing" ) @@ -53,30 +54,30 @@ func TestTimer(t *testing.T) { } func TestAWS(t *testing.T) { - s3 := AWSS3{ - AWSCommon: AWSCommon{ + s3 := v1alpha1.AWSS3{ + AWSCommon: v1alpha1.AWSCommon{ Region: "eu-north-1", }, - BucketNameOrArn: "example-bucket", + Arn: "example-bucket", } if s3.Region != "eu-north-1" { t.Errorf("AWSS3.Region = %v, want 'eu-north-1'", s3.Region) } - sqs := AWSSQS{ - AWSCommon: AWSCommon{ + sqs := v1alpha1.AWSSQS{ + AWSCommon: v1alpha1.AWSCommon{ Region: "eu-north-1", }, - QueueNameOrArn: "example-queue", + Arn: "example-queue", } if sqs.Region != "eu-north-1" { t.Errorf("AWSSQS.Region = %v, want 'eu-north-1'", sqs.Region) } - ddbStreams := AWSDDBStreams{ - AWSCommon: AWSCommon{ + ddbStreams := v1alpha1.AWSDDBStreams{ + AWSCommon: v1alpha1.AWSCommon{ Region: "eu-north-1", }, Table: "example-table", @@ -89,9 +90,9 @@ func TestAWS(t *testing.T) { // TestAuthFieldAccess tests the HasAuth method and field access in Auth struct func TestAuthFieldAccess(t *testing.T) { - auth := Auth{ - Secret: &Secret{ - Ref: &SecretReference{ + auth := v1alpha1.Auth{ + Secret: &v1alpha1.Secret{ + Ref: &v1alpha1.SecretReference{ Name: "aws-secret", }, }, diff --git a/pkg/apis/sources/v1alpha1/integration_validation.go b/pkg/apis/sources/v1alpha1/integration_validation.go index 4df223eb5dd..3b412998460 100644 --- a/pkg/apis/sources/v1alpha1/integration_validation.go +++ b/pkg/apis/sources/v1alpha1/integration_validation.go @@ -65,8 +65,8 @@ func (spec *IntegrationSourceSpec) Validate(ctx context.Context) *apis.FieldErro // Additional validation for AWS S3 required fields if spec.Aws.S3 != nil { - if spec.Aws.S3.BucketNameOrArn == "" { - errs = errs.Also(apis.ErrMissingField("aws.s3.bucketNameOrArn")) + if spec.Aws.S3.Arn == "" { + errs = errs.Also(apis.ErrMissingField("aws.s3.arn")) } if spec.Aws.S3.Region == "" { errs = errs.Also(apis.ErrMissingField("aws.s3.region")) @@ -75,7 +75,7 @@ func (spec *IntegrationSourceSpec) Validate(ctx context.Context) *apis.FieldErro // Additional validation for AWS SQS required fields if spec.Aws.SQS != nil { - if spec.Aws.SQS.QueueNameOrArn == "" { + if spec.Aws.SQS.Arn == "" { errs = errs.Also(apis.ErrMissingField("aws.sqs.queueNameOrArn")) } if spec.Aws.SQS.Region == "" { diff --git a/pkg/apis/sources/v1alpha1/integration_validation_test.go b/pkg/apis/sources/v1alpha1/integration_validation_test.go index 6e0071703d1..3281b3940fb 100644 --- a/pkg/apis/sources/v1alpha1/integration_validation_test.go +++ b/pkg/apis/sources/v1alpha1/integration_validation_test.go @@ -18,6 +18,7 @@ package v1alpha1 import ( "context" + "knative.dev/eventing/pkg/apis/common/integration/v1alpha1" "testing" "github.com/google/go-cmp/cmp" @@ -45,15 +46,15 @@ func TestIntegrationSourceSpecValidation(t *testing.T) { name: "valid AWS S3 source with auth and region", spec: IntegrationSourceSpec{ Aws: &Aws{ - S3: &AWSS3{ - AWSCommon: AWSCommon{ + S3: &v1alpha1.AWSS3{ + AWSCommon: v1alpha1.AWSCommon{ Region: "us-east-1", }, - BucketNameOrArn: "example-bucket", + Arn: "example-bucket", }, - Auth: &Auth{ - Secret: &Secret{ - Ref: &SecretReference{ + Auth: &v1alpha1.Auth{ + Secret: &v1alpha1.Secret{ + Ref: &v1alpha1.SecretReference{ Name: "aws-secret", }, }, @@ -66,15 +67,15 @@ func TestIntegrationSourceSpecValidation(t *testing.T) { name: "valid AWS SQS source with auth and region", spec: IntegrationSourceSpec{ Aws: &Aws{ - SQS: &AWSSQS{ - AWSCommon: AWSCommon{ + SQS: &v1alpha1.AWSSQS{ + AWSCommon: v1alpha1.AWSCommon{ Region: "us-east-1", }, - QueueNameOrArn: "example-queue", + Arn: "example-queue", }, - Auth: &Auth{ - Secret: &Secret{ - Ref: &SecretReference{ + Auth: &v1alpha1.Auth{ + Secret: &v1alpha1.Secret{ + Ref: &v1alpha1.SecretReference{ Name: "aws-secret", }, }, @@ -87,15 +88,15 @@ func TestIntegrationSourceSpecValidation(t *testing.T) { name: "valid AWS DDBStreams source with auth and region", spec: IntegrationSourceSpec{ Aws: &Aws{ - DDBStreams: &AWSDDBStreams{ - AWSCommon: AWSCommon{ + DDBStreams: &v1alpha1.AWSDDBStreams{ + AWSCommon: v1alpha1.AWSCommon{ Region: "us-east-1", }, Table: "example-table", }, - Auth: &Auth{ - Secret: &Secret{ - Ref: &SecretReference{ + Auth: &v1alpha1.Auth{ + Secret: &v1alpha1.Secret{ + Ref: &v1alpha1.SecretReference{ Name: "aws-secret", }, }, @@ -113,11 +114,11 @@ func TestIntegrationSourceSpecValidation(t *testing.T) { ContentType: "text/plain", }, Aws: &Aws{ - S3: &AWSS3{ - AWSCommon: AWSCommon{ + S3: &v1alpha1.AWSS3{ + AWSCommon: v1alpha1.AWSCommon{ Region: "us-east-1", }, - BucketNameOrArn: "example-bucket", + Arn: "example-bucket", }, }, }, @@ -127,21 +128,21 @@ func TestIntegrationSourceSpecValidation(t *testing.T) { name: "multiple AWS sources set (invalid)", spec: IntegrationSourceSpec{ Aws: &Aws{ - S3: &AWSS3{ - AWSCommon: AWSCommon{ + S3: &v1alpha1.AWSS3{ + AWSCommon: v1alpha1.AWSCommon{ Region: "us-east-1", }, - BucketNameOrArn: "example-bucket", + Arn: "example-bucket", }, - SQS: &AWSSQS{ - AWSCommon: AWSCommon{ + SQS: &v1alpha1.AWSSQS{ + AWSCommon: v1alpha1.AWSCommon{ Region: "us-east-1", }, - QueueNameOrArn: "example-queue", + Arn: "example-queue", }, - Auth: &Auth{ - Secret: &Secret{ - Ref: &SecretReference{ + Auth: &v1alpha1.Auth{ + Secret: &v1alpha1.Secret{ + Ref: &v1alpha1.SecretReference{ Name: "aws-secret", }, }, @@ -154,14 +155,14 @@ func TestIntegrationSourceSpecValidation(t *testing.T) { name: "AWS SQS source without QueueNameOrArn (invalid)", spec: IntegrationSourceSpec{ Aws: &Aws{ - SQS: &AWSSQS{ - AWSCommon: AWSCommon{ + SQS: &v1alpha1.AWSSQS{ + AWSCommon: v1alpha1.AWSCommon{ Region: "us-east-1", }, }, - Auth: &Auth{ - Secret: &Secret{ - Ref: &SecretReference{ + Auth: &v1alpha1.Auth{ + Secret: &v1alpha1.Secret{ + Ref: &v1alpha1.SecretReference{ Name: "aws-secret", }, }, @@ -174,14 +175,14 @@ func TestIntegrationSourceSpecValidation(t *testing.T) { name: "AWS DDBStreams source without Table (invalid)", spec: IntegrationSourceSpec{ Aws: &Aws{ - DDBStreams: &AWSDDBStreams{ - AWSCommon: AWSCommon{ + DDBStreams: &v1alpha1.AWSDDBStreams{ + AWSCommon: v1alpha1.AWSCommon{ Region: "us-east-1", }, }, - Auth: &Auth{ - Secret: &Secret{ - Ref: &SecretReference{ + Auth: &v1alpha1.Auth{ + Secret: &v1alpha1.Secret{ + Ref: &v1alpha1.SecretReference{ Name: "aws-secret", }, }, @@ -199,11 +200,11 @@ func TestIntegrationSourceSpecValidation(t *testing.T) { name: "AWS source without auth (invalid)", spec: IntegrationSourceSpec{ Aws: &Aws{ - S3: &AWSS3{ - AWSCommon: AWSCommon{ + S3: &v1alpha1.AWSS3{ + AWSCommon: v1alpha1.AWSCommon{ Region: "us-east-1", }, - BucketNameOrArn: "example-bucket", + Arn: "example-bucket", }, }, }, @@ -213,12 +214,12 @@ func TestIntegrationSourceSpecValidation(t *testing.T) { name: "AWS S3 source without region (invalid)", spec: IntegrationSourceSpec{ Aws: &Aws{ - S3: &AWSS3{ - BucketNameOrArn: "example-bucket", + S3: &v1alpha1.AWSS3{ + Arn: "example-bucket", }, - Auth: &Auth{ - Secret: &Secret{ - Ref: &SecretReference{ + Auth: &v1alpha1.Auth{ + Secret: &v1alpha1.Secret{ + Ref: &v1alpha1.SecretReference{ Name: "aws-secret", }, }, diff --git a/pkg/apis/sources/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/sources/v1alpha1/zz_generated.deepcopy.go index 5fe9cf729af..7e2857fded5 100644 --- a/pkg/apis/sources/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/sources/v1alpha1/zz_generated.deepcopy.go @@ -23,117 +23,30 @@ package v1alpha1 import ( runtime "k8s.io/apimachinery/pkg/runtime" + integrationv1alpha1 "knative.dev/eventing/pkg/apis/common/integration/v1alpha1" ) -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *AWSCommon) DeepCopyInto(out *AWSCommon) { - *out = *in - return -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AWSCommon. -func (in *AWSCommon) DeepCopy() *AWSCommon { - if in == nil { - return nil - } - out := new(AWSCommon) - in.DeepCopyInto(out) - return out -} - -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *AWSDDBStreams) DeepCopyInto(out *AWSDDBStreams) { - *out = *in - out.AWSCommon = in.AWSCommon - return -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AWSDDBStreams. -func (in *AWSDDBStreams) DeepCopy() *AWSDDBStreams { - if in == nil { - return nil - } - out := new(AWSDDBStreams) - in.DeepCopyInto(out) - return out -} - -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *AWSS3) DeepCopyInto(out *AWSS3) { - *out = *in - out.AWSCommon = in.AWSCommon - return -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AWSS3. -func (in *AWSS3) DeepCopy() *AWSS3 { - if in == nil { - return nil - } - out := new(AWSS3) - in.DeepCopyInto(out) - return out -} - -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *AWSSQS) DeepCopyInto(out *AWSSQS) { - *out = *in - out.AWSCommon = in.AWSCommon - return -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AWSSQS. -func (in *AWSSQS) DeepCopy() *AWSSQS { - if in == nil { - return nil - } - out := new(AWSSQS) - in.DeepCopyInto(out) - return out -} - -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *Auth) DeepCopyInto(out *Auth) { - *out = *in - if in.Secret != nil { - in, out := &in.Secret, &out.Secret - *out = new(Secret) - (*in).DeepCopyInto(*out) - } - return -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Auth. -func (in *Auth) DeepCopy() *Auth { - if in == nil { - return nil - } - out := new(Auth) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Aws) DeepCopyInto(out *Aws) { *out = *in if in.S3 != nil { in, out := &in.S3, &out.S3 - *out = new(AWSS3) + *out = new(integrationv1alpha1.AWSS3) **out = **in } if in.SQS != nil { in, out := &in.SQS, &out.SQS - *out = new(AWSSQS) + *out = new(integrationv1alpha1.AWSSQS) **out = **in } if in.DDBStreams != nil { in, out := &in.DDBStreams, &out.DDBStreams - *out = new(AWSDDBStreams) + *out = new(integrationv1alpha1.AWSDDBStreams) **out = **in } if in.Auth != nil { in, out := &in.Auth, &out.Auth - *out = new(Auth) + *out = new(integrationv1alpha1.Auth) (*in).DeepCopyInto(*out) } return @@ -254,43 +167,6 @@ func (in *IntegrationSourceStatus) DeepCopy() *IntegrationSourceStatus { return out } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *Secret) DeepCopyInto(out *Secret) { - *out = *in - if in.Ref != nil { - in, out := &in.Ref, &out.Ref - *out = new(SecretReference) - **out = **in - } - return -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Secret. -func (in *Secret) DeepCopy() *Secret { - if in == nil { - return nil - } - out := new(Secret) - in.DeepCopyInto(out) - return out -} - -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *SecretReference) DeepCopyInto(out *SecretReference) { - *out = *in - return -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SecretReference. -func (in *SecretReference) DeepCopy() *SecretReference { - if in == nil { - return nil - } - out := new(SecretReference) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Timer) DeepCopyInto(out *Timer) { *out = *in diff --git a/pkg/reconciler/integration/helper.go b/pkg/reconciler/integration/helper.go new file mode 100644 index 00000000000..d77cc576340 --- /dev/null +++ b/pkg/reconciler/integration/helper.go @@ -0,0 +1,128 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package integration + +import ( + "fmt" + "reflect" + "strconv" + "strings" + + corev1 "k8s.io/api/core/v1" +) + +const prefix = "CAMEL_KAMELET_" + +func GenerateEnvVarsFromStruct(prefix string, s interface{}) []corev1.EnvVar { + var envVars []corev1.EnvVar + + // Use reflection to inspect the struct fields + v := reflect.ValueOf(s) + if v.Kind() == reflect.Ptr { + v = v.Elem() + } + + t := v.Type() + + for i := 0; i < v.NumField(); i++ { + field := v.Field(i) + fieldType := t.Field(i) + + // Skip unexported fields + if !field.CanInterface() { + continue + } + + // Handle embedded/anonymous structs recursively + if fieldType.Anonymous && field.Kind() == reflect.Struct { + // Recursively handle embedded structs with the same prefix + envVars = append(envVars, GenerateEnvVarsFromStruct(prefix, field.Interface())...) + continue + } + + // First, check for the custom 'camel' tag + envVarName := fieldType.Tag.Get("camel") + if envVarName == "" { + // If 'camel' tag is not present, fall back to the 'json' tag or Go field name + jsonTag := fieldType.Tag.Get("json") + tagName := strings.Split(jsonTag, ",")[0] + if tagName == "" || tagName == "-" { + tagName = fieldType.Name + } + envVarName = fmt.Sprintf("%s_%s", prefix, strings.ToUpper(tagName)) + } + + if field.Kind() == reflect.Ptr { + if field.IsNil() { + continue + } + field = field.Elem() + } + + var value string + switch field.Kind() { + case reflect.Int, reflect.Int32, reflect.Int64: + value = strconv.FormatInt(field.Int(), 10) + case reflect.Bool: + value = strconv.FormatBool(field.Bool()) + case reflect.String: + value = field.String() + default: + // Skip unsupported types + continue + } + + // Skip zero/empty values + if value == "" { + continue + } + + envVars = append(envVars, corev1.EnvVar{ + Name: envVarName, + Value: value, + }) + } + + return envVars +} + +func MakeSecretEnvVar(name, key, secretName string) corev1.EnvVar { + return corev1.EnvVar{ + Name: name, + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + Key: key, + LocalObjectReference: corev1.LocalObjectReference{ + Name: secretName, + }, + }, + }, + } +} + +func MakeSSLEnvVar() []corev1.EnvVar { + return []corev1.EnvVar{ + { + Name: "CAMEL_KNATIVE_CLIENT_SSL_ENABLED", + Value: "true", + }, + { + Name: "CAMEL_KNATIVE_CLIENT_SSL_CERT_PATH", + Value: "/knative-custom-certs/knative-eventing-bundle.pem", + }, + } +} diff --git a/pkg/reconciler/integration/helper_test.go b/pkg/reconciler/integration/helper_test.go new file mode 100644 index 00000000000..0da78460379 --- /dev/null +++ b/pkg/reconciler/integration/helper_test.go @@ -0,0 +1,77 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package integration + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + corev1 "k8s.io/api/core/v1" +) + +func TestGenerateEnvVarsFromStruct(t *testing.T) { + type TestStruct struct { + Field1 int `json:"field1"` + Field2 bool `json:"field2"` + Field3 string `json:"field3"` + } + + prefix := "TEST_PREFIX" + input := &TestStruct{ + Field1: 123, + Field2: true, + Field3: "hello", + } + + // Expected environment variables including SSL settings + want := []corev1.EnvVar{ + {Name: "TEST_PREFIX_FIELD1", Value: "123"}, + {Name: "TEST_PREFIX_FIELD2", Value: "true"}, + {Name: "TEST_PREFIX_FIELD3", Value: "hello"}, + } + + got := GenerateEnvVarsFromStruct(prefix, input) + + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("generateEnvVarsFromStruct() mismatch (-want +got):\n%s", diff) + } +} + +func TestGenerateEnvVarsFromStruct_S3WithCamelTag(t *testing.T) { + type AWSS3 struct { + Arn string `json:"arn,omitempty" camel:"CAMEL_KAMELET_AWS_S3_SOURCE_BUCKETNAMEORARN"` + Region string `json:"region,omitempty"` + } + + prefix := "CAMEL_KAMELET_AWS_S3_SOURCE" + input := AWSS3{ + Arn: "arn:aws:s3:::example-bucket", + Region: "us-west-2", + } + + // Expected environment variables including SSL settings and camel tag for Arn + want := []corev1.EnvVar{ + {Name: "CAMEL_KAMELET_AWS_S3_SOURCE_BUCKETNAMEORARN", Value: "arn:aws:s3:::example-bucket"}, + {Name: "CAMEL_KAMELET_AWS_S3_SOURCE_REGION", Value: "us-west-2"}, + } + + got := GenerateEnvVarsFromStruct(prefix, input) + + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("generateEnvVarsFromStruct_S3WithCamelTag() mismatch (-want +got):\n%s", diff) + } +} diff --git a/pkg/reconciler/integrationsource/controller.go b/pkg/reconciler/integration/source/controller.go similarity index 99% rename from pkg/reconciler/integrationsource/controller.go rename to pkg/reconciler/integration/source/controller.go index a84bec85815..751bed6ddf0 100644 --- a/pkg/reconciler/integrationsource/controller.go +++ b/pkg/reconciler/integration/source/controller.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package integrationsource +package source import ( "context" diff --git a/pkg/reconciler/integrationsource/controller_test.go b/pkg/reconciler/integration/source/controller_test.go similarity index 98% rename from pkg/reconciler/integrationsource/controller_test.go rename to pkg/reconciler/integration/source/controller_test.go index e4c21f26565..2b890e0c284 100644 --- a/pkg/reconciler/integrationsource/controller_test.go +++ b/pkg/reconciler/integration/source/controller_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package integrationsource +package source import ( "context" diff --git a/pkg/reconciler/integrationsource/integrationsource.go b/pkg/reconciler/integration/source/integrationsource.go similarity index 97% rename from pkg/reconciler/integrationsource/integrationsource.go rename to pkg/reconciler/integration/source/integrationsource.go index ca2f9aa95fa..c466058b4bc 100644 --- a/pkg/reconciler/integrationsource/integrationsource.go +++ b/pkg/reconciler/integration/source/integrationsource.go @@ -14,11 +14,12 @@ See the License for the specific language governing permissions and limitations under the License. */ -package integrationsource +package source import ( "context" "fmt" + "knative.dev/eventing/pkg/reconciler/integration/source/resources" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" @@ -32,7 +33,6 @@ import ( "knative.dev/eventing/pkg/client/injection/reconciler/sources/v1alpha1/integrationsource" v1listers "knative.dev/eventing/pkg/client/listers/sources/v1" listers "knative.dev/eventing/pkg/client/listers/sources/v1alpha1" - "knative.dev/eventing/pkg/reconciler/integrationsource/resources" "knative.dev/pkg/controller" "knative.dev/pkg/logging" pkgreconciler "knative.dev/pkg/reconciler" diff --git a/pkg/reconciler/integrationsource/integrationsource_test.go b/pkg/reconciler/integration/source/integrationsource_test.go similarity index 99% rename from pkg/reconciler/integrationsource/integrationsource_test.go rename to pkg/reconciler/integration/source/integrationsource_test.go index 92dc884bfcf..d500c0cccbb 100644 --- a/pkg/reconciler/integrationsource/integrationsource_test.go +++ b/pkg/reconciler/integration/source/integrationsource_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package integrationsource +package source import ( "fmt" diff --git a/pkg/reconciler/integrationsource/resources/containersource.go b/pkg/reconciler/integration/source/resources/containersource.go similarity index 50% rename from pkg/reconciler/integrationsource/resources/containersource.go rename to pkg/reconciler/integration/source/resources/containersource.go index 95fb2a32aa1..8607867a6f2 100644 --- a/pkg/reconciler/integrationsource/resources/containersource.go +++ b/pkg/reconciler/integration/source/resources/containersource.go @@ -17,15 +17,11 @@ package resources import ( - "fmt" - "reflect" - "strconv" - "strings" - corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1" "knative.dev/eventing/pkg/apis/sources/v1alpha1" + "knative.dev/eventing/pkg/reconciler/integration" "knative.dev/pkg/kmeta" ) @@ -62,85 +58,13 @@ func NewContainerSource(source *v1alpha1.IntegrationSource) *sourcesv1.Container } } -func generateEnvVarsFromStruct(prefix string, s interface{}) []corev1.EnvVar { - var envVars []corev1.EnvVar - - // Use reflection to inspect the struct fields - v := reflect.ValueOf(s) - if v.Kind() == reflect.Ptr { - v = v.Elem() - } - - t := v.Type() - - for i := 0; i < v.NumField(); i++ { - field := v.Field(i) - fieldType := t.Field(i) - - // Skip unexported fields - if !field.CanInterface() { - continue - } - - // Handle embedded/anonymous structs recursively - if fieldType.Anonymous && field.Kind() == reflect.Struct { - // Recursively handle embedded structs with the same prefix - envVars = append(envVars, generateEnvVarsFromStruct(prefix, field.Interface())...) - continue - } - - // Extract the JSON tag or fall back to the Go field name - jsonTag := fieldType.Tag.Get("json") - tagName := strings.Split(jsonTag, ",")[0] - - // fallback to Go field name if no JSON tag - if tagName == "" || tagName == "-" { - tagName = fieldType.Name - } - - envVarName := fmt.Sprintf("%s_%s", prefix, strings.ToUpper(tagName)) - - if field.Kind() == reflect.Ptr { - if field.IsNil() { - continue - } - field = field.Elem() - } - - var value string - switch field.Kind() { - case reflect.Int, reflect.Int32, reflect.Int64: - value = strconv.FormatInt(field.Int(), 10) - case reflect.Bool: - value = strconv.FormatBool(field.Bool()) - case reflect.String: - value = field.String() - default: - // Skip unsupported types - continue - } - - // Skip zero/empty values - if value == "" { - continue - } - - envVars = append(envVars, corev1.EnvVar{ - Name: envVarName, - Value: value, - }) - } - - return envVars -} - // Function to create environment variables for Timer or AWS configurations dynamically func makeEnv(source *v1alpha1.IntegrationSource) []corev1.EnvVar { - var envVars = makeSSLEnvVar() + var envVars = integration.MakeSSLEnvVar() // Timer environment variables if source.Spec.Timer != nil { - envVars = append(envVars, generateEnvVarsFromStruct("CAMEL_KAMELET_TIMER_SOURCE", *source.Spec.Timer)...) + envVars = append(envVars, integration.GenerateEnvVarsFromStruct("CAMEL_KAMELET_TIMER_SOURCE", *source.Spec.Timer)...) return envVars } @@ -152,11 +76,11 @@ func makeEnv(source *v1alpha1.IntegrationSource) []corev1.EnvVar { // AWS S3 environment variables if source.Spec.Aws != nil && source.Spec.Aws.S3 != nil { - envVars = append(envVars, generateEnvVarsFromStruct("CAMEL_KAMELET_AWS_S3_SOURCE", *source.Spec.Aws.S3)...) + envVars = append(envVars, integration.GenerateEnvVarsFromStruct("CAMEL_KAMELET_AWS_S3_SOURCE", *source.Spec.Aws.S3)...) if secretName != "" { envVars = append(envVars, []corev1.EnvVar{ - makeSecretEnvVar("CAMEL_KAMELET_AWS_S3_SOURCE_ACCESSKEY", awsAccessKey, secretName), - makeSecretEnvVar("CAMEL_KAMELET_AWS_S3_SOURCE_SECRETKEY", awsSecretKey, secretName), + integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_S3_SOURCE_ACCESSKEY", awsAccessKey, secretName), + integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_S3_SOURCE_SECRETKEY", awsSecretKey, secretName), }...) } return envVars @@ -164,11 +88,11 @@ func makeEnv(source *v1alpha1.IntegrationSource) []corev1.EnvVar { // AWS SQS environment variables if source.Spec.Aws != nil && source.Spec.Aws.SQS != nil { - envVars = append(envVars, generateEnvVarsFromStruct("CAMEL_KAMELET_AWS_SQS_SOURCE", *source.Spec.Aws.SQS)...) + envVars = append(envVars, integration.GenerateEnvVarsFromStruct("CAMEL_KAMELET_AWS_SQS_SOURCE", *source.Spec.Aws.SQS)...) if secretName != "" { envVars = append(envVars, []corev1.EnvVar{ - makeSecretEnvVar("CAMEL_KAMELET_AWS_SQS_SOURCE_ACCESSKEY", awsAccessKey, secretName), - makeSecretEnvVar("CAMEL_KAMELET_AWS_SQS_SOURCE_SECRETKEY", awsSecretKey, secretName), + integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_SQS_SOURCE_ACCESSKEY", awsAccessKey, secretName), + integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_SQS_SOURCE_SECRETKEY", awsSecretKey, secretName), }...) } return envVars @@ -176,11 +100,11 @@ func makeEnv(source *v1alpha1.IntegrationSource) []corev1.EnvVar { // AWS DynamoDB Streams environment variables if source.Spec.Aws != nil && source.Spec.Aws.DDBStreams != nil { - envVars = append(envVars, generateEnvVarsFromStruct("CAMEL_KAMELET_AWS_DDB_STREAMS_SOURCE", *source.Spec.Aws.DDBStreams)...) + envVars = append(envVars, integration.GenerateEnvVarsFromStruct("CAMEL_KAMELET_AWS_DDB_STREAMS_SOURCE", *source.Spec.Aws.DDBStreams)...) if secretName != "" { envVars = append(envVars, []corev1.EnvVar{ - makeSecretEnvVar("CAMEL_KAMELET_AWS_DDB_STREAMS_SOURCE_ACCESSKEY", awsAccessKey, secretName), - makeSecretEnvVar("CAMEL_KAMELET_AWS_DDB_STREAMS_SOURCE_SECRETKEY", awsSecretKey, secretName), + integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_DDB_STREAMS_SOURCE_ACCESSKEY", awsAccessKey, secretName), + integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_DDB_STREAMS_SOURCE_SECRETKEY", awsSecretKey, secretName), }...) } return envVars @@ -190,19 +114,6 @@ func makeEnv(source *v1alpha1.IntegrationSource) []corev1.EnvVar { return envVars } -func makeSSLEnvVar() []corev1.EnvVar { - return []corev1.EnvVar{ - { - Name: "CAMEL_KNATIVE_CLIENT_SSL_ENABLED", - Value: "true", - }, - { - Name: "CAMEL_KNATIVE_CLIENT_SSL_CERT_PATH", - Value: "/knative-custom-certs/knative-eventing-bundle.pem", - }, - } -} - func selectImage(source *v1alpha1.IntegrationSource) string { if source.Spec.Timer != nil { return "gcr.io/knative-nightly/timer-source:latest" @@ -220,17 +131,3 @@ func selectImage(source *v1alpha1.IntegrationSource) string { } return "" } - -func makeSecretEnvVar(name, key, secretName string) corev1.EnvVar { - return corev1.EnvVar{ - Name: name, - ValueFrom: &corev1.EnvVarSource{ - SecretKeyRef: &corev1.SecretKeySelector{ - Key: key, - LocalObjectReference: corev1.LocalObjectReference{ - Name: secretName, - }, - }, - }, - } -} diff --git a/pkg/reconciler/integrationsource/resources/containersource_test.go b/pkg/reconciler/integration/source/resources/containersource_test.go similarity index 80% rename from pkg/reconciler/integrationsource/resources/containersource_test.go rename to pkg/reconciler/integration/source/resources/containersource_test.go index 669b4eeaabc..54336ec6781 100644 --- a/pkg/reconciler/integrationsource/resources/containersource_test.go +++ b/pkg/reconciler/integration/source/resources/containersource_test.go @@ -101,31 +101,3 @@ func TestNewContainerSource(t *testing.T) { t.Errorf("NewContainerSource() mismatch (-want +got):\n%s", diff) } } - -func TestGenerateEnvVarsFromStruct(t *testing.T) { - type TestStruct struct { - Field1 int `json:"field1"` - Field2 bool `json:"field2"` - Field3 string `json:"field3"` - } - - prefix := "TEST_PREFIX" - input := &TestStruct{ - Field1: 123, - Field2: true, - Field3: "hello", - } - - // Expected environment variables including SSL settings - want := []corev1.EnvVar{ - {Name: "TEST_PREFIX_FIELD1", Value: "123"}, - {Name: "TEST_PREFIX_FIELD2", Value: "true"}, - {Name: "TEST_PREFIX_FIELD3", Value: "hello"}, - } - - got := generateEnvVarsFromStruct(prefix, input) - - if diff := cmp.Diff(want, got); diff != "" { - t.Errorf("generateEnvVarsFromStruct() mismatch (-want +got):\n%s", diff) - } -} diff --git a/pkg/reconciler/integrationsource/resources/names.go b/pkg/reconciler/integration/source/resources/names.go similarity index 100% rename from pkg/reconciler/integrationsource/resources/names.go rename to pkg/reconciler/integration/source/resources/names.go