diff --git a/charts/rocketmq-operator/crds/rocketmq.apache.org_brokers.yaml b/charts/rocketmq-operator/crds/rocketmq.apache.org_brokers.yaml index ffc0ad1b..e318d100 100644 --- a/charts/rocketmq-operator/crds/rocketmq.apache.org_brokers.yaml +++ b/charts/rocketmq-operator/crds/rocketmq.apache.org_brokers.yaml @@ -1203,6 +1203,9 @@ spec: to an implementation-defined value. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/' type: object type: object + rocketMqName: + description: RocketMqName is the name of the RocketMQ cluster. + type: string scalePodName: description: The name of pod where the metadata from type: string diff --git a/charts/rocketmq-operator/crds/rocketmq.apache.org_consoles.yaml b/charts/rocketmq-operator/crds/rocketmq.apache.org_consoles.yaml index fab41e0c..e0394f3e 100644 --- a/charts/rocketmq-operator/crds/rocketmq.apache.org_consoles.yaml +++ b/charts/rocketmq-operator/crds/rocketmq.apache.org_consoles.yaml @@ -7880,6 +7880,9 @@ spec: nameServers: description: NameServers defines the name service list e.g. 192.168.1.1:9876;192.168.1.2:9876 type: string + rocketMqName: + description: RocketMqName is the name of the RocketMQ cluster + type: string required: - consoleDeployment type: object diff --git a/charts/rocketmq-operator/crds/rocketmq.apache.org_controllers.yaml b/charts/rocketmq-operator/crds/rocketmq.apache.org_controllers.yaml index 4f98506b..60d8864b 100644 --- a/charts/rocketmq-operator/crds/rocketmq.apache.org_controllers.yaml +++ b/charts/rocketmq-operator/crds/rocketmq.apache.org_controllers.yaml @@ -1178,6 +1178,9 @@ spec: to an implementation-defined value. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/' type: object type: object + rocketMqName: + description: RocketMqName is the name of the RocketMQ cluster + type: string securityContext: description: Pod Security Context properties: diff --git a/charts/rocketmq-operator/crds/rocketmq.apache.org_nameservices.yaml b/charts/rocketmq-operator/crds/rocketmq.apache.org_nameservices.yaml index 315d85e9..a8dbaf93 100644 --- a/charts/rocketmq-operator/crds/rocketmq.apache.org_nameservices.yaml +++ b/charts/rocketmq-operator/crds/rocketmq.apache.org_nameservices.yaml @@ -1082,6 +1082,9 @@ spec: to an implementation-defined value. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/' type: object type: object + rocketMqName: + description: RocketMqName is the name of the RocketMQ cluster + type: string securityContext: description: Pod Security Context properties: diff --git a/charts/rocketmq-operator/crds/rocketmq.apache.org_topictransfers.yaml b/charts/rocketmq-operator/crds/rocketmq.apache.org_topictransfers.yaml index 4a1f1dd8..f6e4c62a 100644 --- a/charts/rocketmq-operator/crds/rocketmq.apache.org_topictransfers.yaml +++ b/charts/rocketmq-operator/crds/rocketmq.apache.org_topictransfers.yaml @@ -36,6 +36,9 @@ spec: spec: description: TopicTransferSpec defines the desired state of TopicTransfer properties: + rocketMqName: + description: RocketMqName is the name of the RocketMQ cluster + type: string sourceCluster: description: The cluster where the transferred topic from type: string diff --git a/charts/rocketmq-operator/templates/role_binding.yaml b/charts/rocketmq-operator/templates/role_binding.yaml index fc8ce958..f330555b 100644 --- a/charts/rocketmq-operator/templates/role_binding.yaml +++ b/charts/rocketmq-operator/templates/role_binding.yaml @@ -13,17 +13,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding +apiVersion: rbac.authorization.k8s.io/v1 metadata: - name: {{ include "rocketmq-operator.fullname" . }} - labels: - {{- include "rocketmq-operator.labels" . | nindent 4 }} + name: rocketmq-operator +subjects: +- kind: ServiceAccount + name: rocketmq-operator + namespace: default roleRef: - apiGroup: rbac.authorization.k8s.io kind: ClusterRole - name: {{ template "rocketmq-operator.fullname" . }} -subjects: - - name: {{ template "rocketmq-operator.serviceAccountName" . }} - namespace: {{ .Release.Namespace | quote }} - kind: ServiceAccount \ No newline at end of file + name: rocketmq-operator + apiGroup: rbac.authorization.k8s.io diff --git a/charts/rocketmq-operator/templates/service_account.yaml b/charts/rocketmq-operator/templates/service_account.yaml index 8f1a58ff..2dde8f9e 100644 --- a/charts/rocketmq-operator/templates/service_account.yaml +++ b/charts/rocketmq-operator/templates/service_account.yaml @@ -16,4 +16,4 @@ apiVersion: v1 kind: ServiceAccount metadata: - name: {{ template "rocketmq-operator.serviceAccountName" . }} + name: rocketmq-operator diff --git a/deploy/crds/rocketmq.apache.org_brokers.yaml b/deploy/crds/rocketmq.apache.org_brokers.yaml index ffc0ad1b..e318d100 100644 --- a/deploy/crds/rocketmq.apache.org_brokers.yaml +++ b/deploy/crds/rocketmq.apache.org_brokers.yaml @@ -1203,6 +1203,9 @@ spec: to an implementation-defined value. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/' type: object type: object + rocketMqName: + description: RocketMqName is the name of the RocketMQ cluster. + type: string scalePodName: description: The name of pod where the metadata from type: string diff --git a/deploy/crds/rocketmq.apache.org_consoles.yaml b/deploy/crds/rocketmq.apache.org_consoles.yaml index fab41e0c..e0394f3e 100644 --- a/deploy/crds/rocketmq.apache.org_consoles.yaml +++ b/deploy/crds/rocketmq.apache.org_consoles.yaml @@ -7880,6 +7880,9 @@ spec: nameServers: description: NameServers defines the name service list e.g. 192.168.1.1:9876;192.168.1.2:9876 type: string + rocketMqName: + description: RocketMqName is the name of the RocketMQ cluster + type: string required: - consoleDeployment type: object diff --git a/deploy/crds/rocketmq.apache.org_controllers.yaml b/deploy/crds/rocketmq.apache.org_controllers.yaml index 4f98506b..60d8864b 100644 --- a/deploy/crds/rocketmq.apache.org_controllers.yaml +++ b/deploy/crds/rocketmq.apache.org_controllers.yaml @@ -1178,6 +1178,9 @@ spec: to an implementation-defined value. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/' type: object type: object + rocketMqName: + description: RocketMqName is the name of the RocketMQ cluster + type: string securityContext: description: Pod Security Context properties: diff --git a/deploy/crds/rocketmq.apache.org_nameservices.yaml b/deploy/crds/rocketmq.apache.org_nameservices.yaml index 315d85e9..a8dbaf93 100644 --- a/deploy/crds/rocketmq.apache.org_nameservices.yaml +++ b/deploy/crds/rocketmq.apache.org_nameservices.yaml @@ -1082,6 +1082,9 @@ spec: to an implementation-defined value. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/' type: object type: object + rocketMqName: + description: RocketMqName is the name of the RocketMQ cluster + type: string securityContext: description: Pod Security Context properties: diff --git a/deploy/crds/rocketmq.apache.org_topictransfers.yaml b/deploy/crds/rocketmq.apache.org_topictransfers.yaml index 4a1f1dd8..f6e4c62a 100644 --- a/deploy/crds/rocketmq.apache.org_topictransfers.yaml +++ b/deploy/crds/rocketmq.apache.org_topictransfers.yaml @@ -36,6 +36,9 @@ spec: spec: description: TopicTransferSpec defines the desired state of TopicTransfer properties: + rocketMqName: + description: RocketMqName is the name of the RocketMQ cluster + type: string sourceCluster: description: The cluster where the transferred topic from type: string diff --git a/pkg/apis/rocketmq/v1alpha1/broker_types.go b/pkg/apis/rocketmq/v1alpha1/broker_types.go index 1647f9a9..0dfb7a36 100644 --- a/pkg/apis/rocketmq/v1alpha1/broker_types.go +++ b/pkg/apis/rocketmq/v1alpha1/broker_types.go @@ -28,6 +28,9 @@ import ( // BrokerSpec defines the desired state of Broker // +k8s:openapi-gen=true type BrokerSpec struct { + // RocketMqName is the name of the RocketMQ cluster. + RocketMqName string `json:"rocketMqName,omitempty"` + // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "operator-sdk generate k8s" to regenerate code after modifying this file // Add custom validation using kubebuilder tags: https://book-v1.book.kubebuilder.io/beyond_basics/generating_crd.html diff --git a/pkg/apis/rocketmq/v1alpha1/console_types.go b/pkg/apis/rocketmq/v1alpha1/console_types.go index 00feac54..f8104c97 100644 --- a/pkg/apis/rocketmq/v1alpha1/console_types.go +++ b/pkg/apis/rocketmq/v1alpha1/console_types.go @@ -28,6 +28,9 @@ import ( // ConsoleSpec defines the desired state of Console // +k8s:openapi-gen=true type ConsoleSpec struct { + // RocketMqName is the name of the RocketMQ cluster + RocketMqName string `json:"rocketMqName,omitempty"` + // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "operator-sdk generate k8s" to regenerate code after modifying this file // Add custom validation using kubebuilder tags: https://book-v1.book.kubebuilder.io/beyond_basics/generating_crd.html diff --git a/pkg/apis/rocketmq/v1alpha1/controller_types.go b/pkg/apis/rocketmq/v1alpha1/controller_types.go index 5e8152c8..50b8b04e 100644 --- a/pkg/apis/rocketmq/v1alpha1/controller_types.go +++ b/pkg/apis/rocketmq/v1alpha1/controller_types.go @@ -22,12 +22,19 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +const ( + ControllerRocketMqNameIndexKey = "spec.rocketMqNameNamespaced" +) + // EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! // NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. // ControllerSpec defines the desired state of Controller // +k8s:openapi-gen=true type ControllerSpec struct { + // RocketMqName is the name of the RocketMQ cluster + RocketMqName string `json:"rocketMqName,omitempty"` + // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "operator-sdk generate k8s" to regenerate code after modifying this file // Add custom validation using kubebuilder tags: https://book-v1.book.kubebuilder.io/beyond_basics/generating_crd.html diff --git a/pkg/apis/rocketmq/v1alpha1/nameservice_types.go b/pkg/apis/rocketmq/v1alpha1/nameservice_types.go index a6f4ace5..72263640 100644 --- a/pkg/apis/rocketmq/v1alpha1/nameservice_types.go +++ b/pkg/apis/rocketmq/v1alpha1/nameservice_types.go @@ -22,12 +22,19 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +const ( + NameServiceRocketMqNameIndexKey = ".spec.rocketMqNameNamespaced" +) + // EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! // NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. // NameServiceSpec defines the desired state of NameService // +k8s:openapi-gen=true type NameServiceSpec struct { + // RocketMqName is the name of the RocketMQ cluster + RocketMqName string `json:"rocketMqName,omitempty"` + // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "operator-sdk generate k8s" to regenerate code after modifying this file // Add custom validation using kubebuilder tags: https://book-v1.book.kubebuilder.io/beyond_basics/generating_crd.html diff --git a/pkg/apis/rocketmq/v1alpha1/topictransfer_types.go b/pkg/apis/rocketmq/v1alpha1/topictransfer_types.go index 82a7e37e..63e89c1e 100644 --- a/pkg/apis/rocketmq/v1alpha1/topictransfer_types.go +++ b/pkg/apis/rocketmq/v1alpha1/topictransfer_types.go @@ -27,6 +27,9 @@ import ( // TopicTransferSpec defines the desired state of TopicTransfer // +k8s:openapi-gen=true type TopicTransferSpec struct { + // RocketMqName is the name of the RocketMQ cluster + RocketMqName string `json:"rocketMqName,omitempty"` + // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "operator-sdk generate k8s" to regenerate code after modifying this file // Add custom validation using kubebuilder tags: https://book-v1.book.kubebuilder.io/beyond_basics/generating_crd.html diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index 653b3d7a..4af520fd 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -55,6 +55,9 @@ const ( // UpdateBrokerConfig is update broker config command UpdateBrokerConfig = "updateBrokerConfig" + // ClusterList is the command of cluster list + ClusterList = "clusterList" + // ParamNameServiceAddress is the name of name server list parameter ParamNameServiceAddress = "namesrvAddr" diff --git a/pkg/controller/broker/broker_controller.go b/pkg/controller/broker/broker_controller.go index 030a3d65..8c32f4bf 100644 --- a/pkg/controller/broker/broker_controller.go +++ b/pkg/controller/broker/broker_controller.go @@ -136,16 +136,18 @@ func (r *ReconcileBroker) Reconcile(ctx context.Context, request reconcile.Reque return reconcile.Result{}, err } + var groupNum int if broker.Status.Size == 0 { - share.GroupNum = broker.Spec.Size + groupNum = broker.Spec.Size } else { - share.GroupNum = broker.Status.Size + groupNum = broker.Status.Size } + var nameServersStr string if broker.Spec.NameServers == "" { // wait for name server ready when create broker cluster if nameServers is omitted for { - if share.IsNameServersStrInitialized { + if nameServersStr = share.GetNameServersStr(r.client, broker.Namespace, broker.Spec.RocketMqName); nameServersStr != "" { break } else { log.Info("Broker Waiting for name server ready...") @@ -153,24 +155,24 @@ func (r *ReconcileBroker) Reconcile(ctx context.Context, request reconcile.Reque } } } else { - share.NameServersStr = broker.Spec.NameServers + nameServersStr = broker.Spec.NameServers } if broker.Spec.ClusterMode == "" { broker.Spec.ClusterMode = "STATIC" } - if broker.Spec.ClusterMode == "CONTROLLER" && share.ControllerAccessPoint == "" { + controllerAccessPoint := r.getControllerAccessPoint(broker.Namespace, broker.Spec.RocketMqName) + if broker.Spec.ClusterMode == "CONTROLLER" && controllerAccessPoint == "" { log.Info("Broker Waiting for Controller ready...") return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(cons.RequeueIntervalInSecond) * time.Second}, nil } - share.BrokerClusterName = broker.Name replicaPerGroup := broker.Spec.ReplicaPerGroup - reqLogger.Info("brokerGroupNum=" + strconv.Itoa(share.GroupNum) + ", replicaPerGroup=" + strconv.Itoa(replicaPerGroup)) - for brokerGroupIndex := 0; brokerGroupIndex < share.GroupNum; brokerGroupIndex++ { - reqLogger.Info("Check Broker cluster " + strconv.Itoa(brokerGroupIndex+1) + "/" + strconv.Itoa(share.GroupNum)) - dep := r.getBrokerStatefulSet(broker, brokerGroupIndex, 0) + reqLogger.Info("brokerGroupNum=" + strconv.Itoa(groupNum) + ", replicaPerGroup=" + strconv.Itoa(replicaPerGroup)) + for brokerGroupIndex := 0; brokerGroupIndex < groupNum; brokerGroupIndex++ { + reqLogger.Info("Check Broker cluster " + strconv.Itoa(brokerGroupIndex+1) + "/" + strconv.Itoa(groupNum)) + dep := r.getBrokerStatefulSet(broker, brokerGroupIndex, 0, controllerAccessPoint, nameServersStr) // Check if the statefulSet already exists, if not create a new one found := &appsv1.StatefulSet{} err = r.client.Get(context.TODO(), types.NamespacedName{Name: dep.Name, Namespace: dep.Namespace}, found) @@ -186,7 +188,7 @@ func (r *ReconcileBroker) Reconcile(ctx context.Context, request reconcile.Reque for replicaIndex := 1; replicaIndex <= replicaPerGroup; replicaIndex++ { reqLogger.Info("Check Replica Broker of cluster-" + strconv.Itoa(brokerGroupIndex) + " " + strconv.Itoa(replicaIndex) + "/" + strconv.Itoa(replicaPerGroup)) - replicaDep := r.getBrokerStatefulSet(broker, brokerGroupIndex, replicaIndex) + replicaDep := r.getBrokerStatefulSet(broker, brokerGroupIndex, replicaIndex, controllerAccessPoint, nameServersStr) err = r.client.Get(context.TODO(), types.NamespacedName{Name: replicaDep.Name, Namespace: replicaDep.Namespace}, found) if err != nil && errors.IsNotFound(err) { reqLogger.Info("Creating a new Replica Broker StatefulSet.", "StatefulSet.Namespace", replicaDep.Namespace, "StatefulSet.Name", replicaDep.Name) @@ -202,19 +204,18 @@ func (r *ReconcileBroker) Reconcile(ctx context.Context, request reconcile.Reque // Check for name server scaling if broker.Spec.AllowRestart { - // The following code will restart all brokers to update NAMESRV_ADDR env - if share.IsNameServersStrUpdated { - for brokerGroupIndex := 0; brokerGroupIndex < broker.Spec.Size; brokerGroupIndex++ { - brokerName := getBrokerName(broker, brokerGroupIndex) - // Update master broker - reqLogger.Info("Update Master Broker NAMESRV_ADDR of " + brokerName) - dep := r.getBrokerStatefulSet(broker, brokerGroupIndex, 0) - found := &appsv1.StatefulSet{} - err = r.client.Get(context.TODO(), types.NamespacedName{Name: dep.Name, Namespace: dep.Namespace}, found) - if err != nil { - reqLogger.Error(err, "Failed to get broker master StatefulSet of "+brokerName) - } else { - found.Spec.Template.Spec.Containers[0].Env[0].Value = share.NameServersStr + // The following code will restart all brokers to update NAMESRV_ADDR env if name server list is updated + for brokerGroupIndex := 0; brokerGroupIndex < broker.Spec.Size; brokerGroupIndex++ { + brokerName := getBrokerName(broker, brokerGroupIndex) + dep := r.getBrokerStatefulSet(broker, brokerGroupIndex, 0, controllerAccessPoint, nameServersStr) + found := &appsv1.StatefulSet{} + err = r.client.Get(context.TODO(), types.NamespacedName{Name: dep.Name, Namespace: dep.Namespace}, found) + if err != nil { + reqLogger.Error(err, "Failed to get broker master StatefulSet of "+brokerName) + } else { + // update if name server list is updated + if found.Spec.Template.Spec.Containers[0].Env[0].Value != nameServersStr { + found.Spec.Template.Spec.Containers[0].Env[0].Value = nameServersStr err = r.client.Update(context.TODO(), found) if err != nil { reqLogger.Error(err, "Failed to update NAMESRV_ADDR of master broker "+brokerName, "StatefulSet.Namespace", found.Namespace, "StatefulSet.Name", found.Name) @@ -223,18 +224,19 @@ func (r *ReconcileBroker) Reconcile(ctx context.Context, request reconcile.Reque } time.Sleep(time.Duration(cons.RestartBrokerPodIntervalInSecond) * time.Second) } - // Update replicas brokers - for replicaIndex := 1; replicaIndex <= replicaPerGroup; replicaIndex++ { - reqLogger.Info("Update Replica Broker NAMESRV_ADDR of " + brokerName + " " + strconv.Itoa(replicaIndex) + "/" + strconv.Itoa(replicaPerGroup)) - replicaDep := r.getBrokerStatefulSet(broker, brokerGroupIndex, replicaIndex) - replicaFound := &appsv1.StatefulSet{} - err = r.client.Get(context.TODO(), types.NamespacedName{Name: replicaDep.Name, Namespace: replicaDep.Namespace}, replicaFound) - if err != nil { - reqLogger.Error(err, "Failed to get broker replica StatefulSet of "+brokerName) - } else { + } + // Update replicas brokers + for replicaIndex := 1; replicaIndex <= replicaPerGroup; replicaIndex++ { + replicaDep := r.getBrokerStatefulSet(broker, brokerGroupIndex, replicaIndex, controllerAccessPoint, nameServersStr) + replicaFound := &appsv1.StatefulSet{} + err = r.client.Get(context.TODO(), types.NamespacedName{Name: replicaDep.Name, Namespace: replicaDep.Namespace}, replicaFound) + if err != nil { + reqLogger.Error(err, "Failed to get broker replica StatefulSet of "+brokerName) + } else { + if replicaFound.Spec.Template.Spec.Containers[0].Env[0].Value != nameServersStr { for index := range replicaFound.Spec.Template.Spec.Containers[0].Env { if cons.EnvNameServiceAddress == replicaFound.Spec.Template.Spec.Containers[0].Env[index].Name { - replicaFound.Spec.Template.Spec.Containers[0].Env[index].Value = share.NameServersStr + replicaFound.Spec.Template.Spec.Containers[0].Env[index].Value = nameServersStr break } } @@ -249,7 +251,6 @@ func (r *ReconcileBroker) Reconcile(ctx context.Context, request reconcile.Reque } } } - share.IsNameServersStrUpdated = false } // List the pods for this broker's statefulSet @@ -343,6 +344,27 @@ func (r *ReconcileBroker) Reconcile(ctx context.Context, request reconcile.Reque return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(cons.RequeueIntervalInSecond) * time.Second}, nil } +func (r *ReconcileBroker) getControllerAccessPoint(namespace string, rocketMqName string) string { + controllerList := &rocketmqv1alpha1.ControllerList{} + err := r.client.List(context.TODO(), controllerList, &client.MatchingFields{ + rocketmqv1alpha1.ControllerRocketMqNameIndexKey: rocketMqName + "-" + namespace, + }) + if err != nil { + log.Error(err, "Failed to list controller.", "Controller.Namespace", namespace, "Controller.Name", rocketMqName) + return "" + } + if len(controllerList.Items) != 1 { + return "" + } + + controller := controllerList.Items[0] + + if controller.Status.Size != controller.Spec.Size { + return "" + } + return tool.BuildSvcResourceName(controller.Name) + ":9878" +} + func getCopyMetadataJsonCommand(dir string, sourcePodName string, namespace string, k8s *tool.K8sClient) string { cmdOpts := buildInputCommand(dir) topicsJsonStr, err := exec(cmdOpts, sourcePodName, k8s, namespace) @@ -399,7 +421,7 @@ func getBrokerName(broker *rocketmqv1alpha1.Broker, brokerGroupIndex int) string } // getBrokerStatefulSet returns a broker StatefulSet object -func (r *ReconcileBroker) getBrokerStatefulSet(broker *rocketmqv1alpha1.Broker, brokerGroupIndex int, replicaIndex int) *appsv1.StatefulSet { +func (r *ReconcileBroker) getBrokerStatefulSet(broker *rocketmqv1alpha1.Broker, brokerGroupIndex int, replicaIndex int, controllerAccessPoint, nameServersStr string) *appsv1.StatefulSet { ls := labelsForBroker(broker.Name) var a int32 = 1 var c = &a @@ -460,7 +482,7 @@ func (r *ReconcileBroker) getBrokerStatefulSet(broker *rocketmqv1alpha1.Broker, }, SecurityContext: getContainerSecurityContext(broker), ImagePullPolicy: broker.Spec.ImagePullPolicy, - Env: getENV(broker, replicaIndex, brokerGroupIndex), + Env: getENV(broker, replicaIndex, brokerGroupIndex, controllerAccessPoint, nameServersStr), Ports: []corev1.ContainerPort{{ ContainerPort: cons.BrokerVipContainerPort, Name: cons.BrokerVipContainerPortName, @@ -499,10 +521,10 @@ func (r *ReconcileBroker) getBrokerStatefulSet(broker *rocketmqv1alpha1.Broker, } -func getENV(broker *rocketmqv1alpha1.Broker, replicaIndex int, brokerGroupIndex int) []corev1.EnvVar { +func getENV(broker *rocketmqv1alpha1.Broker, replicaIndex int, brokerGroupIndex int, controllerAccessPoint, nameServersStr string) []corev1.EnvVar { envs := []corev1.EnvVar{{ Name: cons.EnvNameServiceAddress, - Value: share.NameServersStr, + Value: nameServersStr, }, { Name: cons.EnvBrokerId, Value: strconv.Itoa(replicaIndex), @@ -515,7 +537,7 @@ func getENV(broker *rocketmqv1alpha1.Broker, replicaIndex int, brokerGroupIndex }} if broker.Spec.ClusterMode == "CONTROLLER" { envs = append(envs, corev1.EnvVar{Name: cons.EnvEnableControllerMode, Value: "true"}) - envs = append(envs, corev1.EnvVar{Name: cons.EnvControllerAddr, Value: share.ControllerAccessPoint}) + envs = append(envs, corev1.EnvVar{Name: cons.EnvControllerAddr, Value: controllerAccessPoint}) } envs = append(envs, broker.Spec.Env...) return envs diff --git a/pkg/controller/console/console_controller.go b/pkg/controller/console/console_controller.go index 63c3fe30..251c45b8 100644 --- a/pkg/controller/console/console_controller.go +++ b/pkg/controller/console/console_controller.go @@ -124,10 +124,11 @@ func (r *ReconcileConsole) Reconcile(ctx context.Context, request reconcile.Requ return reconcile.Result{}, err } + var nameserverStr string if instance.Spec.NameServers == "" { // wait for name server ready if nameServers is omitted for { - if share.IsNameServersStrInitialized { + if nameserverStr = share.GetNameServersStr(r.client, instance.Namespace, instance.Spec.RocketMqName); nameserverStr != "" { break } else { log.Info("Waiting for name server ready...") @@ -135,10 +136,10 @@ func (r *ReconcileConsole) Reconcile(ctx context.Context, request reconcile.Requ } } } else { - share.NameServersStr = instance.Spec.NameServers + nameserverStr = instance.Spec.NameServers } - consoleDeployment := newDeploymentForCR(instance) + consoleDeployment := newDeploymentForCR(instance, nameserverStr) // Set Console instance as the owner and controller if err := controllerutil.SetControllerReference(instance, consoleDeployment, r.scheme); err != nil { @@ -180,10 +181,10 @@ func (r *ReconcileConsole) Reconcile(ctx context.Context, request reconcile.Requ } // newDeploymentForCR returns a deployment pod with modifying the ENV -func newDeploymentForCR(cr *rocketmqv1alpha1.Console) *appsv1.Deployment { +func newDeploymentForCR(cr *rocketmqv1alpha1.Console, nameServersStr string) *appsv1.Deployment { env := corev1.EnvVar{ Name: "JAVA_OPTS", - Value: fmt.Sprintf("-Drocketmq.namesrv.addr=%s -Dcom.rocketmq.sendMessageWithVIPChannel=false", share.NameServersStr), + Value: fmt.Sprintf("-Drocketmq.namesrv.addr=%s -Dcom.rocketmq.sendMessageWithVIPChannel=false", nameServersStr), } dep := &appsv1.Deployment{ diff --git a/pkg/controller/controller/dledger_controller.go b/pkg/controller/controller/dledger_controller.go index 1056f244..bc6ed2e7 100644 --- a/pkg/controller/controller/dledger_controller.go +++ b/pkg/controller/controller/dledger_controller.go @@ -29,7 +29,6 @@ import ( rocketmqv1alpha1 "github.com/apache/rocketmq-operator/pkg/apis/rocketmq/v1alpha1" cons "github.com/apache/rocketmq-operator/pkg/constants" - "github.com/apache/rocketmq-operator/pkg/share" "github.com/apache/rocketmq-operator/pkg/tool" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -64,6 +63,19 @@ func newReconciler(mgr manager.Manager) reconcile.Reconciler { // add adds a new Controller to mgr with r as the reconcile.Reconciler func add(mgr manager.Manager, r reconcile.Reconciler) error { + err := mgr.GetCache().IndexField(context.TODO(), &rocketmqv1alpha1.Controller{}, rocketmqv1alpha1.ControllerRocketMqNameIndexKey, + func(rawObj client.Object) []string { + c, ok := rawObj.(*rocketmqv1alpha1.Controller) + if !ok { + return nil + } + return []string{c.Spec.RocketMqName + "-" + c.Namespace} + }, + ) + if err != nil { + return err + } + // Create a new controller c, err := controller.New("dledger-controller", mgr, controller.Options{Reconciler: r}) if err != nil { @@ -224,8 +236,6 @@ func (r *ReconcileController) Reconcile(ctx context.Context, request reconcile.R return reconcile.Result{}, err } } - share.ControllerAccessPoint = controllerSvcName + ":9878" - return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(cons.RequeueIntervalInSecond) * time.Second}, nil } diff --git a/pkg/controller/nameservice/nameservice_controller.go b/pkg/controller/nameservice/nameservice_controller.go index ece34de1..f4f997ca 100644 --- a/pkg/controller/nameservice/nameservice_controller.go +++ b/pkg/controller/nameservice/nameservice_controller.go @@ -30,7 +30,6 @@ import ( rocketmqv1alpha1 "github.com/apache/rocketmq-operator/pkg/apis/rocketmq/v1alpha1" cons "github.com/apache/rocketmq-operator/pkg/constants" - "github.com/apache/rocketmq-operator/pkg/share" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -68,6 +67,19 @@ func newReconciler(mgr manager.Manager) reconcile.Reconciler { // add adds a new Controller to mgr with r as the reconcile.Reconciler func add(mgr manager.Manager, r reconcile.Reconciler) error { + err := mgr.GetFieldIndexer().IndexField(context.TODO(), &rocketmqv1alpha1.NameService{}, rocketmqv1alpha1.NameServiceRocketMqNameIndexKey, + func(rawObj client.Object) []string { + n, ok := rawObj.(*rocketmqv1alpha1.NameService) + if !ok { + return nil + } + return []string{n.Spec.RocketMqName + "-" + n.Namespace} + }, + ) + if err != nil { + return err + } + // Create a new controller c, err := controller.New("nameservice-controller", mgr, controller.Options{Reconciler: r}) if err != nil { @@ -187,7 +199,7 @@ func (r *ReconcileNameService) updateNameServiceStatus(instance *rocketmqv1alpha for _, value := range hostIps { nameServerListStr = nameServerListStr + value + ":9876;" } - + newNameServerListStr := "" // Update status.NameServers if needed if !reflect.DeepEqual(hostIps, instance.Status.NameServers) { oldNameServerListStr := "" @@ -195,14 +207,15 @@ func (r *ReconcileNameService) updateNameServiceStatus(instance *rocketmqv1alpha oldNameServerListStr = oldNameServerListStr + value + ":9876;" } - share.NameServersStr = nameServerListStr[:len(nameServerListStr)-1] - reqLogger.Info("share.NameServersStr:" + share.NameServersStr) + newNameServerListStr = nameServerListStr[:len(nameServerListStr)-1] + reqLogger.Info("newNameServersStr:" + newNameServerListStr) + var isNameServersStrUpdated bool if len(oldNameServerListStr) <= cons.MinIpListLength { - oldNameServerListStr = share.NameServersStr - } else if len(share.NameServersStr) > cons.MinIpListLength { + oldNameServerListStr = newNameServerListStr + } else if len(newNameServerListStr) > cons.MinIpListLength { oldNameServerListStr = oldNameServerListStr[:len(oldNameServerListStr)-1] - share.IsNameServersStrUpdated = true + isNameServersStrUpdated = true } reqLogger.Info("oldNameServerListStr:" + oldNameServerListStr) @@ -216,17 +229,40 @@ func (r *ReconcileNameService) updateNameServiceStatus(instance *rocketmqv1alpha } // use admin tool to update broker config - if share.IsNameServersStrUpdated && (len(oldNameServerListStr) > cons.MinIpListLength) && (len(share.NameServersStr) > cons.MinIpListLength) { + if isNameServersStrUpdated && (len(oldNameServerListStr) > cons.MinIpListLength) && (len(newNameServerListStr) > cons.MinIpListLength) { + // bash-4.4$ ./mqadmin clusterList -n 192.168.180.36:9876 + // #Cluster Name #Broker Name #BID #Addr #Version #InTPS(LOAD) #OutTPS(LOAD) #PCWait(ms) #Hour #SPACE + // broker broker-0 0 192.168.180.40:10911 V4_5_0 0.00(0,0ms) 0.00(0,0ms) 0 471030.34 -1.0000 + // broker broker-0 1 192.168.137.89:10911 V4_5_0 0.00(0,0ms) 0.00(0,0ms) 0 471030.34 0.2673 + clusterListCmd := exec.Command("sh", cons.AdminToolDir, cons.ClusterList, "-n", oldNameServerListStr) + clusterListOutput, err := clusterListCmd.Output() + if err != nil { + reqLogger.Error(err, "Get cluster list failed, command: "+cons.AdminToolDir+" "+cons.ClusterList+" -n "+oldNameServerListStr) + return reconcile.Result{Requeue: true}, err + } + // get cluster of output + clusterName := "" + for _, line := range strings.Split(string(clusterListOutput), "\n") { + if strings.HasPrefix(line, "#Cluster Name") { + continue + } + for _, f := range strings.Fields(line) { + clusterName = f + break + } + } + if clusterName == "" { + reqLogger.Error(err, "Get empty cluster name, command: "+cons.AdminToolDir+" "+cons.ClusterList+" -n "+oldNameServerListStr) + return reconcile.Result{Requeue: true}, err + } + mqAdmin := cons.AdminToolDir subCmd := cons.UpdateBrokerConfig key := cons.ParamNameServiceAddress - reqLogger.Info("share.GroupNum=broker.Spec.Size=" + strconv.Itoa(share.GroupNum)) - - clusterName := share.BrokerClusterName reqLogger.Info("Updating config " + key + " of cluster" + clusterName) - command := mqAdmin + " " + subCmd + " -c " + clusterName + " -k " + key + " -n " + oldNameServerListStr + " -v " + share.NameServersStr - cmd := exec.Command("sh", mqAdmin, subCmd, "-c", clusterName, "-k", key, "-n", oldNameServerListStr, "-v", share.NameServersStr) + command := mqAdmin + " " + subCmd + " -c " + clusterName + " -k " + key + " -n " + oldNameServerListStr + " -v " + newNameServerListStr + cmd := exec.Command("sh", mqAdmin, subCmd, "-c", clusterName, "-k", key, "-n", oldNameServerListStr, "-v", newNameServerListStr) output, err := cmd.Output() if err != nil { reqLogger.Error(err, "Update Broker config "+key+" failed of cluster "+clusterName+", command: "+command) @@ -241,16 +277,6 @@ func (r *ReconcileNameService) updateNameServiceStatus(instance *rocketmqv1alpha reqLogger.Info("NameServers IP " + strconv.Itoa(i) + ": " + value) } - runningNameServerNum := getRunningNameServersNum(podList.Items) - if runningNameServerNum == instance.Spec.Size { - share.IsNameServersStrInitialized = true - share.NameServersStr = nameServerListStr // reassign if operator restarts - } - - reqLogger.Info("Share variables", "GroupNum", share.GroupNum, - "NameServersStr", share.NameServersStr, "IsNameServersStrUpdated", share.IsNameServersStrUpdated, - "IsNameServersStrInitialized", share.IsNameServersStrInitialized, "BrokerClusterName", share.BrokerClusterName) - if requeue { return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(cons.RequeueIntervalInSecond) * time.Second}, nil } diff --git a/pkg/controller/topictransfer/topictransfer_controller.go b/pkg/controller/topictransfer/topictransfer_controller.go index 5f212a47..8856b846 100644 --- a/pkg/controller/topictransfer/topictransfer_controller.go +++ b/pkg/controller/topictransfer/topictransfer_controller.go @@ -128,7 +128,7 @@ func (r *ReconcileTopicTransfer) Reconcile(ctx context.Context, request reconcil targetCluster := topicTransfer.Spec.TargetCluster sourceCluster := topicTransfer.Spec.SourceCluster - nameServer := strings.Split(share.NameServersStr, ";")[0] + nameServer := strings.Split(share.GetNameServersStr(r.client, topicTransfer.Namespace, topicTransfer.Spec.RocketMqName), ";")[0] if len(nameServer) < cons.MinIpListLength { reqLogger.Info("There is no available name server now thus the topic transfer process is terminated.") // terminate the transfer process diff --git a/pkg/share/share.go b/pkg/share/share.go index 0656bc28..5ff86b52 100644 --- a/pkg/share/share.go +++ b/pkg/share/share.go @@ -18,22 +18,61 @@ // Package share defines some variables shared by different packages package share -var ( - // GroupNum is the number of broker group - GroupNum = 0 +import ( + "context" + "sort" + "strings" - // NameServersStr is the name server list - NameServersStr = "" + rocketmqv1alpha1 "github.com/apache/rocketmq-operator/pkg/apis/rocketmq/v1alpha1" + "github.com/apache/rocketmq-operator/pkg/tool" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "sigs.k8s.io/controller-runtime/pkg/client" +) - // IsNameServersStrUpdated is whether the name server list is updated - IsNameServersStrUpdated = false +func GetNameServersStr(r client.Reader, namespace, rocketMqName string) string { + nameserviceList := &rocketmqv1alpha1.NameServiceList{} + err := r.List(context.TODO(), nameserviceList, &client.MatchingFields{ + rocketmqv1alpha1.NameServiceRocketMqNameIndexKey: rocketMqName + "-" + namespace, + }) + if err != nil { + return "" + } + if len(nameserviceList.Items) != 1 { + return "" + } - // IsNameServersStrInitialized is whether the name server list is initialized - IsNameServersStrInitialized = false + nameservice := nameserviceList.Items[0] + labelSelector := labels.SelectorFromSet(tool.LabelsForNameService(nameservice.Name)) + listOps := &client.ListOptions{ + Namespace: nameservice.Namespace, + LabelSelector: labelSelector, + } + podList := &corev1.PodList{} + err = r.List(context.Background(), podList, listOps) + if err != nil { + return "" + } + if len(podList.Items) == 0 { + return "" + } - // BrokerClusterName is the broker cluster name - BrokerClusterName = "" + var hostIps []string + for _, pod := range podList.Items { + if pod.Status.Phase == corev1.PodRunning && !strings.EqualFold(pod.Status.PodIP, "") { + hostIps = append(hostIps, pod.Status.PodIP) + } + } + if len(hostIps) == 0 { + return "" + } - // svc of controller for brokers - ControllerAccessPoint = "" -) + sort.Strings(hostIps) + + nameServerListStr := "" + for _, value := range hostIps { + nameServerListStr = nameServerListStr + value + ":9876;" + } + + return nameServerListStr[:len(nameServerListStr)-1] +} diff --git a/pkg/tool/resource_name.go b/pkg/tool/resource_name.go index 7862e620..193f47e4 100644 --- a/pkg/tool/resource_name.go +++ b/pkg/tool/resource_name.go @@ -26,3 +26,7 @@ func BuildHeadlessSvcResourceName(name string) string { func BuildSvcResourceName(name string) string { return fmt.Sprintf("%s-svc", name) } + +func LabelsForNameService(name string) map[string]string { + return map[string]string{"app": "name_service", "name_service_cr": name} +}