Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

JournalDB inject #1806

Merged
merged 4 commits into from
Aug 3, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions util/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,17 @@ use hash::H256;
pub enum BaseDataError {
/// An entry was removed more times than inserted.
NegativelyReferencedHash(H256),
/// A committed value was inserted more than once.
AlreadyExists(H256),
}

impl fmt::Display for BaseDataError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
BaseDataError::NegativelyReferencedHash(hash) =>
f.write_fmt(format_args!("Entry {} removed from database more times \
than it was added.", hash)),
write!(f, "Entry {} removed from database more times than it was added.", hash),
BaseDataError::AlreadyExists(hash) =>
write!(f, "Committed key already exists in database: {}", hash),
}
}
}
Expand Down
47 changes: 47 additions & 0 deletions util/src/journaldb/archivedb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,38 @@ impl JournalDB for ArchiveDB {
Ok((inserts + deletes) as u32)
}

fn inject(&mut self, batch: &DBTransaction) -> Result<u32, UtilError> {
let mut inserts = 0usize;
let mut deletes = 0usize;

for i in self.overlay.drain().into_iter() {
let (key, (value, rc)) = i;
if rc > 0 {
assert!(rc == 1);
if try!(self.backing.get(self.column, &key)).is_some() {
return Err(BaseDataError::AlreadyExists(key).into());
}
try!(batch.put(self.column, &key, &value));
inserts += 1;
}
if rc < 0 {
assert!(rc == -1);
if try!(self.backing.get(self.column, &key)).is_none() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these checks probably will slow down inject quite a bit and it's not as though these are going to cause rocksdb errors. are they even worthwhile?

seems like we are using the DB to point out logic errors in the rest of our code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup - this way it fails fast rather than allows an inconsistent situation to persevere.

however, we might consider a build flag (perhaps called "final"?) which allows these checks to be omitted.

return Err(BaseDataError::NegativelyReferencedHash(key).into());
}
try!(batch.delete(self.column, &key));
deletes += 1;
}
}

for (mut key, value) in self.overlay.drain_aux().into_iter() {
key.push(AUX_FLAG);
try!(batch.put(self.column, &key, &value));
}

Ok((inserts + deletes) as u32)
}

fn latest_era(&self) -> Option<u64> { self.latest_era }

fn state(&self, id: &H256) -> Option<Bytes> {
Expand Down Expand Up @@ -449,4 +481,19 @@ mod tests {
assert!(state.is_some());
}
}

#[test]
fn inject() {
let temp = ::devtools::RandomTempPath::new();

let mut jdb = new_db(temp.as_path().as_path());
let key = jdb.insert(b"dog");
jdb.inject_batch().unwrap();

assert_eq!(jdb.get(&key).unwrap(), b"dog");
jdb.remove(&key);
jdb.inject_batch().unwrap();

assert!(jdb.get(&key).is_none());
}
}
41 changes: 41 additions & 0 deletions util/src/journaldb/earlymergedb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,32 @@ impl JournalDB for EarlyMergeDB {

Ok(0)
}

fn inject(&mut self, batch: &DBTransaction) -> Result<u32, UtilError> {
let mut ops = 0;
for (key, (value, rc)) in self.overlay.drain() {
if rc != 0 { ops += 1 }

match rc {
0 => {}
1 => {
if try!(self.backing.get(self.column, &key)).is_some() {
return Err(BaseDataError::AlreadyExists(key).into());
}
try!(batch.put(self.column, &key, &value))
}
-1 => {
if try!(self.backing.get(self.column, &key)).is_none() {
return Err(BaseDataError::NegativelyReferencedHash(key).into());
}
try!(batch.delete(self.column, &key))
}
_ => panic!("Attempted to inject invalid state."),
}
}

Ok(ops)
}
}

#[cfg(test)]
Expand Down Expand Up @@ -1045,4 +1071,19 @@ mod tests {
assert!(!jdb.contains(&bar));
}
}

