Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
Snapshot creation and restoration (#1679)
Browse files Browse the repository at this point in the history
* to_rlp takes self by-reference

* clean up some derefs

* out-of-order insertion for blockchain

* implement block rebuilder without verification

* group block chunk header into struct

* block rebuilder does verification

* integrate snapshot service with client service; flesh out implementation more

* initial implementation of snapshot service

* remove snapshottaker trait

* snapshot writer trait with packed and loose implementations

* write chunks using "snapshotwriter" in service

* have snapshot taking use snapshotwriter

* implement snapshot readers

* back up client dbs when replacing

* use snapshot reader in snapshot service

* describe offset format

* use new get_db_path in parity, allow some errors in service

* blockchain formatting

* implement parity snapshot

* implement snapshot restore

* force blocks to be submitted in order

* fix bug loading block hashes in packed reader

* fix seal field loading

* fix uncle hash computation

* fix a few bugs

* store genesis state in db. reverse block chunk order in packed writer

* allow out-of-order import for blocks

* bring restoration types together

* only snapshot the last 30000 blocks

* restore into overlaydb instead of journaldb

* commit version to database

* use memorydbs and commit directly

* fix trie test compilation

* fix failing tests

* sha3_null_rlp, not H256::zero

* move overlaydb to ref_overlaydb, add new overlaydb without on-disk rc

* port archivedb to new overlaydb

* add deletion mode tests for overlaydb

* use new overlaydb, check state root at end

* share chain info between state and block snapshotting

* create blocks snapshot using blockchain directly

* allow snapshot from arbitrary block, remove panickers from snapshot creation

* begin test framework

* blockchain chunking test

* implement stateproducer::tick

* state snapshot test

* create block and state chunks concurrently, better restoration informant

* fix tests

* add deletion mode tests for overlaydb

* address comments

* more tests

* Fix up tests.

* remove a few printlns

* add a little more documentation to `commit`

* fix tests

* fix ref_overlaydb test names

* snapshot command skeleton

* revert ref_overlaydb renaming

* reimplement snapshot commands

* fix many errors

* everything but inject

* get ethcore compiling

* get snapshot tests passing again

* instrument snapshot commands again

* fix fallout from other changes, mark snapshots as experimental

* optimize injection patterns

* do two injections

* fix up tests

* take snapshots from 1000 blocks efore

* address minor comments

* fix a few io crate related errors

* clarify names about total difficulty

[ci skip]
  • Loading branch information
rphmeier authored and gavofyork committed Aug 5, 2016
1 parent 725d320 commit 76a7246
Show file tree
Hide file tree
Showing 42 changed files with 1,909 additions and 240 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ethcore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ ethjson = { path = "../json" }
ethcore-ipc = { path = "../ipc/rpc" }
ethstore = { path = "../ethstore" }
ethcore-ipc-nano = { path = "../ipc/nano" }
rand = "0.3"

[dependencies.hyper]
git = "https://github.com/ethcore/hyper"
Expand Down
2 changes: 1 addition & 1 deletion ethcore/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl Block {
UntrustedRlp::new(b).as_val::<Block>().is_ok()
}

/// Get the RLP-encoding of the block without the seal.
/// Get the RLP-encoding of the block with or without the seal.
pub fn rlp_bytes(&self, seal: Seal) -> Bytes {
let mut block_rlp = RlpStream::new_list(3);
self.header.stream_rlp(&mut block_rlp, seal);
Expand Down
18 changes: 9 additions & 9 deletions ethcore/src/block_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl BlockQueueInfo {
/// Sorts them ready for blockchain insertion.
pub struct BlockQueue {
panic_handler: Arc<PanicHandler>,
engine: Arc<Box<Engine>>,
engine: Arc<Engine>,
more_to_verify: Arc<SCondvar>,
verification: Arc<Verification>,
verifiers: Vec<JoinHandle<()>>,
Expand Down Expand Up @@ -140,7 +140,7 @@ struct Verification {

impl BlockQueue {
/// Creates a new queue instance.
pub fn new(config: BlockQueueConfig, engine: Arc<Box<Engine>>, message_channel: IoChannel<ClientIoMessage>) -> BlockQueue {
pub fn new(config: BlockQueueConfig, engine: Arc<Engine>, message_channel: IoChannel<ClientIoMessage>) -> BlockQueue {
let verification = Arc::new(Verification {
unverified: Mutex::new(VecDeque::new()),
verified: Mutex::new(VecDeque::new()),
Expand Down Expand Up @@ -196,7 +196,7 @@ impl BlockQueue {
}
}

fn verify(verification: Arc<Verification>, engine: Arc<Box<Engine>>, wait: Arc<SCondvar>, ready: Arc<QueueSignal>, deleting: Arc<AtomicBool>, empty: Arc<SCondvar>) {
fn verify(verification: Arc<Verification>, engine: Arc<Engine>, wait: Arc<SCondvar>, ready: Arc<QueueSignal>, deleting: Arc<AtomicBool>, empty: Arc<SCondvar>) {
while !deleting.load(AtomicOrdering::Acquire) {
{
let mut more_to_verify = verification.more_to_verify.lock().unwrap();
Expand Down Expand Up @@ -226,7 +226,7 @@ impl BlockQueue {
};

let block_hash = block.header.hash();
match verify_block_unordered(block.header, block.bytes, &**engine) {
match verify_block_unordered(block.header, block.bytes, &*engine) {
Ok(verified) => {
let mut verifying = verification.verifying.lock();
for e in verifying.iter_mut() {
Expand Down Expand Up @@ -319,7 +319,7 @@ impl BlockQueue {
}
}

match verify_block_basic(&header, &bytes, &**self.engine) {
match verify_block_basic(&header, &bytes, &*self.engine) {
Ok(()) => {
self.processing.write().insert(h.clone());
self.verification.unverified.lock().push_back(UnverifiedBlock { header: header, bytes: bytes });
Expand All @@ -340,7 +340,7 @@ impl BlockQueue {
return;
}
let mut verified_lock = self.verification.verified.lock();
let mut verified = verified_lock.deref_mut();
let mut verified = &mut *verified_lock;
let mut bad = self.verification.bad.lock();
let mut processing = self.processing.write();
bad.reserve(block_hashes.len());
Expand Down Expand Up @@ -460,15 +460,15 @@ mod tests {
fn get_test_queue() -> BlockQueue {
let spec = get_test_spec();
let engine = spec.engine;
BlockQueue::new(BlockQueueConfig::default(), Arc::new(engine), IoChannel::disconnected())
BlockQueue::new(BlockQueueConfig::default(), engine, IoChannel::disconnected())
}

#[test]
fn can_be_created() {
// TODO better test
let spec = Spec::new_test();
let engine = spec.engine;
let _ = BlockQueue::new(BlockQueueConfig::default(), Arc::new(engine), IoChannel::disconnected());
let _ = BlockQueue::new(BlockQueueConfig::default(), engine, IoChannel::disconnected());
}

#[test]
Expand Down Expand Up @@ -531,7 +531,7 @@ mod tests {
let engine = spec.engine;
let mut config = BlockQueueConfig::default();
config.max_mem_use = super::MIN_MEM_LIMIT; // empty queue uses about 15000
let queue = BlockQueue::new(config, Arc::new(engine), IoChannel::disconnected());
let queue = BlockQueue::new(config, engine, IoChannel::disconnected());
assert!(!queue.queue_info().is_full());
let mut blocks = get_good_dummy_block_seq(50);
for b in blocks.drain(..) {
Expand Down
120 changes: 115 additions & 5 deletions ethcore/src/blockchain/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,116 @@ impl BlockChain {
}
}

/// Inserts a verified, known block from the canonical chain.
///
/// Can be performed out-of-order, but care must be taken that the final chain is in a correct state.
/// This is used by snapshot restoration.
///
/// Supply a dummy parent total difficulty when the parent block may not be in the chain.
/// Returns true if the block is disconnected.
pub fn insert_snapshot_block(&self, bytes: &[u8], receipts: Vec<Receipt>, parent_td: Option<U256>, is_best: bool) -> bool {
let block = BlockView::new(bytes);
let header = block.header_view();
let hash = header.sha3();

if self.is_known(&hash) {
return false;
}

assert!(self.pending_best_block.read().is_none());

let batch = self.db.transaction();

let block_rlp = UntrustedRlp::new(bytes);
let compressed_header = block_rlp.at(0).unwrap().compress(RlpType::Blocks);
let compressed_body = UntrustedRlp::new(&Self::block_to_body(bytes)).compress(RlpType::Blocks);

// store block in db
batch.put(DB_COL_HEADERS, &hash, &compressed_header).unwrap();
batch.put(DB_COL_BODIES, &hash, &compressed_body).unwrap();

let maybe_parent = self.block_details(&header.parent_hash());

if let Some(parent_details) = maybe_parent {
// parent known to be in chain.
let info = BlockInfo {
hash: hash,
number: header.number(),
total_difficulty: parent_details.total_difficulty + header.difficulty(),
location: BlockLocation::CanonChain,
};

self.prepare_update(&batch, ExtrasUpdate {
block_hashes: self.prepare_block_hashes_update(bytes, &info),
block_details: self.prepare_block_details_update(bytes, &info),
block_receipts: self.prepare_block_receipts_update(receipts, &info),
transactions_addresses: self.prepare_transaction_addresses_update(bytes, &info),
blocks_blooms: self.prepare_block_blooms_update(bytes, &info),
info: info,
block: bytes
}, is_best);
self.db.write(batch).unwrap();

false
} else {
// parent not in the chain yet. we need the parent difficulty to proceed.
let d = parent_td
.expect("parent total difficulty always supplied for first block in chunk. only first block can have missing parent; qed");

let info = BlockInfo {
hash: hash,
number: header.number(),
total_difficulty: d + header.difficulty(),
location: BlockLocation::CanonChain,
};

let block_details = BlockDetails {
number: header.number(),
total_difficulty: info.total_difficulty,
parent: header.parent_hash(),
children: Vec::new(),
};

let mut update = HashMap::new();
update.insert(hash, block_details);

self.prepare_update(&batch, ExtrasUpdate {
block_hashes: self.prepare_block_hashes_update(bytes, &info),
block_details: update,
block_receipts: self.prepare_block_receipts_update(receipts, &info),
transactions_addresses: self.prepare_transaction_addresses_update(bytes, &info),
blocks_blooms: self.prepare_block_blooms_update(bytes, &info),
info: info,
block: bytes,
}, is_best);
self.db.write(batch).unwrap();

true
}
}

/// Add a child to a given block. Assumes that the block hash is in
/// the chain and the child's parent is this block.
///
/// Used in snapshots to glue the chunks together at the end.
pub fn add_child(&self, block_hash: H256, child_hash: H256) {
let mut parent_details = self.block_details(&block_hash)
.unwrap_or_else(|| panic!("Invalid block hash: {:?}", block_hash));

let batch = self.db.transaction();
parent_details.children.push(child_hash);

let mut update = HashMap::new();
update.insert(block_hash, parent_details);

self.note_used(CacheID::BlockDetails(block_hash));

let mut write_details = self.block_details.write();
batch.extend_with_cache(DB_COL_EXTRA, &mut *write_details, update, CacheUpdatePolicy::Overwrite);

self.db.write(batch).unwrap();
}

#[cfg_attr(feature="dev", allow(similar_names))]
/// Inserts the block into backing cache database.
/// Expects the block to be valid and already verified.
Expand Down Expand Up @@ -572,7 +682,7 @@ impl BlockChain {
blocks_blooms: self.prepare_block_blooms_update(bytes, &info),
info: info.clone(),
block: bytes,
});
}, true);

ImportRoute::from(info)
}
Expand Down Expand Up @@ -618,7 +728,7 @@ impl BlockChain {
}

/// Prepares extras update.
fn prepare_update(&self, batch: &DBTransaction, update: ExtrasUpdate) {
fn prepare_update(&self, batch: &DBTransaction, update: ExtrasUpdate, is_best: bool) {
{
for hash in update.block_details.keys().cloned() {
self.note_used(CacheID::BlockDetails(hash));
Expand All @@ -645,17 +755,16 @@ impl BlockChain {
// update best block
match update.info.location {
BlockLocation::Branch => (),
_ => {
_ => if is_best {
batch.put(DB_COL_EXTRA, b"best", &update.info.hash).unwrap();
*best_block = Some(BestBlock {
hash: update.info.hash,
number: update.info.number,
total_difficulty: update.info.total_difficulty,
block: update.block.to_vec(),
});
}
},
}

let mut write_hashes = self.pending_block_hashes.write();
let mut write_txs = self.pending_transaction_addresses.write();

Expand Down Expand Up @@ -745,6 +854,7 @@ impl BlockChain {
}

/// This function returns modified block details.
/// Uses the given parent details or attempts to load them from the database.
fn prepare_block_details_update(&self, block_bytes: &[u8], info: &BlockInfo) -> HashMap<H256, BlockDetails> {
let block = BlockView::new(block_bytes);
let header = block.header_view();
Expand Down
2 changes: 1 addition & 1 deletion ethcore/src/blockchain/extras.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub enum ExtrasIndex {
fn with_index(hash: &H256, i: ExtrasIndex) -> H264 {
let mut result = H264::default();
result[0] = i as u8;
result.deref_mut()[1..].clone_from_slice(hash);
(*result)[1..].clone_from_slice(hash);
result
}

Expand Down
2 changes: 2 additions & 0 deletions ethcore/src/blockchain/generator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.

//! Blockchain generator for tests.

mod bloom;
mod block;
mod complete;
Expand Down
2 changes: 1 addition & 1 deletion ethcore/src/blockchain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ mod import_route;
mod update;

#[cfg(test)]
mod generator;
pub mod generator;

pub use self::blockchain::{BlockProvider, BlockChain};
pub use self::cache::CacheSize;
Expand Down
Loading

0 comments on commit 76a7246

Please sign in to comment.