Skip to content

Commit

Permalink
refactor: refactoring code around incoming receipts handling (#9452)
Browse files Browse the repository at this point in the history
Refactoring some methods handling the incoming receipts. It's a warm up before handling that during resharding :).
  • Loading branch information
wacban authored Aug 18, 2023
1 parent 916a174 commit abc0104
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 123 deletions.
36 changes: 18 additions & 18 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1711,17 +1711,18 @@ impl Chain {
let mut receipt_proofs_by_shard_id = HashMap::new();

for chunk_header in block.chunks().iter() {
if chunk_header.height_included() == height {
let partial_encoded_chunk =
self.store.get_partial_chunk(&chunk_header.chunk_hash()).unwrap();
for receipt in partial_encoded_chunk.receipts().iter() {
let ReceiptProof(_, shard_proof) = receipt;
let ShardProof { from_shard_id: _, to_shard_id, proof: _ } = shard_proof;
receipt_proofs_by_shard_id
.entry(*to_shard_id)
.or_insert_with(Vec::new)
.push(receipt.clone());
}
if chunk_header.height_included() != height {
continue;
}
let partial_encoded_chunk =
self.store.get_partial_chunk(&chunk_header.chunk_hash()).unwrap();
for receipt in partial_encoded_chunk.receipts().iter() {
let ReceiptProof(_, shard_proof) = receipt;
let ShardProof { to_shard_id, .. } = shard_proof;
receipt_proofs_by_shard_id
.entry(*to_shard_id)
.or_insert_with(Vec::new)
.push(receipt.clone());
}
}
// sort the receipts deterministically so the order that they will be processed is deterministic
Expand Down Expand Up @@ -3981,13 +3982,12 @@ impl Chain {
// we can't use hash from the current block here yet because the incoming receipts
// for this block is not stored yet
let mut receipts = collect_receipts(incoming_receipts.get(&shard_id).unwrap());
receipts.extend(collect_receipts_from_response(
&self.store().get_incoming_receipts_for_shard(
shard_id,
*prev_hash,
prev_chunk_height_included,
)?,
));
let receipt_proof_response = &self.store().get_incoming_receipts_for_shard(
shard_id,
*prev_hash,
prev_chunk_height_included,
)?;
receipts.extend(collect_receipts_from_response(receipt_proof_response));
let chunk = self.get_chunk_clone_from_header(&chunk_header.clone())?;

let transactions = chunk.transactions();
Expand Down
7 changes: 7 additions & 0 deletions chain/chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,11 +203,13 @@ pub trait ChainStoreAccess {
hash: &CryptoHash,
shard_id: ShardId,
) -> Result<Arc<Vec<Receipt>>, Error>;

fn get_incoming_receipts(
&self,
hash: &CryptoHash,
shard_id: ShardId,
) -> Result<Arc<Vec<ReceiptProof>>, Error>;

/// Collect incoming receipts for shard `shard_id` from
/// the block at height `last_chunk_height_included` (non-inclusive) to the block `block_hash` (inclusive)
/// This is because the chunks for the shard are empty for the blocks in between,
Expand Down Expand Up @@ -239,6 +241,11 @@ pub trait ChainStoreAccess {
ret.push(ReceiptProofResponse(block_hash, Arc::new(vec![])));
}

// TODO(resharding)
// when crossing the epoch boundary we should check if the shard
// layout is different and handle that
// one idea would be to do shard_id := parent(shard_id) but remember to
// deduplicate the receipts as well
block_hash = prev_hash;
}

Expand Down
214 changes: 109 additions & 105 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1506,124 +1506,128 @@ impl Client {
}

if let Some(validator_signer) = self.validator_signer.clone() {
// Reconcile the txpool against the new block *after* we have broadcast it too our peers.
// This may be slow and we do not want to delay block propagation.
let validator_id = validator_signer.validator_id().clone();
match status {
BlockStatus::Next => {
// If this block immediately follows the current tip, remove transactions
// from the txpool
self.remove_transactions_for_block(validator_id.clone(), &block);
}
BlockStatus::Fork => {
// If it's a fork, no need to reconcile transactions or produce chunks
return;
}
BlockStatus::Reorg(prev_head) => {
// If a reorg happened, reintroduce transactions from the previous chain and
// remove transactions from the new chain
let mut reintroduce_head = self.chain.get_block_header(&prev_head).unwrap();
let mut remove_head = block.header().clone();
assert_ne!(remove_head.hash(), reintroduce_head.hash());

let mut to_remove = vec![];
let mut to_reintroduce = vec![];

while remove_head.hash() != reintroduce_head.hash() {
while remove_head.height() > reintroduce_head.height() {
to_remove.push(*remove_head.hash());
remove_head = self
.chain
.get_block_header(remove_head.prev_hash())
.unwrap()
.clone();
}
while reintroduce_head.height() > remove_head.height()
|| reintroduce_head.height() == remove_head.height()
&& reintroduce_head.hash() != remove_head.hash()
{
to_reintroduce.push(*reintroduce_head.hash());
reintroduce_head = self
.chain
.get_block_header(reintroduce_head.prev_hash())
.unwrap()
.clone();
}
}

for to_reintroduce_hash in to_reintroduce {
if let Ok(block) = self.chain.get_block(&to_reintroduce_hash) {
let block = block.clone();
self.reintroduce_transactions_for_block(validator_id.clone(), &block);
}
}

for to_remove_hash in to_remove {
if let Ok(block) = self.chain.get_block(&to_remove_hash) {
let block = block.clone();
self.remove_transactions_for_block(validator_id.clone(), &block);
}
}
}
};
if !self.reconcile_transaction_pool(validator_id.clone(), status, &block) {
return;
}

if provenance != Provenance::SYNC
&& !self.sync_status.is_syncing()
&& !skip_produce_chunk
{
// Produce new chunks
let epoch_id =
self.epoch_manager.get_epoch_id_from_prev_block(block.header().hash()).unwrap();
for shard_id in 0..self.epoch_manager.num_shards(&epoch_id).unwrap() {
let chunk_proposer = self
.epoch_manager
.get_chunk_producer(&epoch_id, block.header().height() + 1, shard_id)
.unwrap();

if &chunk_proposer == &validator_id {
let _span = tracing::debug_span!(
target: "client",
"on_block_accepted",
prev_block_hash = ?*block.hash(),
?shard_id)
.entered();
let _timer = metrics::PRODUCE_AND_DISTRIBUTE_CHUNK_TIME
.with_label_values(&[&shard_id.to_string()])
.start_timer();
match self.produce_chunk(
*block.hash(),
&epoch_id,
Chain::get_prev_chunk_header(
self.epoch_manager.as_ref(),
&block,
shard_id,
)
.unwrap(),
block.header().height() + 1,
shard_id,
) {
Ok(Some((encoded_chunk, merkle_paths, receipts))) => {
self.persist_and_distribute_encoded_chunk(
encoded_chunk,
merkle_paths,
receipts,
validator_id.clone(),
)
.expect("Failed to process produced chunk");
}
Ok(None) => {}
Err(err) => {
error!(target: "client", "Error producing chunk {:?}", err);
}
}
}
}
self.produce_chunks(&block, validator_id);
}
}

self.shards_manager_adapter
.send(ShardsManagerRequestFromClient::CheckIncompleteChunks(*block.hash()));
}

/// Reconcile the transaction pool after processing a block.
/// returns true if it's ok to proceed to produce chunks
/// returns false when handling a fork and there is no need to produce chunks
fn reconcile_transaction_pool(
&mut self,
validator_id: AccountId,
status: BlockStatus,
block: &Block,
) -> bool {
match status {
BlockStatus::Next => {
// If this block immediately follows the current tip, remove
// transactions from the txpool.
self.remove_transactions_for_block(validator_id, block);
}
BlockStatus::Fork => {
// If it's a fork, no need to reconcile transactions or produce chunks.
return false;
}
BlockStatus::Reorg(prev_head) => {
// If a reorg happened, reintroduce transactions from the
// previous chain and remove transactions from the new chain.
let mut reintroduce_head = self.chain.get_block_header(&prev_head).unwrap();
let mut remove_head = block.header().clone();
assert_ne!(remove_head.hash(), reintroduce_head.hash());

let mut to_remove = vec![];
let mut to_reintroduce = vec![];

while remove_head.hash() != reintroduce_head.hash() {
while remove_head.height() > reintroduce_head.height() {
to_remove.push(*remove_head.hash());
remove_head =
self.chain.get_block_header(remove_head.prev_hash()).unwrap().clone();
}
while reintroduce_head.height() > remove_head.height()
|| reintroduce_head.height() == remove_head.height()
&& reintroduce_head.hash() != remove_head.hash()
{
to_reintroduce.push(*reintroduce_head.hash());
reintroduce_head = self
.chain
.get_block_header(reintroduce_head.prev_hash())
.unwrap()
.clone();
}
}

for to_reintroduce_hash in to_reintroduce {
if let Ok(block) = self.chain.get_block(&to_reintroduce_hash) {
let block = block.clone();
self.reintroduce_transactions_for_block(validator_id.clone(), &block);
}
}

for to_remove_hash in to_remove {
if let Ok(block) = self.chain.get_block(&to_remove_hash) {
let block = block.clone();
self.remove_transactions_for_block(validator_id.clone(), &block);
}
}
}
};
true
}

// Produce new chunks
fn produce_chunks(&mut self, block: &Block, validator_id: AccountId) {
let epoch_id =
self.epoch_manager.get_epoch_id_from_prev_block(block.header().hash()).unwrap();
for shard_id in 0..self.epoch_manager.num_shards(&epoch_id).unwrap() {
let next_height = block.header().height() + 1;
let epoch_manager = self.epoch_manager.as_ref();
let chunk_proposer =
epoch_manager.get_chunk_producer(&epoch_id, next_height, shard_id).unwrap();
if &chunk_proposer != &validator_id {
continue;
}

let _span = tracing::debug_span!(
target: "client", "on_block_accepted", prev_block_hash = ?*block.hash(), ?shard_id)
.entered();
let _timer = metrics::PRODUCE_AND_DISTRIBUTE_CHUNK_TIME
.with_label_values(&[&shard_id.to_string()])
.start_timer();
let last_header = Chain::get_prev_chunk_header(epoch_manager, block, shard_id).unwrap();
match self.produce_chunk(*block.hash(), &epoch_id, last_header, next_height, shard_id) {
Ok(Some((encoded_chunk, merkle_paths, receipts))) => {
self.persist_and_distribute_encoded_chunk(
encoded_chunk,
merkle_paths,
receipts,
validator_id.clone(),
)
.expect("Failed to process produced chunk");
}
Ok(None) => {}
Err(err) => {
error!(target: "client", "Error producing chunk {:?}", err);
}
}
}
}

pub fn persist_and_distribute_encoded_chunk(
&mut self,
encoded_chunk: EncodedShardChunk,
Expand Down

0 comments on commit abc0104

Please sign in to comment.