From 9326dbd9e04aa85eaddecef9130ccb3e7b042bd0 Mon Sep 17 00:00:00 2001 From: Adam Fraser Date: Mon, 21 Oct 2024 19:11:10 -0700 Subject: [PATCH] CBG-4174 Force disconnection of blip clients on database close (#7166) --- db/active_replicator.go | 3 ++- db/active_replicator_test.go | 2 +- db/blip.go | 7 ++++--- db/database.go | 20 ++++++++++++++++---- go.mod | 2 +- go.sum | 4 ++-- rest/blip_api_crud_test.go | 34 ++++++++++++++++++++++++++++++++++ rest/blip_sync.go | 2 +- rest/utilities_testing.go | 5 +++-- 9 files changed, 64 insertions(+), 15 deletions(-) diff --git a/db/active_replicator.go b/db/active_replicator.go index e3be11165f..7cae4bfa2c 100644 --- a/db/active_replicator.go +++ b/db/active_replicator.go @@ -208,7 +208,8 @@ func connect(arc *activeReplicatorCommon, idSuffix string) (blipSender *blip.Sen arc.replicationStats.NumConnectAttempts.Add(1) var originPatterns []string // no origin headers for ISGR - blipContext, err := NewSGBlipContext(arc.ctx, arc.config.ID+idSuffix, originPatterns) + // NewSGBlipContext doesn't set cancellation context - active replication cancellation on db close is handled independently + blipContext, err := NewSGBlipContext(arc.ctx, arc.config.ID+idSuffix, originPatterns, nil) if err != nil { return nil, nil, err } diff --git a/db/active_replicator_test.go b/db/active_replicator_test.go index 93e5eb4808..98f772575e 100644 --- a/db/active_replicator_test.go +++ b/db/active_replicator_test.go @@ -65,7 +65,7 @@ func TestBlipSyncErrorUserinfo(t *testing.T) { srvURL.Path = "/db1" t.Logf("srvURL: %v", srvURL.String()) - blipContext, err := NewSGBlipContext(base.TestCtx(t), t.Name(), nil) + blipContext, err := NewSGBlipContext(base.TestCtx(t), t.Name(), nil, nil) require.NoError(t, err) _, err = blipSync(*srvURL, blipContext, false) diff --git a/db/blip.go b/db/blip.go index ca54f1f36a..fe8bedae8e 100644 --- a/db/blip.go +++ b/db/blip.go @@ -36,14 +36,15 @@ var ( ) // NewSGBlipContext returns a go-blip context with the given ID, initialized for use in Sync Gateway. -func NewSGBlipContext(ctx context.Context, id string, origin []string) (bc *blip.Context, err error) { - return NewSGBlipContextWithProtocols(ctx, id, origin, supportedSubprotocols()) +func NewSGBlipContext(ctx context.Context, id string, origin []string, cancelCtx context.Context) (bc *blip.Context, err error) { + return NewSGBlipContextWithProtocols(ctx, id, origin, supportedSubprotocols(), cancelCtx) } -func NewSGBlipContextWithProtocols(ctx context.Context, id string, origin []string, protocols []string) (bc *blip.Context, err error) { +func NewSGBlipContextWithProtocols(ctx context.Context, id string, origin []string, protocols []string, cancelCtx context.Context) (bc *blip.Context, err error) { opts := blip.ContextOptions{ Origin: origin, ProtocolIds: protocols, + CancelCtx: cancelCtx, } if id == "" { bc, err = blip.NewContext(opts) diff --git a/db/database.go b/db/database.go index 11b2e42254..c1f3c1b2df 100644 --- a/db/database.go +++ b/db/database.go @@ -115,10 +115,11 @@ type DatabaseContext struct { LocalJWTProviders auth.LocalJWTProviderMap ServerUUID string // UUID of the server, if available - DbStats *base.DbStats // stats that correspond to this database context - CompactState uint32 // Status of database compaction - terminator chan bool // Signal termination of background goroutines - + DbStats *base.DbStats // stats that correspond to this database context + CompactState uint32 // Status of database compaction + terminator chan bool // Signal termination of background goroutines + CancelContext context.Context // Cancelled when the database is closed - used to notify associated processes (e.g. blipContext) + cancelContextFunc context.CancelFunc // Cancel function for cancelContext backgroundTasks []BackgroundTask // List of background tasks that are initiated. activeChannels *channels.ActiveChannels // Tracks active replications by channel CfgSG cbgt.Cfg // Sync Gateway cluster shared config @@ -417,6 +418,14 @@ func NewDatabaseContext(ctx context.Context, dbName string, bucket base.Bucket, UserFunctionTimeout: defaultUserFunctionTimeout, } + // set up cancellable context based on the background context (context lifecycle for the database + // must be distinct from the request context associated with the db create/update). Used to trigger + // teardown of connected replications on database close. + dbContext.CancelContext, dbContext.cancelContextFunc = context.WithCancel(context.Background()) + cleanupFunctions = append(cleanupFunctions, func() { + dbContext.cancelContextFunc() + }) + // Check if server version supports multi-xattr operations, required for mou handling dbContext.EnableMou = bucket.IsSupported(sgbucket.BucketStoreFeatureMultiXattrSubdocOperations) @@ -591,6 +600,9 @@ func (context *DatabaseContext) Close(ctx context.Context) { context.OIDCProviders.Stop() close(context.terminator) + if context.cancelContextFunc != nil { + context.cancelContextFunc() + } // Stop All background processors bgManagers := context.stopBackgroundManagers() diff --git a/go.mod b/go.mod index 0e02434f5f..d1b976abca 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/coreos/go-oidc v2.2.1+incompatible github.com/couchbase/cbgt v1.3.9 github.com/couchbase/clog v0.1.0 - github.com/couchbase/go-blip v0.0.0-20231212195435-3490e96d30e3 + github.com/couchbase/go-blip v0.0.0-20241014142134-cc8d8ebf1949 github.com/couchbase/gocb/v2 v2.9.1 github.com/couchbase/gocbcore/v10 v10.5.1 github.com/couchbase/gomemcached v0.2.1 diff --git a/go.sum b/go.sum index 4138b620bf..ff240fe925 100644 --- a/go.sum +++ b/go.sum @@ -44,8 +44,8 @@ github.com/couchbase/cbgt v1.3.9 h1:MAT3FwD1ctekxuFe0yau0H1BCTvgLXvh1ipbZ3nZhBE= github.com/couchbase/cbgt v1.3.9/go.mod h1:MImhtmvk0qjJit5HbmA34tnYThZoNtvgjL7jJH/kCAE= github.com/couchbase/clog v0.1.0 h1:4Kh/YHkhRjMCbdQuvRVsm39XZh4FtL1d8fAwJsHrEPY= github.com/couchbase/clog v0.1.0/go.mod h1:7tzUpEOsE+fgU81yfcjy5N1H6XtbVC8SgOz/3mCjmd4= -github.com/couchbase/go-blip v0.0.0-20231212195435-3490e96d30e3 h1:MeikDkvUMHZLpS57pfzhu2E+disqUVulUTb/r3aqUck= -github.com/couchbase/go-blip v0.0.0-20231212195435-3490e96d30e3/go.mod h1:Dz8Keu17/4cjF7hvKYqOjH4pRXOh1CCnzsKlBOJaoJE= +github.com/couchbase/go-blip v0.0.0-20241014142134-cc8d8ebf1949 h1:jwFj/GtyaoACmwnGfan/XW38TBTG1kYboXLZfAqd2VE= +github.com/couchbase/go-blip v0.0.0-20241014142134-cc8d8ebf1949/go.mod h1:Dz8Keu17/4cjF7hvKYqOjH4pRXOh1CCnzsKlBOJaoJE= github.com/couchbase/go-couchbase v0.1.1 h1:ClFXELcKj/ojyoTYbsY34QUrrYCBi/1G749sXSCkdhk= github.com/couchbase/go-couchbase v0.1.1/go.mod h1:+/bddYDxXsf9qt0xpDUtRR47A2GjaXmGGAqQ/k3GJ8A= github.com/couchbase/gocb/v2 v2.9.1 h1:yB2ZhRLk782Y9sZlATaUwglZe9+2QpvFmItJXTX4stQ= diff --git a/rest/blip_api_crud_test.go b/rest/blip_api_crud_test.go index bf228bd2b6..04e7008d66 100644 --- a/rest/blip_api_crud_test.go +++ b/rest/blip_api_crud_test.go @@ -3167,3 +3167,37 @@ func TestOnDemandImportBlipFailure(t *testing.T) { } }) } + +// TestBlipDatabaseClose verifies that the client connection is closed when the database is closed. +// Starts a continuous pull replication then updates the db to trigger a close. +func TestBlipDatabaseClose(t *testing.T) { + + base.SetUpTestLogging(t, base.LevelInfo, base.KeyHTTP, base.KeySync, base.KeySyncMsg, base.KeyChanges, base.KeyCache) + btcRunner := NewBlipTesterClientRunner(t) + btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) { + rt := NewRestTesterPersistentConfig(t) + defer rt.Close() + const username = "alice" + rt.CreateUser(username, []string{"*"}) + btc := btcRunner.NewBlipTesterClientOptsWithRT(rt, &BlipTesterClientOpts{Username: username}) + var blipContextClosed atomic.Bool + btcRunner.clients[btc.id].pullReplication.bt.blipContext.OnExitCallback = func() { + log.Printf("on exit callback invoked") + blipContextClosed.Store(true) + } + + // put a doc, and make sure blip connection is established + markerDoc := "markerDoc" + markerDocVersion := rt.CreateTestDoc(markerDoc) + require.NoError(t, rt.WaitForPendingChanges()) + require.NoError(t, btcRunner.StartPull(btc.id)) + + btcRunner.WaitForVersion(btc.id, markerDoc, markerDocVersion) + + RequireStatus(t, rt.SendAdminRequest(http.MethodDelete, "/{{.db}}/", ""), http.StatusOK) + + require.EventuallyWithT(t, func(c *assert.CollectT) { + assert.True(c, blipContextClosed.Load()) + }, time.Second*10, time.Millisecond*100) + }) +} diff --git a/rest/blip_sync.go b/rest/blip_sync.go index 602c588f5e..cf4fb0a2a6 100644 --- a/rest/blip_sync.go +++ b/rest/blip_sync.go @@ -52,7 +52,7 @@ func (h *handler) handleBLIPSync() error { originPatterns, _ := hostOnlyCORS(h.db.CORS.Origin) // Create a BLIP context: - blipContext, err := db.NewSGBlipContext(h.ctx(), "", originPatterns) + blipContext, err := db.NewSGBlipContext(h.ctx(), "", originPatterns, h.db.DatabaseContext.CancelContext) if err != nil { return err } diff --git a/rest/utilities_testing.go b/rest/utilities_testing.go index 9ab25eebb0..85dae6e2b8 100644 --- a/rest/utilities_testing.go +++ b/rest/utilities_testing.go @@ -1522,8 +1522,9 @@ func createBlipTesterWithSpec(tb testing.TB, spec BlipTesterSpec, rt *RestTester if err != nil { return nil, err } - // Make BLIP/Websocket connection - bt.blipContext, err = db.NewSGBlipContextWithProtocols(base.TestCtx(tb), "", origin, protocols) + // Make BLIP/Websocket connection. Not specifying cancellation context here as this is a + // client blip context that doesn't require cancellation-based close + bt.blipContext, err = db.NewSGBlipContextWithProtocols(base.TestCtx(tb), "", origin, protocols, nil) if err != nil { return nil, err }