diff --git a/chain/chain/src/flat_storage_creator.rs b/chain/chain/src/flat_storage_creator.rs index 7c0c15e27fd..9e968f3c605 100644 --- a/chain/chain/src/flat_storage_creator.rs +++ b/chain/chain/src/flat_storage_creator.rs @@ -446,7 +446,15 @@ impl FlatStorageCreator { return Ok(None); }; for shard_id in 0..num_shards { - if shard_tracker.care_about_shard(me, &chain_head.prev_block_hash, shard_id, true) { + // The node applies transactions from the shards it cares about this and the next epoch. + if shard_tracker.care_about_shard(me, &chain_head.prev_block_hash, shard_id, true) + || shard_tracker.will_care_about_shard( + me, + &chain_head.prev_block_hash, + shard_id, + true, + ) + { let shard_uid = epoch_manager.shard_id_to_uid(shard_id, &chain_head.epoch_id)?; let status = flat_storage_manager.get_flat_storage_status(shard_uid); diff --git a/core/chain-configs/src/client_config.rs b/core/chain-configs/src/client_config.rs index 5b9f3b6b4f2..948dc35c50c 100644 --- a/core/chain-configs/src/client_config.rs +++ b/core/chain-configs/src/client_config.rs @@ -224,12 +224,13 @@ pub struct ClientConfig { pub block_header_fetch_horizon: BlockHeightDelta, /// Garbage collection configuration. pub gc: GCConfig, - /// Accounts that this client tracks + /// Accounts that this client tracks. pub tracked_accounts: Vec, - /// Shards that this client tracks + /// Shards that this client tracks. pub tracked_shards: Vec, /// Rotate between these sets of tracked shards. /// Used to simulate the behavior of chunk only producers without staking tokens. + /// This field is only used if `tracked_shards` is empty. pub tracked_shard_schedule: Vec>, /// Not clear old data, set `true` for archive nodes. pub archive: bool, diff --git a/nightly/pytest-sanity.txt b/nightly/pytest-sanity.txt index c8f521b6fb0..3f7de18d15a 100644 --- a/nightly/pytest-sanity.txt +++ b/nightly/pytest-sanity.txt @@ -61,6 +61,8 @@ pytest --timeout=240 sanity/block_sync.py pytest --timeout=240 sanity/block_sync.py --features nightly pytest --timeout=10m sanity/block_sync_archival.py pytest --timeout=10m sanity/block_sync_archival.py --features nightly +pytest --timeout=120 sanity/block_sync_flat_storage.py +pytest --timeout=120 sanity/block_sync_flat_storage.py --features nightly pytest --timeout=240 sanity/validator_switch.py pytest --timeout=240 sanity/validator_switch.py --features nightly pytest --timeout=240 sanity/rpc_state_changes.py @@ -144,4 +146,4 @@ pytest sanity/meta_tx.py --features nightly # Tests for split storage and split storage migration pytest --timeout=600 sanity/split_storage.py -pytest --timeout=600 sanity/split_storage.py --features nightly \ No newline at end of file +pytest --timeout=600 sanity/split_storage.py --features nightly diff --git a/pytest/lib/cluster.py b/pytest/lib/cluster.py index 05c30ead51e..d517bbf38ae 100644 --- a/pytest/lib/cluster.py +++ b/pytest/lib/cluster.py @@ -818,6 +818,7 @@ def apply_config_changes(node_dir, client_config_change): 'split_storage', 'state_sync_enabled', 'store.state_snapshot_enabled', + 'tracked_shard_schedule', ) for k, v in client_config_change.items(): @@ -965,4 +966,4 @@ def get_binary_protocol_version(config) -> typing.Optional[int]: for i in range(n): if tokens[i] == "protocol" and i + 1 < n: return int(tokens[i + 1]) - return None \ No newline at end of file + return None diff --git a/pytest/tests/sanity/block_sync_flat_storage.py b/pytest/tests/sanity/block_sync_flat_storage.py new file mode 100755 index 00000000000..9c23cd86f7c --- /dev/null +++ b/pytest/tests/sanity/block_sync_flat_storage.py @@ -0,0 +1,101 @@ +#!/usr/bin/env python3 +# Spins up one validating node. +# Spins a non-validating node that tracks all shards. +# In the middle of an epoch, the node gets stopped, and the set of tracked shards gets reduced. +# Test that the node correctly handles chunks for the shards that it will care about in the next epoch. +# Spam transactions that require the node to use flat storage to process them correctly. + +import pathlib +import random +import sys + +sys.path.append(str(pathlib.Path(__file__).resolve().parents[2] / 'lib')) + +from cluster import init_cluster, spin_up_node, load_config, apply_config_changes +import account +import transaction +import utils + +EPOCH_LENGTH = 30 + +config0 = { + 'tracked_shards': [0], +} +config1 = { + 'tracked_shards': [0], +} + +config = load_config() +near_root, node_dirs = init_cluster(1, 1, 4, config, + [["epoch_length", EPOCH_LENGTH]], { + 0: config0, + 1: config1 + }) + +boot_node = spin_up_node(config, near_root, node_dirs[0], 0) +node1 = spin_up_node(config, near_root, node_dirs[1], 1, boot_node=boot_node) + +contract_key = boot_node.signer_key +contract = utils.load_test_contract() +latest_block_hash = boot_node.get_latest_block().hash_bytes +deploy_contract_tx = transaction.sign_deploy_contract_tx( + contract_key, contract, 10, latest_block_hash) +result = boot_node.send_tx_and_wait(deploy_contract_tx, 10) +assert 'result' in result and 'error' not in result, ( + 'Expected "result" and no "error" in response, got: {}'.format(result)) + + +def random_workload_until(target, nonce, keys): + while True: + nonce += 1 + height = boot_node.get_latest_block().height + if height > target: + break + if (len(keys) > 100 and random.random() < 0.2) or len(keys) > 1000: + key = keys[random.randint(0, len(keys) - 1)] + call_function(boot_node, 'read', key, nonce) + else: + key = random_u64() + keys.append(key) + call_function(boot_node, 'write', key, nonce) + return (nonce, keys) + + +def random_u64(): + return bytes(random.randint(0, 255) for _ in range(8)) + + +def call_function(node, op, key, nonce): + last_block_hash = node.get_latest_block().hash_bytes + if op == 'read': + args = key + fn = 'read_value' + else: + args = key + random_u64() + fn = 'write_key_value' + + tx = transaction.sign_function_call_tx(node.signer_key, + node.signer_key.account_id, fn, args, + 300 * account.TGAS, 0, nonce, + last_block_hash) + return node.send_tx(tx).get('result') + + +nonce, keys = random_workload_until(EPOCH_LENGTH + 5, 1, []) + +node1.kill() +# Reduce the set of tracked shards and make it variable in time. +# The node is stopped in epoch_height = 1. +# Change the config of tracked shards such that after restart the node cares +# only about shard 0, and in the next epoch it will care about shards [1, 2, 3]. +apply_config_changes(node_dirs[1], { + "tracked_shards": [], + "tracked_shard_schedule": [[0], [0], [1, 2, 3]] +}) + +# Run node0 more to trigger block sync in node1. +nonce, keys = random_workload_until(EPOCH_LENGTH * 2 + 1, nonce, keys) + +# Node1 is now behind and needs to do header sync and block sync. +node1.start(boot_node=boot_node) +utils.wait_for_blocks(node1, target=EPOCH_LENGTH * 2 + 10)