diff --git a/beacon-chain/db/filesystem/BUILD.bazel b/beacon-chain/db/filesystem/BUILD.bazel index 2d186a74fd5e..fe1b4fdac51a 100644 --- a/beacon-chain/db/filesystem/BUILD.bazel +++ b/beacon-chain/db/filesystem/BUILD.bazel @@ -6,6 +6,7 @@ go_library( "blob.go", "ephemeral.go", "metrics.go", + "pruner.go", ], importpath = "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filesystem", visibility = ["//visibility:public"], @@ -19,7 +20,6 @@ go_library( "//proto/prysm/v1alpha1:go_default_library", "//runtime/logging:go_default_library", "//time/slots:go_default_library", - "@com_github_ethereum_go_ethereum//common/hexutil:go_default_library", "@com_github_pkg_errors//:go_default_library", "@com_github_prometheus_client_golang//prometheus:go_default_library", "@com_github_prometheus_client_golang//prometheus/promauto:go_default_library", @@ -30,7 +30,10 @@ go_library( go_test( name = "go_default_test", - srcs = ["blob_test.go"], + srcs = [ + "blob_test.go", + "pruner_test.go", + ], embed = [":go_default_library"], deps = [ "//beacon-chain/verification:go_default_library", @@ -40,7 +43,6 @@ go_test( "//proto/prysm/v1alpha1:go_default_library", "//testing/require:go_default_library", "//testing/util:go_default_library", - "@com_github_ethereum_go_ethereum//common/hexutil:go_default_library", "@com_github_pkg_errors//:go_default_library", "@com_github_prysmaticlabs_fastssz//:go_default_library", "@com_github_spf13_afero//:go_default_library", diff --git a/beacon-chain/db/filesystem/blob.go b/beacon-chain/db/filesystem/blob.go index 808b787409fc..17ad605cc08b 100644 --- a/beacon-chain/db/filesystem/blob.go +++ b/beacon-chain/db/filesystem/blob.go @@ -1,28 +1,21 @@ package filesystem import ( - "encoding/binary" "fmt" - "io" "os" "path" - "path/filepath" "strconv" "strings" - "sync/atomic" "time" - "github.com/ethereum/go-ethereum/common/hexutil" "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/v4/beacon-chain/verification" fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams" - "github.com/prysmaticlabs/prysm/v4/config/params" "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" "github.com/prysmaticlabs/prysm/v4/io/file" ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v4/runtime/logging" - "github.com/prysmaticlabs/prysm/v4/time/slots" log "github.com/sirupsen/logrus" "github.com/spf13/afero" ) @@ -35,7 +28,6 @@ const ( sszExt = "ssz" partExt = "part" - bufferEpochs = 2 directoryPermissions = 0700 ) @@ -45,11 +37,11 @@ type BlobStorageOption func(*BlobStorage) error // WithBlobRetentionEpochs is an option that changes the number of epochs blobs will be persisted. func WithBlobRetentionEpochs(e primitives.Epoch) BlobStorageOption { return func(b *BlobStorage) error { - s, err := slots.EpochStart(e + bufferEpochs) + pruner, err := newBlobPruner(b.fs, e) if err != nil { - return errors.Wrap(err, "could not set retentionSlots") + return err } - b.retentionSlots = s + b.pruner = pruner return nil } } @@ -71,14 +63,29 @@ func NewBlobStorage(base string, opts ...BlobStorageOption) (*BlobStorage, error return nil, fmt.Errorf("failed to create blob storage at %s: %w", base, err) } } + if b.pruner == nil { + log.Warn("Initializing blob filesystem storage with pruning disabled") + } return b, nil } // BlobStorage is the concrete implementation of the filesystem backend for saving and retrieving BlobSidecars. type BlobStorage struct { - fs afero.Fs - retentionSlots primitives.Slot - prunedBefore atomic.Uint64 + fs afero.Fs + pruner *blobPruner +} + +// WarmCache runs the prune routine with an expiration of slot of 0, so nothing will be pruned, but the pruner's cache +// will be populated at node startup, avoiding a costly cold prune (~4s in syscalls) during syncing. +func (bs *BlobStorage) WarmCache() { + if bs.pruner == nil { + return + } + go func() { + if err := bs.pruner.prune(0); err != nil { + log.WithError(err).Error("Error encountered while warming up blob pruner cache.") + } + }() } // Save saves blobs given a list of sidecars. @@ -94,7 +101,9 @@ func (bs *BlobStorage) Save(sidecar blocks.VerifiedROBlob) error { log.WithFields(logging.BlobFields(sidecar.ROBlob)).Debug("ignoring a duplicate blob sidecar Save attempt") return nil } - bs.tryPrune(sidecar.Slot()) + if bs.pruner != nil { + bs.pruner.notify(sidecar.BlockRoot(), sidecar.Slot()) + } // Serialize the ethpb.BlobSidecar to binary data using SSZ. sidecarData, err := sidecar.MarshalSSZ() @@ -106,8 +115,12 @@ func (bs *BlobStorage) Save(sidecar blocks.VerifiedROBlob) error { } partPath := fname.partPath() + partialMoved := false // Ensure the partial file is deleted. defer func() { + if partialMoved { + return + } // It's expected to error if the save is successful. err = bs.fs.Remove(partPath) if err == nil { @@ -141,7 +154,8 @@ func (bs *BlobStorage) Save(sidecar blocks.VerifiedROBlob) error { if err != nil { return errors.Wrap(err, "failed to rename partial file to final name") } - blobsTotalGauge.Inc() + partialMoved = true + blobsWrittenCounter.Inc() blobSaveLatency.Observe(time.Since(startTime).Seconds()) return nil } @@ -218,7 +232,7 @@ func namerForSidecar(sc blocks.VerifiedROBlob) blobNamer { } func (p blobNamer) dir() string { - return fmt.Sprintf("%#x", p.root) + return rootString(p.root) } func (p blobNamer) fname(ext string) string { @@ -233,133 +247,6 @@ func (p blobNamer) path() string { return p.fname(sszExt) } -// Prune prunes blobs in the base directory based on the retention epoch. -// It deletes blobs older than currentEpoch - (retentionEpochs+bufferEpochs). -// This is so that we keep a slight buffer and blobs are deleted after n+2 epochs. -func (bs *BlobStorage) Prune(pruneBefore primitives.Slot) error { - t := time.Now() - - var dirs []string - err := afero.Walk(bs.fs, ".", func(path string, info os.FileInfo, err error) error { - if err != nil { - return errors.Wrap(err, "failed to walk blob storage directory") - } - if info.IsDir() && path != "." { - dirs = append(dirs, path) - } - return nil - }) - if err != nil { - return errors.Wrap(err, "failed to build directories list") - } - - var totalPruned int - for _, dir := range dirs { - num, err := bs.processFolder(dir, pruneBefore) - if err != nil { - return errors.Wrapf(err, "failed to process folder %s", dir) - } - blobsPrunedCounter.Add(float64(num)) - blobsTotalGauge.Add(-float64(num)) - totalPruned += num - } - - if totalPruned > 0 { - pruneTime := time.Since(t) - log.WithFields(log.Fields{ - "lastPrunedEpoch": slots.ToEpoch(pruneBefore), - "pruneTime": pruneTime, - "numberBlobsPruned": totalPruned, - }).Debug("Pruned old blobs") - } - - return nil -} - -// processFolder will delete the folder of blobs if the blob slot is outside the -// retention period. We determine the slot by looking at the first blob in the folder. -func (bs *BlobStorage) processFolder(folder string, pruneBefore primitives.Slot) (int, error) { - var f afero.File - var err error - var slot primitives.Slot - - for i := 0; i < fieldparams.MaxBlobsPerBlock; i++ { - f, err = bs.fs.Open(filepath.Join(folder, fmt.Sprintf("%d.%s", i, sszExt))) - if err != nil { - log.WithError(err).Debug("Could not open blob file") - continue - } - - slot, err = slotFromBlob(f) - if closeErr := f.Close(); closeErr != nil { - log.WithError(closeErr).Error("Could not close blob file") - } - if err != nil { - log.WithError(err).Error("Could not read from blob file") - continue - } - if slot != 0 { - break - } - } - - if slot == 0 { - log.WithField("folder", folder).Warn("Could not find slot for folder") - return 0, nil - } - - var num int - if slot < pruneBefore { - num, err = bs.countFiles(folder) - if err != nil { - return 0, err - } - if err = bs.fs.RemoveAll(folder); err != nil { - return 0, errors.Wrapf(err, "failed to delete blob %s", f.Name()) - } - } - return num, nil -} - -// slotFromBlob reads the ssz data of a file at the specified offset (8 + 131072 + 48 + 48 = 131176 bytes), -// which is calculated based on the size of the BlobSidecar struct and is based on the size of the fields -// preceding the slot information within SignedBeaconBlockHeader. -func slotFromBlob(at io.ReaderAt) (primitives.Slot, error) { - b := make([]byte, 8) - _, err := at.ReadAt(b, 131176) - if err != nil { - return 0, err - } - rawSlot := binary.LittleEndian.Uint64(b) - return primitives.Slot(rawSlot), nil -} - -// Delete removes the directory matching the provided block root and all the blobs it contains. -func (bs *BlobStorage) Delete(root [32]byte) error { - if err := bs.fs.RemoveAll(hexutil.Encode(root[:])); err != nil { - return fmt.Errorf("failed to delete blobs for root %#x: %w", root, err) - } - return nil -} - -// tryPrune checks whether we should prune and then calls prune -func (bs *BlobStorage) tryPrune(latest primitives.Slot) { - pruned := uint64(pruneBefore(latest, bs.retentionSlots)) - if bs.prunedBefore.Swap(pruned) == pruned { - return - } - go func() { - if err := bs.Prune(primitives.Slot(pruned)); err != nil { - log.WithError(err).Errorf("failed to prune blobs from slot %d", latest) - } - }() -} - -func pruneBefore(latest primitives.Slot, offset primitives.Slot) primitives.Slot { - // Safely compute the first slot in the epoch for the latest slot - latest = latest - latest%params.BeaconConfig().SlotsPerEpoch - if latest < offset { - return 0 - } - return latest - offset +func rootString(root [32]byte) string { + return fmt.Sprintf("%#x", root) } diff --git a/beacon-chain/db/filesystem/blob_test.go b/beacon-chain/db/filesystem/blob_test.go index 3639a20b090b..dc7769951297 100644 --- a/beacon-chain/db/filesystem/blob_test.go +++ b/beacon-chain/db/filesystem/blob_test.go @@ -7,7 +7,6 @@ import ( "testing" "time" - "github.com/ethereum/go-ethereum/common/hexutil" "github.com/pkg/errors" ssz "github.com/prysmaticlabs/fastssz" "github.com/prysmaticlabs/prysm/v4/beacon-chain/verification" @@ -74,48 +73,6 @@ func TestBlobStorage_SaveBlobData(t *testing.T) { require.NoError(t, err) require.DeepSSZEqual(t, expected, actual) }) - t.Run("check pruning", func(t *testing.T) { - fs, bs, err := NewEphemeralBlobStorageWithFs(t) - require.NoError(t, err) - - // Slot in first half of epoch therefore should not prune - bs.tryPrune(testSidecars[0].Slot()) - err = bs.Save(testSidecars[0]) - require.NoError(t, err) - actual, err := bs.Get(testSidecars[0].BlockRoot(), testSidecars[0].Index) - require.NoError(t, err) - require.DeepSSZEqual(t, testSidecars[0], actual) - err = pollUntil(t, fs, 1) - require.NoError(t, err) - - _, sidecars = util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 33, fieldparams.MaxBlobsPerBlock) - testSidecars1, err := verification.BlobSidecarSliceNoop(sidecars) - require.NoError(t, err) - // Slot in first half of epoch therefore should not prune - bs.tryPrune(testSidecars1[0].Slot()) - err = bs.Save(testSidecars1[0]) - require.NoError(t, err) - // Check previous saved sidecar was not pruned - actual, err = bs.Get(testSidecars[0].BlockRoot(), testSidecars[0].Index) - require.NoError(t, err) - require.DeepSSZEqual(t, testSidecars[0], actual) - // Check latest sidecar exists - actual, err = bs.Get(testSidecars1[0].BlockRoot(), testSidecars1[0].Index) - require.NoError(t, err) - require.DeepSSZEqual(t, testSidecars1[0], actual) - err = pollUntil(t, fs, 2) // Check correct number of files - require.NoError(t, err) - - _, sidecars = util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 131187, fieldparams.MaxBlobsPerBlock) - testSidecars2, err := verification.BlobSidecarSliceNoop(sidecars) - // Slot in second half of epoch therefore should prune - bs.tryPrune(testSidecars2[0].Slot()) - require.NoError(t, err) - err = bs.Save(testSidecars2[0]) - require.NoError(t, err) - err = pollUntil(t, fs, 3) - require.NoError(t, err) - }) } // pollUntil polls a condition function until it returns true or a timeout is reached. @@ -188,7 +145,7 @@ func TestBlobStoragePrune(t *testing.T) { require.NoError(t, bs.Save(sidecar)) } - require.NoError(t, bs.Prune(currentSlot-bs.retentionSlots)) + require.NoError(t, bs.pruner.prune(currentSlot-bs.pruner.windowSize)) remainingFolders, err := afero.ReadDir(fs, ".") require.NoError(t, err) @@ -203,7 +160,7 @@ func TestBlobStoragePrune(t *testing.T) { require.NoError(t, bs.Save(sidecar)) } - require.NoError(t, bs.Prune(currentSlot-bs.retentionSlots)) + require.NoError(t, bs.pruner.prune(currentSlot-bs.pruner.windowSize)) remainingFolders, err := afero.ReadDir(fs, ".") require.NoError(t, err) @@ -223,7 +180,7 @@ func TestBlobStoragePrune(t *testing.T) { slot += 10000 } - require.NoError(t, bs.Prune(currentSlot-bs.retentionSlots)) + require.NoError(t, bs.pruner.prune(currentSlot-bs.pruner.windowSize)) remainingFolders, err := afero.ReadDir(fs, ".") require.NoError(t, err) @@ -252,41 +209,11 @@ func BenchmarkPruning(b *testing.B) { b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - err := bs.Prune(currentSlot) + err := bs.pruner.prune(currentSlot) require.NoError(b, err) } } -func TestBlobStorageDelete(t *testing.T) { - fs, bs, err := NewEphemeralBlobStorageWithFs(t) - require.NoError(t, err) - rawRoot := "0xcf9bb70c98f58092c9d6459227c9765f984d240be9690e85179bc5a6f60366ad" - blockRoot, err := hexutil.Decode(rawRoot) - require.NoError(t, err) - - _, sidecars := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 1, fieldparams.MaxBlobsPerBlock) - testSidecars, err := verification.BlobSidecarSliceNoop(sidecars) - require.NoError(t, err) - for _, sidecar := range testSidecars { - require.NoError(t, bs.Save(sidecar)) - } - - exists, err := afero.DirExists(fs, hexutil.Encode(blockRoot)) - require.NoError(t, err) - require.Equal(t, true, exists) - - // Delete the directory corresponding to the block root - require.NoError(t, bs.Delete(bytesutil.ToBytes32(blockRoot))) - - // Ensure that the directory no longer exists after deletion - exists, err = afero.DirExists(fs, hexutil.Encode(blockRoot)) - require.NoError(t, err) - require.Equal(t, false, exists) - - // Deleting a non-existent root does not return an error. - require.NoError(t, bs.Delete(bytesutil.ToBytes32([]byte{0x1}))) -} - func TestNewBlobStorage(t *testing.T) { _, err := NewBlobStorage(path.Join(t.TempDir(), "good")) require.NoError(t, err) diff --git a/beacon-chain/db/filesystem/ephemeral.go b/beacon-chain/db/filesystem/ephemeral.go index 5518651a8192..98b83c2ac9e1 100644 --- a/beacon-chain/db/filesystem/ephemeral.go +++ b/beacon-chain/db/filesystem/ephemeral.go @@ -4,26 +4,30 @@ import ( "testing" "github.com/prysmaticlabs/prysm/v4/config/params" - "github.com/prysmaticlabs/prysm/v4/time/slots" "github.com/spf13/afero" ) // NewEphemeralBlobStorage should only be used for tests. // The instance of BlobStorage returned is backed by an in-memory virtual filesystem, // improving test performance and simplifying cleanup. -func NewEphemeralBlobStorage(_ testing.TB) *BlobStorage { - return &BlobStorage{fs: afero.NewMemMapFs()} +func NewEphemeralBlobStorage(t testing.TB) *BlobStorage { + fs := afero.NewMemMapFs() + pruner, err := newBlobPruner(fs, params.BeaconConfig().MinEpochsForBlobsSidecarsRequest) + if err != nil { + t.Fatal("test setup issue", err) + } + return &BlobStorage{fs: fs, pruner: pruner} } // NewEphemeralBlobStorageWithFs can be used by tests that want access to the virtual filesystem // in order to interact with it outside the parameters of the BlobStorage api. -func NewEphemeralBlobStorageWithFs(_ testing.TB) (afero.Fs, *BlobStorage, error) { +func NewEphemeralBlobStorageWithFs(t testing.TB) (afero.Fs, *BlobStorage, error) { fs := afero.NewMemMapFs() - s, err := slots.EpochStart(params.BeaconConfig().MinEpochsForBlobsSidecarsRequest) + pruner, err := newBlobPruner(fs, params.BeaconConfig().MinEpochsForBlobsSidecarsRequest) if err != nil { - return fs, &BlobStorage{}, err + t.Fatal("test setup issue", err) } - return fs, &BlobStorage{fs: fs, retentionSlots: s}, nil + return fs, &BlobStorage{fs: fs, pruner: pruner}, nil } type BlobMocker struct { diff --git a/beacon-chain/db/filesystem/metrics.go b/beacon-chain/db/filesystem/metrics.go index 088be6445f1a..d3be677064a9 100644 --- a/beacon-chain/db/filesystem/metrics.go +++ b/beacon-chain/db/filesystem/metrics.go @@ -1,66 +1,28 @@ package filesystem import ( - "fmt" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/spf13/afero" ) var ( blobBuckets = []float64{0.00003, 0.00005, 0.00007, 0.00009, 0.00011, 0.00013, 0.00015} blobSaveLatency = promauto.NewHistogram(prometheus.HistogramOpts{ Name: "blob_storage_save_latency", - Help: "Latency of blob storage save operations in seconds", + Help: "Latency of BlobSidecar storage save operations in seconds", Buckets: blobBuckets, }) blobFetchLatency = promauto.NewHistogram(prometheus.HistogramOpts{ Name: "blob_storage_get_latency", - Help: "Latency of blob storage get operations in seconds", + Help: "Latency of BlobSidecar storage get operations in seconds", Buckets: blobBuckets, }) blobsPrunedCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "blob_pruned_blobs_total", - Help: "Total number of pruned blobs.", + Name: "blob_pruned", + Help: "Number of BlobSidecar files pruned.", }) - blobsTotalGauge = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "blobs_on_disk_total", - Help: "Total number of blobs in filesystem.", + blobsWrittenCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "blobs_written", + Help: "Number of BlobSidecar files written.", }) ) - -func (bs *BlobStorage) Initialize() error { - if err := bs.collectTotalBlobMetric(); err != nil { - return fmt.Errorf("failed to initialize blob metrics: %w", err) - } - return nil -} - -// CollectTotalBlobMetric set the number of blobs currently present in the filesystem -// to the blobsTotalGauge metric. -func (bs *BlobStorage) collectTotalBlobMetric() error { - totalBlobs := 0 - folders, err := afero.ReadDir(bs.fs, ".") - if err != nil { - return err - } - for _, folder := range folders { - num, err := bs.countFiles(folder.Name()) - if err != nil { - return err - } - totalBlobs = totalBlobs + num - } - blobsTotalGauge.Set(float64(totalBlobs)) - return nil -} - -// countFiles returns the length of blob files for a given directory. -func (bs *BlobStorage) countFiles(folderName string) (int, error) { - files, err := afero.ReadDir(bs.fs, folderName) - if err != nil { - return 0, err - } - return len(files), nil -} diff --git a/beacon-chain/db/filesystem/pruner.go b/beacon-chain/db/filesystem/pruner.go new file mode 100644 index 000000000000..a4a26742fadb --- /dev/null +++ b/beacon-chain/db/filesystem/pruner.go @@ -0,0 +1,274 @@ +package filesystem + +import ( + "encoding/binary" + "io" + "path" + "path/filepath" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/pkg/errors" + fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams" + "github.com/prysmaticlabs/prysm/v4/config/params" + "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" + "github.com/prysmaticlabs/prysm/v4/time/slots" + log "github.com/sirupsen/logrus" + "github.com/spf13/afero" +) + +const retentionBuffer primitives.Epoch = 2 + +var ( + errPruningFailures = errors.New("blobs could not be pruned for some roots") +) + +type blobPruner struct { + sync.Mutex + prunedBefore atomic.Uint64 + windowSize primitives.Slot + slotMap *slotForRoot + fs afero.Fs +} + +func newBlobPruner(fs afero.Fs, retain primitives.Epoch) (*blobPruner, error) { + r, err := slots.EpochStart(retain + retentionBuffer) + if err != nil { + return nil, errors.Wrap(err, "could not set retentionSlots") + } + return &blobPruner{fs: fs, windowSize: r, slotMap: newSlotForRoot()}, nil +} + +// notify updates the pruner's view of root->blob mappings. This allows the pruner to build a cache +// of root->slot mappings and decide when to evict old blobs based on the age of present blobs. +func (p *blobPruner) notify(root [32]byte, latest primitives.Slot) { + p.slotMap.ensure(rootString(root), latest) + pruned := uint64(windowMin(latest, p.windowSize)) + if p.prunedBefore.Swap(pruned) == pruned { + return + } + go func() { + if err := p.prune(primitives.Slot(pruned)); err != nil { + log.WithError(err).Errorf("Failed to prune blobs from slot %d", latest) + } + }() +} + +func windowMin(latest primitives.Slot, offset primitives.Slot) primitives.Slot { + // Safely compute the first slot in the epoch for the latest slot + latest = latest - latest%params.BeaconConfig().SlotsPerEpoch + if latest < offset { + return 0 + } + return latest - offset +} + +// Prune prunes blobs in the base directory based on the retention epoch. +// It deletes blobs older than currentEpoch - (retentionEpochs+bufferEpochs). +// This is so that we keep a slight buffer and blobs are deleted after n+2 epochs. +func (p *blobPruner) prune(pruneBefore primitives.Slot) error { + p.Lock() + defer p.Unlock() + start := time.Now() + totalPruned, totalErr := 0, 0 + // Customize logging/metrics behavior for the initial cache warmup when slot=0. + // We'll never see a prune request for slot 0, unless this is the initial call to warm up the cache. + if pruneBefore == 0 { + defer func() { + log.WithField("duration", time.Since(start).String()).Debug("Warmed up pruner cache") + }() + } else { + defer func() { + log.WithFields(log.Fields{ + "upToEpoch": slots.ToEpoch(pruneBefore), + "duration": time.Since(start).String(), + "filesRemoved": totalPruned, + }).Debug("Pruned old blobs") + blobsPrunedCounter.Add(float64(totalPruned)) + }() + } + + entries, err := listDir(p.fs, ".") + if err != nil { + return errors.Wrap(err, "unable to list root blobs directory") + } + dirs := filter(entries, filterRoot) + for _, dir := range dirs { + pruned, err := p.tryPruneDir(dir, pruneBefore) + if err != nil { + totalErr += 1 + log.WithError(err).WithField("directory", dir).Error("Unable to prune directory") + } + totalPruned += pruned + } + + if totalErr > 0 { + return errors.Wrapf(errPruningFailures, "pruning failed for %d root directories", totalErr) + } + return nil +} + +func shouldRetain(slot, pruneBefore primitives.Slot) bool { + return slot >= pruneBefore +} + +func (p *blobPruner) tryPruneDir(dir string, pruneBefore primitives.Slot) (int, error) { + root := rootFromDir(dir) + slot, slotCached := p.slotMap.slot(root) + // Return early if the slot is cached and doesn't need pruning. + if slotCached && shouldRetain(slot, pruneBefore) { + return 0, nil + } + + // entries will include things that aren't ssz files, like dangling .part files. We need these to + // completely clean up the directory. + entries, err := listDir(p.fs, dir) + if err != nil { + return 0, errors.Wrapf(err, "failed to list blobs in directory %s", dir) + } + // scFiles filters the dir listing down to the ssz encoded BlobSidecar files. This allows us to peek + // at the first one in the list to figure out the slot. + scFiles := filter(entries, filterSsz) + if len(scFiles) == 0 { + log.WithField("dir", dir).Warn("Pruner ignoring directory with no blob files") + return 0, nil + } + if !slotCached { + slot, err = slotFromFile(path.Join(dir, scFiles[0]), p.fs) + if err != nil { + return 0, errors.Wrapf(err, "slot could not be read from blob file %s", scFiles[0]) + } + p.slotMap.ensure(root, slot) + if shouldRetain(slot, pruneBefore) { + return 0, nil + } + } + + removed := 0 + for _, fname := range entries { + fullName := path.Join(dir, fname) + if err := p.fs.Remove(fullName); err != nil { + return removed, errors.Wrapf(err, "unable to remove %s", fullName) + } + // Don't count other files that happen to be in the dir, like dangling .part files. + if filterSsz(fname) { + removed += 1 + } + // Log a warning whenever we clean up a .part file + if filterPart(fullName) { + log.WithField("file", fullName).Warn("Deleting abandoned blob .part file") + } + } + if err := p.fs.Remove(dir); err != nil { + return removed, errors.Wrapf(err, "unable to remove blob directory %s", dir) + } + + p.slotMap.evict(rootFromDir(dir)) + return len(scFiles), nil +} + +func rootFromDir(dir string) string { + return filepath.Base(dir) // end of the path should be the blob directory, named by hex encoding of root +} + +// Read slot from marshaled BlobSidecar data in the given file. See slotFromBlob for details. +func slotFromFile(file string, fs afero.Fs) (primitives.Slot, error) { + f, err := fs.Open(file) + if err != nil { + return 0, err + } + defer func() { + if err := f.Close(); err != nil { + log.WithError(err).Errorf("Could not close blob file") + } + }() + return slotFromBlob(f) +} + +// slotFromBlob reads the ssz data of a file at the specified offset (8 + 131072 + 48 + 48 = 131176 bytes), +// which is calculated based on the size of the BlobSidecar struct and is based on the size of the fields +// preceding the slot information within SignedBeaconBlockHeader. +func slotFromBlob(at io.ReaderAt) (primitives.Slot, error) { + b := make([]byte, 8) + _, err := at.ReadAt(b, 131176) + if err != nil { + return 0, err + } + rawSlot := binary.LittleEndian.Uint64(b) + return primitives.Slot(rawSlot), nil +} + +func listDir(fs afero.Fs, dir string) ([]string, error) { + top, err := fs.Open(dir) + if err != nil { + return nil, errors.Wrap(err, "failed to open directory descriptor") + } + defer func() { + if err := top.Close(); err != nil { + log.WithError(err).Errorf("Could not close file %s", dir) + } + }() + // re the -1 param: "If n <= 0, Readdirnames returns all the names from the directory in a single slice" + dirs, err := top.Readdirnames(-1) + if err != nil { + return nil, errors.Wrap(err, "failed to read directory listing") + } + return dirs, nil +} + +func filter(entries []string, filt func(string) bool) []string { + filtered := make([]string, 0, len(entries)) + for i := range entries { + if filt(entries[i]) { + filtered = append(filtered, entries[i]) + } + } + return filtered +} + +func filterRoot(s string) bool { + return strings.HasPrefix(s, "0x") +} + +var dotSszExt = "." + sszExt +var dotPartExt = "." + partExt + +func filterSsz(s string) bool { + return filepath.Ext(s) == dotSszExt +} + +func filterPart(s string) bool { + return filepath.Ext(s) == dotPartExt +} + +func newSlotForRoot() *slotForRoot { + return &slotForRoot{ + cache: make(map[string]primitives.Slot, params.BeaconConfig().MinEpochsForBlobsSidecarsRequest*fieldparams.SlotsPerEpoch), + } +} + +type slotForRoot struct { + sync.RWMutex + cache map[string]primitives.Slot +} + +func (s *slotForRoot) ensure(key string, slot primitives.Slot) { + s.Lock() + defer s.Unlock() + s.cache[key] = slot +} + +func (s *slotForRoot) slot(key string) (primitives.Slot, bool) { + s.RLock() + defer s.RUnlock() + slot, ok := s.cache[key] + return slot, ok +} + +func (s *slotForRoot) evict(key string) { + s.Lock() + defer s.Unlock() + delete(s.cache, key) +} diff --git a/beacon-chain/db/filesystem/pruner_test.go b/beacon-chain/db/filesystem/pruner_test.go new file mode 100644 index 000000000000..e67068414771 --- /dev/null +++ b/beacon-chain/db/filesystem/pruner_test.go @@ -0,0 +1,327 @@ +package filesystem + +import ( + "bytes" + "fmt" + "math" + "os" + "path" + "sort" + "testing" + + "github.com/prysmaticlabs/prysm/v4/beacon-chain/verification" + fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams" + "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" + "github.com/prysmaticlabs/prysm/v4/testing/require" + "github.com/prysmaticlabs/prysm/v4/testing/util" + "github.com/spf13/afero" +) + +func TestTryPruneDir_CachedNotExpired(t *testing.T) { + fs := afero.NewMemMapFs() + pr, err := newBlobPruner(fs, 0) + require.NoError(t, err) + slot := pr.windowSize + _, sidecars := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, slot, fieldparams.MaxBlobsPerBlock) + sc, err := verification.BlobSidecarNoop(sidecars[0]) + require.NoError(t, err) + root := fmt.Sprintf("%#x", sc.BlockRoot()) + // This slot is right on the edge of what would need to be pruned, so by adding it to the cache and + // skipping any other test setup, we can be certain the hot cache path never touches the filesystem. + pr.slotMap.ensure(root, sc.Slot()) + pruned, err := pr.tryPruneDir(root, pr.windowSize) + require.NoError(t, err) + require.Equal(t, 0, pruned) +} + +func TestTryPruneDir_CachedExpired(t *testing.T) { + t.Run("empty directory", func(t *testing.T) { + fs := afero.NewMemMapFs() + pr, err := newBlobPruner(fs, 0) + require.NoError(t, err) + var slot primitives.Slot = 0 + _, sidecars := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, slot, 1) + sc, err := verification.BlobSidecarNoop(sidecars[0]) + require.NoError(t, err) + root := fmt.Sprintf("%#x", sc.BlockRoot()) + require.NoError(t, fs.Mkdir(root, directoryPermissions)) // make empty directory + pr.slotMap.ensure(root, sc.Slot()) + pruned, err := pr.tryPruneDir(root, slot+1) + require.NoError(t, err) + require.Equal(t, 0, pruned) + }) + t.Run("blobs to delete", func(t *testing.T) { + fs, bs, err := NewEphemeralBlobStorageWithFs(t) + require.NoError(t, err) + var slot primitives.Slot = 0 + _, sidecars := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, slot, 2) + scs, err := verification.BlobSidecarSliceNoop(sidecars) + require.NoError(t, err) + + require.NoError(t, bs.Save(scs[0])) + require.NoError(t, bs.Save(scs[1])) + + // check that the root->slot is cached + root := fmt.Sprintf("%#x", scs[0].BlockRoot()) + cs, cok := bs.pruner.slotMap.slot(root) + require.Equal(t, true, cok) + require.Equal(t, slot, cs) + + // ensure that we see the saved files in the filesystem + files, err := listDir(fs, root) + require.NoError(t, err) + require.Equal(t, 2, len(files)) + + pruned, err := bs.pruner.tryPruneDir(root, slot+1) + require.NoError(t, err) + require.Equal(t, 2, pruned) + files, err = listDir(fs, root) + require.ErrorIs(t, err, os.ErrNotExist) + require.Equal(t, 0, len(files)) + }) +} + +func TestTryPruneDir_SlotFromFile(t *testing.T) { + t.Run("expired blobs deleted", func(t *testing.T) { + fs, bs, err := NewEphemeralBlobStorageWithFs(t) + require.NoError(t, err) + var slot primitives.Slot = 0 + _, sidecars := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, slot, 2) + scs, err := verification.BlobSidecarSliceNoop(sidecars) + require.NoError(t, err) + + require.NoError(t, bs.Save(scs[0])) + require.NoError(t, bs.Save(scs[1])) + + // check that the root->slot is cached + root := fmt.Sprintf("%#x", scs[0].BlockRoot()) + cs, ok := bs.pruner.slotMap.slot(root) + require.Equal(t, true, ok) + require.Equal(t, slot, cs) + // evict it from the cache so that we trigger the file read path + bs.pruner.slotMap.evict(root) + _, ok = bs.pruner.slotMap.slot(root) + require.Equal(t, false, ok) + + // ensure that we see the saved files in the filesystem + files, err := listDir(fs, root) + require.NoError(t, err) + require.Equal(t, 2, len(files)) + + pruned, err := bs.pruner.tryPruneDir(root, slot+1) + require.NoError(t, err) + require.Equal(t, 2, pruned) + files, err = listDir(fs, root) + require.ErrorIs(t, err, os.ErrNotExist) + require.Equal(t, 0, len(files)) + }) + t.Run("not expired, intact", func(t *testing.T) { + fs, bs, err := NewEphemeralBlobStorageWithFs(t) + require.NoError(t, err) + // Set slot equal to the window size, so it should be retained. + var slot primitives.Slot = bs.pruner.windowSize + _, sidecars := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, slot, 2) + scs, err := verification.BlobSidecarSliceNoop(sidecars) + require.NoError(t, err) + + require.NoError(t, bs.Save(scs[0])) + require.NoError(t, bs.Save(scs[1])) + + // Evict slot mapping from the cache so that we trigger the file read path. + root := fmt.Sprintf("%#x", scs[0].BlockRoot()) + bs.pruner.slotMap.evict(root) + _, ok := bs.pruner.slotMap.slot(root) + require.Equal(t, false, ok) + + // Ensure that we see the saved files in the filesystem. + files, err := listDir(fs, root) + require.NoError(t, err) + require.Equal(t, 2, len(files)) + + // This should use the slotFromFile code (simulating restart). + // Setting pruneBefore == slot, so that the slot will be outside the window (at the boundary). + pruned, err := bs.pruner.tryPruneDir(root, slot) + require.NoError(t, err) + require.Equal(t, 0, pruned) + + // Ensure files are still present. + files, err = listDir(fs, root) + require.NoError(t, err) + require.Equal(t, 2, len(files)) + }) +} + +func TestSlotFromBlob(t *testing.T) { + cases := []struct { + slot primitives.Slot + }{ + {slot: 0}, + {slot: 2}, + {slot: 1123581321}, + {slot: math.MaxUint64}, + } + for _, c := range cases { + t.Run(fmt.Sprintf("slot %d", c.slot), func(t *testing.T) { + _, sidecars := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, c.slot, 1) + sc := sidecars[0] + enc, err := sc.MarshalSSZ() + require.NoError(t, err) + slot, err := slotFromBlob(bytes.NewReader(enc)) + require.NoError(t, err) + require.Equal(t, c.slot, slot) + }) + } +} + +func TestSlotFromFile(t *testing.T) { + cases := []struct { + slot primitives.Slot + }{ + {slot: 0}, + {slot: 2}, + {slot: 1123581321}, + {slot: math.MaxUint64}, + } + for _, c := range cases { + t.Run(fmt.Sprintf("slot %d", c.slot), func(t *testing.T) { + fs, bs, err := NewEphemeralBlobStorageWithFs(t) + require.NoError(t, err) + _, sidecars := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, c.slot, 1) + sc, err := verification.BlobSidecarNoop(sidecars[0]) + require.NoError(t, err) + require.NoError(t, bs.Save(sc)) + fname := namerForSidecar(sc) + sszPath := fname.path() + slot, err := slotFromFile(sszPath, fs) + require.NoError(t, err) + require.Equal(t, c.slot, slot) + }) + } +} + +type dirFiles struct { + name string + isDir bool + children []dirFiles +} + +func (df dirFiles) reify(t *testing.T, fs afero.Fs, base string) { + fullPath := path.Join(base, df.name) + if df.isDir { + if df.name != "" { + require.NoError(t, fs.Mkdir(fullPath, directoryPermissions)) + } + for _, c := range df.children { + c.reify(t, fs, fullPath) + } + } else { + fp, err := fs.Create(fullPath) + require.NoError(t, err) + _, err = fp.WriteString("derp") + require.NoError(t, err) + } +} + +func (df dirFiles) childNames() []string { + cn := make([]string, len(df.children)) + for i := range df.children { + cn[i] = df.children[i].name + } + return cn +} + +func TestListDir(t *testing.T) { + fs := afero.NewMemMapFs() + + // parent directory + fsLayout := dirFiles{isDir: true} + // break out each subdir for easier assertions + notABlob := dirFiles{name: "notABlob", isDir: true} + childlessBlob := dirFiles{name: "0x0987654321", isDir: true} + blobWithSsz := dirFiles{name: "0x1123581321", isDir: true, + children: []dirFiles{{name: "1.ssz"}, {name: "2.ssz"}}, + } + blobWithSszAndTmp := dirFiles{name: "0x1234567890", isDir: true, + children: []dirFiles{{name: "5.ssz"}, {name: "0.part"}}} + fsLayout.children = append(fsLayout.children, notABlob) + fsLayout.children = append(fsLayout.children, childlessBlob) + fsLayout.children = append(fsLayout.children, blobWithSsz) + fsLayout.children = append(fsLayout.children, blobWithSszAndTmp) + + topChildren := make([]string, len(fsLayout.children)) + for i := range fsLayout.children { + topChildren[i] = fsLayout.children[i].name + } + + fsLayout.reify(t, fs, "") + cases := []struct { + name string + dirPath string + expected []string + filter func(string) bool + err error + }{ + { + name: "non-existent", + dirPath: "derp", + expected: []string{}, + err: os.ErrNotExist, + }, + { + name: "empty", + dirPath: childlessBlob.name, + expected: []string{}, + }, + { + name: "top", + dirPath: ".", + expected: topChildren, + }, + { + name: "custom filter: only notABlob", + dirPath: ".", + expected: []string{notABlob.name}, + filter: func(s string) bool { + if s == notABlob.name { + return true + } + return false + }, + }, + { + name: "root filter", + dirPath: ".", + expected: []string{childlessBlob.name, blobWithSsz.name, blobWithSszAndTmp.name}, + filter: filterRoot, + }, + { + name: "ssz filter", + dirPath: blobWithSsz.name, + expected: blobWithSsz.childNames(), + filter: filterSsz, + }, + { + name: "ssz mixed filter", + dirPath: blobWithSszAndTmp.name, + expected: []string{"5.ssz"}, + filter: filterSsz, + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + result, err := listDir(fs, c.dirPath) + if c.filter != nil { + result = filter(result, c.filter) + } + if c.err != nil { + require.ErrorIs(t, err, c.err) + require.Equal(t, 0, len(result)) + } else { + require.NoError(t, err) + sort.Strings(c.expected) + sort.Strings(result) + require.DeepEqual(t, c.expected, result) + } + }) + } +} diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index 8d24039cd8e2..c148734ad69b 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -208,6 +208,7 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco if err := beacon.startDB(cliCtx, depositAddress); err != nil { return nil, err } + beacon.BlobStorage.WarmCache() log.Debugln("Starting Slashing DB") if err := beacon.startSlasherDB(cliCtx); err != nil { @@ -231,9 +232,6 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco beacon.verifyInitWaiter = verification.NewInitializerWaiter( beacon.clockWaiter, forkchoice.NewROForkChoice(beacon.forkChoicer), beacon.stateGen) - if err := beacon.BlobStorage.Initialize(); err != nil { - return nil, fmt.Errorf("failed to initialize blob storage: %w", err) - } log.Debugln("Registering P2P Service") if err := beacon.registerP2P(cliCtx); err != nil {