Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tenant emergency move to SansShell fdbcli modules #285

Merged
merged 36 commits into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
4e1240e
add tenant emergency move to fdbcli modules
sfc-gh-jfu Aug 2, 2023
491e24a
add status command
sfc-gh-jfu Aug 2, 2023
dfb88f0
partial implementation of FDB data move
sfc-gh-jfu Aug 3, 2023
8b16274
fix incorrect copy/paste
sfc-gh-jfu Aug 3, 2023
4800877
add server for fdbdatamove
sfc-gh-jfu Aug 3, 2023
33e29fb
add numProcs argument
sfc-gh-jfu Aug 3, 2023
e28bf33
register package
sfc-gh-jfu Aug 3, 2023
577ca24
WIP for env variables
sfc-gh-jfu Aug 3, 2023
3548e76
WIP implementation of start and wait
sfc-gh-jfu Aug 4, 2023
50a8ed8
look for error on cmd.Wait()
sfc-gh-jfu Aug 4, 2023
8ccbe66
WIP implementation of streaming stdout/stderr
sfc-gh-jfu Aug 7, 2023
791d814
move mu unlock location and add link to script location
sfc-gh-jfu Aug 7, 2023
098139b
try to release mutex deliberately and fix string message
sfc-gh-jfu Aug 7, 2023
c0a4391
fix streaming stdout/stderr to client
sfc-gh-jfu Aug 8, 2023
dc468e3
add test file
sfc-gh-jfu Aug 9, 2023
b62f6a4
adjust test case
sfc-gh-jfu Aug 9, 2023
cf46c36
Merge branch 'main' of github.com:Snowflake-Labs/sansshell into jfu-t…
sfc-gh-jfu Aug 9, 2023
fd6a242
wrap clear operations with mutex
sfc-gh-jfu Aug 9, 2023
ff5c452
Merge branch 'main' of github.com:Snowflake-Labs/sansshell into jfu-t…
sfc-gh-jfu Aug 9, 2023
fb175d2
defer unlock and reorder test to be easier to follow
sfc-gh-jfu Aug 9, 2023
738e2c0
avoid directly overwriting pipes
sfc-gh-jfu Aug 9, 2023
268d6f7
copy stdout/stderr before passing to anonymous goroutine
sfc-gh-jfu Aug 9, 2023
daf085e
Merge branch 'main' of github.com:Snowflake-Labs/sansshell into jfu-t…
sfc-gh-jfu Aug 9, 2023
d912137
avoid duplicate error message
sfc-gh-jfu Aug 9, 2023
e4a533b
account for EOF test result in races
sfc-gh-jfu Aug 9, 2023
56edee7
revert test change and use sync.WaitGroup
sfc-gh-jfu Aug 9, 2023
2d562b4
move Wait ahead of command Wait
sfc-gh-jfu Aug 10, 2023
bffa5b4
address code review comments
sfc-gh-jfu Aug 10, 2023
c518725
Merge branch 'main' of github.com:Snowflake-Labs/sansshell into jfu-t…
sfc-gh-jfu Aug 10, 2023
33a6c87
update unit test
sfc-gh-jfu Aug 10, 2023
6979511
Merge branch 'main' of github.com:Snowflake-Labs/sansshell into jfu-t…
sfc-gh-jfu Aug 10, 2023
29605f1
update module with finalized paths
sfc-gh-jfu Aug 10, 2023
143c33b
add more log lines
sfc-gh-jfu Aug 11, 2023
d6050fc
add more logging
sfc-gh-jfu Aug 11, 2023
eeec0ca
Merge branch 'main' of github.com:Snowflake-Labs/sansshell into jfu-t…
sfc-gh-jfu Aug 11, 2023
e3b8a7e
merge log lines
sfc-gh-jfu Aug 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/sansshell-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func init() {
fdbCLIEnvList.Target = &fdbserver.FDBCLIEnvList
*fdbCLIEnvList.Target = append(*fdbCLIEnvList.Target, "") // To set a default
flag.Var(&fdbCLIEnvList, "fdbcli-env-list", "List of environment variable names (separated by comma) to retain before fork/exec'ing fdbcli")
flag.StringVar(&fdbserver.FDBMoveOrchestrator, "fdb-move-orchestrator", "/usr/bin/fdb_move_orchestrator.py", "Path to python data movement script.")

flag.StringVar(&mtlsFlags.ClientCertFile, "client-cert", mtlsFlags.ClientCertFile, "Path to this client's x509 cert, PEM format")
flag.StringVar(&mtlsFlags.ClientKeyFile, "client-key", mtlsFlags.ClientKeyFile, "Path to this client's key")
Expand Down
295 changes: 295 additions & 0 deletions services/fdb/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"path/filepath"
"strconv"
"strings"
"time"

"github.com/google/subcommands"
"google.golang.org/protobuf/types/known/durationpb"
Expand Down Expand Up @@ -100,6 +101,7 @@ func (*fdbCmd) GetSubpackage(f *flag.FlagSet) *subcommands.Commander {
c.Register(&fdbCLICmd{}, "")
c.Register(&fdbConfCmd{}, "")
c.Register(&fdbServerCmd{}, "")
c.Register(&fdbMoveDataCmd{}, "")

return c
}
Expand Down Expand Up @@ -163,6 +165,7 @@ func (*fdbCLICmd) GetSubpackage(f *flag.FlagSet) *subcommands.Commander {
c.Register(&fdbCLISnapshotCmd{}, "")
c.Register(&fdbCLIStatusCmd{}, "")
c.Register(&fdbCLISuspendCmd{}, "")
c.Register(&fdbCLITenantEmergencyMoveCmd{}, "")
c.Register(&fdbCLIThrottleCmd{}, "")
c.Register(&fdbCLITriggerddteaminfologCmd{}, "")
c.Register(&fdbCLITssqCmd{}, "")
Expand Down Expand Up @@ -2610,6 +2613,108 @@ func (r *fdbCLISuspendCmd) Execute(ctx context.Context, f *flag.FlagSet, args ..
return subcommands.ExitSuccess
}

type fdbCLITenantEmergencyMoveCmd struct {
req *pb.FDBCLITenantEmergencyMove
}

func (*fdbCLITenantEmergencyMoveCmd) Name() string { return "tenant_emergency_move" }
func (*fdbCLITenantEmergencyMoveCmd) Synopsis() string {
return "Utility commands for handling offline emergency tenant movement."
}
func (p *fdbCLITenantEmergencyMoveCmd) Usage() string {
return `tenant_emergency_move start <capacityGroupIdentifier> <sourceClusterName> <destinationClusterName>
tenant_emergency_move switch <capacityGroupIdentifier> <sourceClusterName> <destinationClusterName>
tenant_emergency_move stop <capacityGroupIdentifier> <sourceClusterName> <destinationClusterName>
tenant_emergency_move abort <capacityGroupIdentifier> <sourceClusterName> <destinationClusterName>
tenant_emergency_move status <capacityGroupIdentifier>
`
}

func (r *fdbCLITenantEmergencyMoveCmd) SetFlags(f *flag.FlagSet) {
r.req = &pb.FDBCLITenantEmergencyMove{}
}

func (r *fdbCLITenantEmergencyMoveCmd) Execute(ctx context.Context, f *flag.FlagSet, args ...interface{}) subcommands.ExitStatus {
req := args[1].(*pb.FDBCLIRequest)

if f.NArg() < 2 || f.NArg() > 4 || anyEmpty(f.Args()) {
fmt.Fprintln(os.Stderr, "usage: ", r.Usage())
return subcommands.ExitFailure
}
switch f.Arg(0) {
case "start":
if f.NArg() != 4 {
fmt.Fprintln(os.Stderr, "usage: ", r.Usage())
return subcommands.ExitFailure
}
r.req.Request = &pb.FDBCLITenantEmergencyMove_Start{
Start: &pb.FDBCLITenantEmergencyMoveStart{
TenantGroup: f.Arg(1),
SourceCluster: f.Arg(2),
DestinationCluster: f.Arg(3),
},
}
case "switch":
if f.NArg() != 4 {
fmt.Fprintln(os.Stderr, "usage: ", r.Usage())
return subcommands.ExitFailure
}
r.req.Request = &pb.FDBCLITenantEmergencyMove_Switch{
Switch: &pb.FDBCLITenantEmergencyMoveSwitch{
TenantGroup: f.Arg(1),
SourceCluster: f.Arg(2),
DestinationCluster: f.Arg(3),
},
}
case "finish":
if f.NArg() != 4 {
fmt.Fprintln(os.Stderr, "usage: ", r.Usage())
return subcommands.ExitFailure
}
r.req.Request = &pb.FDBCLITenantEmergencyMove_Finish{
Finish: &pb.FDBCLITenantEmergencyMoveFinish{
TenantGroup: f.Arg(1),
SourceCluster: f.Arg(2),
DestinationCluster: f.Arg(3),
},
}
case "abort":
if f.NArg() != 4 {
fmt.Fprintln(os.Stderr, "usage: ", r.Usage())
return subcommands.ExitFailure
}
r.req.Request = &pb.FDBCLITenantEmergencyMove_Abort{
Abort: &pb.FDBCLITenantEmergencyMoveAbort{
TenantGroup: f.Arg(1),
SourceCluster: f.Arg(2),
DestinationCluster: f.Arg(3),
},
}
case "status":
if f.NArg() != 2 {
fmt.Fprintln(os.Stderr, "usage: ", r.Usage())
return subcommands.ExitFailure
}
r.req.Request = &pb.FDBCLITenantEmergencyMove_Status{
Status: &pb.FDBCLITenantEmergencyMoveStatus{
TenantGroup: f.Arg(1),
},
}
default:
fmt.Fprintln(os.Stderr, "usage: ", r.Usage())
return subcommands.ExitFailure
}

req.Commands = append(req.Commands,
&pb.FDBCLICommand{
Command: &pb.FDBCLICommand_TenantEmergencyMove{
TenantEmergencyMove: r.req,
},
})

return subcommands.ExitSuccess
}

type fdbCLIThrottleCmd struct {
req *pb.FDBCLIThrottle
}
Expand Down Expand Up @@ -3461,3 +3566,193 @@ func (p *fdbServerVersionCmd) Execute(ctx context.Context, f *flag.FlagSet, args

return retCode
}

const fdbMoveDataCLIPackage = "fdbmovedata"

func GetSubpackage(f *flag.FlagSet) *subcommands.Commander {
sfc-gh-jfu marked this conversation as resolved.
Show resolved Hide resolved
c := client.SetupSubpackage(fdbMoveDataCLIPackage, f)
c.Register(&fdbMoveDataCopyCmd{}, "")
c.Register(&fdbMoveDataWaitCmd{}, "")
return c
}

// This context detachment is temporary until we use go1.21 and context.WithoutCancel is available.
sfc-gh-jfu marked this conversation as resolved.
Show resolved Hide resolved
type noCancel struct {
ctx context.Context
}

func (c noCancel) Deadline() (time.Time, bool) { return time.Time{}, false }
func (c noCancel) Done() <-chan struct{} { return nil }
func (c noCancel) Err() error { return nil }
func (c noCancel) Value(key interface{}) interface{} { return c.ctx.Value(key) }

// WithoutCancel returns a context that is never canceled.
func WithoutCancel(ctx context.Context) context.Context {
return noCancel{ctx: ctx}
}

type fdbMoveDataCmd struct{}

func (*fdbMoveDataCmd) Name() string { return fdbMoveDataCLIPackage }
func (*fdbMoveDataCmd) SetFlags(_ *flag.FlagSet) {}
func (*fdbMoveDataCmd) Synopsis() string {
return "Copy data across two tenant groups in a metacluster.\n" + client.GenerateSynopsis(GetSubpackage(flag.NewFlagSet("", flag.ContinueOnError)), 4)
}
func (r *fdbMoveDataCmd) Usage() string {
return client.GenerateUsage(fdbMoveDataCLIPackage, r.Synopsis())
}

func (r *fdbMoveDataCmd) Execute(ctx context.Context, f *flag.FlagSet, args ...interface{}) subcommands.ExitStatus {
c := GetSubpackage(f)
return c.Execute(ctx, args...)
}

type fdbMoveDataCopyCmd struct {
req *pb.FDBMoveDataCopyRequest
}

func (*fdbMoveDataCopyCmd) Name() string { return "copy" }
func (*fdbMoveDataCopyCmd) Synopsis() string {
return "Initiate data copy across two tenant groups in a metacluster. Starts a long-running command on the server."
}
func (r *fdbMoveDataCopyCmd) Usage() string {
return "fdbmovedata copy <clusterFile> <capacityGroupIdentifier> <sourceClusterName> <destinationClusterName> [numProcs]"
sfc-gh-jfu marked this conversation as resolved.
Show resolved Hide resolved
}

func (r *fdbMoveDataCopyCmd) SetFlags(f *flag.FlagSet) {
r.req = &pb.FDBMoveDataCopyRequest{}
}

func (r *fdbMoveDataCopyCmd) Execute(ctx context.Context, f *flag.FlagSet, args ...interface{}) subcommands.ExitStatus {
state := args[0].(*util.ExecuteState)
c := pb.NewFDBMoveClientProxy(state.Conn)

if f.NArg() < 4 || f.NArg() > 5 {
fmt.Fprintln(os.Stderr, "usage: ", r.Usage())
return subcommands.ExitFailure
}

clusterFile, tenantGroup, sourceCluster, destinationCluster := f.Arg(0), f.Arg(1), f.Arg(2), f.Arg(3)
// default number of processes is 10
numProcs := int64(10)
var s int64
var err error
if f.NArg() == 5 {
s, err = strconv.ParseInt(f.Arg(4), 10, 64)
if err != nil {
fmt.Fprintf(os.Stderr, "can't parse number of processes: %v\n", err)
return subcommands.ExitFailure
}
numProcs = s
}

resp, err := c.FDBMoveDataCopyOneMany(ctx, &pb.FDBMoveDataCopyRequest{
ClusterFile: clusterFile,
TenantGroup: tenantGroup,
SourceCluster: sourceCluster,
DestinationCluster: destinationCluster,
NumProcs: numProcs,
})
if err != nil {
// Emit this to every error file as it's not specific to a given target.
for _, e := range state.Err {
fmt.Fprintf(e, "fdb move data copy error: %v\n", err)
}

return subcommands.ExitFailure
}

retCode := subcommands.ExitSuccess
for r := range resp {
if r.Error != nil {
fmt.Fprintf(state.Err[r.Index], "fdb move data copy error: %v\n", r.Error)
retCode = subcommands.ExitFailure
}
fmt.Fprintf(state.Out[r.Index], "Command ID to wait on: %d\n", r.Resp.Id)
sfc-gh-jfu marked this conversation as resolved.
Show resolved Hide resolved
}

return retCode
}

type fdbMoveDataWaitCmd struct {
req *pb.FDBMoveDataWaitRequest

// returnCode internally keeps track of the final status to return
returnCode subcommands.ExitStatus
}

func (*fdbMoveDataWaitCmd) Name() string { return "wait" }
func (*fdbMoveDataWaitCmd) Synopsis() string {
return "Wait for data copy across two tenant groups in a metacluster to complete"
}
func (r *fdbMoveDataWaitCmd) Usage() string {
return "fdbmovedata wait <ID>"
}

func (r *fdbMoveDataWaitCmd) SetFlags(f *flag.FlagSet) {
r.req = &pb.FDBMoveDataWaitRequest{}
}

func (r *fdbMoveDataWaitCmd) printCommandOutput(state *util.ExecuteState, idx int, resp *pb.FDBMoveDataWaitResponse, err error) {
if err == io.EOF {
// Streaming commands may return EOF
return
}
if err != nil {
fmt.Fprintf(state.Err[idx], "Command execution failure - %v\n", err)
// If any target had errors it needs to be reported for that target but we still
// need to process responses off the channel. Final return code though should
// indicate something failed.
r.returnCode = subcommands.ExitFailure
return
}
if len(resp.Stderr) > 0 {
fmt.Fprintf(state.Err[idx], "%s", resp.Stderr)
}
fmt.Fprintf(state.Out[idx], "%s", resp.Stdout)
if resp.RetCode != 0 {
r.returnCode = subcommands.ExitFailure
}
}

func (r *fdbMoveDataWaitCmd) Execute(ctx context.Context, f *flag.FlagSet, args ...interface{}) subcommands.ExitStatus {
// Ignore the parent context timeout because we don't want to time out here.
ctx = WithoutCancel(ctx)
state := args[0].(*util.ExecuteState)
c := pb.NewFDBMoveClientProxy(state.Conn)

if f.NArg() != 1 {
fmt.Fprintln(os.Stderr, "usage: ", r.Usage())
return subcommands.ExitFailure
}
id, err := strconv.ParseInt(f.Arg(0), 10, 64)
if err != nil {
fmt.Fprintf(os.Stderr, "can't parse ID: %v\n", err)
return subcommands.ExitFailure
}

resp, err := c.FDBMoveDataWaitOneMany(ctx, &pb.FDBMoveDataWaitRequest{
Id: id,
})
if err != nil {
// Emit this to every error file as it's not specific to a given target.
for _, e := range state.Err {
fmt.Fprintf(e, "fdb move data wait error: %v\n", err)
}
return subcommands.ExitFailure
}

for {
rs, err := resp.Recv()
if err != nil {
if err == io.EOF {
return r.returnCode
}
fmt.Fprintf(os.Stderr, "Stream failure: %v\n", err)
return subcommands.ExitFailure
}
for _, res := range rs {
r.printCommandOutput(state, res.Index, res.Resp, res.Error)
}
}
}
Loading