#[test]
fn inject() {
let temp = ::devtools::RandomTempPath::new();

let mut jdb = new_db(temp.as_path().as_path());
let key = jdb.insert(b"dog");
jdb.inject_batch().unwrap();

assert_eq!(jdb.get(&key).unwrap(), b"dog");
jdb.remove(&key);
jdb.inject_batch().unwrap();

assert!(jdb.get(&key).is_none());
}
}
71 changes: 56 additions & 15 deletions util/src/journaldb/overlayrecentdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use super::JournalDB;
///
/// Commit workflow:
/// 1. Create a new journal record from the transaction overlay.
/// 2. Inseart each node from the transaction overlay into the History overlay increasing reference
/// 2. Insert each node from the transaction overlay into the History overlay increasing reference
/// count if it is already there. Note that the reference counting is managed by `MemoryDB`
/// 3. Clear the transaction overlay.
/// 4. For a canonical journal record that becomes ancient inserts its insertions into the disk DB
Expand Down Expand Up @@ -155,7 +155,7 @@ impl OverlayRecentDB {
for r in insertions.iter() {
let k: H256 = r.val_at(0);
let v: Bytes = r.val_at(1);
overlay.emplace(OverlayRecentDB::to_short_key(&k), v);
overlay.emplace(to_short_key(&k), v);
inserted_keys.push(k);
count += 1;
}
Expand All @@ -176,12 +176,13 @@ impl OverlayRecentDB {
JournalOverlay { backing_overlay: overlay, journal: journal, latest_era: latest_era }
}

#[inline]
fn to_short_key(key: &H256) -> H256 {
let mut k = H256::new();
k[0..DB_PREFIX_LEN].copy_from_slice(&key[0..DB_PREFIX_LEN]);
k
}
}

#[inline]
fn to_short_key(key: &H256) -> H256 {
let mut k = H256::new();
k[0..DB_PREFIX_LEN].copy_from_slice(&key[0..DB_PREFIX_LEN]);
k
}

