From b90ca44eb9203bdf48f461283b87100c925ff09d Mon Sep 17 00:00:00 2001 From: Faye Amacker <33205765+fxamacker@users.noreply.github.com> Date: Wed, 8 May 2024 14:48:59 -0500 Subject: [PATCH 1/6] Add NonderterministicFastCommit MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit NonderterministicFastCommit commits changes in nondeterministic order. It can be used by migration program when ordering isn't required. │ before.txt │ after.txt │ │ sec/op │ sec/op vs base │ StorageFastCommit/10-12 89.72µ ± 4% 57.50µ ± 3% -35.92% (p=0.000 n=10) StorageFastCommit/100-12 118.9µ ± 1% 116.0µ ± 4% ~ (p=0.436 n=10) StorageFastCommit/1000-12 4.086m ± 5% 2.397m ± 25% -41.35% (p=0.000 n=10) StorageFastCommit/10000-12 12.629m ± 4% 9.857m ± 3% -21.95% (p=0.000 n=10) StorageFastCommit/100000-12 102.73m ± 0% 72.26m ± 1% -29.66% (p=0.000 n=10) StorageFastCommit/1000000-12 1.544 ± 2% 1.141 ± 2% -26.09% (p=0.000 n=10) geomean 6.661m 4.848m -27.21% │ before.txt │ after.txt │ │ B/op │ B/op vs base │ StorageFastCommit/10-12 28.92Ki ± 0% 28.05Ki ± 0% -3.00% (p=0.000 n=10) StorageFastCommit/100-12 286.4Ki ± 0% 278.6Ki ± 0% -2.71% (p=0.000 n=10) StorageFastCommit/1000-12 3.009Mi ± 0% 2.901Mi ± 0% -3.58% (p=0.000 n=10) StorageFastCommit/10000-12 28.65Mi ± 0% 27.79Mi ± 0% -2.98% (p=0.000 n=10) StorageFastCommit/100000-12 278.8Mi ± 0% 271.1Mi ± 0% -2.75% (p=0.000 n=10) StorageFastCommit/1000000-12 2.923Gi ± 0% 2.821Gi ± 0% -3.49% (p=0.000 n=10) geomean 9.101Mi 8.820Mi -3.09% │ before.txt │ after.txt │ │ allocs/op │ allocs/op vs base │ StorageFastCommit/10-12 219.0 ± 0% 205.0 ± 0% -6.39% (p=0.000 n=10) StorageFastCommit/100-12 1.980k ± 0% 1.875k ± 0% -5.30% (p=0.000 n=10) StorageFastCommit/1000-12 19.23k ± 0% 18.23k ± 0% -5.22% (p=0.000 n=10) StorageFastCommit/10000-12 191.1k ± 0% 181.1k ± 0% -5.24% (p=0.000 n=10) StorageFastCommit/100000-12 1.918M ± 0% 1.816M ± 0% -5.30% (p=0.000 n=10) StorageFastCommit/1000000-12 19.15M ± 0% 18.15M ± 0% -5.22% (p=0.000 n=10) geomean 62.31k 58.91k -5.45% --- storage.go | 207 +++++++++++++++++++++++++++++++++++++++++- storage_bench_test.go | 134 +++++++++++++++++++++++++++ storage_test.go | 150 +++++++++++++++++++++++++++--- 3 files changed, 474 insertions(+), 17 deletions(-) create mode 100644 storage_bench_test.go diff --git a/storage.go b/storage.go index 53319ad..42faeef 100644 --- a/storage.go +++ b/storage.go @@ -726,12 +726,17 @@ func (s *PersistentSlabStorage) sortedOwnedDeltaKeys() []StorageID { } func (s *PersistentSlabStorage) Commit() error { - var err error // this part ensures the keys are sorted so commit operation is deterministic keysWithOwners := s.sortedOwnedDeltaKeys() - for _, id := range keysWithOwners { + return s.commit(keysWithOwners) +} + +func (s *PersistentSlabStorage) commit(keys []StorageID) error { + var err error + + for _, id := range keys { slab := s.deltas[id] // deleted slabs @@ -913,6 +918,204 @@ func (s *PersistentSlabStorage) FastCommit(numWorkers int) error { return nil } +// NonderterministicFastCommit commits changes in nondeterministic order. +// This is used by migration program when ordering isn't required. +func (s *PersistentSlabStorage) NonderterministicFastCommit(numWorkers int) error { + // No changes + if len(s.deltas) == 0 { + return nil + } + + type slabToBeEncoded struct { + slabID StorageID + slab Slab + } + + type encodedSlab struct { + slabID StorageID + data []byte + err error + } + + // Define encoder (worker) to encode slabs in parallel + encoder := func( + wg *sync.WaitGroup, + done <-chan struct{}, + jobs <-chan slabToBeEncoded, + results chan<- encodedSlab, + ) { + defer wg.Done() + + for job := range jobs { + // Check if goroutine is signaled to stop before proceeding. + select { + case <-done: + return + default: + } + + id := job.slabID + slab := job.slab + + if slab == nil { + results <- encodedSlab{ + slabID: id, + data: nil, + err: nil, + } + continue + } + + // Serialize + data, err := Encode(slab, s.cborEncMode) + results <- encodedSlab{ + slabID: id, + data: data, + err: err, + } + } + } + + // Modified slabs need to be encoded (in parallel) and stored in underlying storage. + modifiedSlabCount := 0 + // Deleted slabs need to be removed from underlying storage. + deletedSlabCount := 0 + for k, v := range s.deltas { + // Ignore slabs not owned by accounts + if k.Address == AddressUndefined { + continue + } + if v == nil { + deletedSlabCount++ + } else { + modifiedSlabCount++ + } + } + + if modifiedSlabCount == 0 && deletedSlabCount == 0 { + return nil + } + + if modifiedSlabCount < 2 { + // Avoid goroutine overhead + ids := make([]StorageID, 0, modifiedSlabCount+deletedSlabCount) + for k := range s.deltas { + // Ignore slabs not owned by accounts + if k.Address == AddressUndefined { + continue + } + ids = append(ids, k) + } + + return s.commit(ids) + } + + if numWorkers > modifiedSlabCount { + numWorkers = modifiedSlabCount + } + + var wg sync.WaitGroup + + // Create done signal channel + done := make(chan struct{}) + + // Create job queue + jobs := make(chan slabToBeEncoded, modifiedSlabCount) + + // Create result queue + results := make(chan encodedSlab, modifiedSlabCount) + + defer func() { + // This ensures that all goroutines are stopped before output channel is closed. + + // Wait for all goroutines to finish + wg.Wait() + + // Close output channel + close(results) + }() + + // Launch workers to encode slabs + wg.Add(numWorkers) + for i := 0; i < numWorkers; i++ { + go encoder(&wg, done, jobs, results) + } + + // Send jobs + deletedSlabIDs := make([]StorageID, 0, deletedSlabCount) + for k, v := range s.deltas { + // ignore the ones that are not owned by accounts + if k.Address == AddressUndefined { + continue + } + if v == nil { + deletedSlabIDs = append(deletedSlabIDs, k) + } else { + jobs <- slabToBeEncoded{k, v} + } + } + close(jobs) + + // Remove deleted slabs from underlying storage. + for _, id := range deletedSlabIDs { + + err := s.baseStorage.Remove(id) + if err != nil { + // Closing done channel signals goroutines to stop. + close(done) + // Wrap err as external error (if needed) because err is returned by BaseStorage interface. + return wrapErrorfAsExternalErrorIfNeeded(err, fmt.Sprintf("failed to remove slab %s", id)) + } + + // Deleted slabs are removed from deltas and added to read cache so that: + // 1. next read is from in-memory read cache + // 2. deleted slabs are not re-committed in next commit + s.cache[id] = nil + delete(s.deltas, id) + } + + // Process encoded slabs + for i := 0; i < modifiedSlabCount; i++ { + result := <-results + + if result.err != nil { + // Closing done channel signals goroutines to stop. + close(done) + // result.err is already categorized by Encode(). + return result.err + } + + id := result.slabID + data := result.data + + if data == nil { + // Closing done channel signals goroutines to stop. + close(done) + // This is unexpected because deleted slabs are processed separately. + return NewEncodingErrorf("unexpectd encoded empty data") + } + + // Store + err := s.baseStorage.Store(id, data) + if err != nil { + // Closing done channel signals goroutines to stop. + close(done) + // Wrap err as external error (if needed) because err is returned by BaseStorage interface. + return wrapErrorfAsExternalErrorIfNeeded(err, fmt.Sprintf("failed to store slab %s", id)) + } + + s.cache[id] = s.deltas[id] + // It's safe to remove slab from deltas because + // iteration is on non-temp slabs and temp slabs + // are still in deltas. + delete(s.deltas, id) + } + + // Do NOT reset deltas because slabs with empty address are not saved. + + return nil +} + func (s *PersistentSlabStorage) DropDeltas() { s.deltas = make(map[StorageID]Slab) } diff --git a/storage_bench_test.go b/storage_bench_test.go new file mode 100644 index 0000000..9acee84 --- /dev/null +++ b/storage_bench_test.go @@ -0,0 +1,134 @@ +/* + * Atree - Scalable Arrays and Ordered Maps + * + * Copyright 2024 Dapper Labs, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package atree + +import ( + "encoding/binary" + "math/rand" + "runtime" + "strconv" + "testing" + + "github.com/fxamacker/cbor/v2" + "github.com/stretchr/testify/require" +) + +func benchmarkFastCommit(b *testing.B, seed int64, numberOfSlabs int) { + r := rand.New(rand.NewSource(seed)) + + encMode, err := cbor.EncOptions{}.EncMode() + require.NoError(b, err) + + decMode, err := cbor.DecOptions{}.DecMode() + require.NoError(b, err) + + slabs := make([]Slab, numberOfSlabs) + for i := 0; i < numberOfSlabs; i++ { + addr := generateRandomAddress(r) + + var index StorageIndex + binary.BigEndian.PutUint64(index[:], uint64(i)) + + id := StorageID{addr, index} + + slabs[i] = generateLargeSlab(id) + } + + b.Run(strconv.Itoa(numberOfSlabs), func(b *testing.B) { + for i := 0; i < b.N; i++ { + b.StopTimer() + + baseStorage := NewInMemBaseStorage() + storage := NewPersistentSlabStorage(baseStorage, encMode, decMode, nil, nil) + + for _, slab := range slabs { + err = storage.Store(slab.ID(), slab) + require.NoError(b, err) + } + + b.StartTimer() + + err := storage.FastCommit(runtime.NumCPU()) + require.NoError(b, err) + } + }) +} + +func benchmarkNondeterministicFastCommit(b *testing.B, seed int64, numberOfSlabs int) { + r := rand.New(rand.NewSource(seed)) + + encMode, err := cbor.EncOptions{}.EncMode() + require.NoError(b, err) + + decMode, err := cbor.DecOptions{}.DecMode() + require.NoError(b, err) + + slabs := make([]Slab, numberOfSlabs) + for i := 0; i < numberOfSlabs; i++ { + addr := generateRandomAddress(r) + + var index StorageIndex + binary.BigEndian.PutUint64(index[:], uint64(i)) + + id := StorageID{addr, index} + + slabs[i] = generateLargeSlab(id) + } + + b.Run(strconv.Itoa(numberOfSlabs), func(b *testing.B) { + for i := 0; i < b.N; i++ { + b.StopTimer() + + baseStorage := NewInMemBaseStorage() + storage := NewPersistentSlabStorage(baseStorage, encMode, decMode, nil, nil) + + for _, slab := range slabs { + err = storage.Store(slab.ID(), slab) + require.NoError(b, err) + } + + b.StartTimer() + + err := storage.NonderterministicFastCommit(runtime.NumCPU()) + require.NoError(b, err) + } + }) +} + +func BenchmarkStorageFastCommit(b *testing.B) { + fixedSeed := int64(1234567) // intentionally use fixed constant rather than time, etc. + + benchmarkFastCommit(b, fixedSeed, 10) + benchmarkFastCommit(b, fixedSeed, 100) + benchmarkFastCommit(b, fixedSeed, 1_000) + benchmarkFastCommit(b, fixedSeed, 10_000) + benchmarkFastCommit(b, fixedSeed, 100_000) + benchmarkFastCommit(b, fixedSeed, 1_000_000) +} + +func BenchmarkStorageNondeterministicFastCommit(b *testing.B) { + fixedSeed := int64(1234567) // intentionally use fixed constant rather than time, etc. + + benchmarkNondeterministicFastCommit(b, fixedSeed, 10) + benchmarkNondeterministicFastCommit(b, fixedSeed, 100) + benchmarkNondeterministicFastCommit(b, fixedSeed, 1_000) + benchmarkNondeterministicFastCommit(b, fixedSeed, 10_000) + benchmarkNondeterministicFastCommit(b, fixedSeed, 100_000) + benchmarkNondeterministicFastCommit(b, fixedSeed, 1_000_000) +} diff --git a/storage_test.go b/storage_test.go index 6d08735..0b6298a 100644 --- a/storage_test.go +++ b/storage_test.go @@ -409,8 +409,8 @@ func TestBasicSlabStorageStore(t *testing.T) { r := newRand(t) address := Address{1} slabs := map[StorageID]Slab{ - {address, StorageIndex{1}}: generateRandomSlab(address, r), - {address, StorageIndex{2}}: generateRandomSlab(address, r), + {address, StorageIndex{1}}: generateRandomSlab(StorageID{address, StorageIndex{1}}, r), + {address, StorageIndex{2}}: generateRandomSlab(StorageID{address, StorageIndex{2}}, r), } // Store values @@ -421,7 +421,7 @@ func TestBasicSlabStorageStore(t *testing.T) { // Overwrite stored values for id := range slabs { - slab := generateRandomSlab(id.Address, r) + slab := generateRandomSlab(id, r) slabs[id] = slab err := storage.Store(id, slab) require.NoError(t, err) @@ -443,7 +443,7 @@ func TestBasicSlabStorageRetrieve(t *testing.T) { r := newRand(t) id := StorageID{Address{1}, StorageIndex{1}} - slab := generateRandomSlab(id.Address, r) + slab := generateRandomSlab(id, r) // Retrieve value from empty storage retrievedSlab, found, err := storage.Retrieve(id) @@ -473,7 +473,7 @@ func TestBasicSlabStorageRemove(t *testing.T) { r := newRand(t) id := StorageID{Address{1}, StorageIndex{1}} - slab := generateRandomSlab(id.Address, r) + slab := generateRandomSlab(id, r) // Remove value from empty storage err := storage.Remove(id) @@ -543,7 +543,7 @@ func TestBasicSlabStorageStorageIDs(t *testing.T) { // Store values for id := range wantIDs { - err := storage.Store(id, generateRandomSlab(id.Address, r)) + err := storage.Store(id, generateRandomSlab(id, r)) require.NoError(t, err) } @@ -566,9 +566,9 @@ func TestBasicSlabStorageSlabIterat(t *testing.T) { id3 := StorageID{Address: address, Index: index.Next()} want := map[StorageID]Slab{ - id1: generateRandomSlab(id1.Address, r), - id2: generateRandomSlab(id2.Address, r), - id3: generateRandomSlab(id3.Address, r), + id1: generateRandomSlab(id1, r), + id2: generateRandomSlab(id2, r), + id3: generateRandomSlab(id3, r), } storage := NewBasicSlabStorage(nil, nil, nil, nil) @@ -639,8 +639,8 @@ func TestPersistentStorage(t *testing.T) { permStorageID, err := NewStorageIDFromRawBytes([]byte{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}) require.NoError(t, err) - slab1 := generateRandomSlab(tempStorageID.Address, r) - slab2 := generateRandomSlab(permStorageID.Address, r) + slab1 := generateRandomSlab(tempStorageID, r) + slab2 := generateRandomSlab(permStorageID, r) // no temp ids should be in the base storage err = storage.Store(tempStorageID, slab1) @@ -721,8 +721,10 @@ func TestPersistentStorage(t *testing.T) { numberOfSlabsPerAccount := 10 r := newRand(t) + baseStorage := newAccessOrderTrackerBaseStorage() storage := NewPersistentSlabStorage(baseStorage, encMode, decMode, nil, nil) + baseStorage2 := newAccessOrderTrackerBaseStorage() storageWithFastCommit := NewPersistentSlabStorage(baseStorage2, encMode, decMode, nil, nil) @@ -732,16 +734,19 @@ func TestPersistentStorage(t *testing.T) { for i := 0; i < numberOfAccounts; i++ { for j := 0; j < numberOfSlabsPerAccount; j++ { addr := generateRandomAddress(r) - slab := generateRandomSlab(addr, r) - slabSize += uint64(slab.ByteSize()) storageID, err := storage.GenerateStorageID(addr) require.NoError(t, err) + + slab := generateRandomSlab(storageID, r) + slabSize += uint64(slab.ByteSize()) + err = storage.Store(storageID, slab) require.NoError(t, err) storageID2, err := storageWithFastCommit.GenerateStorageID(addr) require.NoError(t, err) + err = storageWithFastCommit.Store(storageID2, slab) require.NoError(t, err) @@ -1072,12 +1077,12 @@ func TestPersistentStorageGenerateStorageID(t *testing.T) { }) } -func generateRandomSlab(address Address, r *rand.Rand) Slab { +func generateRandomSlab(id StorageID, r *rand.Rand) Slab { storable := Uint64Value(r.Uint64()) return &ArrayDataSlab{ header: ArraySlabHeader{ - id: NewStorageID(address, StorageIndex{1}), + id: id, size: arrayRootDataSlabPrefixSize + storable.ByteSize(), count: 1, }, @@ -1085,6 +1090,28 @@ func generateRandomSlab(address Address, r *rand.Rand) Slab { } } +func generateLargeSlab(id StorageID) Slab { + + const elementCount = 100 + + storables := make([]Storable, elementCount) + size := uint32(0) + for i := 0; i < elementCount; i++ { + storable := Uint64Value(uint64(i)) + size += storable.ByteSize() + storables[i] = storable + } + + return &ArrayDataSlab{ + header: ArraySlabHeader{ + id: id, + size: arrayRootDataSlabPrefixSize + size, + count: elementCount, + }, + elements: storables, + } +} + func generateRandomAddress(r *rand.Rand) Address { address := Address{} r.Read(address[:]) @@ -4794,3 +4821,96 @@ func testGetAllChildReferences( require.Equal(t, len(expectedBrokenRefIDs), len(brokenRefs)) require.ElementsMatch(t, expectedBrokenRefIDs, brokenRefs) } + +func TestStorageNondeterministicFastCommit(t *testing.T) { + numberOfAccounts := 10 + + t.Run("small", func(t *testing.T) { + numberOfSlabsPerAccount := 10 + testStorageNondeterministicFastCommit(t, numberOfAccounts, numberOfSlabsPerAccount) + }) + + t.Run("large", func(t *testing.T) { + numberOfSlabsPerAccount := 1_000 + testStorageNondeterministicFastCommit(t, numberOfAccounts, numberOfSlabsPerAccount) + }) +} + +func testStorageNondeterministicFastCommit(t *testing.T, numberOfAccounts int, numberOfSlabsPerAccount int) { + encMode, err := cbor.EncOptions{}.EncMode() + require.NoError(t, err) + + decMode, err := cbor.DecOptions{}.DecMode() + require.NoError(t, err) + + r := newRand(t) + + baseStorage := NewInMemBaseStorage() + storage := NewPersistentSlabStorage(baseStorage, encMode, decMode, nil, nil) + + encodedSlabs := make(map[StorageID][]byte) + slabSize := uint64(0) + + // Storage slabs + for i := 0; i < numberOfAccounts; i++ { + + addr := generateRandomAddress(r) + + for j := 0; j < numberOfSlabsPerAccount; j++ { + + slabID, err := storage.GenerateStorageID(addr) + require.NoError(t, err) + + slab := generateRandomSlab(slabID, r) + slabSize += uint64(slab.ByteSize()) + + err = storage.Store(slabID, slab) + require.NoError(t, err) + + // capture data for accuracy testing + encodedSlabs[slabID], err = Encode(slab, encMode) + require.NoError(t, err) + } + } + + require.Equal(t, uint(len(encodedSlabs)), storage.DeltasWithoutTempAddresses()) + require.Equal(t, slabSize, storage.DeltasSizeWithoutTempAddresses()) + + // Commit deltas + err = storage.NonderterministicFastCommit(10) + require.NoError(t, err) + + require.Equal(t, uint(0), storage.DeltasWithoutTempAddresses()) + require.Equal(t, uint64(0), storage.DeltasSizeWithoutTempAddresses()) + require.Equal(t, len(encodedSlabs), storage.Count()) + + // Compare encoded data + for sid, value := range encodedSlabs { + storedValue, found, err := baseStorage.Retrieve(sid) + require.NoError(t, err) + require.True(t, found) + require.Equal(t, value, storedValue) + } + + // Remove all slabs from storage + for sid := range encodedSlabs { + err = storage.Remove(sid) + require.NoError(t, err) + require.Equal(t, uint64(0), storage.DeltasSizeWithoutTempAddresses()) + } + + // Commit deltas + err = storage.NonderterministicFastCommit(10) + require.NoError(t, err) + + require.Equal(t, 0, storage.Count()) + require.Equal(t, uint64(0), storage.DeltasSizeWithoutTempAddresses()) + + // Check remove functionality + for sid := range encodedSlabs { + storedValue, found, err := storage.Retrieve(sid) + require.NoError(t, err) + require.False(t, found) + require.Nil(t, storedValue) + } +} From a0bcce211557a7bdf48e23aa0032ca5900393390 Mon Sep 17 00:00:00 2001 From: Faye Amacker <33205765+fxamacker@users.noreply.github.com> Date: Thu, 9 May 2024 10:01:55 -0500 Subject: [PATCH 2/6] Add more tests --- storage_test.go | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/storage_test.go b/storage_test.go index 0b6298a..e7c9388 100644 --- a/storage_test.go +++ b/storage_test.go @@ -4823,14 +4823,32 @@ func testGetAllChildReferences( } func TestStorageNondeterministicFastCommit(t *testing.T) { - numberOfAccounts := 10 + t.Run("0 slabs", func(t *testing.T) { + numberOfAccounts := 0 + numberOfSlabsPerAccount := 0 + testStorageNondeterministicFastCommit(t, numberOfAccounts, numberOfSlabsPerAccount) + }) + + t.Run("1 slabs", func(t *testing.T) { + numberOfAccounts := 1 + numberOfSlabsPerAccount := 1 + testStorageNondeterministicFastCommit(t, numberOfAccounts, numberOfSlabsPerAccount) + }) + + t.Run("10 slabs", func(t *testing.T) { + numberOfAccounts := 1 + numberOfSlabsPerAccount := 10 + testStorageNondeterministicFastCommit(t, numberOfAccounts, numberOfSlabsPerAccount) + }) - t.Run("small", func(t *testing.T) { + t.Run("100 slabs", func(t *testing.T) { + numberOfAccounts := 10 numberOfSlabsPerAccount := 10 testStorageNondeterministicFastCommit(t, numberOfAccounts, numberOfSlabsPerAccount) }) - t.Run("large", func(t *testing.T) { + t.Run("10_000 slabs", func(t *testing.T) { + numberOfAccounts := 10 numberOfSlabsPerAccount := 1_000 testStorageNondeterministicFastCommit(t, numberOfAccounts, numberOfSlabsPerAccount) }) From 94d407f1bfad7f758e9d8e50d2df383b493f3c60 Mon Sep 17 00:00:00 2001 From: Faye Amacker <33205765+fxamacker@users.noreply.github.com> Date: Mon, 13 May 2024 14:18:29 -0500 Subject: [PATCH 3/6] Refactor to iterate deltas once in NonderterministicFastCommit This change reduces number of lines in the function but is not expected to yield significant speed improvements. --- storage.go | 36 ++++++++++++++++-------------------- 1 file changed, 16 insertions(+), 20 deletions(-) diff --git a/storage.go b/storage.go index 42faeef..b0babef 100644 --- a/storage.go +++ b/storage.go @@ -976,6 +976,12 @@ func (s *PersistentSlabStorage) NonderterministicFastCommit(numWorkers int) erro } } + // slabIDsWithOwner contains slab IDs with owner: + // - modified slab IDs are stored from front to back + // - deleted slab IDs are stored from back to front + // This is to avoid extra allocations. + slabIDsWithOwner := make([]StorageID, len(s.deltas)) + // Modified slabs need to be encoded (in parallel) and stored in underlying storage. modifiedSlabCount := 0 // Deleted slabs need to be removed from underlying storage. @@ -986,27 +992,26 @@ func (s *PersistentSlabStorage) NonderterministicFastCommit(numWorkers int) erro continue } if v == nil { + index := len(slabIDsWithOwner) - 1 - deletedSlabCount + slabIDsWithOwner[index] = k deletedSlabCount++ } else { + slabIDsWithOwner[modifiedSlabCount] = k modifiedSlabCount++ } } + modifiedSlabIDs := slabIDsWithOwner[:modifiedSlabCount] + + deletedSlabIDs := slabIDsWithOwner[len(slabIDsWithOwner)-deletedSlabCount:] + if modifiedSlabCount == 0 && deletedSlabCount == 0 { return nil } if modifiedSlabCount < 2 { // Avoid goroutine overhead - ids := make([]StorageID, 0, modifiedSlabCount+deletedSlabCount) - for k := range s.deltas { - // Ignore slabs not owned by accounts - if k.Address == AddressUndefined { - continue - } - ids = append(ids, k) - } - + ids := append(modifiedSlabIDs, deletedSlabIDs...) return s.commit(ids) } @@ -1042,17 +1047,8 @@ func (s *PersistentSlabStorage) NonderterministicFastCommit(numWorkers int) erro } // Send jobs - deletedSlabIDs := make([]StorageID, 0, deletedSlabCount) - for k, v := range s.deltas { - // ignore the ones that are not owned by accounts - if k.Address == AddressUndefined { - continue - } - if v == nil { - deletedSlabIDs = append(deletedSlabIDs, k) - } else { - jobs <- slabToBeEncoded{k, v} - } + for _, id := range modifiedSlabIDs { + jobs <- slabToBeEncoded{id, s.deltas[id]} } close(jobs) From 75e4960cac10440e2f61175e86c8cb26208c7747 Mon Sep 17 00:00:00 2001 From: Faye Amacker <33205765+fxamacker@users.noreply.github.com> Date: Mon, 13 May 2024 15:14:19 -0500 Subject: [PATCH 4/6] Lint --- storage.go | 4 ++-- storage_bench_test.go | 2 +- storage_test.go | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/storage.go b/storage.go index b0babef..a25117f 100644 --- a/storage.go +++ b/storage.go @@ -918,9 +918,9 @@ func (s *PersistentSlabStorage) FastCommit(numWorkers int) error { return nil } -// NonderterministicFastCommit commits changes in nondeterministic order. +// NondeterministicFastCommit commits changes in nondeterministic order. // This is used by migration program when ordering isn't required. -func (s *PersistentSlabStorage) NonderterministicFastCommit(numWorkers int) error { +func (s *PersistentSlabStorage) NondeterministicFastCommit(numWorkers int) error { // No changes if len(s.deltas) == 0 { return nil diff --git a/storage_bench_test.go b/storage_bench_test.go index 9acee84..910126c 100644 --- a/storage_bench_test.go +++ b/storage_bench_test.go @@ -105,7 +105,7 @@ func benchmarkNondeterministicFastCommit(b *testing.B, seed int64, numberOfSlabs b.StartTimer() - err := storage.NonderterministicFastCommit(runtime.NumCPU()) + err := storage.NondeterministicFastCommit(runtime.NumCPU()) require.NoError(b, err) } }) diff --git a/storage_test.go b/storage_test.go index e7c9388..bfee615 100644 --- a/storage_test.go +++ b/storage_test.go @@ -4895,7 +4895,7 @@ func testStorageNondeterministicFastCommit(t *testing.T, numberOfAccounts int, n require.Equal(t, slabSize, storage.DeltasSizeWithoutTempAddresses()) // Commit deltas - err = storage.NonderterministicFastCommit(10) + err = storage.NondeterministicFastCommit(10) require.NoError(t, err) require.Equal(t, uint(0), storage.DeltasWithoutTempAddresses()) @@ -4918,7 +4918,7 @@ func testStorageNondeterministicFastCommit(t *testing.T, numberOfAccounts int, n } // Commit deltas - err = storage.NonderterministicFastCommit(10) + err = storage.NondeterministicFastCommit(10) require.NoError(t, err) require.Equal(t, 0, storage.Count()) From 562f2fcde9250ac3e3bfa93ed4470f3d57063d30 Mon Sep 17 00:00:00 2001 From: Faye Amacker <33205765+fxamacker@users.noreply.github.com> Date: Mon, 13 May 2024 15:33:12 -0500 Subject: [PATCH 5/6] Lint --- storage.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/storage.go b/storage.go index a25117f..e9584b7 100644 --- a/storage.go +++ b/storage.go @@ -1010,8 +1010,10 @@ func (s *PersistentSlabStorage) NondeterministicFastCommit(numWorkers int) error } if modifiedSlabCount < 2 { - // Avoid goroutine overhead - ids := append(modifiedSlabIDs, deletedSlabIDs...) + // Avoid goroutine overhead. + // Return after committing modified and deleted slabs. + ids := modifiedSlabIDs + ids = append(ids, deletedSlabIDs...) return s.commit(ids) } From 7b4efdbabe954e0a1f2d597f1b2bfb065987fda9 Mon Sep 17 00:00:00 2001 From: Faye Amacker <33205765+fxamacker@users.noreply.github.com> Date: Tue, 14 May 2024 07:20:30 -0500 Subject: [PATCH 6/6] Improve code readability by adding comments, etc. --- storage.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/storage.go b/storage.go index e9584b7..6f20076 100644 --- a/storage.go +++ b/storage.go @@ -986,17 +986,19 @@ func (s *PersistentSlabStorage) NondeterministicFastCommit(numWorkers int) error modifiedSlabCount := 0 // Deleted slabs need to be removed from underlying storage. deletedSlabCount := 0 - for k, v := range s.deltas { + for id, slab := range s.deltas { // Ignore slabs not owned by accounts - if k.Address == AddressUndefined { + if id.Address == AddressUndefined { continue } - if v == nil { + if slab == nil { + // Set deleted slab ID from the end of slabIDsWithOwner. index := len(slabIDsWithOwner) - 1 - deletedSlabCount - slabIDsWithOwner[index] = k + slabIDsWithOwner[index] = id deletedSlabCount++ } else { - slabIDsWithOwner[modifiedSlabCount] = k + // Set modified slab ID from the start of slabIDsWithOwner. + slabIDsWithOwner[modifiedSlabCount] = id modifiedSlabCount++ } }