Skip to content

Commit

Permalink
Op stop subcluster (#969)
Browse files Browse the repository at this point in the history
Add planned subcluster shutdown by setting subcluster shutdown to true.

---------

Co-authored-by: roypaulin <[email protected]>
  • Loading branch information
HaoYang0000 and roypaulin authored Nov 14, 2024
1 parent 08ae373 commit 68fd1d0
Show file tree
Hide file tree
Showing 54 changed files with 1,653 additions and 22 deletions.
5 changes: 5 additions & 0 deletions api/v1/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,11 @@ func (v *VerticaDB) GetCreateDBNodeStartTimeout() int {
return vmeta.GetCreateDBNodeStartTimeout(v.Annotations)
}

// GetShutdownDrainSeconds returns time in seconds to wait for a subcluster/database users' disconnection
func (v *VerticaDB) GetShutdownDrainSeconds() int {
return vmeta.GetShutdownDrainSeconds(v.Annotations)
}

// IsNMASideCarDeploymentEnabled returns true if the conditions to run NMA
// in a sidecar are met
func (v *VerticaDB) IsNMASideCarDeploymentEnabled() bool {
Expand Down
5 changes: 5 additions & 0 deletions changes/unreleased/Added-20241112-143404.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
kind: Added
body: Support planned subcluster shutdown.
time: 2024-11-12T14:34:04.504040638+01:00
custom:
Issue: "969"
4 changes: 3 additions & 1 deletion pkg/controllers/sandbox/sandbox_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,11 @@ func (r *SandboxConfigMapReconciler) constructActors(vdb *v1.VerticaDB, log logr
vdbcontroller.MakeAnnotateAndLabelPodReconciler(r, log, vdb, pfacts),
// Stop Vertica in the sandbox if the shutdown state is true
vdbcontroller.MakeStopDBReconciler(r, vdb, prunner, pfacts, dispatcher),
// Stop subclusters that have shutdown set to true.
vdbcontroller.MakeSubclusterShutdownReconciler(r, log, vdb, dispatcher, pfacts),
// Restart any down pods
vdbcontroller.MakeRestartReconciler(r, log, vdb, prunner, pfacts, true, dispatcher),
// Update the vdb status including subclusters[].shutdown, after a stopdb
// Update the vdb status including subclusters[].shutdown, after a stop_db, stop_sc
// or a restart
vdbcontroller.MakeStatusReconcilerWithShutdown(r.Client, r.Scheme, log, vdb, pfacts),
// Scale down the subclusters' statefulsets to zero after the subclusters are shut down
Expand Down
11 changes: 11 additions & 0 deletions pkg/controllers/vdb/dbaddnode_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,15 @@ func (d *DBAddNodeReconciler) Reconcile(ctx context.Context, _ *ctrl.Request) (c
return ctrl.Result{Requeue: true}, nil
}

scStatusMap := d.Vdb.GenSubclusterStatusMap()
for i := range d.Vdb.Spec.Subclusters {
sc := &d.Vdb.Spec.Subclusters[i]
scStatus, found := scStatusMap[sc.Name]
if found && scStatus.Shutdown {
// subclusters that have been shut down must
// be ignored.
continue
}
// Recollect pod facts to ensure correct options are used in AddNode()
if err := d.PFacts.Collect(ctx, d.Vdb); err != nil {
return ctrl.Result{}, err
Expand All @@ -101,6 +109,9 @@ func (d *DBAddNodeReconciler) findAddNodePods(scName string) ([]*podfacts.PodFac
continue
}
if !v.GetDBExists() {
if v.GetShutdown() {
continue
}
if !v.GetIsPodRunning() || !v.GetIsInstalled() {
// We want to group all of the add nodes in a single admintools call.
// Doing so limits the impact on any running queries. So if there is at
Expand Down
5 changes: 5 additions & 0 deletions pkg/controllers/vdb/depobjcheck_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ func (d *DepObjCheckReconciler) checkSts(ctx context.Context, sc *vapi.Subcluste
}

func (d *DepObjCheckReconciler) checkPods(ctx context.Context, sc *vapi.Subcluster) (ctrl.Result, error) {
scStatus, found := d.Vdb.GenSubclusterStatusMap()[sc.Name]
// Ignore subclusters that are shut down
if sc.Shutdown || (found && scStatus.Shutdown) {
return ctrl.Result{}, nil
}
for i := int32(0); i < sc.Size; i++ {
if res, err := d.checkObj(ctx, "Pod", names.GenPodName(d.Vdb, sc, i), &corev1.Pod{}); verrors.IsReconcileAborted(res, err) {
return res, err
Expand Down
5 changes: 5 additions & 0 deletions pkg/controllers/vdb/resizepv_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,12 @@ func (r *ResizePVReconcile) Reconcile(ctx context.Context, _ *ctrl.Request) (ctr
}

returnRes := ctrl.Result{}
scStatusMap := r.Vdb.GenSubclusterStatusMap()
for _, pf := range r.PFacts.Detail {
scStatus, found := scStatusMap[pf.GetSubclusterName()]
if pf.GetShutdown() || (found && scStatus.Shutdown) {
continue
}
if res, err := r.reconcilePod(ctx, pf); verrors.IsReconcileAborted(res, err) {
// Errors always abort right away. But if we get a requeue, we
// will remember this and go onto the next pod
Expand Down
25 changes: 13 additions & 12 deletions pkg/controllers/vdb/shutdownspec_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,30 +65,31 @@ func (r *ShutdownSpecReconciler) updateSubclustersShutdownStateCallback() (bool,
sb := &r.Vdb.Spec.Sandboxes[i]
for j := range sb.Subclusters {
sc := scMap[sb.Subclusters[j].Name]
// Proceed only if subcluster's shutdown is not equal
// to sandbox's shutdown
if sb.Shutdown == sc.Shutdown {
continue
}
if sb.Shutdown {
if sc.Annotations == nil {
sc.Annotations = make(map[string]string, 1)
}
// Add a label that indicate the shutdown/restart is controlled
// by the sandbox as opposed to the subcluster. It helps
// differentiate this case from when the user is explicitly
// changing the subcluster's shutdown field.
sc.Annotations[vmeta.ShutdownDrivenBySandbox] = "true"
if _, ok := sc.Annotations[vmeta.ShutdownDrivenBySandbox]; !ok {
// Add a label that indicate the shutdown/restart is controlled
// by the sandbox as opposed to the subcluster. It helps
// differentiate this case from when the user is explicitly
// changing the subcluster's shutdown field.
sc.Annotations[vmeta.ShutdownDrivenBySandbox] = "true"
needUpdate = true
}
} else {
// If the shutdown/restart is not controlled by the sandbox,
// we skip to the next subcluster.
if !vmeta.GetShutdownDrivenBySandbox(sc.Annotations) {
continue
}
delete(sc.Annotations, vmeta.ShutdownDrivenBySandbox)
needUpdate = true
}
if sb.Shutdown != sc.Shutdown {
sc.Shutdown = sb.Shutdown
needUpdate = true
}
sc.Shutdown = sb.Shutdown
needUpdate = true
}
}
return needUpdate, nil
Expand Down
1 change: 1 addition & 0 deletions pkg/controllers/vdb/stopdb_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func (s *StopDBReconciler) runATCmd(ctx context.Context, initiatorName types.Nam
stopdb.WithInitiator(initiatorName, initiatorIP),
stopdb.WithSandbox(s.PFacts.GetSandboxName()),
stopdb.WithZeroDrain(false),
stopdb.WithDrainSeconds(s.Vdb.GetShutdownDrainSeconds()),
}
start := time.Now()
if err := s.Dispatcher.StopDB(ctx, opts...); err != nil {
Expand Down
181 changes: 181 additions & 0 deletions pkg/controllers/vdb/subclustershutdown_reconciler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
(c) Copyright [2021-2024] Open Text.
Licensed under the Apache License, Version 2.0 (the "License");
You may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package vdb

import (
"context"
"fmt"
"strings"

"github.com/go-logr/logr"
vapi "github.com/vertica/vertica-kubernetes/api/v1"
"github.com/vertica/vertica-kubernetes/pkg/controllers"
"github.com/vertica/vertica-kubernetes/pkg/events"
"github.com/vertica/vertica-kubernetes/pkg/podfacts"
"github.com/vertica/vertica-kubernetes/pkg/vadmin"
"github.com/vertica/vertica-kubernetes/pkg/vadmin/opts/stopsubcluster"
config "github.com/vertica/vertica-kubernetes/pkg/vdbconfig"
corev1 "k8s.io/api/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
)

// SubclusterShutdownReconciler will handle the process when subclusters
// needs to be shut down or restart
type SubclusterShutdownReconciler struct {
VRec config.ReconcilerInterface
Log logr.Logger
Vdb *vapi.VerticaDB // Vdb is the CRD we are acting on
Dispatcher vadmin.Dispatcher
PFacts *podfacts.PodFacts
}

// MakeSubclusterShutdownReconciler will build a SubclusterShutdownReconciler object
func MakeSubclusterShutdownReconciler(recon config.ReconcilerInterface, log logr.Logger,
vdb *vapi.VerticaDB, dispatcher vadmin.Dispatcher, pfacts *podfacts.PodFacts) controllers.ReconcileActor {
return &SubclusterShutdownReconciler{
VRec: recon,
Log: log.WithName("SubclusterShutdownReconciler"),
Vdb: vdb,
Dispatcher: dispatcher,
PFacts: pfacts,
}
}

func (s *SubclusterShutdownReconciler) Reconcile(ctx context.Context, _ *ctrl.Request) (ctrl.Result, error) {
// no-op for ScheduleOnly init policy or enterprise db
if s.Vdb.Spec.InitPolicy == vapi.CommunalInitPolicyScheduleOnly || !s.Vdb.IsEON() {
return ctrl.Result{}, nil
}

if err := s.PFacts.Collect(ctx, s.Vdb); err != nil {
return ctrl.Result{}, err
}
subclusters, err := s.getSubclustersToShutdown()
if err != nil {
return ctrl.Result{}, err
}
for scName, initIP := range subclusters {
err := s.PFacts.RemoveStartupFileInSubclusterPods(ctx, scName, "removed startup.json before stop_subcluster")
if err != nil {
return ctrl.Result{}, err
}
err = s.runStopSubclusterVclusterAPI(ctx, scName, initIP)
if err != nil {
return ctrl.Result{}, err
}
s.PFacts.Invalidate()
}
return ctrl.Result{}, nil
}

// getSubclustersToShutdown returns the subclusters that need to get
// shut down
func (s *SubclusterShutdownReconciler) getSubclustersToShutdown() (map[string]string, error) {
subclusters := map[string]string{}
primarySubclusters := []string{}
upPrimaryNodes := 0
willLoseQuorum := false
scSbMap := s.Vdb.GenSubclusterSandboxMap()
scStatusMap := s.Vdb.GenSubclusterStatusMap()
sbMap := s.Vdb.GenSandboxMap()
s.Log.Info(fmt.Sprintf("Collecting subclusters to shut down in %s", s.PFacts.GetClusterExtendedName()),
"sandbox", s.PFacts.GetSandboxName())
for i := range s.Vdb.Spec.Subclusters {
sc := &s.Vdb.Spec.Subclusters[i]
sandbox := scSbMap[sc.Name]
if sandbox != s.PFacts.GetSandboxName() {
s.Log.Info(fmt.Sprintf("Skipping subcluster because it is not in %s", s.PFacts.GetClusterExtendedName()),
"subcluster", sc.Name, "sandbox", s.PFacts.GetSandboxName())
continue
}
// no-op if the subcluster is not marked for
// shutdown
if !sc.Shutdown {
continue
}
if sandbox != vapi.MainCluster {
sb := sbMap[sandbox]
// no-op if the subcluster shutdown is driven
// by its sandbox
if sb != nil && sb.Shutdown {
continue
}
}
if s.PFacts.IsDBReadOnly() {
return subclusters, fmt.Errorf("cannot shutdown subcluster because %s is read-only", s.PFacts.GetClusterExtendedName())
}
hostIP, ok := s.PFacts.FindFirstUpPodIP(false, sc.Name)
if !ok {
scStatus := scStatusMap[sc.Name]
if scStatus == nil {
return subclusters, fmt.Errorf("subcluster %q not found in status", sc.Name)
}
if !scStatus.Shutdown {
s.Log.Info("Subcluster nodes are already all down, and were not shutdown gracefully.", "subcluster", sc.Name)
}
continue
}
if sc.IsPrimary() {
primarySubclusters = append(primarySubclusters, sc.Name)
upPrimaryNodes += s.PFacts.GetSubclusterUpNodeCount(sc.Name)
// If stopping a subcluster would cause cluster quorum, we abort
// the operation
if !s.PFacts.DoesDBHaveQuorum(upPrimaryNodes) {
willLoseQuorum = true
break
}
}
subclusters[sc.Name] = hostIP
}
if willLoseQuorum {
// TODO: we may remove this once we find a proper way to handle quorum loss
s.VRec.Eventf(s.Vdb, corev1.EventTypeWarning, events.ClusterWillLoseQuorum,
"Shutting down subclusters %s will cause quorum loss.", strings.Join(primarySubclusters, ","))
return subclusters, fmt.Errorf("cannot shut down primaries %s because it will cause quorum loss. "+
"please revert back", strings.Join(primarySubclusters, ","))
}
return subclusters, nil
}

// runStopSubclusterVclusterAPI will do the actual execution of stop subcluster.
// This handles logging of necessary events.
func (s *SubclusterShutdownReconciler) runStopSubclusterVclusterAPI(ctx context.Context, scName, host string) error {
opts := s.genStopSubclusterOpts(host, scName)
s.VRec.Eventf(s.Vdb, corev1.EventTypeNormal, events.StopSubclusterStart, "Starting stop subcluster %q.",
scName)

err := s.Dispatcher.StopSubcluster(ctx, opts...)
if err != nil {
// For all other errors, return error
s.VRec.Eventf(s.Vdb, corev1.EventTypeWarning, events.StopSubclusterFailed,
"Failed to stop subcluster %q", scName)
return err
}

s.VRec.Eventf(s.Vdb, corev1.EventTypeNormal, events.StopSubclusterSucceeded,
"Successfully stopped subcluster %q.", scName)
return nil
}

// genStopSubclusterOpts will return the options to use with the stop subcluster api
func (s *SubclusterShutdownReconciler) genStopSubclusterOpts(initiatorIP, scName string) []stopsubcluster.Option {
opts := []stopsubcluster.Option{
stopsubcluster.WithInitiator(initiatorIP),
stopsubcluster.WithSCName(scName),
stopsubcluster.WithDrainSeconds(s.Vdb.GetShutdownDrainSeconds()),
}
return opts
}
Loading

0 comments on commit 68fd1d0

Please sign in to comment.