Skip to content

Commit

Permalink
fix(flat-storage): Initiailze Flat Storage respecting the shards that…
Browse files Browse the repository at this point in the history
… the node will care about (#9368)

Added a test reproducing this issue.

Steps to reproduce:
* Download an S3 snapshot
* Change config.json to track some shard in the next epoch but not in the current epoch.
* * Use `tracked_shard_schedule` or be a chunk-only producer.

Observe that the node tries to use flat storage to process chunks from shards that it will care about, but then it prints "FlatStorage not ready" and uses the usual Trie storage, resulting in `IncorrectStateRoot`.
  • Loading branch information
nikurt authored Jul 31, 2023
1 parent 60ca954 commit 8f0f75e
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 5 deletions.
10 changes: 9 additions & 1 deletion chain/chain/src/flat_storage_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,15 @@ impl FlatStorageCreator {
return Ok(None);
};
for shard_id in 0..num_shards {
if shard_tracker.care_about_shard(me, &chain_head.prev_block_hash, shard_id, true) {
// The node applies transactions from the shards it cares about this and the next epoch.
if shard_tracker.care_about_shard(me, &chain_head.prev_block_hash, shard_id, true)
|| shard_tracker.will_care_about_shard(
me,
&chain_head.prev_block_hash,
shard_id,
true,
)
{
let shard_uid = epoch_manager.shard_id_to_uid(shard_id, &chain_head.epoch_id)?;
let status = flat_storage_manager.get_flat_storage_status(shard_uid);

Expand Down
5 changes: 3 additions & 2 deletions core/chain-configs/src/client_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,12 +224,13 @@ pub struct ClientConfig {
pub block_header_fetch_horizon: BlockHeightDelta,
/// Garbage collection configuration.
pub gc: GCConfig,
/// Accounts that this client tracks
/// Accounts that this client tracks.
pub tracked_accounts: Vec<AccountId>,
/// Shards that this client tracks
/// Shards that this client tracks.
pub tracked_shards: Vec<ShardId>,
/// Rotate between these sets of tracked shards.
/// Used to simulate the behavior of chunk only producers without staking tokens.
/// This field is only used if `tracked_shards` is empty.
pub tracked_shard_schedule: Vec<Vec<ShardId>>,
/// Not clear old data, set `true` for archive nodes.
pub archive: bool,
Expand Down
4 changes: 3 additions & 1 deletion nightly/pytest-sanity.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ pytest --timeout=240 sanity/block_sync.py
pytest --timeout=240 sanity/block_sync.py --features nightly
pytest --timeout=10m sanity/block_sync_archival.py
pytest --timeout=10m sanity/block_sync_archival.py --features nightly
pytest --timeout=120 sanity/block_sync_flat_storage.py
pytest --timeout=120 sanity/block_sync_flat_storage.py --features nightly
pytest --timeout=240 sanity/validator_switch.py
pytest --timeout=240 sanity/validator_switch.py --features nightly
pytest --timeout=240 sanity/rpc_state_changes.py
Expand Down Expand Up @@ -144,4 +146,4 @@ pytest sanity/meta_tx.py --features nightly

# Tests for split storage and split storage migration
pytest --timeout=600 sanity/split_storage.py
pytest --timeout=600 sanity/split_storage.py --features nightly
pytest --timeout=600 sanity/split_storage.py --features nightly
3 changes: 2 additions & 1 deletion pytest/lib/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,7 @@ def apply_config_changes(node_dir, client_config_change):
'split_storage',
'state_sync_enabled',
'store.state_snapshot_enabled',
'tracked_shard_schedule',
)

for k, v in client_config_change.items():
Expand Down Expand Up @@ -965,4 +966,4 @@ def get_binary_protocol_version(config) -> typing.Optional[int]:
for i in range(n):
if tokens[i] == "protocol" and i + 1 < n:
return int(tokens[i + 1])
return None
return None
101 changes: 101 additions & 0 deletions pytest/tests/sanity/block_sync_flat_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
#!/usr/bin/env python3
# Spins up one validating node.
# Spins a non-validating node that tracks all shards.
# In the middle of an epoch, the node gets stopped, and the set of tracked shards gets reduced.
# Test that the node correctly handles chunks for the shards that it will care about in the next epoch.
# Spam transactions that require the node to use flat storage to process them correctly.

import pathlib
import random
import sys

sys.path.append(str(pathlib.Path(__file__).resolve().parents[2] / 'lib'))

from cluster import init_cluster, spin_up_node, load_config, apply_config_changes
import account
import transaction
import utils

EPOCH_LENGTH = 30

config0 = {
'tracked_shards': [0],
}
config1 = {
'tracked_shards': [0],
}

config = load_config()
near_root, node_dirs = init_cluster(1, 1, 4, config,
[["epoch_length", EPOCH_LENGTH]], {
0: config0,
1: config1
})

boot_node = spin_up_node(config, near_root, node_dirs[0], 0)
node1 = spin_up_node(config, near_root, node_dirs[1], 1, boot_node=boot_node)

contract_key = boot_node.signer_key
contract = utils.load_test_contract()
latest_block_hash = boot_node.get_latest_block().hash_bytes
deploy_contract_tx = transaction.sign_deploy_contract_tx(
contract_key, contract, 10, latest_block_hash)
result = boot_node.send_tx_and_wait(deploy_contract_tx, 10)
assert 'result' in result and 'error' not in result, (
'Expected "result" and no "error" in response, got: {}'.format(result))


def random_workload_until(target, nonce, keys):
while True:
nonce += 1
height = boot_node.get_latest_block().height
if height > target:
break
if (len(keys) > 100 and random.random() < 0.2) or len(keys) > 1000:
key = keys[random.randint(0, len(keys) - 1)]
call_function(boot_node, 'read', key, nonce)
else:
key = random_u64()
keys.append(key)
call_function(boot_node, 'write', key, nonce)
return (nonce, keys)


def random_u64():
return bytes(random.randint(0, 255) for _ in range(8))


def call_function(node, op, key, nonce):
last_block_hash = node.get_latest_block().hash_bytes
if op == 'read':
args = key
fn = 'read_value'
else:
args = key + random_u64()
fn = 'write_key_value'

tx = transaction.sign_function_call_tx(node.signer_key,
node.signer_key.account_id, fn, args,
300 * account.TGAS, 0, nonce,
last_block_hash)
return node.send_tx(tx).get('result')


nonce, keys = random_workload_until(EPOCH_LENGTH + 5, 1, [])

node1.kill()
# Reduce the set of tracked shards and make it variable in time.
# The node is stopped in epoch_height = 1.
# Change the config of tracked shards such that after restart the node cares
# only about shard 0, and in the next epoch it will care about shards [1, 2, 3].
apply_config_changes(node_dirs[1], {
"tracked_shards": [],
"tracked_shard_schedule": [[0], [0], [1, 2, 3]]
})

# Run node0 more to trigger block sync in node1.
nonce, keys = random_workload_until(EPOCH_LENGTH * 2 + 1, nonce, keys)

# Node1 is now behind and needs to do header sync and block sync.
node1.start(boot_node=boot_node)
utils.wait_for_blocks(node1, target=EPOCH_LENGTH * 2 + 10)

0 comments on commit 8f0f75e

Please sign in to comment.