Skip to content

Commit

Permalink
fix(state-sync): Make partial chunks message deterministic (#9427)
Browse files Browse the repository at this point in the history
The issue only happens if a node tracks a subset of shards.
The order of shards is arbitrary because:

* Shard ids are in a HashSet
* In one case the first the node adds the shards that are cached, and later the shards that are only available on disk.
  • Loading branch information
nikurt committed Aug 30, 2023
1 parent 63f0b64 commit ec3035b
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 34 deletions.
26 changes: 4 additions & 22 deletions chain/chunks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -798,22 +798,9 @@ impl ShardsManager {
request: PartialEncodedChunkRequestMsg,
) -> (PartialEncodedChunkResponseSource, PartialEncodedChunkResponseMsg) {
let (src, mut response_msg) = self.prepare_partial_encoded_chunk_response_unsorted(request);
let before = response_msg
.receipts
.clone()
.into_iter()
.map(|ReceiptProof(_, shard_proof)| shard_proof);
response_msg.receipts.sort_by_key(
|ReceiptProof(_receipt, ShardProof { from_shard_id, to_shard_id, proof: _proof })| {
(*from_shard_id, *to_shard_id)
},
);
let after = response_msg
.receipts
.clone()
.into_iter()
.map(|ReceiptProof(_, shard_proof)| shard_proof);
tracing::info!(target: "debug-me", ?before, ?after);
// Note that the PartialChunks column is a write-once column, and needs
// the values to be deterministic.
response_msg.receipts.sort();
(src, response_msg)
}

Expand Down Expand Up @@ -1502,7 +1489,6 @@ impl ShardsManager {
&mut self,
response: PartialEncodedChunkResponseMsg,
) -> Result<(), Error> {
tracing::info!(target: "debug-me", receipts = ?response.receipts, "process_partial_encoded_chunk_response");
let header = self.get_partial_encoded_chunk_header(&response.chunk_hash)?;
let partial_chunk = PartialEncodedChunk::new(header, response.parts, response.receipts);
// We already know the header signature is valid because we read it from the
Expand Down Expand Up @@ -1595,7 +1581,6 @@ impl ShardsManager {
}
// we can safely unwrap here because we already checked that chunk_hash exist in encoded_chunks
let entry = self.encoded_chunks.get(&chunk_hash).unwrap();
tracing::debug!(target: "debug-me", entry_receipts = ?entry.receipts, ?can_reconstruct, ?have_all_parts, "try_process_chunk_parts_and_receipts");

let cares_about_shard = cares_about_shard_this_or_next_epoch(
self.me.as_ref(),
Expand All @@ -1617,7 +1602,6 @@ impl ShardsManager {
&self.shard_tracker,
);

tracing::info!(target: "debug-me", receipts=?partial_chunk.receipts(), "!cares_about_shard -- complete_chunk0");
self.complete_chunk(partial_chunk, None);
return Ok(ProcessPartialEncodedChunkResult::HaveAllPartsAndReceipts);
}
Expand Down Expand Up @@ -1645,10 +1629,8 @@ impl ShardsManager {
// Don't persist if we don't care about the shard, even if we accidentally got enough
// parts to reconstruct the full shard.
if cares_about_shard {
tracing::info!(target: "debug-me", receipts=?partial_chunk.receipts(), "can_reconstruct complete_chunk1");
self.complete_chunk(partial_chunk, Some(shard_chunk));
} else {
tracing::info!(target: "debug-me", receipts=?partial_chunk.receipts(), "can_reconstruct complete_chunk2");
self.complete_chunk(partial_chunk, None);
}
return Ok(ProcessPartialEncodedChunkResult::HaveAllPartsAndReceipts);
Expand All @@ -1666,7 +1648,7 @@ impl ShardsManager {
self.encoded_chunks.mark_entry_complete(&chunk_hash);
self.encoded_chunks.remove_from_cache_if_outside_horizon(&chunk_hash);
self.requested_partial_encoded_chunks.remove(&chunk_hash);
tracing::debug!(target: "chunks", partial_chunk_receipts = ?partial_chunk.receipts(), backtrace = ?std::backtrace::Backtrace::force_capture(), "Completed chunk {:?}", chunk_hash);
debug!(target: "chunks", "Completed chunk {:?}", chunk_hash);
self.client_adapter
.send(ShardsManagerResponse::ChunkCompleted { partial_chunk, shard_chunk });
}
Expand Down
10 changes: 2 additions & 8 deletions chain/chunks/src/logic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ pub fn make_outgoing_receipts_proofs(
let receipts = receipts_by_shard.remove(&proof_shard_id).unwrap_or_else(Vec::new);
let shard_proof =
ShardProof { from_shard_id: shard_id, to_shard_id: proof_shard_id, proof };
tracing::info!(target: "debug-me", ?shard_proof, ?receipts, "make_outgoing_receipts_proofs");
ReceiptProof(receipts, shard_proof)
});
Ok(it)
Expand Down Expand Up @@ -119,19 +118,15 @@ pub fn make_partial_encoded_chunk_from_owned_parts_and_needed_receipts<'a>(
})
.cloned()
.collect();
let mut receipts: Vec<ReceiptProof> = receipts
let mut receipts: Vec<_> = receipts
.filter(|receipt| {
cares_about_shard
|| need_receipt(prev_block_hash, receipt.1.to_shard_id, me, shard_tracker)
})
.cloned()
.collect();
// Make sure the receipts are in a deterministic order.
receipts.sort_by_key(
|ReceiptProof(_, ShardProof { from_shard_id, to_shard_id, proof: _proof })| {
(*from_shard_id, *to_shard_id)
},
);
receipts.sort();
match header.clone() {
ShardChunkHeader::V1(header) => {
PartialEncodedChunk::V1(PartialEncodedChunkV1 { header, parts, receipts })
Expand Down Expand Up @@ -190,7 +185,6 @@ fn create_partial_chunk(
let header = encoded_chunk.cloned_header();
let receipts =
make_outgoing_receipts_proofs(&header, &outgoing_receipts, epoch_manager)?.collect();
tracing::info!(target: "debug-me", ?outgoing_receipts, ?receipts, "create_partial_chunk");
let partial_chunk = PartialEncodedChunkV2 {
header,
parts: encoded_chunk
Expand Down
10 changes: 6 additions & 4 deletions docs/misc/state_sync_from_external_storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,11 @@ your `config.json` file:
}
}
},
"state_sync_timeout": {
"secs": 30,
"nanos": 0
"consensus": {
"state_sync_timeout": {
"secs": 30,
"nanos": 0
}
}
```

Expand All @@ -70,7 +72,7 @@ shards that can be downloaded in parallel during state sync.
across all shards that can be downloaded in parallel during catchup. Generally,
this number should not be higher than `num_concurrent_requests`. Keep it
reasonably low to allow the node to process chunks of other shards.
* `state_sync_timeout` determines the max duration of an attempt to download a
* `consensus.state_sync_timeout` determines the max duration of an attempt to download a
state part. Setting it too low may cause too many unsuccessful attempts.

### Amazon S3
Expand Down

0 comments on commit ec3035b

Please sign in to comment.