impl JournalDB for OverlayRecentDB {
Expand All @@ -208,7 +209,7 @@ impl JournalDB for OverlayRecentDB {
fn latest_era(&self) -> Option<u64> { self.journal_overlay.read().latest_era }

fn state(&self, key: &H256) -> Option<Bytes> {
let v = self.journal_overlay.read().backing_overlay.get(&OverlayRecentDB::to_short_key(key)).map(|v| v.to_vec());
let v = self.journal_overlay.read().backing_overlay.get(&to_short_key(key)).map(|v| v.to_vec());
v.or_else(|| self.backing.get_by_prefix(self.column, &key[0..DB_PREFIX_LEN]).map(|b| b.to_vec()))
}

Expand All @@ -229,7 +230,7 @@ impl JournalDB for OverlayRecentDB {
r.begin_list(2);
r.append(&k);
r.append(&v);
journal_overlay.backing_overlay.emplace(OverlayRecentDB::to_short_key(&k), v);
journal_overlay.backing_overlay.emplace(to_short_key(&k), v);
}
r.append(&removed_keys);

Expand All @@ -246,7 +247,7 @@ impl JournalDB for OverlayRecentDB {
journal_overlay.journal.entry(now).or_insert_with(Vec::new).push(JournalEntry { id: id.clone(), insertions: inserted_keys, deletions: removed_keys });
}

let journal_overlay = journal_overlay.deref_mut();
let journal_overlay = &mut *journal_overlay;
// apply old commits' details
if let Some((end_era, canon_id)) = end {
if let Some(ref mut records) = journal_overlay.journal.get_mut(&end_era) {
Expand All @@ -265,7 +266,7 @@ impl JournalDB for OverlayRecentDB {
{
if canon_id == journal.id {
for h in &journal.insertions {
if let Some(&(ref d, rc)) = journal_overlay.backing_overlay.raw(&OverlayRecentDB::to_short_key(h)) {
if let Some(&(ref d, rc)) = journal_overlay.backing_overlay.raw(&to_short_key(h)) {
if rc > 0 {
canon_insertions.push((h.clone(), d.clone())); //TODO: optimize this to avoid data copy
}
Expand All @@ -283,11 +284,11 @@ impl JournalDB for OverlayRecentDB {
}
// update the overlay
for k in overlay_deletions {
journal_overlay.backing_overlay.remove_and_purge(&OverlayRecentDB::to_short_key(&k));
journal_overlay.backing_overlay.remove_and_purge(&to_short_key(&k));
}
// apply canon deletions
for k in canon_deletions {
if !journal_overlay.backing_overlay.contains(&OverlayRecentDB::to_short_key(&k)) {
if !journal_overlay.backing_overlay.contains(&to_short_key(&k)) {
try!(batch.delete(self.column, &k));
}
}
Expand All @@ -297,6 +298,31 @@ impl JournalDB for OverlayRecentDB {
Ok(0)
}

fn inject(&mut self, batch: &DBTransaction) -> Result<u32, UtilError> {
let mut ops = 0;
for (key, (value, rc)) in self.transaction_overlay.drain() {
if rc != 0 { ops += 1 }

match rc {
0 => {}
1 => {
if try!(self.backing.get(self.column, &key)).is_some() {
return Err(BaseDataError::AlreadyExists(key).into());
}
try!(batch.put(self.column, &key, &value))
}
-1 => {
if try!(self.backing.get(self.column, &key)).is_none() {
return Err(BaseDataError::NegativelyReferencedHash(key).into());
}
try!(batch.delete(self.column, &key))
}
_ => panic!("Attempted to inject invalid state."),
}
}

Ok(ops)
}
}

impl HashDB for OverlayRecentDB {
Expand All @@ -319,7 +345,7 @@ impl HashDB for OverlayRecentDB {
match k {
Some(&(ref d, rc)) if rc > 0 => Some(d),
_ => {
let v = self.journal_overlay.read().backing_overlay.get(&OverlayRecentDB::to_short_key(key)).map(|v| v.to_vec());
let v = self.journal_overlay.read().backing_overlay.get(&to_short_key(key)).map(|v| v.to_vec());
match v {
Some(x) => {
Some(&self.transaction_overlay.denote(key, x).0)
Expand Down Expand Up @@ -879,4 +905,19 @@ mod tests {
assert!(jdb.contains(&foo));
assert!(jdb.contains(&bar));
}

#[test]
fn inject() {
let temp = ::devtools::RandomTempPath::new();

let mut jdb = new_db(temp.as_path().as_path());
let key = jdb.insert(b"dog");
jdb.inject_batch().unwrap();

assert_eq!(jdb.get(&key).unwrap(), b"dog");
jdb.remove(&key);
jdb.inject_batch().unwrap();

assert!(jdb.get(&key).is_none());
}
}
21 changes: 21 additions & 0 deletions util/src/journaldb/refcounteddb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,14 @@ impl JournalDB for RefCountedDB {
let r = try!(self.forward.commit_to_batch(&batch));
Ok(r)
}

fn inject(&mut self, batch: &DBTransaction) -> Result<u32, UtilError> {
self.inserts.clear();
for remove in self.removes.drain(..) {
self.forward.remove(&remove);
}
self.forward.commit_to_batch(&batch)
}
}

#[cfg(test)]
Expand Down Expand Up @@ -298,4 +306,17 @@ mod tests {
assert!(!jdb.contains(&baz));
assert!(!jdb.contains(&bar));
}

#[test]
fn inject() {
let mut jdb = RefCountedDB::new_temp();
let key = jdb.insert(b"dog");
jdb.inject_batch().unwrap();

assert_eq!(jdb.get(&key).unwrap(), b"dog");
jdb.remove(&key);
jdb.inject_batch().unwrap();

assert!(jdb.get(&key).is_none());
}
}
19 changes: 18 additions & 1 deletion util/src/journaldb/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ pub trait JournalDB : HashDB + Send + Sync {
/// old era to the backing database, reverting any non-canonical historical commit's inserts.
fn commit(&mut self, batch: &DBTransaction, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError>;

/// Commit all queued insert and delete operations without affecting any journalling -- this requires that all insertions
/// and deletions are indeed canonical and will likely lead to an invalid database if that assumption is violated.
///
/// Any keys or values inserted or deleted must be completely independent of those affected
/// by any previous `commit` operations. Essentially, this means that `inject` can be used
/// either to restore a state to a fresh database, or to insert data which may only be journalled
/// from this point onwards.
fn inject(&mut self, batch: &DBTransaction) -> Result<u32, UtilError>;

/// State data query
fn state(&self, _id: &H256) -> Option<Bytes>;

Expand All @@ -48,11 +57,19 @@ pub trait JournalDB : HashDB + Send + Sync {
/// Get backing database.
fn backing(&self) -> &Arc<Database>;

#[cfg(test)]
/// Commit all changes in a single batch
#[cfg(test)]
fn commit_batch(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> {
let batch = self.backing().transaction();
let res = try!(self.commit(&batch, now, id, end));
self.backing().write(batch).map(|_| res).map_err(Into::into)
}

/// Inject all changes in a single batch.
#[cfg(test)]
fn inject_batch(&mut self) -> Result<u32, UtilError> {
let batch = self.backing().transaction();
let res = try!(self.inject(&batch));
self.backing().write(batch).map(|_| res).map_err(Into::into)
}
}