Skip to content

Commit

Permalink
Added new ephem-msk mode (#873)
Browse files Browse the repository at this point in the history
* Added new ephem-msk mode
  • Loading branch information
psav authored Nov 3, 2023
1 parent 702169f commit a30e839
Show file tree
Hide file tree
Showing 19 changed files with 1,280 additions and 395 deletions.
8 changes: 7 additions & 1 deletion apis/cloud.redhat.com/v1alpha1/clowdenvironment_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ type MetricsConfig struct {
}

// KafkaMode details the mode of operation of the Clowder Kafka Provider
// +kubebuilder:validation:Enum=managed-ephem;managed;operator;app-interface;local;none
// +kubebuilder:validation:Enum=ephem-msk;managed;operator;app-interface;local;none
type KafkaMode string

// KafkaClusterConfig defines options related to the Kafka cluster managed/monitored by Clowder
Expand Down Expand Up @@ -252,6 +252,12 @@ type KafkaConfig struct {
// Managed topic prefix for the managed cluster. Only used in (*_managed_*) mode.
ManagedPrefix string `json:"managedPrefix,omitempty"`

// Namespace that kafkaTopics should be written to for (*_msk_*) mode.
TopicNamespace string `json:"topicNamespace,omitempty"`

// Cluster annotation identifier for (*_msk_*) mode.
ClusterAnnotation string `json:"clusterAnnotation,omitempty"`

// (Deprecated) Defines the cluster name to be used by the Kafka Provider this will
// be used in some modes to locate the Kafka instance.
ClusterName string `json:"clusterName,omitempty"`
Expand Down
2 changes: 1 addition & 1 deletion build/kube_setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ function install_keda_operator {
fi

echo "*** Applying keda-operator manifest ..."
${KUBECTL_CMD} apply -f https://github.com/kedacore/keda/releases/download/v2.10.1/keda-2.10.1.yaml
${KUBECTL_CMD} apply -f https://github.com/kedacore/keda/releases/download/v2.12.0/keda-2.12.0.yaml --server-side

echo "*** Will wait for keda-operator to come up in background"
${KUBECTL_CMD} rollout status deployment/$DEPLOYMENT -n $OPERATOR_NS | sed "s/^/[keda-operator] /" &
Expand Down
9 changes: 8 additions & 1 deletion config/crd/bases/cloud.redhat.com_clowdenvironments.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,9 @@ spec:
description: Version. If unset, default is '2.5.0'
type: string
type: object
clusterAnnotation:
description: Cluster annotation identifier for (*_msk_*) mode.
type: string
clusterName:
description: (Deprecated) Defines the cluster name to be used
by the Kafka Provider this will be used in some modes to
Expand Down Expand Up @@ -342,7 +345,7 @@ spec:
a small instance of Kafka is created in the desired cluster
namespace and configured to auto-create topics.'
enum:
- managed-ephem
- ephem-msk
- managed
- operator
- app-interface
Expand All @@ -362,6 +365,10 @@ spec:
suffix:
description: (Deprecated) (Unused)
type: string
topicNamespace:
description: Namespace that kafkaTopics should be written
to for (*_msk_*) mode.
type: string
required:
- mode
type: object
Expand Down
81 changes: 2 additions & 79 deletions controllers/cloud.redhat.com/providers/kafka/managed.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,12 @@ package kafka

import (
"fmt"
"strconv"

crd "github.com/RedHatInsights/clowder/apis/cloud.redhat.com/v1alpha1"
"github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/config"
"github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/errors"
"github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/providers"
"github.com/RedHatInsights/rhc-osdk-utils/utils"

core "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
)

type managedKafkaProvider struct {
Expand All @@ -36,12 +32,12 @@ func (k *managedKafkaProvider) Provide(app *crd.ClowdApp) error {
var secret *core.Secret
var broker config.BrokerConfig

secret, err = k.getSecret()
secret, err = getSecret(k)
if err != nil {
return err
}

broker, err = k.getBrokerConfig(secret)
broker, err = getBrokerConfig(secret)
if err != nil {
return err
}
Expand All @@ -68,52 +64,6 @@ func (k *managedKafkaProvider) appendTopic(topic crd.KafkaTopicSpec, kafkaConfig
)
}

func (k *managedKafkaProvider) destructureSecret(secret *core.Secret) (int, string, string, string, string, string, error) {
port, err := strconv.ParseUint(string(secret.Data["port"]), 10, 16)
if err != nil {
return 0, "", "", "", "", "", err
}
password := string(secret.Data["password"])
username := string(secret.Data["username"])
hostname := string(secret.Data["hostname"])
cacert := ""
if val, ok := secret.Data["cacert"]; ok {
cacert = string(val)
}
saslMechanism := "PLAIN"
if val, ok := secret.Data["saslMechanism"]; ok {
saslMechanism = string(val)
}
return int(port), password, username, hostname, cacert, saslMechanism, nil
}

func (k *managedKafkaProvider) getBrokerConfig(secret *core.Secret) (config.BrokerConfig, error) {
broker := config.BrokerConfig{}

port, password, username, hostname, cacert, saslMechanism, err := k.destructureSecret(secret)
if err != nil {
return broker, err
}

saslType := config.BrokerConfigAuthtypeSasl

broker.Hostname = hostname
broker.Port = &port
broker.Authtype = &saslType
if cacert != "" {
broker.Cacert = &cacert
}
broker.Sasl = &config.KafkaSASLConfig{
Password: &password,
Username: &username,
SecurityProtocol: utils.StringPtr("SASL_SSL"),
SaslMechanism: utils.StringPtr(saslMechanism),
}
broker.SecurityProtocol = utils.StringPtr("SASL_SSL")

return broker, nil
}

func (k *managedKafkaProvider) getKafkaConfig(broker config.BrokerConfig, app *crd.ClowdApp) *config.KafkaConfig {
kafkaConfig := &config.KafkaConfig{}
kafkaConfig.Brokers = []config.BrokerConfig{broker}
Expand All @@ -126,30 +76,3 @@ func (k *managedKafkaProvider) getKafkaConfig(broker config.BrokerConfig, app *c
return kafkaConfig

}

func (k *managedKafkaProvider) getSecret() (*core.Secret, error) {
secretRef, err := k.getSecretRef()
if err != nil {
return nil, err
}

secret := &core.Secret{}

if err = k.Client.Get(k.Ctx, secretRef, secret); err != nil {
return nil, err
}

return secret, nil
}

func (k *managedKafkaProvider) getSecretRef() (types.NamespacedName, error) {
secretRef := types.NamespacedName{
Name: k.Env.Spec.Providers.Kafka.ManagedSecretRef.Name,
Namespace: k.Env.Spec.Providers.Kafka.ManagedSecretRef.Namespace,
}
nullName := types.NamespacedName{}
if secretRef == nullName {
return nullName, errors.NewClowderError("no secret ref defined for managed Kafka")
}
return secretRef, nil
}
176 changes: 176 additions & 0 deletions controllers/cloud.redhat.com/providers/kafka/msk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package kafka

import (
"encoding/json"
"fmt"
"strings"

crd "github.com/RedHatInsights/clowder/apis/cloud.redhat.com/v1alpha1"

"github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/clowderconfig"
"github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/config"
"github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/errors"
"github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/providers"
core "k8s.io/api/core/v1"
apiextensions "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
)

type mskProvider struct {
providers.Provider
}

// NewStrimzi returns a new strimzi provider object.
func NewMSK(p *providers.Provider) (providers.ClowderProvider, error) {
p.Cache.AddPossibleGVKFromIdent(
CyndiPipeline,
CyndiAppSecret,
CyndiHostInventoryAppSecret,
CyndiConfigMap,
KafkaTopic,
KafkaConnect,
)
return &mskProvider{Provider: *p}, nil
}

func (s *mskProvider) EnvProvide() error {
s.Config = &config.AppConfig{
Kafka: &config.KafkaConfig{},
}
return s.configureBrokers()
}

func (s *mskProvider) Provide(app *crd.ClowdApp) error {
if len(app.Spec.KafkaTopics) == 0 {
return nil
}

s.Config.Kafka = &config.KafkaConfig{}
s.Config.Kafka.Brokers = []config.BrokerConfig{}
s.Config.Kafka.Topics = []config.TopicConfig{}

if err := s.configureListeners(); err != nil {
return err
}

if err := processTopics(s, app); err != nil {
return err
}

if app.Spec.Cyndi.Enabled {
err := createCyndiPipeline(s, app, getConnectNamespace(s.Env), getConnectClusterName(s.Env))
if err != nil {
return err
}
}

return nil
}

func (s *mskProvider) getBootstrapServersString() string {
strArray := []string{}
for _, bc := range s.Config.Kafka.Brokers {
if bc.Port != nil {
strArray = append(strArray, fmt.Sprintf("%s:%d", bc.Hostname, *bc.Port))
} else {
strArray = append(strArray, bc.Hostname)
}
}
return strings.Join(strArray, ",")
}

type genericConfig map[string]string

func (s mskProvider) connectConfig(config *apiextensions.JSON) error {

connectConfig := genericConfig{
"config.storage.replication.factor": "1",
"config.storage.topic": fmt.Sprintf("%v-connect-cluster-configs", s.Env.Name),
"connector.client.config.override.policy": "All",
"group.id": "connect-cluster",
"offset.storage.replication.factor": "1",
"offset.storage.topic": fmt.Sprintf("%v-connect-cluster-offsets", s.Env.Name),
"status.storage.replication.factor": "1",
"status.storage.topic": fmt.Sprintf("%v-connect-cluster-status", s.Env.Name),
}

byteData, err := json.Marshal(connectConfig)
if err != nil {
return err
}
return config.UnmarshalJSON(byteData)
}

func (s *mskProvider) getKafkaConfig(broker config.BrokerConfig) *config.KafkaConfig {
kafkaConfig := &config.KafkaConfig{}
kafkaConfig.Brokers = []config.BrokerConfig{broker}
kafkaConfig.Topics = []config.TopicConfig{}

return kafkaConfig

}

func (s *mskProvider) configureListeners() error {
var err error
var secret *core.Secret
var broker config.BrokerConfig

secret, err = getSecret(s)
if err != nil {
return err
}

broker, err = getBrokerConfig(secret)
if err != nil {
return err
}

s.Config.Kafka = s.getKafkaConfig(broker)

return nil
}

func (s *mskProvider) configureBrokers() error {
// Look up Kafka cluster's listeners and configure s.Config.Brokers
// (we need to know the bootstrap server addresses before provisioning KafkaConnect)
if err := s.configureListeners(); err != nil {
clowdErr := errors.Wrap("unable to determine kafka broker addresses", err)
clowdErr.Requeue = true
return clowdErr
}

if err := configureKafkaConnectCluster(s); err != nil {
return errors.Wrap("failed to provision kafka connect cluster", err)
}

return nil
}

func (s *mskProvider) getKafkaConnectTrustedCertSecretName() (string, error) {
secRef, err := getSecretRef(s)
if err != nil {
return "", err
}
return secRef.Name, nil
}

func (s *mskProvider) getConnectClusterUserName() string {
return *s.Config.Kafka.Brokers[0].Sasl.Username
}

func (s *mskProvider) KafkaTopicName(topic crd.KafkaTopicSpec, namespace string) string {
if clowderconfig.LoadedConfig.Features.UseComplexStrimziTopicNames {
return fmt.Sprintf("%s-%s-%s", topic.TopicName, s.Env.Name, namespace)
}
return topic.TopicName
}

func (s *mskProvider) KafkaName() string {
return s.Env.Spec.Providers.Kafka.ClusterAnnotation
}

func (s *mskProvider) KafkaNamespace() string {
if s.Env.Spec.Providers.Kafka.TopicNamespace == "" {
return s.Env.Status.TargetNamespace
}
return s.Env.Spec.Providers.Kafka.TopicNamespace
}
Loading

0 comments on commit a30e839

Please sign in to comment.