From abc01043df22566dd8d8919cdc0d7ee7bdeb6316 Mon Sep 17 00:00:00 2001 From: wacban Date: Fri, 18 Aug 2023 15:26:50 +0200 Subject: [PATCH] refactor: refactoring code around incoming receipts handling (#9452) Refactoring some methods handling the incoming receipts. It's a warm up before handling that during resharding :). --- chain/chain/src/chain.rs | 36 +++---- chain/chain/src/store.rs | 7 ++ chain/client/src/client.rs | 214 +++++++++++++++++++------------------ 3 files changed, 134 insertions(+), 123 deletions(-) diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index f25d5e9a170..45a1385099a 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -1711,17 +1711,18 @@ impl Chain { let mut receipt_proofs_by_shard_id = HashMap::new(); for chunk_header in block.chunks().iter() { - if chunk_header.height_included() == height { - let partial_encoded_chunk = - self.store.get_partial_chunk(&chunk_header.chunk_hash()).unwrap(); - for receipt in partial_encoded_chunk.receipts().iter() { - let ReceiptProof(_, shard_proof) = receipt; - let ShardProof { from_shard_id: _, to_shard_id, proof: _ } = shard_proof; - receipt_proofs_by_shard_id - .entry(*to_shard_id) - .or_insert_with(Vec::new) - .push(receipt.clone()); - } + if chunk_header.height_included() != height { + continue; + } + let partial_encoded_chunk = + self.store.get_partial_chunk(&chunk_header.chunk_hash()).unwrap(); + for receipt in partial_encoded_chunk.receipts().iter() { + let ReceiptProof(_, shard_proof) = receipt; + let ShardProof { to_shard_id, .. } = shard_proof; + receipt_proofs_by_shard_id + .entry(*to_shard_id) + .or_insert_with(Vec::new) + .push(receipt.clone()); } } // sort the receipts deterministically so the order that they will be processed is deterministic @@ -3981,13 +3982,12 @@ impl Chain { // we can't use hash from the current block here yet because the incoming receipts // for this block is not stored yet let mut receipts = collect_receipts(incoming_receipts.get(&shard_id).unwrap()); - receipts.extend(collect_receipts_from_response( - &self.store().get_incoming_receipts_for_shard( - shard_id, - *prev_hash, - prev_chunk_height_included, - )?, - )); + let receipt_proof_response = &self.store().get_incoming_receipts_for_shard( + shard_id, + *prev_hash, + prev_chunk_height_included, + )?; + receipts.extend(collect_receipts_from_response(receipt_proof_response)); let chunk = self.get_chunk_clone_from_header(&chunk_header.clone())?; let transactions = chunk.transactions(); diff --git a/chain/chain/src/store.rs b/chain/chain/src/store.rs index 129a6d73e86..123e4a4a78a 100644 --- a/chain/chain/src/store.rs +++ b/chain/chain/src/store.rs @@ -203,11 +203,13 @@ pub trait ChainStoreAccess { hash: &CryptoHash, shard_id: ShardId, ) -> Result>, Error>; + fn get_incoming_receipts( &self, hash: &CryptoHash, shard_id: ShardId, ) -> Result>, Error>; + /// Collect incoming receipts for shard `shard_id` from /// the block at height `last_chunk_height_included` (non-inclusive) to the block `block_hash` (inclusive) /// This is because the chunks for the shard are empty for the blocks in between, @@ -239,6 +241,11 @@ pub trait ChainStoreAccess { ret.push(ReceiptProofResponse(block_hash, Arc::new(vec![]))); } + // TODO(resharding) + // when crossing the epoch boundary we should check if the shard + // layout is different and handle that + // one idea would be to do shard_id := parent(shard_id) but remember to + // deduplicate the receipts as well block_hash = prev_hash; } diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index 5545ca92543..c8a3dd6f5c1 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -1506,124 +1506,128 @@ impl Client { } if let Some(validator_signer) = self.validator_signer.clone() { - // Reconcile the txpool against the new block *after* we have broadcast it too our peers. - // This may be slow and we do not want to delay block propagation. let validator_id = validator_signer.validator_id().clone(); - match status { - BlockStatus::Next => { - // If this block immediately follows the current tip, remove transactions - // from the txpool - self.remove_transactions_for_block(validator_id.clone(), &block); - } - BlockStatus::Fork => { - // If it's a fork, no need to reconcile transactions or produce chunks - return; - } - BlockStatus::Reorg(prev_head) => { - // If a reorg happened, reintroduce transactions from the previous chain and - // remove transactions from the new chain - let mut reintroduce_head = self.chain.get_block_header(&prev_head).unwrap(); - let mut remove_head = block.header().clone(); - assert_ne!(remove_head.hash(), reintroduce_head.hash()); - - let mut to_remove = vec![]; - let mut to_reintroduce = vec![]; - - while remove_head.hash() != reintroduce_head.hash() { - while remove_head.height() > reintroduce_head.height() { - to_remove.push(*remove_head.hash()); - remove_head = self - .chain - .get_block_header(remove_head.prev_hash()) - .unwrap() - .clone(); - } - while reintroduce_head.height() > remove_head.height() - || reintroduce_head.height() == remove_head.height() - && reintroduce_head.hash() != remove_head.hash() - { - to_reintroduce.push(*reintroduce_head.hash()); - reintroduce_head = self - .chain - .get_block_header(reintroduce_head.prev_hash()) - .unwrap() - .clone(); - } - } - - for to_reintroduce_hash in to_reintroduce { - if let Ok(block) = self.chain.get_block(&to_reintroduce_hash) { - let block = block.clone(); - self.reintroduce_transactions_for_block(validator_id.clone(), &block); - } - } - for to_remove_hash in to_remove { - if let Ok(block) = self.chain.get_block(&to_remove_hash) { - let block = block.clone(); - self.remove_transactions_for_block(validator_id.clone(), &block); - } - } - } - }; + if !self.reconcile_transaction_pool(validator_id.clone(), status, &block) { + return; + } if provenance != Provenance::SYNC && !self.sync_status.is_syncing() && !skip_produce_chunk { - // Produce new chunks - let epoch_id = - self.epoch_manager.get_epoch_id_from_prev_block(block.header().hash()).unwrap(); - for shard_id in 0..self.epoch_manager.num_shards(&epoch_id).unwrap() { - let chunk_proposer = self - .epoch_manager - .get_chunk_producer(&epoch_id, block.header().height() + 1, shard_id) - .unwrap(); - - if &chunk_proposer == &validator_id { - let _span = tracing::debug_span!( - target: "client", - "on_block_accepted", - prev_block_hash = ?*block.hash(), - ?shard_id) - .entered(); - let _timer = metrics::PRODUCE_AND_DISTRIBUTE_CHUNK_TIME - .with_label_values(&[&shard_id.to_string()]) - .start_timer(); - match self.produce_chunk( - *block.hash(), - &epoch_id, - Chain::get_prev_chunk_header( - self.epoch_manager.as_ref(), - &block, - shard_id, - ) - .unwrap(), - block.header().height() + 1, - shard_id, - ) { - Ok(Some((encoded_chunk, merkle_paths, receipts))) => { - self.persist_and_distribute_encoded_chunk( - encoded_chunk, - merkle_paths, - receipts, - validator_id.clone(), - ) - .expect("Failed to process produced chunk"); - } - Ok(None) => {} - Err(err) => { - error!(target: "client", "Error producing chunk {:?}", err); - } - } - } - } + self.produce_chunks(&block, validator_id); } } + self.shards_manager_adapter .send(ShardsManagerRequestFromClient::CheckIncompleteChunks(*block.hash())); } + /// Reconcile the transaction pool after processing a block. + /// returns true if it's ok to proceed to produce chunks + /// returns false when handling a fork and there is no need to produce chunks + fn reconcile_transaction_pool( + &mut self, + validator_id: AccountId, + status: BlockStatus, + block: &Block, + ) -> bool { + match status { + BlockStatus::Next => { + // If this block immediately follows the current tip, remove + // transactions from the txpool. + self.remove_transactions_for_block(validator_id, block); + } + BlockStatus::Fork => { + // If it's a fork, no need to reconcile transactions or produce chunks. + return false; + } + BlockStatus::Reorg(prev_head) => { + // If a reorg happened, reintroduce transactions from the + // previous chain and remove transactions from the new chain. + let mut reintroduce_head = self.chain.get_block_header(&prev_head).unwrap(); + let mut remove_head = block.header().clone(); + assert_ne!(remove_head.hash(), reintroduce_head.hash()); + + let mut to_remove = vec![]; + let mut to_reintroduce = vec![]; + + while remove_head.hash() != reintroduce_head.hash() { + while remove_head.height() > reintroduce_head.height() { + to_remove.push(*remove_head.hash()); + remove_head = + self.chain.get_block_header(remove_head.prev_hash()).unwrap().clone(); + } + while reintroduce_head.height() > remove_head.height() + || reintroduce_head.height() == remove_head.height() + && reintroduce_head.hash() != remove_head.hash() + { + to_reintroduce.push(*reintroduce_head.hash()); + reintroduce_head = self + .chain + .get_block_header(reintroduce_head.prev_hash()) + .unwrap() + .clone(); + } + } + + for to_reintroduce_hash in to_reintroduce { + if let Ok(block) = self.chain.get_block(&to_reintroduce_hash) { + let block = block.clone(); + self.reintroduce_transactions_for_block(validator_id.clone(), &block); + } + } + + for to_remove_hash in to_remove { + if let Ok(block) = self.chain.get_block(&to_remove_hash) { + let block = block.clone(); + self.remove_transactions_for_block(validator_id.clone(), &block); + } + } + } + }; + true + } + + // Produce new chunks + fn produce_chunks(&mut self, block: &Block, validator_id: AccountId) { + let epoch_id = + self.epoch_manager.get_epoch_id_from_prev_block(block.header().hash()).unwrap(); + for shard_id in 0..self.epoch_manager.num_shards(&epoch_id).unwrap() { + let next_height = block.header().height() + 1; + let epoch_manager = self.epoch_manager.as_ref(); + let chunk_proposer = + epoch_manager.get_chunk_producer(&epoch_id, next_height, shard_id).unwrap(); + if &chunk_proposer != &validator_id { + continue; + } + + let _span = tracing::debug_span!( + target: "client", "on_block_accepted", prev_block_hash = ?*block.hash(), ?shard_id) + .entered(); + let _timer = metrics::PRODUCE_AND_DISTRIBUTE_CHUNK_TIME + .with_label_values(&[&shard_id.to_string()]) + .start_timer(); + let last_header = Chain::get_prev_chunk_header(epoch_manager, block, shard_id).unwrap(); + match self.produce_chunk(*block.hash(), &epoch_id, last_header, next_height, shard_id) { + Ok(Some((encoded_chunk, merkle_paths, receipts))) => { + self.persist_and_distribute_encoded_chunk( + encoded_chunk, + merkle_paths, + receipts, + validator_id.clone(), + ) + .expect("Failed to process produced chunk"); + } + Ok(None) => {} + Err(err) => { + error!(target: "client", "Error producing chunk {:?}", err); + } + } + } + } + pub fn persist_and_distribute_encoded_chunk( &mut self, encoded_chunk: EncodedShardChunk,