Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CBG-4263 create single actor tests #7187

Merged
merged 9 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions base/log_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ const (
KeyReplicate
KeySync
KeySyncMsg
KeyVV
KeyWebSocket
KeyWebSocketFrame
KeySGTest
Expand Down Expand Up @@ -87,6 +88,7 @@ var (
KeyReplicate: "Replicate",
KeySync: "Sync",
KeySyncMsg: "SyncMsg",
KeyVV: "VV",
KeyWebSocket: "WS",
KeyWebSocketFrame: "WSFrame",
KeySGTest: "TEST",
Expand Down
32 changes: 22 additions & 10 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -896,11 +896,14 @@ func (db *DatabaseCollectionWithUser) OnDemandImportForWrite(ctx context.Context
}

// updateHLV updates the HLV in the sync data appropriately based on what type of document update event we are encountering. mouMatch represents if the _mou.cas == doc.cas
func (db *DatabaseCollectionWithUser) updateHLV(d *Document, docUpdateEvent DocUpdateType, mouMatch bool) (*Document, error) {
func (db *DatabaseCollectionWithUser) updateHLV(ctx context.Context, d *Document, docUpdateEvent DocUpdateType, mouMatch bool) (*Document, error) {

hasHLV := d.HLV != nil
if d.HLV == nil {
d.HLV = &HybridLogicalVector{}
base.DebugfCtx(ctx, base.KeyVV, "No existing HLV for doc %s", base.UD(d.ID))
} else {
base.DebugfCtx(ctx, base.KeyVV, "Existing HLV for doc %s before modification %+v", base.UD(d.ID), d.HLV)
}
switch docUpdateEvent {
case ExistingVersion:
Expand All @@ -922,6 +925,9 @@ func (db *DatabaseCollectionWithUser) updateHLV(d *Document, docUpdateEvent DocU
return nil, err
}
d.HLV.CurrentVersionCAS = d.Cas
base.DebugfCtx(ctx, base.KeyVV, "Adding new version to HLV due to import for doc %s, updated HLV %+v", base.UD(d.ID), d.HLV)
} else {
base.DebugfCtx(ctx, base.KeyVV, "Not updating HLV to _mou.cas == doc.cas for doc %s, extant HLV %+v", base.UD(d.ID), d.HLV)
}
case NewVersion, ExistingVersionWithUpdateToHLV:
// add a new entry to the version vector
Expand Down Expand Up @@ -1797,7 +1803,7 @@ func (db *DatabaseCollectionWithUser) storeOldBodyInRevTreeAndUpdateCurrent(ctx
// Store the new revision body into the doc:
doc.setRevisionBody(ctx, newRevID, newDoc, db.AllowExternalRevBodyStorage(), newDocHasAttachments)
doc.SyncData.Attachments = newDoc.DocAttachments
doc.metadataOnlyUpdate = newDoc.metadataOnlyUpdate
doc.MetadataOnlyUpdate = newDoc.MetadataOnlyUpdate

if doc.CurrentRev == newRevID {
doc.NewestRev = ""
Expand All @@ -1808,7 +1814,7 @@ func (db *DatabaseCollectionWithUser) storeOldBodyInRevTreeAndUpdateCurrent(ctx
if doc.CurrentRev != prevCurrentRev {
doc.promoteNonWinningRevisionBody(ctx, doc.CurrentRev, db.RevisionBodyLoader)
// If the update resulted in promoting a previous non-winning revision body to winning, this isn't a metadata only update.
doc.metadataOnlyUpdate = nil
doc.MetadataOnlyUpdate = nil
}
}
}
Expand Down Expand Up @@ -2088,8 +2094,14 @@ func (col *DatabaseCollectionWithUser) documentUpdateFunc(
return
}

// compute mouMatch before the callback modifies doc.metadataOnlyUpdate
mouMatch := doc.metadataOnlyUpdate != nil && base.HexCasToUint64(doc.metadataOnlyUpdate.CAS) == doc.Cas
// compute mouMatch before the callback modifies doc.MetadataOnlyUpdate
mouMatch := false
if doc.MetadataOnlyUpdate != nil && base.HexCasToUint64(doc.MetadataOnlyUpdate.CAS) == doc.Cas {
mouMatch = base.HexCasToUint64(doc.MetadataOnlyUpdate.CAS) == doc.Cas
base.DebugfCtx(ctx, base.KeyVV, "updateDoc(%q): _mou:%+v Metadata-only update match:%t", base.UD(doc.ID), doc.MetadataOnlyUpdate, mouMatch)
} else {
base.DebugfCtx(ctx, base.KeyVV, "updateDoc(%q): has no _mou", base.UD(doc.ID))
}
// Invoke the callback to update the document and with a new revision body to be used by the Sync Function:
newDoc, newAttachments, createNewRevIDSkipped, updatedExpiry, err := callback(doc)
if err != nil {
Expand Down Expand Up @@ -2149,7 +2161,7 @@ func (col *DatabaseCollectionWithUser) documentUpdateFunc(
// The callback has updated the HLV for mutations coming from CBL. Update the HLV so that the current version is set before
// we call updateChannels, which needs to set the current version for removals
// update the HLV values
doc, err = col.updateHLV(doc, docUpdateEvent, mouMatch)
doc, err = col.updateHLV(ctx, doc, docUpdateEvent, mouMatch)
if err != nil {
return
}
Expand Down Expand Up @@ -2322,8 +2334,8 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do
updatedDoc.Spec = appendRevocationMacroExpansions(updatedDoc.Spec, revokedChannelsRequiringExpansion)

updatedDoc.IsTombstone = currentRevFromHistory.Deleted
if doc.metadataOnlyUpdate != nil {
if doc.metadataOnlyUpdate.CAS != "" {
if doc.MetadataOnlyUpdate != nil {
if doc.MetadataOnlyUpdate.CAS != "" {
updatedDoc.Spec = append(updatedDoc.Spec, sgbucket.NewMacroExpansionSpec(XattrMouCasPath(), sgbucket.MacroCas))
}
} else {
Expand Down Expand Up @@ -2386,8 +2398,8 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do
} else if doc != nil {
// Update the in-memory CAS values to match macro-expanded values
doc.Cas = casOut
if doc.metadataOnlyUpdate != nil && doc.metadataOnlyUpdate.CAS == expandMacroCASValueString {
doc.metadataOnlyUpdate.CAS = base.CasToString(casOut)
if doc.MetadataOnlyUpdate != nil && doc.MetadataOnlyUpdate.CAS == expandMacroCASValueString {
doc.MetadataOnlyUpdate.CAS = base.CasToString(casOut)
}
// update the doc's HLV defined post macro expansion
doc = postWriteUpdateHLV(doc, casOut)
Expand Down
4 changes: 2 additions & 2 deletions db/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -1865,9 +1865,9 @@ func (db *DatabaseCollectionWithUser) resyncDocument(ctx context.Context, docid,
}
doc.SetCrc32cUserXattrHash()

// Update metadataOnlyUpdate based on previous Cas, metadataOnlyUpdate
// Update MetadataOnlyUpdate based on previous Cas, MetadataOnlyUpdate
if db.useMou() {
doc.metadataOnlyUpdate = computeMetadataOnlyUpdate(doc.Cas, doc.RevSeqNo, doc.metadataOnlyUpdate)
doc.MetadataOnlyUpdate = computeMetadataOnlyUpdate(doc.Cas, doc.RevSeqNo, doc.MetadataOnlyUpdate)
}

_, rawSyncXattr, rawVvXattr, rawMouXattr, rawGlobalXattr, err := updatedDoc.MarshalWithXattrs()
Expand Down
8 changes: 4 additions & 4 deletions db/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ type Document struct {
ID string `json:"-"` // Doc id. (We're already using a custom MarshalJSON for *document that's based on body, so the json:"-" probably isn't needed here)
Cas uint64 // Document cas
rawUserXattr []byte // Raw user xattr as retrieved from the bucket
metadataOnlyUpdate *MetadataOnlyUpdate // Contents of _mou xattr, marshalled/unmarshalled with document from xattrs
MetadataOnlyUpdate *MetadataOnlyUpdate // Contents of _mou xattr, marshalled/unmarshalled with document from xattrs

Deleted bool
DocExpiry uint32
Expand Down Expand Up @@ -433,7 +433,7 @@ func unmarshalDocumentWithXattrs(ctx context.Context, docid string, data, syncXa
}

if len(mouXattrData) > 0 {
if err := base.JSONUnmarshal(mouXattrData, &doc.metadataOnlyUpdate); err != nil {
if err := base.JSONUnmarshal(mouXattrData, &doc.MetadataOnlyUpdate); err != nil {
base.WarnfCtx(ctx, "Failed to unmarshal mouXattr for key %v, mou will be ignored. Err: %v mou:%s", base.UD(docid), err, mouXattrData)
}
}
Expand Down Expand Up @@ -1272,8 +1272,8 @@ func (doc *Document) MarshalWithXattrs() (data, syncXattr, vvXattr, mouXattr, gl
return nil, nil, nil, nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattrs() doc SyncData with id: %s. Error: %v", base.UD(doc.ID), err))
}

if doc.metadataOnlyUpdate != nil {
mouXattr, err = base.JSONMarshal(doc.metadataOnlyUpdate)
if doc.MetadataOnlyUpdate != nil {
mouXattr, err = base.JSONMarshal(doc.MetadataOnlyUpdate)
if err != nil {
return nil, nil, nil, nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattrs() doc MouData with id: %s. Error: %v", base.UD(doc.ID), err))
}
Expand Down
6 changes: 3 additions & 3 deletions db/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ func (db *DatabaseCollectionWithUser) ImportDoc(ctx context.Context, docid strin
} else {
if existingDoc.Deleted {
existingBucketDoc.Xattrs[base.SyncXattrName], err = base.JSONMarshal(existingDoc.SyncData)
if err == nil && existingDoc.metadataOnlyUpdate != nil && db.useMou() {
existingBucketDoc.Xattrs[base.MouXattrName], err = base.JSONMarshal(existingDoc.metadataOnlyUpdate)
if err == nil && existingDoc.MetadataOnlyUpdate != nil && db.useMou() {
existingBucketDoc.Xattrs[base.MouXattrName], err = base.JSONMarshal(existingDoc.MetadataOnlyUpdate)
}
} else {
existingBucketDoc.Body, existingBucketDoc.Xattrs[base.SyncXattrName], existingBucketDoc.Xattrs[base.VvXattrName], existingBucketDoc.Xattrs[base.MouXattrName], existingBucketDoc.Xattrs[base.GlobalXattrName], err = existingDoc.MarshalWithXattrs()
Expand Down Expand Up @@ -337,7 +337,7 @@ func (db *DatabaseCollectionWithUser) importDoc(ctx context.Context, docid strin

// If this is a metadata-only update, set metadataOnlyUpdate based on old doc's cas and mou
if metadataOnlyUpdate && db.useMou() {
newDoc.metadataOnlyUpdate = computeMetadataOnlyUpdate(doc.Cas, revNo, doc.metadataOnlyUpdate)
newDoc.MetadataOnlyUpdate = computeMetadataOnlyUpdate(doc.Cas, revNo, doc.MetadataOnlyUpdate)
}

return newDoc, nil, !shouldGenerateNewRev, updatedExpiry, nil
Expand Down
8 changes: 4 additions & 4 deletions db/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ func TestOnDemandImportMou(t *testing.T) {
require.NoError(t, err)

if db.UseMou() {
require.NotNil(t, doc.metadataOnlyUpdate)
require.Equal(t, base.CasToString(writeCas), doc.metadataOnlyUpdate.PreviousCAS)
require.Equal(t, base.CasToString(doc.Cas), doc.metadataOnlyUpdate.CAS)
require.NotNil(t, doc.MetadataOnlyUpdate)
require.Equal(t, base.CasToString(writeCas), doc.MetadataOnlyUpdate.PreviousCAS)
require.Equal(t, base.CasToString(doc.Cas), doc.MetadataOnlyUpdate.CAS)
} else {
require.Nil(t, doc.metadataOnlyUpdate)
require.Nil(t, doc.MetadataOnlyUpdate)
}
})

Expand Down
37 changes: 24 additions & 13 deletions topologytest/couchbase_lite_mock_peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ func (p *CouchbaseLiteMockPeer) String() string {
}

// GetDocument returns the latest version of a document. The test will fail the document does not exist.
func (p *CouchbaseLiteMockPeer) GetDocument(dsName sgbucket.DataStoreName, docID string) (rest.DocVersion, db.Body) {
func (p *CouchbaseLiteMockPeer) GetDocument(_ sgbucket.DataStoreName, _ string) (DocMetadata, db.Body) {
// this isn't yet collection aware, using single default collection
return rest.EmptyDocVersion(), nil
return DocMetadata{}, nil
}

// getSingleBlipClient returns the single blip client for the peer. If there are multiple clients, or not clients it will fail the test. This is temporary to stub support for multiple Sync Gateway peers.
Expand All @@ -62,27 +62,28 @@ func (p *CouchbaseLiteMockPeer) getSingleBlipClient() *PeerBlipTesterClient {
}

// CreateDocument creates a document on the peer. The test will fail if the document already exists.
func (p *CouchbaseLiteMockPeer) CreateDocument(dsName sgbucket.DataStoreName, docID string, body []byte) rest.DocVersion {
return rest.EmptyDocVersion()
func (p *CouchbaseLiteMockPeer) CreateDocument(dsName sgbucket.DataStoreName, docID string, body []byte) DocMetadata {
p.t.Logf("%s: Creating document %s", p, docID)
return p.WriteDocument(dsName, docID, body)
}

// WriteDocument writes a document to the peer. The test will fail if the write does not succeed.
func (p *CouchbaseLiteMockPeer) WriteDocument(dsName sgbucket.DataStoreName, docID string, body []byte) rest.DocVersion {
func (p *CouchbaseLiteMockPeer) WriteDocument(_ sgbucket.DataStoreName, docID string, body []byte) DocMetadata {
// this isn't yet collection aware, using single default collection
client := p.getSingleBlipClient()
// set an HLV here.
docVersion, err := client.btcRunner.PushRev(client.ID(), docID, rest.EmptyDocVersion(), body)
require.NoError(client.btcRunner.TB(), err)
return docVersion
return DocMetadataFromDocVersion(docID, docVersion)
}

// DeleteDocument deletes a document on the peer. The test will fail if the document does not exist.
func (p *CouchbaseLiteMockPeer) DeleteDocument(dsName sgbucket.DataStoreName, docID string) rest.DocVersion {
return rest.EmptyDocVersion()
func (p *CouchbaseLiteMockPeer) DeleteDocument(sgbucket.DataStoreName, string) DocMetadata {
return DocMetadata{}
}

// WaitForDocVersion waits for a document to reach a specific version. The test will fail if the document does not reach the expected version in 20s.
func (p *CouchbaseLiteMockPeer) WaitForDocVersion(dsName sgbucket.DataStoreName, docID string, expected rest.DocVersion) db.Body {
func (p *CouchbaseLiteMockPeer) WaitForDocVersion(_ sgbucket.DataStoreName, docID string, _ DocMetadata) db.Body {
// this isn't yet collection aware, using single default collection
client := p.getSingleBlipClient()
// FIXME: waiting for a specific version isn't working yet.
Expand All @@ -92,8 +93,18 @@ func (p *CouchbaseLiteMockPeer) WaitForDocVersion(dsName sgbucket.DataStoreName,
return body
}

// WaitForDeletion waits for a document to be deleted. This document must be a tombstone. The test will fail if the document still exists after 20s.
func (p *CouchbaseLiteMockPeer) WaitForDeletion(_ sgbucket.DataStoreName, _ string) {
require.Fail(p.TB(), "WaitForDeletion not yet implemented CBG-4257")
}

// WaitForTombstoneVersion waits for a document to reach a specific version, this must be a tombstone. The test will fail if the document does not reach the expected version in 20s.
func (p *CouchbaseLiteMockPeer) WaitForTombstoneVersion(_ sgbucket.DataStoreName, _ string, _ DocMetadata) {
require.Fail(p.TB(), "WaitForTombstoneVersion not yet implemented CBG-4257")
}

// RequireDocNotFound asserts that a document does not exist on the peer.
func (p *CouchbaseLiteMockPeer) RequireDocNotFound(dsName sgbucket.DataStoreName, docID string) {
func (p *CouchbaseLiteMockPeer) RequireDocNotFound(sgbucket.DataStoreName, string) {
// not implemented yet in blip client tester
// _, err := p.btcRunner.GetDoc(p.btc.id, docID)
// base.RequireDocNotFoundError(p.btcRunner.TB(), err)
Expand All @@ -107,7 +118,7 @@ func (p *CouchbaseLiteMockPeer) Close() {
}

// CreateReplication creates a replication instance
func (p *CouchbaseLiteMockPeer) CreateReplication(peer Peer, config PeerReplicationConfig) PeerReplication {
func (p *CouchbaseLiteMockPeer) CreateReplication(peer Peer, _ PeerReplicationConfig) PeerReplication {
sg, ok := peer.(*SyncGatewayPeer)
if !ok {
require.Fail(p.t, fmt.Sprintf("unsupported peer type %T for pull replication", peer))
Expand All @@ -133,8 +144,8 @@ func (p *CouchbaseLiteMockPeer) CreateReplication(peer Peer, config PeerReplicat
}

// SourceID returns the source ID for the peer used in <val>@<sourceID>.
func (r *CouchbaseLiteMockPeer) SourceID() string {
return r.name
func (p *CouchbaseLiteMockPeer) SourceID() string {
return p.name
}

// Context returns the context for the peer.
Expand Down
Loading
Loading