From a30e839f07cdd05e175ca1179d92ad4f26e6fd76 Mon Sep 17 00:00:00 2001 From: Pete Savage Date: Fri, 3 Nov 2023 17:26:39 +0000 Subject: [PATCH] Added new ephem-msk mode (#873) * Added new ephem-msk mode --- .../v1alpha1/clowdenvironment_types.go | 8 +- build/kube_setup.sh | 2 +- .../cloud.redhat.com_clowdenvironments.yaml | 9 +- .../providers/kafka/managed.go | 81 +--- .../cloud.redhat.com/providers/kafka/msk.go | 176 ++++++++ .../providers/kafka/provider.go | 387 +++++++++++++++++- .../providers/kafka/strimzi.go | 357 +++------------- .../cloud.redhat.com/providers/providers.go | 1 + deploy-mutate.yml | 10 +- deploy.yml | 10 +- .../modules/ROOT/pages/api_reference.adoc | 2 + tests/kuttl/test-kafka-msk/00-install.yaml | 7 + tests/kuttl/test-kafka-msk/01-assert.yaml | 26 ++ tests/kuttl/test-kafka-msk/01-pods.yaml | 342 ++++++++++++++++ .../kuttl/test-kafka-msk/02-json-asserts.yaml | 10 + tests/kuttl/test-kafka-msk/03-assert.yaml | 45 ++ tests/kuttl/test-kafka-msk/03-pods.yaml | 175 ++++++++ tests/kuttl/test-kafka-msk/create_json.sh | 26 ++ .../kuttl/test-kafka-strimzi-pvc/01-pods.yaml | 1 - 19 files changed, 1280 insertions(+), 395 deletions(-) create mode 100644 controllers/cloud.redhat.com/providers/kafka/msk.go create mode 100644 tests/kuttl/test-kafka-msk/00-install.yaml create mode 100644 tests/kuttl/test-kafka-msk/01-assert.yaml create mode 100644 tests/kuttl/test-kafka-msk/01-pods.yaml create mode 100644 tests/kuttl/test-kafka-msk/02-json-asserts.yaml create mode 100644 tests/kuttl/test-kafka-msk/03-assert.yaml create mode 100644 tests/kuttl/test-kafka-msk/03-pods.yaml create mode 100755 tests/kuttl/test-kafka-msk/create_json.sh diff --git a/apis/cloud.redhat.com/v1alpha1/clowdenvironment_types.go b/apis/cloud.redhat.com/v1alpha1/clowdenvironment_types.go index 6f31725b1..1e29c520d 100644 --- a/apis/cloud.redhat.com/v1alpha1/clowdenvironment_types.go +++ b/apis/cloud.redhat.com/v1alpha1/clowdenvironment_types.go @@ -151,7 +151,7 @@ type MetricsConfig struct { } // KafkaMode details the mode of operation of the Clowder Kafka Provider -// +kubebuilder:validation:Enum=managed-ephem;managed;operator;app-interface;local;none +// +kubebuilder:validation:Enum=ephem-msk;managed;operator;app-interface;local;none type KafkaMode string // KafkaClusterConfig defines options related to the Kafka cluster managed/monitored by Clowder @@ -252,6 +252,12 @@ type KafkaConfig struct { // Managed topic prefix for the managed cluster. Only used in (*_managed_*) mode. ManagedPrefix string `json:"managedPrefix,omitempty"` + // Namespace that kafkaTopics should be written to for (*_msk_*) mode. + TopicNamespace string `json:"topicNamespace,omitempty"` + + // Cluster annotation identifier for (*_msk_*) mode. + ClusterAnnotation string `json:"clusterAnnotation,omitempty"` + // (Deprecated) Defines the cluster name to be used by the Kafka Provider this will // be used in some modes to locate the Kafka instance. ClusterName string `json:"clusterName,omitempty"` diff --git a/build/kube_setup.sh b/build/kube_setup.sh index 021b4545a..dda9e23fa 100755 --- a/build/kube_setup.sh +++ b/build/kube_setup.sh @@ -286,7 +286,7 @@ function install_keda_operator { fi echo "*** Applying keda-operator manifest ..." - ${KUBECTL_CMD} apply -f https://github.com/kedacore/keda/releases/download/v2.10.1/keda-2.10.1.yaml + ${KUBECTL_CMD} apply -f https://github.com/kedacore/keda/releases/download/v2.12.0/keda-2.12.0.yaml --server-side echo "*** Will wait for keda-operator to come up in background" ${KUBECTL_CMD} rollout status deployment/$DEPLOYMENT -n $OPERATOR_NS | sed "s/^/[keda-operator] /" & diff --git a/config/crd/bases/cloud.redhat.com_clowdenvironments.yaml b/config/crd/bases/cloud.redhat.com_clowdenvironments.yaml index 978b70b6c..eda968386 100644 --- a/config/crd/bases/cloud.redhat.com_clowdenvironments.yaml +++ b/config/crd/bases/cloud.redhat.com_clowdenvironments.yaml @@ -258,6 +258,9 @@ spec: description: Version. If unset, default is '2.5.0' type: string type: object + clusterAnnotation: + description: Cluster annotation identifier for (*_msk_*) mode. + type: string clusterName: description: (Deprecated) Defines the cluster name to be used by the Kafka Provider this will be used in some modes to @@ -342,7 +345,7 @@ spec: a small instance of Kafka is created in the desired cluster namespace and configured to auto-create topics.' enum: - - managed-ephem + - ephem-msk - managed - operator - app-interface @@ -362,6 +365,10 @@ spec: suffix: description: (Deprecated) (Unused) type: string + topicNamespace: + description: Namespace that kafkaTopics should be written + to for (*_msk_*) mode. + type: string required: - mode type: object diff --git a/controllers/cloud.redhat.com/providers/kafka/managed.go b/controllers/cloud.redhat.com/providers/kafka/managed.go index d6cbf42c0..5d174f514 100644 --- a/controllers/cloud.redhat.com/providers/kafka/managed.go +++ b/controllers/cloud.redhat.com/providers/kafka/managed.go @@ -2,16 +2,12 @@ package kafka import ( "fmt" - "strconv" crd "github.com/RedHatInsights/clowder/apis/cloud.redhat.com/v1alpha1" "github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/config" - "github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/errors" "github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/providers" - "github.com/RedHatInsights/rhc-osdk-utils/utils" core "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" ) type managedKafkaProvider struct { @@ -36,12 +32,12 @@ func (k *managedKafkaProvider) Provide(app *crd.ClowdApp) error { var secret *core.Secret var broker config.BrokerConfig - secret, err = k.getSecret() + secret, err = getSecret(k) if err != nil { return err } - broker, err = k.getBrokerConfig(secret) + broker, err = getBrokerConfig(secret) if err != nil { return err } @@ -68,52 +64,6 @@ func (k *managedKafkaProvider) appendTopic(topic crd.KafkaTopicSpec, kafkaConfig ) } -func (k *managedKafkaProvider) destructureSecret(secret *core.Secret) (int, string, string, string, string, string, error) { - port, err := strconv.ParseUint(string(secret.Data["port"]), 10, 16) - if err != nil { - return 0, "", "", "", "", "", err - } - password := string(secret.Data["password"]) - username := string(secret.Data["username"]) - hostname := string(secret.Data["hostname"]) - cacert := "" - if val, ok := secret.Data["cacert"]; ok { - cacert = string(val) - } - saslMechanism := "PLAIN" - if val, ok := secret.Data["saslMechanism"]; ok { - saslMechanism = string(val) - } - return int(port), password, username, hostname, cacert, saslMechanism, nil -} - -func (k *managedKafkaProvider) getBrokerConfig(secret *core.Secret) (config.BrokerConfig, error) { - broker := config.BrokerConfig{} - - port, password, username, hostname, cacert, saslMechanism, err := k.destructureSecret(secret) - if err != nil { - return broker, err - } - - saslType := config.BrokerConfigAuthtypeSasl - - broker.Hostname = hostname - broker.Port = &port - broker.Authtype = &saslType - if cacert != "" { - broker.Cacert = &cacert - } - broker.Sasl = &config.KafkaSASLConfig{ - Password: &password, - Username: &username, - SecurityProtocol: utils.StringPtr("SASL_SSL"), - SaslMechanism: utils.StringPtr(saslMechanism), - } - broker.SecurityProtocol = utils.StringPtr("SASL_SSL") - - return broker, nil -} - func (k *managedKafkaProvider) getKafkaConfig(broker config.BrokerConfig, app *crd.ClowdApp) *config.KafkaConfig { kafkaConfig := &config.KafkaConfig{} kafkaConfig.Brokers = []config.BrokerConfig{broker} @@ -126,30 +76,3 @@ func (k *managedKafkaProvider) getKafkaConfig(broker config.BrokerConfig, app *c return kafkaConfig } - -func (k *managedKafkaProvider) getSecret() (*core.Secret, error) { - secretRef, err := k.getSecretRef() - if err != nil { - return nil, err - } - - secret := &core.Secret{} - - if err = k.Client.Get(k.Ctx, secretRef, secret); err != nil { - return nil, err - } - - return secret, nil -} - -func (k *managedKafkaProvider) getSecretRef() (types.NamespacedName, error) { - secretRef := types.NamespacedName{ - Name: k.Env.Spec.Providers.Kafka.ManagedSecretRef.Name, - Namespace: k.Env.Spec.Providers.Kafka.ManagedSecretRef.Namespace, - } - nullName := types.NamespacedName{} - if secretRef == nullName { - return nullName, errors.NewClowderError("no secret ref defined for managed Kafka") - } - return secretRef, nil -} diff --git a/controllers/cloud.redhat.com/providers/kafka/msk.go b/controllers/cloud.redhat.com/providers/kafka/msk.go new file mode 100644 index 000000000..d5384b22e --- /dev/null +++ b/controllers/cloud.redhat.com/providers/kafka/msk.go @@ -0,0 +1,176 @@ +package kafka + +import ( + "encoding/json" + "fmt" + "strings" + + crd "github.com/RedHatInsights/clowder/apis/cloud.redhat.com/v1alpha1" + + "github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/clowderconfig" + "github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/config" + "github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/errors" + "github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/providers" + core "k8s.io/api/core/v1" + apiextensions "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" +) + +type mskProvider struct { + providers.Provider +} + +// NewStrimzi returns a new strimzi provider object. +func NewMSK(p *providers.Provider) (providers.ClowderProvider, error) { + p.Cache.AddPossibleGVKFromIdent( + CyndiPipeline, + CyndiAppSecret, + CyndiHostInventoryAppSecret, + CyndiConfigMap, + KafkaTopic, + KafkaConnect, + ) + return &mskProvider{Provider: *p}, nil +} + +func (s *mskProvider) EnvProvide() error { + s.Config = &config.AppConfig{ + Kafka: &config.KafkaConfig{}, + } + return s.configureBrokers() +} + +func (s *mskProvider) Provide(app *crd.ClowdApp) error { + if len(app.Spec.KafkaTopics) == 0 { + return nil + } + + s.Config.Kafka = &config.KafkaConfig{} + s.Config.Kafka.Brokers = []config.BrokerConfig{} + s.Config.Kafka.Topics = []config.TopicConfig{} + + if err := s.configureListeners(); err != nil { + return err + } + + if err := processTopics(s, app); err != nil { + return err + } + + if app.Spec.Cyndi.Enabled { + err := createCyndiPipeline(s, app, getConnectNamespace(s.Env), getConnectClusterName(s.Env)) + if err != nil { + return err + } + } + + return nil +} + +func (s *mskProvider) getBootstrapServersString() string { + strArray := []string{} + for _, bc := range s.Config.Kafka.Brokers { + if bc.Port != nil { + strArray = append(strArray, fmt.Sprintf("%s:%d", bc.Hostname, *bc.Port)) + } else { + strArray = append(strArray, bc.Hostname) + } + } + return strings.Join(strArray, ",") +} + +type genericConfig map[string]string + +func (s mskProvider) connectConfig(config *apiextensions.JSON) error { + + connectConfig := genericConfig{ + "config.storage.replication.factor": "1", + "config.storage.topic": fmt.Sprintf("%v-connect-cluster-configs", s.Env.Name), + "connector.client.config.override.policy": "All", + "group.id": "connect-cluster", + "offset.storage.replication.factor": "1", + "offset.storage.topic": fmt.Sprintf("%v-connect-cluster-offsets", s.Env.Name), + "status.storage.replication.factor": "1", + "status.storage.topic": fmt.Sprintf("%v-connect-cluster-status", s.Env.Name), + } + + byteData, err := json.Marshal(connectConfig) + if err != nil { + return err + } + return config.UnmarshalJSON(byteData) +} + +func (s *mskProvider) getKafkaConfig(broker config.BrokerConfig) *config.KafkaConfig { + kafkaConfig := &config.KafkaConfig{} + kafkaConfig.Brokers = []config.BrokerConfig{broker} + kafkaConfig.Topics = []config.TopicConfig{} + + return kafkaConfig + +} + +func (s *mskProvider) configureListeners() error { + var err error + var secret *core.Secret + var broker config.BrokerConfig + + secret, err = getSecret(s) + if err != nil { + return err + } + + broker, err = getBrokerConfig(secret) + if err != nil { + return err + } + + s.Config.Kafka = s.getKafkaConfig(broker) + + return nil +} + +func (s *mskProvider) configureBrokers() error { + // Look up Kafka cluster's listeners and configure s.Config.Brokers + // (we need to know the bootstrap server addresses before provisioning KafkaConnect) + if err := s.configureListeners(); err != nil { + clowdErr := errors.Wrap("unable to determine kafka broker addresses", err) + clowdErr.Requeue = true + return clowdErr + } + + if err := configureKafkaConnectCluster(s); err != nil { + return errors.Wrap("failed to provision kafka connect cluster", err) + } + + return nil +} + +func (s *mskProvider) getKafkaConnectTrustedCertSecretName() (string, error) { + secRef, err := getSecretRef(s) + if err != nil { + return "", err + } + return secRef.Name, nil +} + +func (s *mskProvider) getConnectClusterUserName() string { + return *s.Config.Kafka.Brokers[0].Sasl.Username +} + +func (s *mskProvider) KafkaTopicName(topic crd.KafkaTopicSpec, namespace string) string { + if clowderconfig.LoadedConfig.Features.UseComplexStrimziTopicNames { + return fmt.Sprintf("%s-%s-%s", topic.TopicName, s.Env.Name, namespace) + } + return topic.TopicName +} + +func (s *mskProvider) KafkaName() string { + return s.Env.Spec.Providers.Kafka.ClusterAnnotation +} + +func (s *mskProvider) KafkaNamespace() string { + if s.Env.Spec.Providers.Kafka.TopicNamespace == "" { + return s.Env.Status.TargetNamespace + } + return s.Env.Spec.Providers.Kafka.TopicNamespace +} diff --git a/controllers/cloud.redhat.com/providers/kafka/provider.go b/controllers/cloud.redhat.com/providers/kafka/provider.go index 873a035d5..6d999f6e6 100644 --- a/controllers/cloud.redhat.com/providers/kafka/provider.go +++ b/controllers/cloud.redhat.com/providers/kafka/provider.go @@ -2,18 +2,40 @@ package kafka import ( "fmt" + "strconv" "strings" crd "github.com/RedHatInsights/clowder/apis/cloud.redhat.com/v1alpha1" cyndi "github.com/RedHatInsights/cyndi-operator/api/v1alpha1" + strimzi "github.com/RedHatInsights/strimzi-client-go/apis/kafka.strimzi.io/v1beta2" core "k8s.io/api/core/v1" + apiextensions "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/config" "github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/errors" "github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/providers" rc "github.com/RedHatInsights/rhc-osdk-utils/resourceCache" + "github.com/RedHatInsights/rhc-osdk-utils/utils" ) +type providerInterface interface { + providers.RootProvider + KafkaTopicName(topic crd.KafkaTopicSpec, namespace string) string + KafkaName() string + KafkaNamespace() string + getConnectClusterUserName() string + getBootstrapServersString() string + connectConfig(*apiextensions.JSON) error + getKafkaConnectTrustedCertSecretName() (string, error) +} + +type rootKafkaProvider struct { + providers.Provider +} + var DefaultImageKafkaXjoin = "quay.io/cloudservices/xjoin-kafka-connect-strimzi:latest" // ProvName is the name/ident of the provider @@ -31,6 +53,12 @@ var CyndiHostInventoryAppSecret = rc.NewSingleResourceIdent(ProvName, "cyndi_hos // CyndiConfigMap is the resource ident for a CyndiConfigMap object. var CyndiConfigMap = rc.NewSingleResourceIdent(ProvName, "cyndi_config_map", &core.ConfigMap{}, rc.ResourceOptions{WriteNow: true}) +// KafkaTopic is the resource ident for a KafkaTopic object. +var KafkaTopic = rc.NewSingleResourceIdent(ProvName, "kafka_topic", &strimzi.KafkaTopic{}, rc.ResourceOptions{WriteNow: true}) + +// KafkaConnect is the resource ident for a KafkaConnect object. +var KafkaConnect = rc.NewSingleResourceIdent(ProvName, "kafka_connect", &strimzi.KafkaConnect{}, rc.ResourceOptions{WriteNow: true}) + // GetKafka returns the correct kafka provider based on the environment. func GetKafka(c *providers.Provider) (providers.ClowderProvider, error) { c.Env.ConvertDeprecatedKafkaSpec() @@ -42,6 +70,8 @@ func GetKafka(c *providers.Provider) (providers.ClowderProvider, error) { return NewAppInterface(c) case "managed": return NewManagedKafka(c) + case "ephem-msk": + return NewMSK(c) case "none", "": return NewNoneKafka(c) default: @@ -89,10 +119,359 @@ func getConnectClusterName(env *crd.ClowdEnvironment) string { return env.Spec.Providers.Kafka.Connect.Name } -func getConnectClusterUserName(env *crd.ClowdEnvironment) string { - return fmt.Sprintf("%s-connect", env.Name) -} - func init() { providers.ProvidersRegistration.Register(GetKafka, 6, ProvName) } + +func processTopics(s providerInterface, app *crd.ClowdApp) error { + topicConfig := []config.TopicConfig{} + + appList, err := s.GetEnv().GetAppsInEnv(s.GetCtx(), s.GetClient()) + + if err != nil { + return errors.Wrap("Topic creation failed: Error listing apps", err) + } + + for _, topic := range app.Spec.KafkaTopics { + k := &strimzi.KafkaTopic{} + + topicName := s.KafkaTopicName(topic, app.Namespace) + knn := types.NamespacedName{ + Namespace: s.KafkaNamespace(), + Name: topicName, + } + + if err := s.GetCache().Create(KafkaTopic, knn, k); err != nil { + return err + } + + labels := providers.Labels{ + "strimzi.io/cluster": s.KafkaName(), + "env": app.Spec.EnvName, + // If we label it with the app name, since app names should be + // unique? can we use for delete selector? + } + + k.SetName(topicName) + k.SetNamespace(s.KafkaNamespace()) + // the ClowdEnvironment is the owner of this topic + k.SetOwnerReferences([]metav1.OwnerReference{s.GetEnv().MakeOwnerReference()}) + k.SetLabels(labels) + + k.Spec = &strimzi.KafkaTopicSpec{} + + err := processTopicValues(k, s.GetEnv(), appList, topic) + + if err != nil { + return err + } + + if err := s.GetCache().Update(KafkaTopic, k); err != nil { + return err + } + + topicConfig = append( + topicConfig, + config.TopicConfig{Name: topicName, RequestedName: topic.TopicName}, + ) + } + + s.GetConfig().Kafka.Topics = topicConfig + + return nil +} + +func processTopicValues( + k *strimzi.KafkaTopic, + env *crd.ClowdEnvironment, + appList *crd.ClowdAppList, + topic crd.KafkaTopicSpec, +) error { + + keys := map[string][]string{} + replicaValList := []string{} + partitionValList := []string{} + + for _, iapp := range appList.Items { + if iapp.Spec.KafkaTopics != nil { + for _, itopic := range iapp.Spec.KafkaTopics { + if itopic.TopicName != topic.TopicName { + // Only consider a topic that matches the name + continue + } + replicaValList = append(replicaValList, strconv.Itoa(int(itopic.Replicas))) + partitionValList = append(partitionValList, strconv.Itoa(int(itopic.Partitions))) + for key := range itopic.Config { + if _, ok := keys[key]; !ok { + keys[key] = []string{} + } + keys[key] = append(keys[key], itopic.Config[key]) + } + } + } + } + + jsonData := "{" + + for key, valList := range keys { + f, ok := conversionMap[key] + if ok { + out, _ := f(valList) + jsonData = fmt.Sprintf("%s\"%s\":\"%s\",", jsonData, key, out) + } else { + return errors.NewClowderError(fmt.Sprintf("no conversion type for %s", key)) + } + } + + if len(jsonData) > 1 { + jsonData = jsonData[0 : len(jsonData)-1] + } + jsonData += "}" + + var config apiextensions.JSON + + err := config.UnmarshalJSON([]byte(jsonData)) + + if err != nil { + return err + + } + + k.Spec.Config = &config + + if len(replicaValList) > 0 { + maxReplicas, err := utils.IntMax(replicaValList) + if err != nil { + return errors.NewClowderError(fmt.Sprintf("could not compute max for %v", replicaValList)) + } + maxReplicasInt, err := strconv.ParseUint(maxReplicas, 10, 16) + if err != nil { + return errors.NewClowderError(fmt.Sprintf("could not convert string to int32 for %v", maxReplicas)) + } + k.Spec.Replicas = utils.Int32Ptr(int(maxReplicasInt)) + if *k.Spec.Replicas < int32(1) { + // if unset, default to 3 + k.Spec.Replicas = utils.Int32Ptr(3) + } + } + + if len(partitionValList) > 0 { + maxPartitions, err := utils.IntMax(partitionValList) + if err != nil { + return errors.NewClowderError(fmt.Sprintf("could not compute max for %v", partitionValList)) + } + maxPartitionsInt, err := strconv.ParseUint(maxPartitions, 10, 16) + if err != nil { + return errors.NewClowderError(fmt.Sprintf("could not convert to string to int32 for %v", maxPartitions)) + } + k.Spec.Partitions = utils.Int32Ptr(int(maxPartitionsInt)) + if *k.Spec.Partitions < int32(1) { + // if unset, default to 3 + k.Spec.Partitions = utils.Int32Ptr(3) + } + } + + if env.Spec.Providers.Kafka.Cluster.Replicas < int32(1) { + k.Spec.Replicas = utils.Int32Ptr(1) + } else if env.Spec.Providers.Kafka.Cluster.Replicas < *k.Spec.Replicas { + k.Spec.Replicas = &env.Spec.Providers.Kafka.Cluster.Replicas + } + + return nil +} + +func configureKafkaConnectCluster(s providerInterface) error { + var kcRequests, kcLimits apiextensions.JSON + + // default values for config/requests/limits in Strimzi resource specs + err := kcRequests.UnmarshalJSON([]byte(`{ + "cpu": "300m", + "memory": "500Mi" + }`)) + if err != nil { + return fmt.Errorf("could not unmarshal kcRequests: %w", err) + } + + err = kcLimits.UnmarshalJSON([]byte(`{ + "cpu": "600m", + "memory": "800Mi" + }`)) + if err != nil { + return fmt.Errorf("could not unmarshal kcLimits: %w", err) + } + + // check if defaults have been overridden in ClowdEnvironment + if s.GetEnv().Spec.Providers.Kafka.Connect.Resources.Requests != nil { + kcRequests = *s.GetEnv().Spec.Providers.Kafka.Connect.Resources.Requests + } + if s.GetEnv().Spec.Providers.Kafka.Connect.Resources.Limits != nil { + kcLimits = *s.GetEnv().Spec.Providers.Kafka.Connect.Resources.Limits + } + + clusterNN := types.NamespacedName{ + Namespace: getConnectNamespace(s.GetEnv()), + Name: getConnectClusterName(s.GetEnv()), + } + + k := &strimzi.KafkaConnect{} + if err := s.GetCache().Create(KafkaConnect, clusterNN, k); err != nil { + return err + } + + // ensure that connect cluster of this same name but labelled for different env does not exist + if envLabel, ok := k.GetLabels()["env"]; ok { + if envLabel != s.GetEnv().Name { + return fmt.Errorf( + "kafka connect cluster named '%s' found in ns '%s' but tied to env '%s'", + clusterNN.Name, clusterNN.Namespace, envLabel, + ) + } + } + + // populate options from the kafka provider's KafkaConnectClusterOptions + replicas := s.GetEnv().Spec.Providers.Kafka.Connect.Replicas + if replicas < int32(1) { + replicas = int32(1) + } + + version := s.GetEnv().Spec.Providers.Kafka.Connect.Version + if version == "" { + version = "3.4.0" + } + + image := s.GetEnv().Spec.Providers.Kafka.Connect.Image + if image == "" { + image = DefaultImageKafkaXjoin + } + + var config apiextensions.JSON + + err = s.connectConfig(&config) + if err != nil { + return fmt.Errorf("could not unmarshal config: %w", err) + } + + username := s.getConnectClusterUserName() + + k.Spec = &strimzi.KafkaConnectSpec{ + Replicas: &replicas, + BootstrapServers: s.getBootstrapServersString(), + Version: &version, + Config: &config, + Image: &image, + Resources: &strimzi.KafkaConnectSpecResources{ + Requests: &kcRequests, + Limits: &kcLimits, + }, + } + + secName, err := s.getKafkaConnectTrustedCertSecretName() + if err != nil { + return err + } + + if !s.GetEnv().Spec.Providers.Kafka.EnableLegacyStrimzi { + k.Spec.Tls = &strimzi.KafkaConnectSpecTls{ + TrustedCertificates: []strimzi.KafkaConnectSpecTlsTrustedCertificatesElem{{ + Certificate: "ca.crt", + SecretName: secName, + }}, + } + k.Spec.Authentication = &strimzi.KafkaConnectSpecAuthentication{ + PasswordSecret: &strimzi.KafkaConnectSpecAuthenticationPasswordSecret{ + Password: "password", + SecretName: username, + }, + Type: "scram-sha-512", + Username: &username, + } + } + + // configures this KafkaConnect to use KafkaConnector resources to avoid needing to call the + // Connect REST API directly + annotations := k.GetAnnotations() + if annotations == nil { + annotations = make(map[string]string) + } + annotations["strimzi.io/use-connector-resources"] = "true" + k.SetAnnotations(annotations) + k.SetOwnerReferences([]metav1.OwnerReference{s.GetEnv().MakeOwnerReference()}) + k.SetName(getConnectClusterName(s.GetEnv())) + k.SetNamespace(getConnectNamespace(s.GetEnv())) + k.SetLabels(providers.Labels{"env": s.GetEnv().Name}) + + return s.GetCache().Update(KafkaConnect, k) +} + +func getSecretRef(s providers.RootProvider) (types.NamespacedName, error) { + secretRef := types.NamespacedName{ + Name: s.GetEnv().Spec.Providers.Kafka.ManagedSecretRef.Name, + Namespace: s.GetEnv().Spec.Providers.Kafka.ManagedSecretRef.Namespace, + } + nullName := types.NamespacedName{} + if secretRef == nullName { + return nullName, errors.NewClowderError("no secret ref defined for managed Kafka") + } + return secretRef, nil +} + +func getSecret(s providers.RootProvider) (*core.Secret, error) { + secretRef, err := getSecretRef(s) + if err != nil { + return nil, err + } + + secret := &core.Secret{} + + if err = s.GetClient().Get(s.GetCtx(), secretRef, secret); err != nil { + return nil, err + } + + return secret, nil +} + +func getBrokerConfig(secret *core.Secret) (config.BrokerConfig, error) { + broker := config.BrokerConfig{} + + port, password, username, hostname, cacert, saslMechanism, err := destructureSecret(secret) + if err != nil { + return broker, err + } + + saslType := config.BrokerConfigAuthtypeSasl + + broker.Hostname = hostname + broker.Port = &port + broker.Authtype = &saslType + if cacert != "" { + broker.Cacert = &cacert + } + broker.Sasl = &config.KafkaSASLConfig{ + Password: &password, + Username: &username, + SecurityProtocol: utils.StringPtr("SASL_SSL"), + SaslMechanism: utils.StringPtr(saslMechanism), + } + broker.SecurityProtocol = utils.StringPtr("SASL_SSL") + + return broker, nil +} + +func destructureSecret(secret *core.Secret) (int, string, string, string, string, string, error) { + port, err := strconv.ParseUint(string(secret.Data["port"]), 10, 16) + if err != nil { + return 0, "", "", "", "", "", err + } + password := string(secret.Data["password"]) + username := string(secret.Data["username"]) + hostname := string(secret.Data["hostname"]) + cacert := "" + if val, ok := secret.Data["cacert"]; ok { + cacert = string(val) + } + saslMechanism := "PLAIN" + if val, ok := secret.Data["saslMechanism"]; ok { + saslMechanism = string(val) + } + return int(port), password, username, hostname, cacert, saslMechanism, nil +} diff --git a/controllers/cloud.redhat.com/providers/kafka/strimzi.go b/controllers/cloud.redhat.com/providers/kafka/strimzi.go index d469e7e7d..320e0b92d 100644 --- a/controllers/cloud.redhat.com/providers/kafka/strimzi.go +++ b/controllers/cloud.redhat.com/providers/kafka/strimzi.go @@ -24,15 +24,9 @@ import ( rc "github.com/RedHatInsights/rhc-osdk-utils/resourceCache" ) -// KafkaTopic is the resource ident for a KafkaTopic object. -var KafkaTopic = rc.NewSingleResourceIdent(ProvName, "kafka_topic", &strimzi.KafkaTopic{}, rc.ResourceOptions{WriteNow: true}) - // KafkaInstance is the resource ident for a Kafka object. var KafkaInstance = rc.NewSingleResourceIdent(ProvName, "kafka_instance", &strimzi.Kafka{}, rc.ResourceOptions{WriteNow: true}) -// KafkaConnect is the resource ident for a KafkaConnect object. -var KafkaConnect = rc.NewSingleResourceIdent(ProvName, "kafka_connect", &strimzi.KafkaConnect{}, rc.ResourceOptions{WriteNow: true}) - // KafkaUser is the resource ident for a KafkaUser object. var KafkaUser = rc.NewSingleResourceIdent(ProvName, "kafka_user", &strimzi.KafkaUser{}, rc.ResourceOptions{WriteNow: true}) @@ -47,6 +41,7 @@ var KafkaNetworkPolicy = rc.NewSingleResourceIdent(ProvName, "kafka_network_poli type strimziProvider struct { providers.Provider + rootKafkaProvider } // NewStrimzi returns a new strimzi provider object. @@ -119,7 +114,7 @@ func (s *strimziProvider) Provide(app *crd.ClowdApp) error { return nil } - if err := s.processTopics(app, s.Config.Kafka); err != nil { + if err := processTopics(s, app); err != nil { return err } @@ -478,6 +473,26 @@ func (s *strimziProvider) configureKafkaCluster() error { return s.Cache.Update(KafkaInstance, k) } +func (s strimziProvider) connectConfig(config *apiextensions.JSON) error { + + rawConfig := []byte(`{ + "config.storage.replication.factor": "1", + "config.storage.topic": "connect-cluster-configs", + "connector.client.config.override.policy": "All", + "group.id": "connect-cluster", + "offset.storage.replication.factor": "1", + "offset.storage.topic": "connect-cluster-offsets", + "status.storage.replication.factor": "1", + "status.storage.topic": "connect-cluster-status" + }`) + + return config.UnmarshalJSON(rawConfig) +} + +func (s *strimziProvider) getConnectClusterUserName() string { + return fmt.Sprintf("%s-connect", s.Env.Name) +} + func (s *strimziProvider) createKafkaMetricsConfigMap() (types.NamespacedName, error) { cm := &core.ConfigMap{} nn := types.NamespacedName{ @@ -503,9 +518,9 @@ func (s *strimziProvider) createKafkaMetricsConfigMap() (types.NamespacedName, e return nn, nil } -func (s *strimziProvider) getBootstrapServersString(configs *config.KafkaConfig) string { +func (s *strimziProvider) getBootstrapServersString() string { strArray := []string{} - for _, bc := range configs.Brokers { + for _, bc := range s.Config.Kafka.Brokers { if bc.Port != nil { strArray = append(strArray, fmt.Sprintf("%s:%d", bc.Hostname, *bc.Port)) } else { @@ -515,11 +530,15 @@ func (s *strimziProvider) getBootstrapServersString(configs *config.KafkaConfig) return strings.Join(strArray, ",") } +func (s *strimziProvider) getKafkaConnectTrustedCertSecretName() (string, error) { + return fmt.Sprintf("%s-cluster-ca-cert", getKafkaName(s.GetEnv())), nil +} + func (s *strimziProvider) createKafkaConnectUser() error { clusterNN := types.NamespacedName{ Namespace: getConnectNamespace(s.Env), - Name: getConnectClusterUserName(s.Env), + Name: s.getConnectClusterUserName(), } ku := &strimzi.KafkaUser{} @@ -577,138 +596,7 @@ func (s *strimziProvider) createKafkaConnectUser() error { return s.Cache.Update(KafkaConnectUser, ku) } -func (s *strimziProvider) configureKafkaConnectCluster(configs *config.KafkaConfig) error { - var kcRequests, kcLimits apiextensions.JSON - - // default values for config/requests/limits in Strimzi resource specs - err := kcRequests.UnmarshalJSON([]byte(`{ - "cpu": "300m", - "memory": "500Mi" - }`)) - if err != nil { - return fmt.Errorf("could not unmarshal kcRequests: %w", err) - } - - err = kcLimits.UnmarshalJSON([]byte(`{ - "cpu": "600m", - "memory": "800Mi" - }`)) - if err != nil { - return fmt.Errorf("could not unmarshal kcLimits: %w", err) - } - - // check if defaults have been overridden in ClowdEnvironment - if s.Env.Spec.Providers.Kafka.Connect.Resources.Requests != nil { - kcRequests = *s.Env.Spec.Providers.Kafka.Connect.Resources.Requests - } - if s.Env.Spec.Providers.Kafka.Connect.Resources.Limits != nil { - kcLimits = *s.Env.Spec.Providers.Kafka.Connect.Resources.Limits - } - - clusterNN := types.NamespacedName{ - Namespace: getConnectNamespace(s.Env), - Name: getConnectClusterName(s.Env), - } - - if err := s.createKafkaConnectUser(); err != nil { - return err - } - - k := &strimzi.KafkaConnect{} - if err := s.Cache.Create(KafkaConnect, clusterNN, k); err != nil { - return err - } - - // ensure that connect cluster of this same name but labelled for different env does not exist - if envLabel, ok := k.GetLabels()["env"]; ok { - if envLabel != s.Env.Name { - return fmt.Errorf( - "kafka connect cluster named '%s' found in ns '%s' but tied to env '%s'", - clusterNN.Name, clusterNN.Namespace, envLabel, - ) - } - } - - // populate options from the kafka provider's KafkaConnectClusterOptions - replicas := s.Env.Spec.Providers.Kafka.Connect.Replicas - if replicas < int32(1) { - replicas = int32(1) - } - - version := s.Env.Spec.Providers.Kafka.Connect.Version - if version == "" { - version = "3.4.0" - } - - image := s.Env.Spec.Providers.Kafka.Connect.Image - if image == "" { - image = DefaultImageKafkaXjoin - } - - username := getConnectClusterUserName(s.Env) - - var config apiextensions.JSON - - err = config.UnmarshalJSON([]byte(`{ - "config.storage.replication.factor": "1", - "config.storage.topic": "connect-cluster-configs", - "connector.client.config.override.policy": "All", - "group.id": "connect-cluster", - "offset.storage.replication.factor": "1", - "offset.storage.topic": "connect-cluster-offsets", - "status.storage.replication.factor": "1", - "status.storage.topic": "connect-cluster-status" - }`)) - if err != nil { - return fmt.Errorf("could not unmarshal config: %w", err) - } - - k.Spec = &strimzi.KafkaConnectSpec{ - Replicas: &replicas, - BootstrapServers: s.getBootstrapServersString(configs), - Version: &version, - Config: &config, - Image: &image, - Resources: &strimzi.KafkaConnectSpecResources{ - Requests: &kcRequests, - Limits: &kcLimits, - }, - } - - if !s.Env.Spec.Providers.Kafka.EnableLegacyStrimzi { - k.Spec.Tls = &strimzi.KafkaConnectSpecTls{ - TrustedCertificates: []strimzi.KafkaConnectSpecTlsTrustedCertificatesElem{{ - Certificate: "ca.crt", - SecretName: fmt.Sprintf("%s-cluster-ca-cert", getKafkaName(s.Env)), - }}, - } - k.Spec.Authentication = &strimzi.KafkaConnectSpecAuthentication{ - PasswordSecret: &strimzi.KafkaConnectSpecAuthenticationPasswordSecret{ - Password: "password", - SecretName: username, - }, - Type: "scram-sha-512", - Username: &username, - } - } - - // configures this KafkaConnect to use KafkaConnector resources to avoid needing to call the - // Connect REST API directly - annotations := k.GetAnnotations() - if annotations == nil { - annotations = make(map[string]string) - } - annotations["strimzi.io/use-connector-resources"] = "true" - k.SetAnnotations(annotations) - k.SetOwnerReferences([]metav1.OwnerReference{s.Env.MakeOwnerReference()}) - k.SetName(getConnectClusterName(s.Env)) - k.SetNamespace(getConnectNamespace(s.Env)) - k.SetLabels(providers.Labels{"env": s.Env.Name}) - - return s.Cache.Update(KafkaConnect, k) -} - -func (s *strimziProvider) configureListeners(configs *config.KafkaConfig) error { +func (s *strimziProvider) configureListeners() error { clusterNN := types.NamespacedName{ Namespace: getKafkaNamespace(s.Env), Name: getKafkaName(s.Env), @@ -737,16 +625,16 @@ func (s *strimziProvider) configureListeners(configs *config.KafkaConfig) error kafkaCACert := string(kafkaCASecret.Data["ca.crt"]) - configs.Brokers = []config.BrokerConfig{} + s.Config.Kafka.Brokers = []config.BrokerConfig{} for _, listener := range kafkaResource.Status.Listeners { if listener.Type != nil && *listener.Type == "tls" { - configs.Brokers = append(configs.Brokers, buildTLSBrokerConfig(listener, kafkaCACert)) + s.Config.Kafka.Brokers = append(s.Config.Kafka.Brokers, buildTLSBrokerConfig(listener, kafkaCACert)) } else if listener.Type != nil && (*listener.Type == "plain" || *listener.Type == "tcp") { - configs.Brokers = append(configs.Brokers, buildTCPBrokerConfig(listener)) + s.Config.Kafka.Brokers = append(s.Config.Kafka.Brokers, buildTCPBrokerConfig(listener)) } } - if len(configs.Brokers) < 1 { + if len(s.Config.Kafka.Brokers) < 1 { return fmt.Errorf( "kafka cluster '%s' in ns '%s' has no listeners", clusterNN.Name, clusterNN.Namespace, ) @@ -788,17 +676,23 @@ func (s *strimziProvider) configureBrokers() error { return errors.Wrap("failed to provision kafka cluster", err) } - config := &config.KafkaConfig{} + s.Config = &config.AppConfig{ + Kafka: &config.KafkaConfig{}, + } // Look up Kafka cluster's listeners and configure s.Config.Brokers // (we need to know the bootstrap server addresses before provisioning KafkaConnect) - if err := s.configureListeners(config); err != nil { + if err := s.configureListeners(); err != nil { clowdErr := errors.Wrap("unable to determine kafka broker addresses", err) clowdErr.Requeue = true return clowdErr } - if err := s.configureKafkaConnectCluster(config); err != nil { + if err := s.createKafkaConnectUser(); err != nil { + return err + } + + if err := configureKafkaConnectCluster(s); err != nil { return errors.Wrap("failed to provision kafka connect cluster", err) } @@ -935,7 +829,7 @@ func (s *strimziProvider) createKafkaUser(app *crd.ClowdApp) error { all := strimzi.KafkaUserSpecAuthorizationAclsElemOperationAll for _, topic := range app.Spec.KafkaTopics { - topicName := getTopicName(topic, *s.Env, app.Namespace) + topicName := s.KafkaTopicName(topic, app.Namespace) ku.Spec.Authorization.Acls = append(ku.Spec.Authorization.Acls, strimzi.KafkaUserSpecAuthorizationAclsElem{ Host: &address, @@ -962,166 +856,17 @@ func (s *strimziProvider) createKafkaUser(app *crd.ClowdApp) error { return s.Cache.Update(KafkaUser, ku) } -func (s *strimziProvider) processTopics(app *crd.ClowdApp, c *config.KafkaConfig) error { - topicConfig := []config.TopicConfig{} - - appList, err := s.Env.GetAppsInEnv(s.Ctx, s.Client) - - if err != nil { - return errors.Wrap("Topic creation failed: Error listing apps", err) - } - - for _, topic := range app.Spec.KafkaTopics { - k := &strimzi.KafkaTopic{} - - topicName := getTopicName(topic, *s.Env, app.Namespace) - knn := types.NamespacedName{ - Namespace: getKafkaNamespace(s.Env), - Name: topicName, - } - - if err := s.Cache.Create(KafkaTopic, knn, k); err != nil { - return err - } - - labels := providers.Labels{ - "strimzi.io/cluster": getKafkaName(s.Env), - "env": app.Spec.EnvName, - // If we label it with the app name, since app names should be - // unique? can we use for delete selector? - } - - k.SetName(topicName) - k.SetNamespace(getKafkaNamespace(s.Env)) - // the ClowdEnvironment is the owner of this topic - k.SetOwnerReferences([]metav1.OwnerReference{s.Env.MakeOwnerReference()}) - k.SetLabels(labels) - - k.Spec = &strimzi.KafkaTopicSpec{} - - err := processTopicValues(k, s.Env, appList, topic) - - if err != nil { - return err - } - - if err := s.Cache.Update(KafkaTopic, k); err != nil { - return err - } - - topicConfig = append( - topicConfig, - config.TopicConfig{Name: topicName, RequestedName: topic.TopicName}, - ) - } - - c.Topics = topicConfig - - return nil -} - -func getTopicName(topic crd.KafkaTopicSpec, env crd.ClowdEnvironment, namespace string) string { +func (s *strimziProvider) KafkaTopicName(topic crd.KafkaTopicSpec, namespace string) string { if clowderconfig.LoadedConfig.Features.UseComplexStrimziTopicNames { - return fmt.Sprintf("%s-%s-%s", topic.TopicName, env.Name, namespace) + return fmt.Sprintf("%s-%s-%s", topic.TopicName, s.GetEnv().Name, namespace) } return topic.TopicName } -func processTopicValues( - k *strimzi.KafkaTopic, - env *crd.ClowdEnvironment, - appList *crd.ClowdAppList, - topic crd.KafkaTopicSpec, -) error { - - keys := map[string][]string{} - replicaValList := []string{} - partitionValList := []string{} - - for _, iapp := range appList.Items { - if iapp.Spec.KafkaTopics != nil { - for _, itopic := range iapp.Spec.KafkaTopics { - if itopic.TopicName != topic.TopicName { - // Only consider a topic that matches the name - continue - } - replicaValList = append(replicaValList, strconv.Itoa(int(itopic.Replicas))) - partitionValList = append(partitionValList, strconv.Itoa(int(itopic.Partitions))) - for key := range itopic.Config { - if _, ok := keys[key]; !ok { - keys[key] = []string{} - } - keys[key] = append(keys[key], itopic.Config[key]) - } - } - } - } - - jsonData := "{" - - for key, valList := range keys { - f, ok := conversionMap[key] - if ok { - out, _ := f(valList) - jsonData = fmt.Sprintf("%s\"%s\":\"%s\",", jsonData, key, out) - } else { - return errors.NewClowderError(fmt.Sprintf("no conversion type for %s", key)) - } - } - - if len(jsonData) > 1 { - jsonData = jsonData[0 : len(jsonData)-1] - } - jsonData += "}" - - var config apiextensions.JSON - - err := config.UnmarshalJSON([]byte(jsonData)) - - if err != nil { - return err - - } - - k.Spec.Config = &config - - if len(replicaValList) > 0 { - maxReplicas, err := utils.IntMax(replicaValList) - if err != nil { - return errors.NewClowderError(fmt.Sprintf("could not compute max for %v", replicaValList)) - } - maxReplicasInt, err := strconv.ParseUint(maxReplicas, 10, 16) - if err != nil { - return errors.NewClowderError(fmt.Sprintf("could not convert string to int32 for %v", maxReplicas)) - } - k.Spec.Replicas = utils.Int32Ptr(int(maxReplicasInt)) - if *k.Spec.Replicas < int32(1) { - // if unset, default to 3 - k.Spec.Replicas = utils.Int32Ptr(3) - } - } - - if len(partitionValList) > 0 { - maxPartitions, err := utils.IntMax(partitionValList) - if err != nil { - return errors.NewClowderError(fmt.Sprintf("could not compute max for %v", partitionValList)) - } - maxPartitionsInt, err := strconv.ParseUint(maxPartitions, 10, 16) - if err != nil { - return errors.NewClowderError(fmt.Sprintf("could not convert to string to int32 for %v", maxPartitions)) - } - k.Spec.Partitions = utils.Int32Ptr(int(maxPartitionsInt)) - if *k.Spec.Partitions < int32(1) { - // if unset, default to 3 - k.Spec.Partitions = utils.Int32Ptr(3) - } - } - - if env.Spec.Providers.Kafka.Cluster.Replicas < int32(1) { - k.Spec.Replicas = utils.Int32Ptr(1) - } else if env.Spec.Providers.Kafka.Cluster.Replicas < *k.Spec.Replicas { - k.Spec.Replicas = &env.Spec.Providers.Kafka.Cluster.Replicas - } +func (s *strimziProvider) KafkaName() string { + return getKafkaName(s.Env) +} - return nil +func (s *strimziProvider) KafkaNamespace() string { + return getKafkaNamespace(s.Env) } diff --git a/controllers/cloud.redhat.com/providers/providers.go b/controllers/cloud.redhat.com/providers/providers.go index 17774fa7c..e4607777b 100644 --- a/controllers/cloud.redhat.com/providers/providers.go +++ b/controllers/cloud.redhat.com/providers/providers.go @@ -116,6 +116,7 @@ type RootProvider interface { GetEnv() *crd.ClowdEnvironment GetCache() *rc.ObjectCache GetLog() logr.Logger + GetConfig() *config.AppConfig } // ClowderProvider is an interface providing a way for a provider to perform its duty. diff --git a/deploy-mutate.yml b/deploy-mutate.yml index 115faf356..f756a402d 100644 --- a/deploy-mutate.yml +++ b/deploy-mutate.yml @@ -6464,6 +6464,10 @@ objects: description: Version. If unset, default is '2.5.0' type: string type: object + clusterAnnotation: + description: Cluster annotation identifier for (*_msk_*) + mode. + type: string clusterName: description: (Deprecated) Defines the cluster name to be used by the Kafka Provider this will be used in some modes @@ -6550,7 +6554,7 @@ objects: is created in the desired cluster namespace and configured to auto-create topics.' enum: - - managed-ephem + - ephem-msk - managed - operator - app-interface @@ -6571,6 +6575,10 @@ objects: suffix: description: (Deprecated) (Unused) type: string + topicNamespace: + description: Namespace that kafkaTopics should be written + to for (*_msk_*) mode. + type: string required: - mode type: object diff --git a/deploy.yml b/deploy.yml index ecf89abe4..997677bf4 100644 --- a/deploy.yml +++ b/deploy.yml @@ -6464,6 +6464,10 @@ objects: description: Version. If unset, default is '2.5.0' type: string type: object + clusterAnnotation: + description: Cluster annotation identifier for (*_msk_*) + mode. + type: string clusterName: description: (Deprecated) Defines the cluster name to be used by the Kafka Provider this will be used in some modes @@ -6550,7 +6554,7 @@ objects: is created in the desired cluster namespace and configured to auto-create topics.' enum: - - managed-ephem + - ephem-msk - managed - operator - app-interface @@ -6571,6 +6575,10 @@ objects: suffix: description: (Deprecated) (Unused) type: string + topicNamespace: + description: Namespace that kafkaTopics should be written + to for (*_msk_*) mode. + type: string required: - mode type: object diff --git a/docs/antora/modules/ROOT/pages/api_reference.adoc b/docs/antora/modules/ROOT/pages/api_reference.adoc index 0aa43299b..2e362b546 100644 --- a/docs/antora/modules/ROOT/pages/api_reference.adoc +++ b/docs/antora/modules/ROOT/pages/api_reference.adoc @@ -1002,6 +1002,8 @@ KafkaConfig configures the Clowder provider controlling the creation of Kafka in | *`connect`* __xref:{anchor_prefix}-github-com-redhatinsights-clowder-apis-cloud-redhat-com-v1alpha1-kafkaconnectclusterconfig[$$KafkaConnectClusterConfig$$]__ | Defines options related to the Kafka Connect cluster for this environment. Ignored for (*_local_*) mode. | *`managedSecretRef`* __xref:{anchor_prefix}-github-com-redhatinsights-clowder-apis-cloud-redhat-com-v1alpha1-namespacedname[$$NamespacedName$$]__ | Defines the secret reference for the Managed Kafka mode. Only used in (*_managed_*) mode. | *`managedPrefix`* __string__ | Managed topic prefix for the managed cluster. Only used in (*_managed_*) mode. +| *`topicNamespace`* __string__ | Namespace that kafkaTopics should be written to for (*_msk_*) mode. +| *`clusterAnnotation`* __string__ | Cluster annotation identifier for (*_msk_*) mode. | *`clusterName`* __string__ | (Deprecated) Defines the cluster name to be used by the Kafka Provider this will be used in some modes to locate the Kafka instance. | *`namespace`* __string__ | (Deprecated) The Namespace the cluster is expected to reside in. This is only used in (*_app-interface_*) and (*_operator_*) modes. | *`connectNamespace`* __string__ | (Deprecated) The namespace that the Kafka Connect cluster is expected to reside in. This is only used in (*_app-interface_*) and (*_operator_*) modes. diff --git a/tests/kuttl/test-kafka-msk/00-install.yaml b/tests/kuttl/test-kafka-msk/00-install.yaml new file mode 100644 index 000000000..a171ce59b --- /dev/null +++ b/tests/kuttl/test-kafka-msk/00-install.yaml @@ -0,0 +1,7 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: test-kafka-msk +spec: + finalizers: + - kubernetes diff --git a/tests/kuttl/test-kafka-msk/01-assert.yaml b/tests/kuttl/test-kafka-msk/01-assert.yaml new file mode 100644 index 000000000..7b8343f5d --- /dev/null +++ b/tests/kuttl/test-kafka-msk/01-assert.yaml @@ -0,0 +1,26 @@ +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: Kafka +metadata: + name: test-kafka-msk + namespace: test-kafka-msk +status: + conditions: + - reason: ZooKeeperStorage + status: "True" + type: Warning + - reason: KafkaStorage + status: "True" + type: Warning + - status: "True" + type: Ready +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaUser +metadata: + labels: + strimzi.io/cluster: test-kafka-msk + name: test-kafka-msk-connect + namespace: test-kafka-msk +status: + username: test-kafka-msk-connect \ No newline at end of file diff --git a/tests/kuttl/test-kafka-msk/01-pods.yaml b/tests/kuttl/test-kafka-msk/01-pods.yaml new file mode 100644 index 000000000..c49a9f794 --- /dev/null +++ b/tests/kuttl/test-kafka-msk/01-pods.yaml @@ -0,0 +1,342 @@ +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: Kafka +metadata: + name: test-kafka-msk + namespace: test-kafka-msk +spec: + entityOperator: + tlsSidecar: + resources: + limits: + cpu: 100m + memory: 100Mi + requests: + cpu: 50m + memory: 50Mi + topicOperator: + resources: + limits: + cpu: 200m + memory: 500Mi + requests: + cpu: 50m + memory: 250Mi + userOperator: + resources: + limits: + cpu: 400m + memory: 500Mi + requests: + cpu: 50m + memory: 250Mi + kafka: + authorization: + type: simple + config: + offsets.topic.replication.factor: 1 + jvmOptions: {} + listeners: + - authentication: + type: scram-sha-512 + name: tls + port: 9093 + tls: true + type: internal + metricsConfig: + type: jmxPrometheusExporter + valueFrom: + configMapKeyRef: + key: metrics + name: test-kafka-msk-metrics + optional: false + replicas: 1 + resources: + limits: + cpu: 500m + memory: 1Gi + requests: + cpu: 250m + memory: 600Mi + storage: + type: ephemeral + version: 3.4.0 + zookeeper: + replicas: 1 + resources: + limits: + cpu: 350m + memory: 800Mi + requests: + cpu: 200m + memory: 400Mi + storage: + type: ephemeral +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaUser +metadata: + labels: + strimzi.io/cluster: test-kafka-msk + name: test-kafka-msk-connect + namespace: test-kafka-msk +spec: + authentication: + type: scram-sha-512 + authorization: + acls: + - host: '*' + operation: All + resource: + name: '*' + patternType: literal + type: topic + - host: '*' + operation: All + resource: + name: '*' + patternType: literal + type: group + - host: '*' + operation: All + resource: + name: '*' + patternType: literal + type: cluster + type: simple +--- +apiVersion: v1 +data: + metrics: |- + { + "metrics": { + "lowercaseOutputName": true, + "rules": [ + { + "labels": { + "clientId": "$3", + "partition": "$5", + "topic": "$4" + }, + "name": "kafka_server_$1_$2", + "pattern": "kafka.server<>Value", + "type": "GAUGE" + }, + { + "labels": { + "broker": "$4:$5", + "clientId": "$3" + }, + "name": "kafka_server_$1_$2", + "pattern": "kafka.server<>Value", + "type": "GAUGE" + }, + { + "labels": { + "cipher": "$5", + "listener": "$2", + "networkProcessor": "$3", + "protocol": "$4" + }, + "name": "kafka_server_$1_connections_tls_info", + "pattern": "kafka.server<>connections", + "type": "GAUGE" + }, + { + "labels": { + "clientSoftwareName": "$2", + "clientSoftwareVersion": "$3", + "listener": "$4", + "networkProcessor": "$5" + }, + "name": "kafka_server_$1_connections_software", + "pattern": "kafka.server<>connections", + "type": "GAUGE" + }, + { + "labels": { + "listener": "$2", + "networkProcessor": "$3" + }, + "name": "kafka_server_$1_$4", + "pattern": "kafka.server<>(.+):", + "type": "GAUGE" + }, + { + "labels": { + "listener": "$2", + "networkProcessor": "$3" + }, + "name": "kafka_server_$1_$4", + "pattern": "kafka.server<>(.+)", + "type": "GAUGE" + }, + { + "name": "kafka_$1_$2_$3_percent", + "pattern": "kafka.(\\w+)<>MeanRate", + "type": "GAUGE" + }, + { + "name": "kafka_$1_$2_$3_percent", + "pattern": "kafka.(\\w+)<>Value", + "type": "GAUGE" + }, + { + "labels": { + "$4": "$5" + }, + "name": "kafka_$1_$2_$3_percent", + "pattern": "kafka.(\\w+)<>Value", + "type": "GAUGE" + }, + { + "labels": { + "$4": "$5", + "$6": "$7" + }, + "name": "kafka_$1_$2_$3_total", + "pattern": "kafka.(\\w+)<>Count", + "type": "COUNTER" + }, + { + "labels": { + "$4": "$5" + }, + "name": "kafka_$1_$2_$3_total", + "pattern": "kafka.(\\w+)<>Count", + "type": "COUNTER" + }, + { + "name": "kafka_$1_$2_$3_total", + "pattern": "kafka.(\\w+)<>Count", + "type": "COUNTER" + }, + { + "labels": { + "$4": "$5", + "$6": "$7" + }, + "name": "kafka_$1_$2_$3", + "pattern": "kafka.(\\w+)<>Value", + "type": "GAUGE" + }, + { + "labels": { + "$4": "$5" + }, + "name": "kafka_$1_$2_$3", + "pattern": "kafka.(\\w+)<>Value", + "type": "GAUGE" + }, + { + "name": "kafka_$1_$2_$3", + "pattern": "kafka.(\\w+)<>Value", + "type": "GAUGE" + }, + { + "labels": { + "$4": "$5", + "$6": "$7" + }, + "name": "kafka_$1_$2_$3_count", + "pattern": "kafka.(\\w+)<>Count", + "type": "COUNTER" + }, + { + "labels": { + "$4": "$5", + "$6": "$7", + "quantile": "0.$8" + }, + "name": "kafka_$1_$2_$3", + "pattern": "kafka.(\\w+)<>(\\d+)thPercentile", + "type": "GAUGE" + }, + { + "labels": { + "$4": "$5" + }, + "name": "kafka_$1_$2_$3_count", + "pattern": "kafka.(\\w+)<>Count", + "type": "COUNTER" + }, + { + "labels": { + "$4": "$5", + "quantile": "0.$6" + }, + "name": "kafka_$1_$2_$3", + "pattern": "kafka.(\\w+)<>(\\d+)thPercentile", + "type": "GAUGE" + }, + { + "name": "kafka_$1_$2_$3_count", + "pattern": "kafka.(\\w+)<>Count", + "type": "COUNTER" + }, + { + "labels": { + "quantile": "0.$4" + }, + "name": "kafka_$1_$2_$3", + "pattern": "kafka.(\\w+)<>(\\d+)thPercentile", + "type": "GAUGE" + } + ] + } + } +kind: ConfigMap +metadata: + name: test-kafka-msk-metrics + namespace: test-kafka-msk +--- +kind: ServiceAccount +apiVersion: v1 +metadata: + name: strimzi-topic-operator + namespace: test-kafka-msk + labels: + app: strimzi +--- +kind: Role +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: strimzi-topic-operator + namespace: test-kafka-msk + labels: + app: strimzi +rules: + - verbs: + - get + - list + - watch + - create + - patch + - update + - delete + apiGroups: + - kafka.strimzi.io + resources: + - kafkatopics + - kafkatopics/status + - verbs: + - create + apiGroups: + - '' + resources: + - events +--- +kind: RoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: strimzi-topic-operator + namespace: test-kafka-msk + labels: + app: strimzi +subjects: + - kind: ServiceAccount + name: strimzi-topic-operator +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: strimzi-topic-operator diff --git a/tests/kuttl/test-kafka-msk/02-json-asserts.yaml b/tests/kuttl/test-kafka-msk/02-json-asserts.yaml new file mode 100644 index 000000000..27bd68abd --- /dev/null +++ b/tests/kuttl/test-kafka-msk/02-json-asserts.yaml @@ -0,0 +1,10 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: +- script: sleep 5 +- script: kubectl get secret --namespace=test-kafka-msk test-kafka-msk-connect -o json > /tmp/test-kafka-msk-user +- script: kubectl get secret --namespace=test-kafka-msk test-kafka-msk-cluster-ca-cert -o json > /tmp/test-kafka-msk-cluster-ca-cert + +- script: sh create_json.sh +- script: kubectl apply -f /tmp/managed-secret.yaml -n test-kafka-msk diff --git a/tests/kuttl/test-kafka-msk/03-assert.yaml b/tests/kuttl/test-kafka-msk/03-assert.yaml new file mode 100644 index 000000000..71ec92a8c --- /dev/null +++ b/tests/kuttl/test-kafka-msk/03-assert.yaml @@ -0,0 +1,45 @@ +--- +apiVersion: cloud.redhat.com/v1alpha1 +kind: ClowdEnvironment +metadata: + name: test-kafka-msk +status: + ready: true + conditions: + - status: "True" + type: DeploymentsReady + - status: "False" + type: ReconciliationFailed + - status: "True" + type: ReconciliationSuccessful + +--- +apiVersion: cloud.redhat.com/v1alpha1 +kind: ClowdApp +metadata: + name: puptoo + namespace: test-kafka-msk +status: + conditions: + - status: "True" + type: DeploymentsReady + - status: "False" + type: ReconciliationFailed + - status: "True" + type: ReconciliationSuccessful +--- +kind: Deployment +apiVersion: apps/v1 +metadata: + name: strimzi-topic-operator + namespace: test-kafka-msk + labels: + app: strimzi +status: + conditions: + - reason: MinimumReplicasAvailable + status: "True" + type: Available + - reason: NewReplicaSetAvailable + status: "True" + type: Progressing diff --git a/tests/kuttl/test-kafka-msk/03-pods.yaml b/tests/kuttl/test-kafka-msk/03-pods.yaml new file mode 100644 index 000000000..03ae96741 --- /dev/null +++ b/tests/kuttl/test-kafka-msk/03-pods.yaml @@ -0,0 +1,175 @@ +--- +apiVersion: cloud.redhat.com/v1alpha1 +kind: ClowdEnvironment +metadata: + name: test-kafka-msk +spec: + targetNamespace: test-kafka-msk + providers: + web: + port: 8000 + mode: operator + metrics: + port: 9000 + mode: operator + path: "/metrics" + kafka: + mode: ephem-msk + managedSecretRef: + name: managed-secret + namespace: test-kafka-msk + clusterAnnotation: test-kafka-msk + db: + mode: none + logging: + mode: none + objectStore: + mode: none + inMemoryDb: + mode: none + featureFlags: + mode: none + resourceDefaults: + limits: + cpu: 400m + memory: 1024Mi + requests: + cpu: 30m + memory: 512Mi +--- +apiVersion: cloud.redhat.com/v1alpha1 +kind: ClowdApp +metadata: + name: puptoo + namespace: test-kafka-msk +spec: + envName: test-kafka-msk + deployments: + - name: processor + podSpec: + image: quay.io/psav/clowder-hello + kafkaTopics: + - replicas: 3 + partitions: 64 + topicName: topic-one + - replicas: 5 + partitions: 32 + topicName: topic-two +--- +kind: Deployment +apiVersion: apps/v1 +metadata: + name: strimzi-topic-operator + namespace: test-kafka-msk + labels: + app: strimzi +spec: + replicas: 1 + selector: + matchLabels: + name: strimzi-topic-operator + template: + metadata: + creationTimestamp: null + labels: + name: strimzi-topic-operator + spec: + serviceAccountName: strimzi-topic-operator + serviceAccount: strimzi-topic-operator + restartPolicy: Always + schedulerName: default-scheduler + terminationGracePeriodSeconds: 30 + securityContext: {} + containers: + - resources: + limits: + cpu: 500m + memory: 256Mi + requests: + cpu: 100m + memory: 256Mi + readinessProbe: + httpGet: + path: /ready + port: 8080 + scheme: HTTP + initialDelaySeconds: 10 + timeoutSeconds: 1 + periodSeconds: 30 + successThreshold: 1 + failureThreshold: 3 + terminationMessagePath: /dev/termination-log + name: strimzi-topic-operator + livenessProbe: + httpGet: + path: /healthy + port: 8080 + scheme: HTTP + initialDelaySeconds: 10 + timeoutSeconds: 1 + periodSeconds: 30 + successThreshold: 1 + failureThreshold: 3 + env: + - name: STRIMZI_NAMESPACE + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: metadata.namespace + - name: STRIMZI_RESOURCE_LABELS + value: strimzi.io/cluster=test-kafka-msk + - name: STRIMZI_KAFKA_BOOTSTRAP_SERVERS + value: test-kafka-msk-kafka-bootstrap.test-kafka-msk.svc:9093 + - name: STRIMZI_FULL_RECONCILIATION_INTERVAL_MS + value: '120000' + - name: STRIMZI_LOG_LEVEL + value: INFO + - name: STRIMZI_TLS_ENABLED + value: 'false' + - name: STRIMZI_TLS_AUTH_ENABLED + value: 'false' + - name: STRIMZI_JAVA_OPTS + value: '-Xmx512M -Xms256M' + - name: STRIMZI_PUBLIC_CA + value: 'false' + - name: STRIMZI_SASL_ENABLED + value: 'true' + - name: STRIMZI_SASL_USERNAME + value: test-kafka-msk-connect + - name: STRIMZI_SASL_PASSWORD + valueFrom: + secretKeyRef: + name: test-kafka-msk-connect + key: password + - name: STRIMZI_SASL_MECHANISM + value: scram-sha-512 + - name: STRIMZI_SECURITY_PROTOCOL + value: SASL_SSL + - name: STRIMZI_USE_FINALIZERS + value: 'true' + imagePullPolicy: IfNotPresent + volumeMounts: + - name: strimzi-tmp + mountPath: /tmp + - name: ca + mountPath: /etc/tls-sidecar/cluster-ca-certs + terminationMessagePolicy: File + image: >- + quay.io/strimzi/operator:0.37.0 + args: + - /opt/strimzi/bin/topic_operator_run.sh + volumes: + - name: strimzi-tmp + emptyDir: + medium: Memory + sizeLimit: 5Mi + - name: ca + secret: + secretName: test-kafka-msk-cluster-ca-cert + + dnsPolicy: ClusterFirst + strategy: + type: Recreate + revisionHistoryLimit: 10 + progressDeadlineSeconds: 600 +--- diff --git a/tests/kuttl/test-kafka-msk/create_json.sh b/tests/kuttl/test-kafka-msk/create_json.sh new file mode 100755 index 000000000..86338bc70 --- /dev/null +++ b/tests/kuttl/test-kafka-msk/create_json.sh @@ -0,0 +1,26 @@ +#!/bin/bash + +# Set the file paths +username=$(cat /tmp/test-kafka-msk-user | jq -r '.metadata.name') +password=$(cat /tmp/test-kafka-msk-user | jq -r '.data.password' | base64 -d) +cert=$(cat /tmp/test-kafka-msk-cluster-ca-cert | jq -r '.data["ca.crt"]' | base64 -d) +port=9093 +saslMechanism=SCRAM-SHA-512 +hostname=test-kafka-msk-kafka-bootstrap.test-kafka-msk.svc + +# Create the Kubernetes Secret YAML +cat < /tmp/managed-secret.yaml +apiVersion: v1 +kind: Secret +metadata: + name: managed-secret +type: Opaque +data: + username: $(echo -n "$username" | base64) + password: $(echo -n "$password"| base64) + saslMechanism: $(echo -n "$saslMechanism" | base64) + port: $(echo -n "$port" | base64) + hostname: $(echo -n "$hostname" | base64) + ca.crt: $(echo -n "$cert" | base64 | tr -d '\n') + cacert: $(echo -n "$cert" | base64 | tr -d '\n') +EOF diff --git a/tests/kuttl/test-kafka-strimzi-pvc/01-pods.yaml b/tests/kuttl/test-kafka-strimzi-pvc/01-pods.yaml index d8dad9527..3c2acc7a2 100644 --- a/tests/kuttl/test-kafka-strimzi-pvc/01-pods.yaml +++ b/tests/kuttl/test-kafka-strimzi-pvc/01-pods.yaml @@ -17,7 +17,6 @@ spec: cluster: namespace: test-kafka-strimzi-pvc-kafka name: my-pvc-cluster - provisionCluster: true storageSize: 100Mi mode: operator pvc: true