Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.2.1 Backport] CBG-4089: Do not remove loaded database on transient fetch error #7132

Merged
merged 1 commit into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions base/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,12 @@ func IsTemporaryKvError(err error) bool {
return false
}
// define list of temporary errors
temporaryKVError := []error{ErrTimeout, gocb.ErrAmbiguousTimeout, gocb.ErrUnambiguousTimeout,
gocb.ErrOverload, gocb.ErrTemporaryFailure, gocb.ErrCircuitBreakerOpen}
temporaryKVError := []error{
ErrTimeout, // Sync Gateway client-side timeout
gocb.ErrTimeout, // SDK op timeout. Wrapped by gocb.ErrAmbiguousTimeout, gocb.ErrUnambiguousTimeout,
gocb.ErrOverload, // SDK client-side pipeline queue full, request was not submitted to server
gocb.ErrTemporaryFailure, // Couchbase Server returned temporary failure error
gocb.ErrCircuitBreakerOpen} // SDK client-side circuit breaker blocked request

// iterate through to check incoming error is one of them
for _, tempKVErr := range temporaryKVError {
Expand Down
85 changes: 85 additions & 0 deletions rest/adminapitest/admin_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1777,6 +1777,91 @@ func TestMultipleBucketWithBadDbConfigScenario3(t *testing.T) {

}

// TestConfigPollingRemoveDatabase:
//
// Validates db is removed when polling detects that the config is not found
func TestConfigPollingRemoveDatabase(t *testing.T) {

base.SetUpTestLogging(t, base.LevelInfo, base.KeyConfig)
testCases := []struct {
useXattrConfig bool
}{
{
useXattrConfig: false,
},
{
useXattrConfig: true,
},
}
for _, testCase := range testCases {
t.Run(fmt.Sprintf("xattrConfig_%v", testCase.useXattrConfig), func(t *testing.T) {

rt := rest.NewRestTester(t, &rest.RestTesterConfig{
CustomTestBucket: base.GetTestBucket(t),
PersistentConfig: true,
MutateStartupConfig: func(config *rest.StartupConfig) {
// configure the interval time to pick up new configs from the bucket to every 50 milliseconds
config.Bootstrap.ConfigUpdateFrequency = base.NewConfigDuration(50 * time.Millisecond)
},
DatabaseConfig: nil,
UseXattrConfig: testCase.useXattrConfig,
})
defer rt.Close()

ctx := base.TestCtx(t)
// create a new db
dbName := "db1"
dbConfig := rt.NewDbConfig()
dbConfig.Name = dbName
dbConfig.BucketConfig.Bucket = base.StringPtr(rt.CustomTestBucket.GetName())
resp := rt.CreateDatabase(dbName, dbConfig)
rest.RequireStatus(t, resp, http.StatusCreated)

// Validate that db is loaded
_, err := rt.ServerContext().GetDatabase(ctx, dbName)
require.NoError(t, err)

// Force timeouts - dev-time only test enhancement to validate CBG-3947, requires manual "leaky bootstrap" handling
// To enable:
// - Add "var ForceTimeouts bool" to bootstrap.go
// - In CouchbaseCluster.GetMetadataDocument, add the following after loadConfig:
// if ForceTimeouts {
// return 0, gocb.ErrTimeout
// }
// - enable the code block below
/*
base.ForceTimeouts = true

// Wait to ensure database doesn't disappear
err = rt.WaitForConditionWithOptions(func() bool {
_, err := rt.ServerContext().GetActiveDatabase(dbName)
return errors.Is(err, base.ErrNotFound)

}, 200, 50)
require.Error(t, err)

base.ForceTimeouts = false
*/

// Delete the config directly
rt.RemoveDbConfigFromBucket("db1", rt.CustomTestBucket.GetName())

// assert that the database is unloaded
err = rt.WaitForConditionWithOptions(func() bool {
_, err := rt.ServerContext().GetActiveDatabase(dbName)
return errors.Is(err, base.ErrNotFound)

}, 200, 1000)
require.NoError(t, err)

// assert that a request to the database fails with correct error message
resp = rt.SendAdminRequest(http.MethodGet, "/db1/_config", "")
rest.RequireStatus(t, resp, http.StatusNotFound)
assert.Contains(t, resp.Body.String(), "no such database")
})
}
}

func TestResyncStopUsingDCPStream(t *testing.T) {
if base.UnitTestUrlIsWalrus() {
// This test requires a gocb bucket
Expand Down
111 changes: 62 additions & 49 deletions rest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1635,17 +1635,26 @@ func (sc *ServerContext) fetchAndLoadConfigs(ctx context.Context, isInitialStart
sc.lock.Lock()
defer sc.lock.Unlock()
for _, dbName := range deletedDatabases {
dbc, ok := sc.databases_[dbName]
if !ok {
base.DebugfCtx(ctx, base.KeyConfig, "Database %q already removed from server context after acquiring write lock - do not need to remove not removing database", base.MD(dbName))
continue
}
// It's possible that the "deleted" database was not written to the server until after sc.FetchConfigs had returned...
// we'll need to pay for the cost of getting the config again now that we've got the write lock to double-check this db is definitely ok to remove...
found, _, err := sc._fetchDatabase(ctx, dbName)
if err != nil {
base.InfofCtx(ctx, base.KeyConfig, "Error fetching config for database %q to check whether we need to remove it: %v", dbName, err)
found, _, getConfigErr := sc._fetchDatabaseFromBucket(ctx, dbc.Bucket.GetName(), dbName)
if found && getConfigErr == nil {
base.DebugfCtx(ctx, base.KeyConfig, "Found config for database %q after acquiring write lock - not removing database", base.MD(dbName))
continue
}
if base.IsTemporaryKvError(getConfigErr) {
base.InfofCtx(ctx, base.KeyConfig, "Transient error fetching config for database %q to check whether we need to remove it, will not be removed: %v", base.MD(dbName), getConfigErr)
continue
}

if !found {
base.InfofCtx(ctx, base.KeyConfig, "Database %q was running on this node, but config was not found on the server - removing database", base.MD(dbName))
base.InfofCtx(ctx, base.KeyConfig, "Database %q was running on this node, but config was not found on the server - removing database (%v)", base.MD(dbName), getConfigErr)
sc._removeDatabase(ctx, dbName)
} else {
base.DebugfCtx(ctx, base.KeyConfig, "Found config for database %q after acquiring write lock - not removing database", base.MD(dbName))
}
}

Expand Down Expand Up @@ -1753,56 +1762,60 @@ func (sc *ServerContext) fetchDatabase(ctx context.Context, dbName string) (foun
return sc._fetchDatabase(ctx, dbName)
}

func (sc *ServerContext) _fetchDatabase(ctx context.Context, dbName string) (found bool, dbConfig *DatabaseConfig, err error) {
// loop code moved to foreachDbConfig
var cnf DatabaseConfig
callback := func(bucket string) (exit bool, err error) {
cas, err := sc.BootstrapContext.GetConfig(ctx, bucket, sc.Config.Bootstrap.ConfigGroupID, dbName, &cnf)
if err == base.ErrNotFound {
base.DebugfCtx(ctx, base.KeyConfig, "%q did not contain config in group %q", bucket, sc.Config.Bootstrap.ConfigGroupID)
return false, err
}
if err != nil {
base.DebugfCtx(ctx, base.KeyConfig, "unable to fetch config in group %q from bucket %q: %v", sc.Config.Bootstrap.ConfigGroupID, bucket, err)
return false, err
}
func (sc *ServerContext) _fetchDatabaseFromBucket(ctx context.Context, bucket string, dbName string) (found bool, cnf DatabaseConfig, err error) {

if cnf.Name == "" {
cnf.Name = bucket
}
cas, err := sc.BootstrapContext.GetConfig(ctx, bucket, sc.Config.Bootstrap.ConfigGroupID, dbName, &cnf)
if errors.Is(err, base.ErrNotFound) {
base.DebugfCtx(ctx, base.KeyConfig, "%q did not contain config in group %q", bucket, sc.Config.Bootstrap.ConfigGroupID)
return false, cnf, err
}
if err != nil {
base.DebugfCtx(ctx, base.KeyConfig, "unable to fetch config in group %q from bucket %q: %v", sc.Config.Bootstrap.ConfigGroupID, bucket, err)
return false, cnf, err
}

if cnf.Name != dbName {
base.TracefCtx(ctx, base.KeyConfig, "%q did not contain config in group %q for db %q", bucket, sc.Config.Bootstrap.ConfigGroupID, dbName)
return false, err
}
if cnf.Name == "" {
cnf.Name = bucket
}

cnf.cfgCas = cas
if cnf.Name != dbName {
base.TracefCtx(ctx, base.KeyConfig, "%q did not contain config in group %q for db %q", bucket, sc.Config.Bootstrap.ConfigGroupID, dbName)
return false, cnf, err
}

// TODO: This code is mostly copied from FetchConfigs, move into shared function with DbConfig REST API work?
cnf.cfgCas = cas

// inherit properties the bootstrap config
cnf.CACertPath = sc.Config.Bootstrap.CACertPath
// inherit properties the bootstrap config
cnf.CACertPath = sc.Config.Bootstrap.CACertPath

// We need to check for corruption in the database config (CC. CBG-3292). If the fetched config doesn't match the
// bucket name we got the config from we need to maker this db context as corrupt. Then remove the context and
// in memory representation on the server context.
if bucket != cnf.GetBucketName() {
sc._handleInvalidDatabaseConfig(ctx, bucket, cnf)
return true, fmt.Errorf("mismatch in persisted database bucket name %q vs the actual bucket name %q. Please correct db %q's config, groupID %q.", base.MD(cnf.Bucket), base.MD(bucket), base.MD(cnf.Name), base.MD(sc.Config.Bootstrap.ConfigGroupID))
}
bucketCopy := bucket
// no corruption detected carry on as usual
cnf.Bucket = &bucketCopy
// We need to check for corruption in the database config (CC. CBG-3292). If the fetched config doesn't match the
// bucket name we got the config from we need to maker this db context as corrupt. Then remove the context and
// in memory representation on the server context.
if bucket != cnf.GetBucketName() {
sc._handleInvalidDatabaseConfig(ctx, bucket, cnf)
return true, cnf, fmt.Errorf("mismatch in persisted database bucket name %q vs the actual bucket name %q. Please correct db %q's config, groupID %q.", base.MD(cnf.Bucket), base.MD(bucket), base.MD(cnf.Name), base.MD(sc.Config.Bootstrap.ConfigGroupID))
}
bucketCopy := bucket
// no corruption detected carry on as usual
cnf.Bucket = &bucketCopy

// any authentication fields defined on the dbconfig take precedence over any in the bootstrap config
if cnf.Username == "" && cnf.Password == "" && cnf.CertPath == "" && cnf.KeyPath == "" {
cnf.Username = sc.Config.Bootstrap.Username
cnf.Password = sc.Config.Bootstrap.Password
cnf.CertPath = sc.Config.Bootstrap.X509CertPath
cnf.KeyPath = sc.Config.Bootstrap.X509KeyPath
}
base.TracefCtx(ctx, base.KeyConfig, "Got database config %s for bucket %q with cas %d and groupID %q", base.MD(dbName), base.MD(bucket), cas, base.MD(sc.Config.Bootstrap.ConfigGroupID))
return true, nil
// any authentication fields defined on the dbconfig take precedence over any in the bootstrap config
if cnf.Username == "" && cnf.Password == "" && cnf.CertPath == "" && cnf.KeyPath == "" {
cnf.Username = sc.Config.Bootstrap.Username
cnf.Password = sc.Config.Bootstrap.Password
cnf.CertPath = sc.Config.Bootstrap.X509CertPath
cnf.KeyPath = sc.Config.Bootstrap.X509KeyPath
}
base.TracefCtx(ctx, base.KeyConfig, "Got database config %s for bucket %q with cas %d and groupID %q", base.MD(dbName), base.MD(bucket), cas, base.MD(sc.Config.Bootstrap.ConfigGroupID))
return true, cnf, nil
}

func (sc *ServerContext) _fetchDatabase(ctx context.Context, dbName string) (found bool, dbConfig *DatabaseConfig, err error) {
var cnf DatabaseConfig
callback := func(bucket string) (exit bool, callbackErr error) {
var foundInBucket bool
foundInBucket, cnf, callbackErr = sc._fetchDatabaseFromBucket(ctx, bucket, dbName)
return foundInBucket, callbackErr
}

err = sc.findBucketWithCallback(callback)
Expand Down
2 changes: 2 additions & 0 deletions rest/utilities_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type RestTesterConfig struct {
syncGatewayVersion *base.ComparableBuildVersion // alternate version of Sync Gateway to use on startup
allowDbConfigEnvVars *bool
maxConcurrentRevs *int
UseXattrConfig bool
}

type collectionConfiguration uint8
Expand Down Expand Up @@ -234,6 +235,7 @@ func (rt *RestTester) Bucket() base.Bucket {
sc.Bootstrap.ServerTLSSkipVerify = base.BoolPtr(base.TestTLSSkipVerify())
sc.Unsupported.Serverless.Enabled = &rt.serverless
sc.Unsupported.AllowDbConfigEnvVars = rt.RestTesterConfig.allowDbConfigEnvVars
sc.Unsupported.UseXattrConfig = &rt.UseXattrConfig
sc.Replicator.MaxConcurrentRevs = rt.RestTesterConfig.maxConcurrentRevs
if rt.serverless {
if !rt.PersistentConfig {
Expand Down
Loading