Skip to content

Commit

Permalink
fix(flat-storage and state-sync): Delay updating Flat Head to let us …
Browse files Browse the repository at this point in the history
…obtain state parts from a state snapshot (#9338)

When requested to move flat head to block X, set flat head to the second-to-last predecessor block with flat changes. This way blocks without flat state changes are effectively ignored.

This is done by keeping a bit of metadata in `FlatStateDeltaMetadata`, it keeps a pointer to a predecessor block with flat state change.
The metadata bit is trivially constructed from the current and the prev block.

Getting flat values remains to be O(1). Block head and flat head can have arbitrarily large height differences, but the number of blocks with flat state changes is limited to 2 (explained above) + 2 (gap between the final block and the block head). 

The PR lets us obtain state parts when the last chunk of an epoch is not in the last block.
  • Loading branch information
nikurt authored Jul 28, 2023
1 parent ab7e238 commit 60ca954
Show file tree
Hide file tree
Showing 20 changed files with 938 additions and 94 deletions.
39 changes: 34 additions & 5 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2274,7 +2274,7 @@ impl Chain {
new_flat_head = *self.genesis.hash();
}
// Try to update flat head.
flat_storage.update_flat_head(&new_flat_head).unwrap_or_else(|err| {
flat_storage.update_flat_head(&new_flat_head, false).unwrap_or_else(|err| {
match &err {
FlatStorageError::BlockNotSupported(_) => {
// It's possible that new head is not a child of current flat head, e.g. when we have a
Expand Down Expand Up @@ -3385,15 +3385,22 @@ impl Chain {
// Flat storage must not exist at this point because leftover keys corrupt its state.
assert!(flat_storage_manager.get_flat_storage_for_shard(shard_uid).is_none());

let flat_head_hash = *chunk.prev_block();
let flat_head_header = self.get_block_header(&flat_head_hash)?;
let flat_head_prev_hash = *flat_head_header.prev_hash();
let flat_head_height = flat_head_header.height();

tracing::debug!(target: "store", ?shard_uid, ?flat_head_hash, flat_head_height, "set_state_finalize - initialized flat storage");

let mut store_update = self.runtime_adapter.store().store_update();
store_helper::set_flat_storage_status(
&mut store_update,
shard_uid,
FlatStorageStatus::Ready(FlatStorageReadyStatus {
flat_head: near_store::flat::BlockInfo {
hash: *block_hash,
prev_hash: *block_header.prev_hash(),
height: block_header.height(),
hash: flat_head_hash,
prev_hash: flat_head_prev_hash,
height: flat_head_height,
},
}),
);
Expand Down Expand Up @@ -4188,7 +4195,12 @@ impl Chain {
let head = self.head()?;
let epoch_id = self.epoch_manager.get_epoch_id(&head.prev_block_hash)?;
let shard_layout = self.epoch_manager.get_shard_layout(&epoch_id)?;
(helper.make_snapshot_callback)(head.prev_block_hash, shard_layout.get_shard_uids())
let last_block = self.get_block(&head.last_block_hash)?;
(helper.make_snapshot_callback)(
head.prev_block_hash,
shard_layout.get_shard_uids(),
last_block,
)
}
}
Ok(())
Expand Down Expand Up @@ -5077,10 +5089,27 @@ impl<'a> ChainUpdate<'a> {
shard_uid: ShardUId,
trie_changes: &WrappedTrieChanges,
) -> Result<(), Error> {
let prev_block_with_changes = if trie_changes.state_changes().is_empty() {
// The current block has no flat state changes.
// Find the last block with flat state changes by looking it up in
// the prev block.
store_helper::get_prev_block_with_changes(
self.chain_store_update.store(),
shard_uid,
block_hash,
prev_hash,
)
.map_err(|e| StorageError::from(e))?
} else {
// The current block has flat state changes.
None
};

let delta = FlatStateDelta {
changes: FlatStateChanges::from_state_changes(&trie_changes.state_changes()),
metadata: FlatStateDeltaMetadata {
block: near_store::flat::BlockInfo { hash: block_hash, height, prev_hash },
prev_block_with_changes,
},
};

Expand Down
19 changes: 13 additions & 6 deletions chain/chain/src/state_snapshot_actor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use actix::AsyncContext;
use near_o11y::{handler_debug_span, OpenTelemetrySpanExt, WithSpanContext, WithSpanContextExt};
use near_primitives::block::Block;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::ShardUId;
use near_store::flat::FlatStorageManager;
Expand Down Expand Up @@ -29,6 +30,8 @@ struct MakeSnapshotRequest {
prev_block_hash: CryptoHash,
/// Shards that need to be present in the snapshot.
shard_uids: Vec<ShardUId>,
/// Last block of the prev epoch.
block: Block,
/// Whether to perform compaction.
compaction_enabled: bool,
}
Expand All @@ -47,9 +50,9 @@ impl actix::Handler<WithSpanContext<MakeSnapshotRequest>> for StateSnapshotActor
_ctx: &mut actix::Context<Self>,
) -> Self::Result {
let (_span, msg) = handler_debug_span!(target: "state_snapshot", msg);
let MakeSnapshotRequest { prev_block_hash, shard_uids, compaction_enabled } = msg;
let MakeSnapshotRequest { prev_block_hash, shard_uids, block, compaction_enabled } = msg;

let res = self.tries.make_state_snapshot(&prev_block_hash, &shard_uids);
let res = self.tries.make_state_snapshot(&prev_block_hash, &shard_uids, &block);
if !self.flat_storage_manager.set_flat_state_updates_mode(true) {
tracing::error!(target: "state_snapshot", ?prev_block_hash, ?shard_uids, "Failed to unlock flat state updates");
}
Expand Down Expand Up @@ -88,19 +91,23 @@ impl actix::Handler<WithSpanContext<CompactSnapshotRequest>> for StateSnapshotAc
}

pub type MakeSnapshotCallback =
Arc<dyn Fn(CryptoHash, Vec<ShardUId>) -> () + Send + Sync + 'static>;
Arc<dyn Fn(CryptoHash, Vec<ShardUId>, Block) -> () + Send + Sync + 'static>;

/// Sends a request to make a state snapshot.
pub fn get_make_snapshot_callback(
state_snapshot_addr: Arc<actix::Addr<StateSnapshotActor>>,
flat_storage_manager: FlatStorageManager,
compaction_enabled: bool,
) -> MakeSnapshotCallback {
Arc::new(move |prev_block_hash, shard_uids| {
tracing::info!(target: "state_snapshot", ?prev_block_hash, ?shard_uids, "start_snapshot_callback sends `MakeSnapshotCallback` to state_snapshot_addr");
Arc::new(move |prev_block_hash, shard_uids, block| {
tracing::info!(
target: "state_snapshot",
?prev_block_hash,
?shard_uids,
"start_snapshot_callback sends `MakeSnapshotCallback` to state_snapshot_addr");
if flat_storage_manager.set_flat_state_updates_mode(false) {
state_snapshot_addr.do_send(
MakeSnapshotRequest { prev_block_hash, shard_uids, compaction_enabled }
MakeSnapshotRequest { prev_block_hash, shard_uids, block, compaction_enabled }
.with_span_context(),
);
}
Expand Down
4 changes: 2 additions & 2 deletions chain/client/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1765,9 +1765,9 @@ impl TestEnvBuilder {
};
let make_state_snapshot_callback : Option<MakeSnapshotCallback> = if self.add_state_snapshots {
let runtime = runtime.clone();
let snapshot : MakeSnapshotCallback = Arc::new(move |prev_block_hash, shard_uids| {
let snapshot : MakeSnapshotCallback = Arc::new(move |prev_block_hash, shard_uids, block| {
tracing::info!(target: "state_snapshot", ?prev_block_hash, "make_snapshot_callback");
runtime.get_tries().make_state_snapshot(&prev_block_hash, &shard_uids).unwrap();
runtime.get_tries().make_state_snapshot(&prev_block_hash, &shard_uids, &block).unwrap();
});
Some(snapshot)
} else {
Expand Down
1 change: 1 addition & 0 deletions core/o11y/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ static EXCEPTIONS: Lazy<HashSet<&str>> = Lazy::new(|| {
"flat_storage_creation_threads_used",
"flat_storage_distance_to_head",
"flat_storage_head_height",
"flat_storage_hops_to_head",
])
});

Expand Down
12 changes: 11 additions & 1 deletion core/store/src/flat/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use borsh::{BorshDeserialize, BorshSerialize};
use near_primitives::hash::hash;
use near_primitives::shard_layout::ShardUId;
use near_primitives::state::{FlatStateValue, ValueRef};
use near_primitives::types::RawStateChangesWithTrieKey;
use near_primitives::types::{BlockHeight, RawStateChangesWithTrieKey};
use std::collections::HashMap;
use std::sync::Arc;

Expand All @@ -16,9 +16,19 @@ pub struct FlatStateDelta {
pub changes: FlatStateChanges,
}

#[derive(BorshSerialize, BorshDeserialize, Debug, Clone, Copy, serde::Serialize)]
pub struct BlockWithChangesInfo {
pub(crate) hash: CryptoHash,
pub(crate) height: BlockHeight,
}

#[derive(BorshSerialize, BorshDeserialize, Debug, Clone, Copy, serde::Serialize)]
pub struct FlatStateDeltaMetadata {
pub block: BlockInfo,
/// `None` if the block itself has flat state changes.
/// `Some` if the block has no flat state changes, and contains
/// info of the last block with some flat state changes.
pub prev_block_with_changes: Option<BlockWithChangesInfo>,
}

#[derive(BorshSerialize, BorshDeserialize, Debug)]
Expand Down
10 changes: 7 additions & 3 deletions core/store/src/flat/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use crate::metrics::flat_state_metrics;
use near_o11y::metrics::{IntCounter, IntGauge};
use near_primitives::types::ShardId;
use near_primitives::types::{BlockHeight, ShardId};

use super::FlatStorageStatus;

pub(crate) struct FlatStorageMetrics {
flat_head_height: IntGauge,
distance_to_head: IntGauge,
hops_to_head: IntGauge,
cached_deltas: IntGauge,
cached_changes_num_items: IntGauge,
cached_changes_size: IntGauge,
Expand All @@ -20,6 +21,8 @@ impl FlatStorageMetrics {
.with_label_values(&[&shard_id_label]),
distance_to_head: flat_state_metrics::FLAT_STORAGE_DISTANCE_TO_HEAD
.with_label_values(&[&shard_id_label]),
hops_to_head: flat_state_metrics::FLAT_STORAGE_HOPS_TO_HEAD
.with_label_values(&[&shard_id_label]),
cached_deltas: flat_state_metrics::FLAT_STORAGE_CACHED_DELTAS
.with_label_values(&[&shard_id_label]),
cached_changes_num_items: flat_state_metrics::FLAT_STORAGE_CACHED_CHANGES_NUM_ITEMS
Expand All @@ -29,8 +32,9 @@ impl FlatStorageMetrics {
}
}

pub(crate) fn set_distance_to_head(&self, distance: usize) {
self.distance_to_head.set(distance as i64);
pub(crate) fn set_distance_to_head(&self, distance: usize, height: Option<BlockHeight>) {
self.distance_to_head.set(height.unwrap_or(0) as i64);
self.hops_to_head.set(distance as i64);
}

pub(crate) fn set_flat_head_height(&self, height: u64) {
Expand Down
Loading

0 comments on commit 60ca954

Please sign in to comment.