diff --git a/db/change_cache.go b/db/change_cache.go index daa92b15c1..bbeecdcbee 100644 --- a/db/change_cache.go +++ b/db/change_cache.go @@ -989,13 +989,7 @@ func (c *changeCache) PushSkipped(ctx context.Context, startSeq uint64, endSeq u base.InfofCtx(ctx, base.KeyCache, "cannot push negative skipped sequence range to skipped list: %d %d", startSeq, endSeq) return } - numSeqs := (endSeq - startSeq) + 1 - if numSeqs > MinSequencesForRange { - c.skippedSeqs.PushSkippedSequenceEntry(NewSkippedSequenceRangeEntry(startSeq, endSeq)) - } else { - // push sequences as separate entries - c.skippedSeqs.PushSkippedSequenceEntries(startSeq, endSeq, int64(numSeqs)) - } + c.skippedSeqs.PushSkippedSequenceEntry(NewSkippedSequenceRangeEntry(startSeq, endSeq)) } // waitForSequence blocks up to maxWaitTime until the given sequence has been received. diff --git a/db/change_cache_test.go b/db/change_cache_test.go index ecc35d05fb..0cd1ad5dd1 100644 --- a/db/change_cache_test.go +++ b/db/change_cache_test.go @@ -2056,7 +2056,7 @@ func TestProcessSkippedEntry(t *testing.T) { // assert this pushes an entry on the skipped sequence slice require.EventuallyWithT(t, func(c *assert.CollectT) { - assert.Equal(c, 19, len(testChangeCache.skippedSeqs.list)) + assert.Equal(c, 1, len(testChangeCache.skippedSeqs.list)) }, time.Second*10, time.Millisecond*100) // process some sequences over cache @@ -2130,13 +2130,13 @@ func TestProcessSkippedEntryStats(t *testing.T) { // assert this pushes an entry on the skipped sequence slice require.EventuallyWithT(t, func(c *assert.CollectT) { - assert.Equal(c, 19, len(testChangeCache.skippedSeqs.list)) + assert.Equal(c, 1, len(testChangeCache.skippedSeqs.list)) }, time.Second*10, time.Millisecond*100) // expected values for stats on skipped slice arrivingSeqs := []uint64{3, 15, 18, 2, 1} - expSliceLen := []int64{18, 17, 16, 15, 14} - expSliceCap := []int64{32, 32, 32, 32, 32} + expSliceLen := []int64{2, 3, 4, 4, 3} + expSliceCap := []int64{2, 4, 4, 4, 4} numSeqsInList := dbContext.DbStats.CacheStats.NumCurrentSeqsSkipped.Value() for j := 0; j < len(arrivingSeqs); j++ { @@ -2181,7 +2181,7 @@ func TestSkippedSequenceCompact(t *testing.T) { testChangeCache := &changeCache{} if err := testChangeCache.Init(ctx, dbContext, dbContext.channelCache, nil, &CacheOptions{ CachePendingSeqMaxWait: 5 * time.Millisecond, - CacheSkippedSeqMaxWait: 1 * time.Second, + CacheSkippedSeqMaxWait: 2 * time.Second, }, dbContext.MetadataKeys); err != nil { log.Printf("Init failed for testChangeCache: %v", err) t.Fail() @@ -2206,12 +2206,9 @@ func TestSkippedSequenceCompact(t *testing.T) { // assert this pushes an entry on the skipped sequence slice require.EventuallyWithT(t, func(c *assert.CollectT) { - assert.Equal(c, 19, len(testChangeCache.skippedSeqs.list)) + assert.Equal(c, 1, len(testChangeCache.skippedSeqs.list)) }, time.Second*10, time.Millisecond*100) - // manually call clean skipped sequence queue, avoiding race with actual wait time omn skipped - require.NoError(t, testChangeCache.CleanSkippedSequenceQueue(ctx)) - // assert that compaction empties the skipped slice and we have correct value for abandoned sequences require.EventuallyWithT(t, func(c *assert.CollectT) { testChangeCache.updateStats(ctx) @@ -2267,7 +2264,7 @@ func TestReleasedSequenceRangeHandlingEverythingSkipped(t *testing.T) { // assert that skipped list is filled and next seq at cache is updated require.EventuallyWithT(t, func(c *assert.CollectT) { testChangeCache.updateStats(ctx) - assert.Equal(c, int64(19), dbContext.DbStats.CacheStats.SkippedSeqLen.Value()) + assert.Equal(c, int64(1), dbContext.DbStats.CacheStats.SkippedSeqLen.Value()) assert.Equal(c, int64(19), dbContext.DbStats.CacheStats.NumCurrentSeqsSkipped.Value()) assert.Equal(c, uint64(21), testChangeCache.nextSequence) }, time.Second*10, time.Millisecond*100) @@ -2715,7 +2712,7 @@ func TestReleasedSequenceRangeHandlingEdgeCase2(t *testing.T) { // assert that the skipped list is filled require.EventuallyWithT(t, func(c *assert.CollectT) { testChangeCache.updateStats(ctx) - assert.Equal(c, int64(19), dbContext.DbStats.CacheStats.SkippedSeqLen.Value()) + assert.Equal(c, int64(1), dbContext.DbStats.CacheStats.SkippedSeqLen.Value()) assert.Equal(c, int64(19), dbContext.DbStats.CacheStats.NumCurrentSeqsSkipped.Value()) assert.Equal(c, int64(0), dbContext.DbStats.CacheStats.PendingSeqLen.Value()) dbContext.UpdateCalculatedStats(ctx) @@ -2797,7 +2794,7 @@ func TestReleasedSequenceRangeHandlingDuplicateSequencesInSkipped(t *testing.T) require.EventuallyWithT(t, func(c *assert.CollectT) { testChangeCache.updateStats(ctx) assert.Equal(c, int64(0), dbContext.DbStats.CacheStats.PendingSeqLen.Value()) - assert.Equal(c, int64(16), dbContext.DbStats.CacheStats.SkippedSeqLen.Value()) + assert.Equal(c, int64(2), dbContext.DbStats.CacheStats.SkippedSeqLen.Value()) assert.Equal(c, int64(16), dbContext.DbStats.CacheStats.NumCurrentSeqsSkipped.Value()) assert.Equal(c, uint64(19), testChangeCache.nextSequence) dbContext.UpdateCalculatedStats(ctx) @@ -2805,14 +2802,14 @@ func TestReleasedSequenceRangeHandlingDuplicateSequencesInSkipped(t *testing.T) }, time.Second*10, time.Millisecond*100) // process unusedSeq range with range containing duplicate sipped sequences - // Skipped should contain: (1,2,3,4,5,6,7,8,9,10,11,12,13), (15,16,17) before processing this range + // Skipped should contain: (1-13), (15-17) before processing this range testChangeCache.releaseUnusedSequenceRange(ctx, 10, 17, time.Now()) // assert skipped list altered to reflect the above range is processed require.EventuallyWithT(t, func(c *assert.CollectT) { testChangeCache.updateStats(ctx) assert.Equal(c, int64(0), dbContext.DbStats.CacheStats.PendingSeqLen.Value()) - assert.Equal(c, int64(9), dbContext.DbStats.CacheStats.SkippedSeqLen.Value()) + assert.Equal(c, int64(1), dbContext.DbStats.CacheStats.SkippedSeqLen.Value()) assert.Equal(c, int64(9), dbContext.DbStats.CacheStats.NumCurrentSeqsSkipped.Value()) assert.Equal(c, uint64(19), testChangeCache.nextSequence) dbContext.UpdateCalculatedStats(ctx) @@ -2855,104 +2852,6 @@ func TestReleasedSequenceRangeHandlingDuplicateSequencesInSkipped(t *testing.T) }, time.Second*10, time.Millisecond*100) } -// TestRangeInSkippedSplit: -// - Test skipped handling when skipped list has ranges -// - Purpose to ensure range handling isn't broken now changes are to have more single entries -func TestRangeInSkippedSplit(t *testing.T) { - base.SetUpTestLogging(t, base.LevelDebug, base.KeyCache) - - ctx := base.TestCtx(t) - bucket := base.GetTestBucket(t) - dbContext, err := NewDatabaseContext(ctx, "db", bucket, false, DatabaseContextOptions{ - Scopes: GetScopesOptions(t, bucket, 1), - }) - require.NoError(t, err) - defer dbContext.Close(ctx) - - ctx = dbContext.AddDatabaseLogContext(ctx) - err = dbContext.StartOnlineProcesses(ctx) - require.NoError(t, err) - - testChangeCache := &changeCache{} - if err := testChangeCache.Init(ctx, dbContext, dbContext.channelCache, nil, &CacheOptions{ - CachePendingSeqMaxWait: 20 * time.Minute, - CacheSkippedSeqMaxWait: 20 * time.Minute, - CachePendingSeqMaxNum: 0, - }, dbContext.MetadataKeys); err != nil { - log.Printf("Init failed for testChangeCache: %v", err) - t.Fail() - } - - if err := testChangeCache.Start(0); err != nil { - log.Printf("Start error for testChangeCache: %v", err) - t.Fail() - } - defer testChangeCache.Stop(ctx) - require.NoError(t, err) - - // entry that will create a skipped range entry on the list - entry := &LogEntry{ - Sequence: uint64(MinSequencesForRange + 10), - DocID: fmt.Sprintf("doc_%d", 50), - RevID: "1-abcdefabcdefabcdef", - TimeReceived: time.Now(), - TimeSaved: time.Now(), - } - _ = testChangeCache.processEntry(ctx, entry) - - // process unusedSeq range with range in middle of skipped entry - testChangeCache.releaseUnusedSequenceRange(ctx, 10, 17, time.Now()) - - // assert on stats - require.EventuallyWithT(t, func(c *assert.CollectT) { - testChangeCache.updateStats(ctx) - assert.Equal(c, int64(0), dbContext.DbStats.CacheStats.PendingSeqLen.Value()) - assert.Equal(c, int64(2), dbContext.DbStats.CacheStats.SkippedSeqLen.Value()) - assert.Equal(c, int64(31), dbContext.DbStats.CacheStats.NumCurrentSeqsSkipped.Value()) - assert.Equal(c, uint64(41), testChangeCache.nextSequence) - dbContext.UpdateCalculatedStats(ctx) - assert.Equal(c, int64(40), dbContext.DbStats.CacheStats.HighSeqCached.Value()) - }, time.Second*10, time.Millisecond*100) - - // expected values for stats on skipped slice - arrivingSeqs := []uint64{3, 30, 25, 27} - expSliceLen := []int64{3, 4, 5, 6} - expSliceCap := []int64{4, 4, 8, 8} - - testChangeCache.updateStats(ctx) - numSeqsInList := dbContext.DbStats.CacheStats.NumCurrentSeqsSkipped.Value() - for j := 0; j < len(arrivingSeqs); j++ { - newEntry := &LogEntry{ - DocID: fmt.Sprintf("doc_%d", arrivingSeqs), - RevID: "1-abcdefabcdefabcdef", - Sequence: arrivingSeqs[j], - } - - _ = testChangeCache.processEntry(ctx, newEntry) - // assert on skipped sequence slice stats - testChangeCache.updateStats(ctx) - assert.Equal(t, numSeqsInList-1, dbContext.DbStats.CacheStats.NumCurrentSeqsSkipped.Value()) - assert.Equal(t, expSliceLen[j], dbContext.DbStats.CacheStats.SkippedSeqLen.Value()) - assert.Equal(t, int64(39), dbContext.DbStats.CacheStats.NumSkippedSeqs.Value()) - assert.Equal(t, expSliceCap[j], dbContext.DbStats.CacheStats.SkippedSeqCap.Value()) - numSeqsInList = dbContext.DbStats.CacheStats.NumCurrentSeqsSkipped.Value() - } - - // empty list - testChangeCache.releaseUnusedSequenceRange(ctx, 1, 40, time.Now()) - - // assert on stats - require.EventuallyWithT(t, func(c *assert.CollectT) { - testChangeCache.updateStats(ctx) - assert.Equal(c, int64(0), dbContext.DbStats.CacheStats.PendingSeqLen.Value()) - assert.Equal(c, int64(0), dbContext.DbStats.CacheStats.SkippedSeqLen.Value()) - assert.Equal(c, int64(0), dbContext.DbStats.CacheStats.NumCurrentSeqsSkipped.Value()) - assert.Equal(c, uint64(41), testChangeCache.nextSequence) - dbContext.UpdateCalculatedStats(ctx) - assert.Equal(c, int64(40), dbContext.DbStats.CacheStats.HighSeqCached.Value()) - }, time.Second*10, time.Millisecond*100) -} - // getChanges is a synchronous convenience function that returns all changes as a simple array. This will fail the test if an error is returned. func getChanges(t *testing.T, collection *DatabaseCollectionWithUser, channels base.Set, options ChangesOptions) []*ChangeEntry { require.NotNil(t, options.ChangesCtx) @@ -3043,19 +2942,19 @@ func TestAddPendingLogs(t *testing.T) { // overlapping ranges, low range arrives first incoming: []sequenceRange{{4, 8}, {6, 10}}, expectedNextSequence: 11, - expectedSkipped: []sequenceRange{{1, 1}, {2, 2}, {3, 3}}, + expectedSkipped: []sequenceRange{{1, 3}}, }, { // completely overlapping ranges, larger range arrives first incoming: []sequenceRange{{4, 8}, {6, 8}}, expectedNextSequence: 9, - expectedSkipped: []sequenceRange{{1, 1}, {2, 2}, {3, 3}}, + expectedSkipped: []sequenceRange{{1, 3}}, }, { // range arrives then partly overlapping range arrives incoming: []sequenceRange{{4, 8}, {6, 10}}, expectedNextSequence: 11, - expectedSkipped: []sequenceRange{{1, 1}, {2, 2}, {3, 3}}, + expectedSkipped: []sequenceRange{{1, 3}}, }, { // range arrives, partly overlapping range arrives the single overlapping range @@ -3063,7 +2962,7 @@ func TestAddPendingLogs(t *testing.T) { channelName: "B", expectedNextSequence: 10, expectedCached: []uint64{9}, - expectedSkipped: []sequenceRange{{1, 1}, {2, 2}, {3, 3}}, + expectedSkipped: []sequenceRange{{1, 3}}, }, { // single range arrives, left side overlapping range arrives @@ -3077,13 +2976,13 @@ func TestAddPendingLogs(t *testing.T) { incoming: []sequenceRange{{4, 0}, {4, 5}}, channelName: "C", expectedNextSequence: 6, - expectedSkipped: []sequenceRange{{1, 1}, {2, 2}, {3, 3}}, + expectedSkipped: []sequenceRange{{1, 3}}, }, { // range arrives, lower end overlapping range arrives incoming: []sequenceRange{{6, 10}, {4, 8}}, expectedNextSequence: 11, - expectedSkipped: []sequenceRange{{1, 1}, {2, 2}, {3, 3}}, + expectedSkipped: []sequenceRange{{1, 3}}, }, } diff --git a/db/skipped_sequence.go b/db/skipped_sequence.go index a26eb83153..687e9cf382 100644 --- a/db/skipped_sequence.go +++ b/db/skipped_sequence.go @@ -23,7 +23,6 @@ import ( const ( DefaultClipCapacityHeadroom = 1000 - MinSequencesForRange = 30 // minimum number of sequences required to store entry as range ) // SkippedSequenceSlice stores the set of skipped sequences as an ordered slice of single skipped sequences @@ -232,7 +231,7 @@ func (s *SkippedSequenceSlice) _removeSeqRange(ctx context.Context, startSeq, en // put this below a check for !found to avoid out of bound error rangeElem := s.list[startIndex] if endSeq > rangeElem.getLastSeq() { - base.DebugfCtx(ctx, base.KeyCache, "sequence range %d to %d specified has sequences in that are not present in skipped list, or sequence range spans multiple skipped entries", startSeq, endSeq) + base.DebugfCtx(ctx, base.KeyCache, "sequence range %d to %d specified has sequences in that are not present in skipped list", startSeq, endSeq) return base.ErrSkippedSequencesMissing } @@ -328,35 +327,6 @@ func (s *SkippedSequenceSlice) _insert(index int, entry *SkippedSequenceListEntr s.list[index] = entry } -// PushSkippedSequenceEntries will push seq range to end of slice as separate single sequence entries unless the range -// being pushed in contiguous with end of the slice and that contiguous range is grater than the min range threshold -func (s *SkippedSequenceSlice) PushSkippedSequenceEntries(startSeq, endSeq uint64, numSeqsIncoming int64) { - s.lock.Lock() - defer s.lock.Unlock() - - // update num current skipped sequences count + the cumulative count of skipped sequences - s.NumCurrentSkippedSequences += numSeqsIncoming - s.NumCumulativeSkippedSequences += numSeqsIncoming - - // check if we should be extending the last entry on the slice - if len(s.list) != 0 { - index := len(s.list) - 1 - lastEntryLastSeq := s.list[index].getLastSeq() - totalSeqs := s.list[index].getNumSequencesInEntry() + numSeqsIncoming - if (lastEntryLastSeq+1) == startSeq && totalSeqs > MinSequencesForRange { - // adding contiguous sequence, and we are above the min threshold to hold a range in list - // set last seq in the range to the new arriving sequence + alter timestamp to incoming entries timestamp - s.list[index].extendRange(endSeq, time.Now().Unix()) - return - } - } - - // push all items separate to end of slice - for i := startSeq; i <= endSeq; i++ { - s.list = append(s.list, NewSingleSkippedSequenceEntry(i)) - } -} - // PushSkippedSequenceEntry will append a new skipped sequence entry to the end of the slice, if adding a contiguous // sequence function will expand the last entry of the slice to reflect this func (s *SkippedSequenceSlice) PushSkippedSequenceEntry(entry *SkippedSequenceListEntry) { @@ -375,13 +345,11 @@ func (s *SkippedSequenceSlice) PushSkippedSequenceEntry(entry *SkippedSequenceLi // get index of last entry + last seq of entry index := len(s.list) - 1 lastEntryLastSeq := s.list[index].getLastSeq() - totalSeqs := s.list[index].getNumSequencesInEntry() + entry.getNumSequencesInEntry() - if (lastEntryLastSeq+1) == entry.getStartSeq() && totalSeqs > MinSequencesForRange { - // adding contiguous sequence, and we are above the min threshold to hold a range in list + if (lastEntryLastSeq + 1) == entry.getStartSeq() { + // adding contiguous sequence // set last seq in the range to the new arriving sequence + alter timestamp to incoming entries timestamp s.list[index].extendRange(entry.getLastSeq(), entry.getTimestamp()) } else { - // add new entry as separate item s.list = append(s.list, entry) } @@ -429,7 +397,7 @@ func (s *SkippedSequenceSlice) processUnusedSequenceRangeAtSkipped(ctx context.C } } else if err != nil { // if we get here then the skipped list must be empty - base.InfofCtx(ctx, base.KeyCache, "error attempting to remove unused sequence range from skipped: %v", err) + base.InfofCtx(ctx, base.KeyCache, "error attempting to remove unused sequence range form skipped: %v", err) } } diff --git a/db/skipped_sequence_test.go b/db/skipped_sequence_test.go index 828d494e35..58c671adee 100644 --- a/db/skipped_sequence_test.go +++ b/db/skipped_sequence_test.go @@ -23,7 +23,6 @@ import ( // - Populate 10 single skipped sequence items in the slice // - Assert that each one is added in the correct order // - Assert that timestamp is increasing from the last entry (or equal to) -// - Add contiguous sequence to slice and assert that it is added as single range // - Add contiguous sequence to slice and assert that it extends the last element with a range func TestPushSingleSkippedSequence(t *testing.T) { skippedSlice := NewSkippedSequenceSlice(DefaultClipCapacityHeadroom) @@ -38,28 +37,17 @@ func TestPushSingleSkippedSequence(t *testing.T) { assert.GreaterOrEqual(t, skippedSlice.list[j].getTimestamp(), prevTime) prevTime = skippedSlice.list[j].getTimestamp() } - // add a new single entry that is contiguous with end of the slice which is results in thee range being less than - // MinRangeThreshold so wil be added as single entry + // add a new single entry that is contiguous with end of the slice which should replace last + // single entry with a range skippedSlice.PushSkippedSequenceEntry(NewSingleSkippedSequenceEntry(19)) // grab last entry in list index := len(skippedSlice.list) - 1 entry := skippedSlice.list[index] - // assert last entry is single entry and start + end sequence on range is as expected - assert.True(t, entry.singleEntry()) - assert.Equal(t, uint64(19), entry.getStartSeq()) - assert.Equal(t, uint64(19), entry.getLastSeq()) - - // add a new single entry that is contiguous with end of the slice which is results in the range being greater than - // MinRangeThreshold so wil be added as range entry - skippedSlice.PushSkippedSequenceEntry(NewSkippedSequenceRangeEntry(20, 50)) - // grab last entry in list - index = len(skippedSlice.list) - 1 - entry = skippedSlice.list[index] // assert last entry is range entry and start + end sequence on range is as expected assert.False(t, entry.singleEntry()) - assert.Equal(t, uint64(19), entry.getStartSeq()) - assert.Equal(t, uint64(50), entry.getLastSeq()) + assert.Equal(t, uint64(18), entry.getStartSeq()) + assert.Equal(t, uint64(19), entry.getLastSeq()) } // TestPushSkippedSequenceRange: @@ -84,9 +72,8 @@ func TestPushSkippedSequenceRange(t *testing.T) { prevTime = skippedSlice.list[j].getTimestamp() } - // add a new range entry that is contiguous with end of the slice which is greater then range threshold so should - // extend the existing last range - skippedSlice.PushSkippedSequenceEntry(NewSkippedSequenceRangeEntry(96, 142)) + // add a new range entry that is contiguous with end of the slice which should alter range last element in list + skippedSlice.PushSkippedSequenceEntry(NewSkippedSequenceRangeEntry(96, 110)) // grab last entry in list index := len(skippedSlice.list) - 1 entry := skippedSlice.list[index] @@ -94,13 +81,13 @@ func TestPushSkippedSequenceRange(t *testing.T) { // assert last entry is range entry and start + end sequence on range is as expected assert.False(t, entry.singleEntry()) assert.Equal(t, uint64(90), entry.getStartSeq()) - assert.Equal(t, uint64(142), entry.getLastSeq()) + assert.Equal(t, uint64(110), entry.getLastSeq()) // add new single entry that is not contiguous with last element on slice skippedSlice.PushSkippedSequenceEntry(NewSingleSkippedSequenceEntry(500)) // add new range that is contiguous with the single entry on the last element of the slice + garbage timestamp - // for later assertion. This new range will not be greater than min threshold so wil be added a separate range + // for later assertion newTimeStamp := time.Now().Unix() + 10000 skippedSlice.PushSkippedSequenceEntry(NewSkippedSequenceRangeEntryAt(501, 510, newTimeStamp)) @@ -109,7 +96,7 @@ func TestPushSkippedSequenceRange(t *testing.T) { // assert that last element in list is a range and holds sequences we expect + timestamp // is what the new pushed range above holds assert.False(t, entry.singleEntry()) - assert.Equal(t, uint64(501), entry.getStartSeq()) + assert.Equal(t, uint64(500), entry.getStartSeq()) assert.Equal(t, uint64(510), entry.getLastSeq()) assert.Equal(t, newTimeStamp, entry.getTimestamp()) } diff --git a/rest/changes_test.go b/rest/changes_test.go index 56ae42c7c8..20525f7b71 100644 --- a/rest/changes_test.go +++ b/rest/changes_test.go @@ -315,7 +315,7 @@ func TestJumpInSequencesAtAllocatorSkippedSequenceFill(t *testing.T) { // wait for value to move from pending to cache and skipped list to fill require.EventuallyWithT(t, func(c *assert.CollectT) { rt.GetDatabase().UpdateCalculatedStats(ctx) - assert.Equal(c, int64(18), rt.GetDatabase().DbStats.CacheStats.SkippedSeqLen.Value()) + assert.Equal(c, int64(1), rt.GetDatabase().DbStats.CacheStats.SkippedSeqLen.Value()) }, time.Second*10, time.Millisecond*100) docVrs := rt.UpdateDoc("doc", vrs, `{"prob": "lol"}`)