Skip to content

Commit

Permalink
storage/mkvs: Only nil value should mean deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
kostko committed Mar 23, 2020
1 parent b32d41f commit b0e6532
Show file tree
Hide file tree
Showing 10 changed files with 76 additions and 21 deletions.
2 changes: 1 addition & 1 deletion go/common/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ var (
// the runtime.
//
// NOTE: This version must be synced with runtime/src/common/version.rs.
RuntimeProtocol = Version{Major: 0, Minor: 11, Patch: 0}
RuntimeProtocol = Version{Major: 0, Minor: 12, Patch: 0}

// CommitteeProtocol versions the P2P protocol used by the
// committee members.
Expand Down
2 changes: 1 addition & 1 deletion go/storage/mkvs/urkel/urkel.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (t *tree) ApplyWriteLog(ctx context.Context, wl writelog.Iterator) error {
}

// Apply operation.
if len(entry.Value) == 0 {
if entry.Value == nil {
err = t.Remove(ctx, entry.Key)
} else {
err = t.Insert(ctx, entry.Key, entry.Value)
Expand Down
2 changes: 1 addition & 1 deletion go/storage/mkvs/urkel/writelog/writelog.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ const (

// Type returns the type of the write log entry.
func (k *LogEntry) Type() LogEntryType {
if len(k.Value) == 0 {
if k.Value == nil {
return LogDelete
}

Expand Down
2 changes: 1 addition & 1 deletion runtime/src/common/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,6 @@ impl From<u64> for Version {
// the worker host.
pub const PROTOCOL_VERSION: Version = Version {
major: 0,
minor: 11,
minor: 12,
patch: 0,
};
17 changes: 8 additions & 9 deletions runtime/src/storage/mkvs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,21 @@ pub struct LogEntry {
pub key: Vec<u8>,
/// The inserted value (empty if the key was deleted).
#[serde(with = "serde_bytes")]
pub value: Vec<u8>,
pub value: Option<Vec<u8>>,
}

impl LogEntry {
pub fn new(key: &[u8], value: &[u8]) -> Self {
Self {
key: key.to_owned(),
value: value.to_owned(),
value: Some(value.to_owned()),
}
}

pub fn kind(&self) -> LogEntryKind {
if self.value.is_empty() {
LogEntryKind::Delete
} else {
LogEntryKind::Insert
match self.value {
Some(_) => LogEntryKind::Insert,
None => LogEntryKind::Delete,
}
}
}
Expand All @@ -56,10 +55,10 @@ impl serde::Serialize for LogEntry {
let mut seq = serializer.serialize_seq(Some(2))?;
if is_human_readable {
seq.serialize_element(&base64::encode(&self.key))?;
seq.serialize_element(&base64::encode(&self.value))?;
seq.serialize_element(&self.value.as_ref().map(|v| base64::encode(v)))?;
} else {
seq.serialize_element(&Bytes::new(&self.key))?;
seq.serialize_element(&Bytes::new(&self.value))?;
seq.serialize_element(&self.value.as_ref().map(|v| Bytes::new(v)))?;
}
seq.end()
}
Expand Down Expand Up @@ -153,7 +152,7 @@ mod tests {
fn test_write_log_serialization() {
let write_log = vec![LogEntry {
key: b"foo".to_vec(),
value: b"bar".to_vec(),
value: Some(b"bar".to_vec()),
}];

let raw = cbor::to_vec(&write_log);
Expand Down
10 changes: 10 additions & 0 deletions runtime/src/storage/mkvs/urkel/interop/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ mod rpc;
pub trait Driver {
/// Apply the given write log to the protocol server.
fn apply(&self, write_log: &WriteLog, hash: Hash, namespace: Namespace, round: u64);

/// Apply the given write log against an existing root on the protocol server.
fn apply_existing(
&self,
write_log: &WriteLog,
existing_root: Hash,
root_hash: Hash,
namespace: Namespace,
round: u64,
);
}

pub use self::protocol_server::ProtocolServer;
13 changes: 12 additions & 1 deletion runtime/src/storage/mkvs/urkel/interop/protocol_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,22 @@ impl Drop for ProtocolServer {

impl Driver for ProtocolServer {
fn apply(&self, write_log: &WriteLog, root_hash: Hash, namespace: Namespace, round: u64) {
self.apply_existing(write_log, Hash::empty_hash(), root_hash, namespace, round)
}

fn apply_existing(
&self,
write_log: &WriteLog,
existing_root: Hash,
root_hash: Hash,
namespace: Namespace,
round: u64,
) {
self.client
.apply(&rpc::ApplyRequest {
namespace,
src_round: round,
src_root: Hash::empty_hash(),
src_root: existing_root,
dst_round: round,
dst_root: root_hash,
writelog: write_log.clone(),
Expand Down
8 changes: 6 additions & 2 deletions runtime/src/storage/mkvs/urkel/sync/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,12 @@ fn test_nil_pointers() {
];

for entry in write_log.iter() {
tree.insert(Context::background(), &entry.key, &entry.value)
.expect("insert");
tree.insert(
Context::background(),
&entry.key,
&entry.value.as_ref().unwrap(),
)
.expect("insert");
}

// Verify at least one null pointer somewhere.
Expand Down
2 changes: 1 addition & 1 deletion runtime/src/storage/mkvs/urkel/tree/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl UrkelTree {
}
log.push(LogEntry {
key: entry.key.clone(),
value: entry.value.clone().unwrap_or_default(),
value: entry.value.clone(),
});
}
self.pending_write_log.clear();
Expand Down
39 changes: 35 additions & 4 deletions runtime/src/storage/mkvs/urkel/tree/tree_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ fn test_basic() {
log,
[LogEntry {
key: key_zero.to_vec(),
value: value_zero.to_vec(),
value: Some(value_zero.to_vec()),
}]
.to_vec()
);
Expand Down Expand Up @@ -194,11 +194,11 @@ fn test_basic() {
[
LogEntry {
key: key_one.to_vec(),
value: value_one.to_vec(),
value: Some(value_one.to_vec()),
},
LogEntry {
key: key_zero.to_vec(),
value: value_zero.to_vec(),
value: Some(value_zero.to_vec()),
}
]
.to_vec()
Expand Down Expand Up @@ -226,7 +226,7 @@ fn test_basic() {
log,
[LogEntry {
key: key_one.to_vec(),
value: Vec::new(),
value: None,
}]
.to_vec()
);
Expand Down Expand Up @@ -716,6 +716,37 @@ fn test_syncer_insert() {
assert_eq!(0, stats.sync_iterate_count, "sync_iterate count");
}

#[test]
fn test_syncer_writelog_remove() {
let server = ProtocolServer::new();

let mut tree = UrkelTree::make()
.with_capacity(0, 0)
.new(Box::new(NoopReadSyncer {}));

let (keys, values) = generate_key_value_pairs();
for i in 0..keys.len() {
tree.insert(
Context::background(),
keys[i].as_slice(),
values[i].as_slice(),
)
.expect("insert");
}

let (write_log, hash) =
UrkelTree::commit(&mut tree, Context::background(), Default::default(), 0).expect("commit");
server.apply(&write_log, hash, Default::default(), 0);

tree.remove(Context::background(), keys[0].as_slice())
.expect("remove");

let previous_hash = hash;
let (write_log, hash) =
UrkelTree::commit(&mut tree, Context::background(), Default::default(), 0).expect("commit");
server.apply_existing(&write_log, previous_hash, hash, Default::default(), 0);
}

#[test]
fn test_syncer_prefetch_prefixes() {
let server = ProtocolServer::new();
Expand Down

0 comments on commit b0e6532

Please sign in to comment.