Skip to content

Commit

Permalink
Combine MutationStore2, Collection, DataStoreName into DataStore (#6752)
Browse files Browse the repository at this point in the history
* Combine MutationStore2, Collection, DataStoreName into DataStore

This is cleanup from the introduction to rosmar.

- rosmar.Collection and Collection already satisified the DataStoreName
interface, so putting it as part of DataStore removes need for
AsDataStoreName checks.
- remove GetFeedType() checks since it will always return DCP

- Remove StartTapFeed from leaky bucket because it was not being used.
  TestStopChangeCache was only running with xattrs disabled, but the
  test would pass even if the events arrived out of order, so the leaky
  bucket tap code was not being executed.
- Reflect in documentation that DCP feed is always used.

* Replace leaky bucket code for DCPMissingDocs
  • Loading branch information
torcolvin authored Apr 4, 2024
1 parent 4316e75 commit a9811cd
Show file tree
Hide file tree
Showing 24 changed files with 75 additions and 484 deletions.
29 changes: 0 additions & 29 deletions base/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@ import (
pkgerrors "github.com/pkg/errors"
)

const (
TapFeedType = "tap"
DcpFeedType = "dcp"
)

const (
DefaultPool = "default"
)
Expand Down Expand Up @@ -92,12 +87,6 @@ func GetBaseDataStore(ds DataStore) DataStore {
return ds
}

// AsDataStoreName is a temporary thing until DataStoreName is implemented on wrappers (pending further design work on FQName...)
func AsDataStoreName(ds DataStore) (sgbucket.DataStoreName, bool) {
dsn, ok := GetBaseDataStore(ds).(sgbucket.DataStoreName)
return dsn, ok
}

func init() {
// Increase max memcached request size to 20M bytes, to support large docs (attachments!)
// arriving in a tap feed. (see issues #210, #333, #342)
Expand Down Expand Up @@ -372,24 +361,6 @@ func IsCasMismatch(err error) bool {
return false
}

// Returns mutation feed type for bucket. Will first return the feed type from the spec, when present. If not found, returns default feed type for bucket
// (DCP for any couchbase bucket, TAP otherwise)
func GetFeedType(bucket Bucket) (feedType string) {
switch typedBucket := bucket.(type) {
case *GocbV2Bucket:
return DcpFeedType
case sgbucket.MutationFeedStore2:
return string(typedBucket.GetFeedType())
case *LeakyBucket:
return GetFeedType(typedBucket.bucket)
case *TestBucket:
return GetFeedType(typedBucket.Bucket)
default:
// unknown bucket type?
return TapFeedType
}
}

// Gets the bucket max TTL, or 0 if no TTL was set. Sync gateway should fail to bring the DB online if this is non-zero,
// since it's not meant to operate against buckets that auto-delete data.
func getMaxTTL(ctx context.Context, store CouchbaseBucketStore) (int, error) {
Expand Down
15 changes: 0 additions & 15 deletions base/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,10 +289,6 @@ func (b *GocbV2Bucket) StartDCPFeed(ctx context.Context, args sgbucket.FeedArgum
return StartGocbDCPFeed(ctx, b, b.Spec.BucketName, args, callback, dbStats, DCPMetadataStoreInMemory, groupID)
}

func (b *GocbV2Bucket) StartTapFeed(args sgbucket.FeedArguments, dbStats *expvar.Map) (sgbucket.MutationFeed, error) {
return nil, errors.New("StartTapFeed not implemented")
}

func (b *GocbV2Bucket) GetStatsVbSeqno(maxVbno uint16, useAbsHighSeqNo bool) (uuids map[uint16]uint64, highSeqnos map[uint16]uint64, seqErr error) {

agent, agentErr := b.getGoCBAgent()
Expand Down Expand Up @@ -721,14 +717,3 @@ func (b *GocbV2Bucket) ServerMetrics(ctx context.Context) (map[string]*dto.Metri

return mf, nil
}

func GetCollectionID(dataStore DataStore) uint32 {
switch c := dataStore.(type) {
case WrappingDatastore:
return GetCollectionID(c.GetUnderlyingDataStore())
case sgbucket.Collection:
return c.GetCollectionID()
default:
return DefaultCollectionID
}
}
213 changes: 7 additions & 206 deletions base/leaky_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import (
"context"
"expvar"
"fmt"
"math"
"time"

sgbucket "github.com/couchbase/sg-bucket"
)
Expand Down Expand Up @@ -123,12 +121,7 @@ type LeakyBucketConfig struct {
DDocDeleteErrorCount int
DDocGetErrorCount int

// Emulate TAP/DCP feed de-dupliation behavior, such that within a
// window of # of mutations or a timeout, mutations for a given document
// will be filtered such that only the _latest_ mutation will make it through.
TapFeedDeDuplication bool
TapFeedVbuckets bool // Emulate vbucket numbers on feed
TapFeedMissingDocs []string // Emulate entry not appearing on tap feed
DCPFeedMissingDocs []string // Emulate entry not appearing on DCP feed

ForceErrorSetRawKeys []string // Issuing a SetRaw call with a specified key will return an error

Expand Down Expand Up @@ -161,209 +154,17 @@ type LeakyBucketConfig struct {
IgnoreClose bool
}

func (b *LeakyBucket) StartTapFeed(args sgbucket.FeedArguments, dbStats *expvar.Map) (sgbucket.MutationFeed, error) {

if b.config.TapFeedDeDuplication {
return b.wrapFeedForDeduplication(args, dbStats)
} else if len(b.config.TapFeedMissingDocs) > 0 {
callback := func(event *sgbucket.FeedEvent) bool {
for _, key := range b.config.TapFeedMissingDocs {
func (b *LeakyBucket) StartDCPFeed(ctx context.Context, args sgbucket.FeedArguments, callback sgbucket.FeedEventCallbackFunc, dbStats *expvar.Map) error {
if len(b.config.DCPFeedMissingDocs) > 0 {
wrappedCallback := func(event sgbucket.FeedEvent) bool {
for _, key := range b.config.DCPFeedMissingDocs {
if string(event.Key) == key {
return false
}
}
return true
}
return b.wrapFeed(args, callback, dbStats)
} else if b.config.TapFeedVbuckets {
// kick off the wrapped sgbucket tap feed
walrusTapFeed, err := b.bucket.StartTapFeed(args, dbStats)
if err != nil {
return walrusTapFeed, err
}
// this is the sgbucket.MutationFeed impl we'll return to callers, which
// will add vbucket information
channel := make(chan sgbucket.FeedEvent, 10)
vbTapFeed := &wrappedTapFeedImpl{
channel: channel,
wrappedTapFeed: walrusTapFeed,
return callback(event)
}
go func() {
for event := range walrusTapFeed.Events() {
key := string(event.Key)
event.VbNo = uint16(sgbucket.VBHash(key, 1024))
vbTapFeed.channel <- event
}
close(vbTapFeed.channel)
}()
return vbTapFeed, nil

} else {
return b.bucket.StartTapFeed(args, dbStats)
return b.bucket.StartDCPFeed(ctx, args, wrappedCallback, dbStats)
}

}

func (b *LeakyBucket) StartDCPFeed(ctx context.Context, args sgbucket.FeedArguments, callback sgbucket.FeedEventCallbackFunc, dbStats *expvar.Map) error {
return b.bucket.StartDCPFeed(ctx, args, callback, dbStats)
}

type EventUpdateFunc func(event *sgbucket.FeedEvent) bool

func (b *LeakyBucket) wrapFeed(args sgbucket.FeedArguments, callback EventUpdateFunc, dbStats *expvar.Map) (sgbucket.MutationFeed, error) {

// kick off the wrapped sgbucket tap feed
walrusTapFeed, err := b.bucket.StartTapFeed(args, dbStats)
if err != nil {
return walrusTapFeed, err
}

// create an output channel
channel := make(chan sgbucket.FeedEvent, 10)

// this is the sgbucket.MutationFeed impl we'll return to callers, which
// will have missing entries
wrapperFeed := &wrappedTapFeedImpl{
channel: channel,
wrappedTapFeed: walrusTapFeed,
}

go func() {
for event := range walrusTapFeed.Events() {
// Callback returns false if the event should be skipped
if callback(&event) {
wrapperFeed.channel <- event
}
}
close(wrapperFeed.channel)
}()
return wrapperFeed, nil
}

func (b *LeakyBucket) wrapFeedForDeduplication(args sgbucket.FeedArguments, dbStats *expvar.Map) (sgbucket.MutationFeed, error) {
// create an output channel
// start a goroutine which reads off the sgbucket tap feed
// - de-duplicate certain events
// - puts them to output channel

// the number of changes that it will buffer up before de-duplicating
deDuplicationWindowSize := 5

// the timeout window in milliseconds after which it will flush to output, even if
// the dedupe buffer has not filled up yet.
deDuplicationTimeoutMs := time.Millisecond * 1000

// kick off the wrapped sgbucket tap feed
walrusTapFeed, err := b.bucket.StartTapFeed(args, dbStats)
if err != nil {
return walrusTapFeed, err
}

// create an output channel for de-duplicated events
channel := make(chan sgbucket.FeedEvent, 10)

// this is the sgbucket.MutationFeed impl we'll return to callers, which
// will reead from the de-duplicated events channel
dupeTapFeed := &wrappedTapFeedImpl{
channel: channel,
wrappedTapFeed: walrusTapFeed,
}

go func() {
defer close(dupeTapFeed.channel)
// the buffer to hold tap events that are candidates for de-duplication
deDupeBuffer := []sgbucket.FeedEvent{}

timer := time.NewTimer(math.MaxInt64)
for {
select {
case tapEvent, ok := <-walrusTapFeed.Events():
if !ok {
// channel closed, goroutine is done
// dedupe and send what we currently have
dedupeAndForward(deDupeBuffer, channel)
return
}
deDupeBuffer = append(deDupeBuffer, tapEvent)

// if we've collected enough, dedeupe and send what we have,
// and reset buffer.
if len(deDupeBuffer) >= deDuplicationWindowSize {
dedupeAndForward(deDupeBuffer, channel)
deDupeBuffer = []sgbucket.FeedEvent{}
} else {
_ = timer.Reset(deDuplicationTimeoutMs)
}

case <-timer.C:
if len(deDupeBuffer) > 0 {
// give up on waiting for the buffer to fill up,
// de-dupe and send what we currently have
dedupeAndForward(deDupeBuffer, channel)
deDupeBuffer = []sgbucket.FeedEvent{}
}
}
}

}()
return dupeTapFeed, nil
}

// An implementation of a sgbucket tap feed that wraps
// tap events on the upstream tap feed to better emulate real world
// TAP/DCP behavior.
type wrappedTapFeedImpl struct {
channel chan sgbucket.FeedEvent
wrappedTapFeed sgbucket.MutationFeed
}

func (feed *wrappedTapFeedImpl) Close() error {
return feed.wrappedTapFeed.Close()
}

func (feed *wrappedTapFeedImpl) Events() <-chan sgbucket.FeedEvent {
return feed.channel
}

func (feed *wrappedTapFeedImpl) WriteEvents() chan<- sgbucket.FeedEvent {
return feed.channel
}

func dedupeAndForward(tapEvents []sgbucket.FeedEvent, destChannel chan<- sgbucket.FeedEvent) {

deduped := dedupeTapEvents(tapEvents)

for _, tapEvent := range deduped {
destChannel <- tapEvent
}

}

func dedupeTapEvents(tapEvents []sgbucket.FeedEvent) []sgbucket.FeedEvent {

// For each document key, keep track of the latest seen tapEvent
// doc1 -> tapEvent with Seq=1
// doc2 -> tapEvent with Seq=5
// (if tapEvent with Seq=7 comes in for doc1, it will clobber existing)
latestTapEventPerKey := map[string]sgbucket.FeedEvent{}

for _, tapEvent := range tapEvents {
key := string(tapEvent.Key)
latestTapEventPerKey[key] = tapEvent
}

// Iterate over the original tapEvents, and only keep what
// is in latestTapEventPerKey, and discard all previous mutations
// of that doc. This will preserve the original
// sequence order as read off the feed.
deduped := []sgbucket.FeedEvent{}
for _, tapEvent := range tapEvents {
latestTapEventForKey := latestTapEventPerKey[string(tapEvent.Key)]
if tapEvent.Cas == latestTapEventForKey.Cas {
deduped = append(deduped, tapEvent)
}
}

return deduped

}
70 changes: 0 additions & 70 deletions base/leaky_bucket_test.go

This file was deleted.

Loading

0 comments on commit a9811cd

Please sign in to comment.