-
Notifications
You must be signed in to change notification settings - Fork 11.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fastx db] Update an owner-object index #149
Changes from 4 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,7 +19,7 @@ use move_core_types::{ | |
}; | ||
use move_vm_runtime::native_functions::NativeFunctionTable; | ||
use std::{ | ||
collections::{BTreeMap, HashSet}, | ||
collections::{BTreeMap, HashMap, HashSet}, | ||
sync::Arc, | ||
}; | ||
|
||
|
@@ -80,21 +80,26 @@ impl AuthorityState { | |
None | ||
}; | ||
|
||
for object_ref in input_objects { | ||
let ids: Vec<_> = input_objects.iter().map(|(id, _, _)| *id).collect(); | ||
// Get a copy of the object. | ||
// TODO: We only need to read the read_only and owner field of the object, | ||
// it's a bit wasteful to copy the entire object. | ||
let _objects = self.get_objects(&ids[..]).await?; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now the object is used so removed the |
||
for (object_ref, object) in input_objects.into_iter().zip(_objects) { | ||
//for object_ref in input_objects.into_iter() { | ||
let (object_id, sequence_number, _object_digest) = object_ref; | ||
|
||
fp_ensure!( | ||
sequence_number <= SequenceNumber::max(), | ||
FastPayError::InvalidSequenceNumber | ||
); | ||
|
||
// Get a copy of the object. | ||
// TODO: We only need to read the read_only and owner field of the object, | ||
// it's a bit wasteful to copy the entire object. | ||
let object = self | ||
.object_state(&object_id) | ||
.await | ||
.map_err(|_| FastPayError::ObjectNotFound)?; | ||
// let object = self | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we remove comment out code? |
||
// .object_state(&object_id) | ||
// .await | ||
// .map_err(|_| FastPayError::ObjectNotFound)?; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does the commented code still provide value? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice catch, that was just forgotten. |
||
|
||
let object = object.ok_or(FastPayError::ObjectNotFound)?; | ||
|
||
// TODO(https://github.com/MystenLabs/fastnft/issues/123): This hack substitutes the real | ||
// object digest instead of using the one passed in by the client. We need to fix clients and | ||
|
@@ -171,14 +176,21 @@ impl AuthorityState { | |
// Check the certificate and retrieve the transfer data. | ||
certificate.check(&self.committee)?; | ||
|
||
let input_objects = order.input_objects(); | ||
let ids: Vec<_> = input_objects.iter().map(|(id, _, _)| *id).collect(); | ||
// Get a copy of the object. | ||
// TODO: We only need to read the read_only and owner field of the object, | ||
// it's a bit wasteful to copy the entire object. | ||
let _objects = self.get_objects(&ids[..]).await?; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto |
||
|
||
let mut inputs = vec![]; | ||
for (input_object_id, input_seq, _input_digest) in order.input_objects() { | ||
let mut owner_index = HashMap::new(); | ||
for (object_ref, object) in input_objects.into_iter().zip(_objects) { | ||
let (input_object_id, input_seq, _input_digest) = object_ref; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: you may want to extract a private function to cover this and the above. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My iterator-foo is not up to defining a function with a generic return signature of an iterator :) |
||
|
||
// If we have a certificate on the confirmation order it means that the input | ||
// object exists on other honest authorities, but we do not have it. | ||
let input_object = self | ||
.object_state(&input_object_id) | ||
.await | ||
.map_err(|_| FastPayError::ObjectNotFound)?; | ||
let input_object = object.ok_or(FastPayError::ObjectNotFound)?; | ||
|
||
let input_sequence_number = input_object.version(); | ||
|
||
|
@@ -193,6 +205,10 @@ impl AuthorityState { | |
return self.make_order_info(&transaction_digest).await; | ||
} | ||
|
||
if !input_object.is_read_only() { | ||
owner_index.insert(input_object_id, input_object.owner); | ||
} | ||
|
||
inputs.push(input_object.clone()); | ||
} | ||
|
||
|
@@ -202,7 +218,7 @@ impl AuthorityState { | |
|
||
// Order-specific logic | ||
let mut temporary_store = AuthorityTemporaryStore::new(self, &inputs); | ||
let _status = match order.kind { | ||
let status = match order.kind { | ||
OrderKind::Transfer(t) => { | ||
debug_assert!( | ||
inputs.len() == 2, | ||
|
@@ -254,17 +270,40 @@ impl AuthorityState { | |
} | ||
}; | ||
|
||
// Make a list of all object that are either deleted or have changed owner, along with their old owner. | ||
// This is used to update the owner index. | ||
let drop_index_entries = temporary_store | ||
.deleted() | ||
.iter() | ||
.map(|(id, _, _)| (owner_index[id], *id)) | ||
.chain( | ||
temporary_store | ||
.written() | ||
.iter() | ||
.filter(|(id, _, _)| { | ||
let owner = owner_index.get(id); | ||
owner.is_some() && *owner.unwrap() != temporary_store.objects()[id].owner | ||
}) | ||
.map(|(id, _, _)| (owner_index[id], *id)), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: consider There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice to know this exists. |
||
) | ||
.collect(); | ||
|
||
// Update the database in an atomic manner | ||
let to_signed_effects = temporary_store.to_signed_effects( | ||
&self.name, | ||
&self.secret, | ||
&transaction_digest, | ||
_status, | ||
status, | ||
); | ||
self.update_state(temporary_store, certificate, to_signed_effects) | ||
.await?; | ||
|
||
self.make_order_info(&transaction_digest).await | ||
self.update_state( | ||
temporary_store, | ||
drop_index_entries, | ||
certificate, | ||
to_signed_effects, | ||
) | ||
.await | ||
|
||
// self.make_order_info(&transaction_digest).await | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this intended? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nope, removed and commented instead. |
||
} | ||
|
||
pub async fn handle_account_info_request( | ||
|
@@ -396,11 +435,16 @@ impl AuthorityState { | |
async fn update_state( | ||
&self, | ||
temporary_store: AuthorityTemporaryStore, | ||
expired_object_owners: Vec<(FastPayAddress, ObjectID)>, | ||
certificate: CertifiedOrder, | ||
signed_effects: SignedOrderEffects, | ||
) -> Result<(), FastPayError> { | ||
self._database | ||
.update_state(temporary_store, certificate, signed_effects) | ||
) -> Result<OrderInfoResponse, FastPayError> { | ||
self._database.update_state( | ||
temporary_store, | ||
expired_object_owners, | ||
certificate, | ||
signed_effects, | ||
) | ||
} | ||
|
||
/// Get a read reference to an object/seq lock | ||
|
@@ -426,4 +470,11 @@ impl AuthorityState { | |
.parent(object_ref) | ||
.expect("TODO: propagate the error") | ||
} | ||
|
||
pub async fn get_objects( | ||
&self, | ||
_objects: &[ObjectID], | ||
) -> Result<Vec<Option<Object>>, FastPayError> { | ||
self._database.get_objects(_objects) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,7 @@ use typed_store::traits::Map; | |
pub struct AuthorityStore { | ||
objects: DBMap<ObjectID, Object>, | ||
order_lock: DBMap<ObjectRef, Option<TransactionDigest>>, | ||
owner_index: DBMap<(FastPayAddress, ObjectID), ObjectRef>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice way to get out of the fact that it's unwieldy to update collections inside the DB ... but this is something that may require comment, either here or around There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the standard way of using a persistent BTree to make a secondary index, I did not invent it :). Just to check -- I think our choice of serialization (bincode with fixint, bigendian, etc) was specifically selected to allow for these "composite" indexes. Am I correct in thinking this is true? It is not the case that this will break in a different architecture, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes, this is correct. |
||
signed_orders: DBMap<TransactionDigest, SignedOrder>, | ||
certificates: DBMap<TransactionDigest, CertifiedOrder>, | ||
parent_sync: DBMap<ObjectRef, TransactionDigest>, | ||
|
@@ -24,6 +25,7 @@ impl AuthorityStore { | |
db_options, | ||
&[ | ||
"objects", | ||
"owner_index", | ||
"order_lock", | ||
"signed_orders", | ||
"certificates", | ||
|
@@ -34,6 +36,7 @@ impl AuthorityStore { | |
.expect("Cannot open DB."); | ||
AuthorityStore { | ||
objects: DBMap::reopen(&db, Some("objects")).expect("Cannot open CF."), | ||
owner_index: DBMap::reopen(&db, Some("owner_index")).expect("Cannot open CF."), | ||
order_lock: DBMap::reopen(&db, Some("order_lock")).expect("Cannot open CF."), | ||
signed_orders: DBMap::reopen(&db, Some("signed_orders")).expect("Cannot open CF."), | ||
certificates: DBMap::reopen(&db, Some("certificates")).expect("Cannot open CF."), | ||
|
@@ -51,10 +54,13 @@ impl AuthorityStore { | |
account: FastPayAddress, | ||
) -> Result<Vec<ObjectRef>, FastPayError> { | ||
Ok(self | ||
.objects | ||
.owner_index | ||
.iter() | ||
.filter(|(_, object)| object.owner == account) | ||
.map(|(id, object)| (id, object.version(), object.digest())) | ||
// The object id [0; 16] is the smallest possible | ||
.skip_to(&(account, AccountAddress::from([0; 16]))) | ||
.map_err(|_| FastPayError::StorageError)? | ||
.take_while(|((owner, _id), _object_ref)| (owner == &account)) | ||
.map(|((_owner, _id), object_ref)| object_ref) | ||
.collect()) | ||
} | ||
|
||
|
@@ -86,6 +92,13 @@ impl AuthorityStore { | |
.ok_or(FastPayError::ObjectNotFound) | ||
} | ||
|
||
/// Get many objects | ||
pub fn get_objects(&self, _objects: &[ObjectID]) -> Result<Vec<Option<Object>>, FastPayError> { | ||
self.objects | ||
.multi_get(_objects) | ||
.map_err(|_| FastPayError::StorageError) | ||
} | ||
|
||
/// Read a lock or returns Err(OrderLockDoesNotExist) if the lock does not exist. | ||
pub fn get_order_lock( | ||
&self, | ||
|
@@ -135,7 +148,14 @@ impl AuthorityStore { | |
pub fn insert_object(&self, object: Object) -> Result<(), FastPayError> { | ||
self.objects | ||
.insert(&object.id(), &object) | ||
.map_err(|_| FastPayError::StorageError) | ||
.map_err(|_| FastPayError::StorageError)?; | ||
|
||
// Update the index | ||
self.owner_index | ||
.insert(&(object.owner, object.id()), &object.to_object_reference()) | ||
.map_err(|_| FastPayError::StorageError)?; | ||
|
||
Ok(()) | ||
} | ||
|
||
/// Initialize a lock to an object reference to None, but keep it | ||
|
@@ -184,13 +204,14 @@ impl AuthorityStore { | |
.lock() | ||
.map_err(|_| FastPayError::StorageError)?; | ||
|
||
for obj_ref in mutable_input_objects { | ||
let locks = self | ||
.order_lock | ||
.multi_get(mutable_input_objects) | ||
.map_err(|_| FastPayError::StorageError)?; | ||
|
||
for (obj_ref, lock) in mutable_input_objects.iter().zip(locks) { | ||
// The object / version must exist, and therefore lock initialized. | ||
let lock = self | ||
.order_lock | ||
.get(obj_ref) | ||
.map_err(|_| FastPayError::StorageError)? | ||
.ok_or(FastPayError::OrderLockDoesNotExist)?; | ||
let lock = lock.ok_or(FastPayError::OrderLockDoesNotExist)?; | ||
|
||
if let Some(previous_tx_digest) = lock { | ||
if previous_tx_digest != tx_digest { | ||
|
@@ -221,9 +242,10 @@ impl AuthorityStore { | |
pub fn update_state( | ||
&self, | ||
temporary_store: AuthorityTemporaryStore, | ||
_expired_object_owners: Vec<(FastPayAddress, ObjectID)>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. don't need There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Indeed! |
||
certificate: CertifiedOrder, | ||
signed_effects: SignedOrderEffects, | ||
) -> Result<(), FastPayError> { | ||
) -> Result<OrderInfoResponse, FastPayError> { | ||
// TODO: There is a lot of cloning used -- eliminate it. | ||
|
||
// Extract the new state from the execution | ||
|
@@ -240,15 +262,15 @@ impl AuthorityStore { | |
write_batch = write_batch | ||
.insert_batch( | ||
&self.certificates, | ||
std::iter::once((transaction_digest, certificate)), | ||
std::iter::once((transaction_digest, certificate.clone())), | ||
) | ||
.map_err(|_| FastPayError::StorageError)?; | ||
|
||
// Store the signed effects of the order | ||
write_batch = write_batch | ||
.insert_batch( | ||
&self.signed_effects, | ||
std::iter::once((transaction_digest, signed_effects)), | ||
std::iter::once((transaction_digest, signed_effects.clone())), | ||
) | ||
.map_err(|_| FastPayError::StorageError)?; | ||
|
||
|
@@ -260,6 +282,16 @@ impl AuthorityStore { | |
) | ||
.map_err(|_| FastPayError::StorageError)?; | ||
|
||
// Delete the old owner index entries | ||
write_batch = write_batch | ||
.delete_batch( | ||
&self.owner_index, | ||
_expired_object_owners | ||
.into_iter() | ||
.map(|(owner, id)| (owner, id)), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why the map? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No reason any more, removed! |
||
) | ||
.map_err(|_| FastPayError::StorageError)?; | ||
|
||
// Index the certificate by the objects created | ||
write_batch = write_batch | ||
.insert_batch( | ||
|
@@ -281,6 +313,16 @@ impl AuthorityStore { | |
) | ||
.map_err(|_| FastPayError::StorageError)?; | ||
|
||
// Update the indexes of the objects written | ||
write_batch = write_batch | ||
.insert_batch( | ||
&self.owner_index, | ||
written | ||
.iter() | ||
.map(|output_ref| ((objects[&output_ref.0].owner, output_ref.0), *output_ref)), | ||
) | ||
.map_err(|_| FastPayError::StorageError)?; | ||
|
||
// Insert each output object into the stores | ||
write_batch = write_batch | ||
.insert_batch( | ||
|
@@ -296,6 +338,8 @@ impl AuthorityStore { | |
) | ||
.map_err(|_| FastPayError::StorageError)?; | ||
|
||
// Update the indexes of the objects written | ||
|
||
// This is the critical region: testing the locks and writing the | ||
// new locks must be atomic, and no writes should happen in between. | ||
{ | ||
|
@@ -307,19 +351,29 @@ impl AuthorityStore { | |
|
||
// Check the locks are still active | ||
// TODO: maybe we could just check if the certificate is there instead? | ||
for input_ref in active_inputs { | ||
fp_ensure!( | ||
self.order_lock | ||
.contains_key(&input_ref) | ||
.map_err(|_| FastPayError::StorageError)?, | ||
FastPayError::OrderLockDoesNotExist | ||
); | ||
let locks = self | ||
.order_lock | ||
.multi_get(&active_inputs[..]) | ||
.map_err(|_| FastPayError::StorageError)?; | ||
for object_lock in locks { | ||
object_lock.ok_or(FastPayError::OrderLockDoesNotExist)?; | ||
} | ||
|
||
// Atomic write of all locks & other data | ||
write_batch.write().map_err(|_| FastPayError::StorageError) | ||
write_batch | ||
.write() | ||
.map_err(|_| FastPayError::StorageError)?; | ||
|
||
// Implicit: drop(_lock); | ||
// implict: drop(_lock); | ||
} // End of critical region | ||
|
||
Ok(OrderInfoResponse { | ||
signed_order: self | ||
.signed_orders | ||
.get(&transaction_digest) | ||
.map_err(|_| FastPayError::StorageError)?, | ||
certified_order: Some(certificate), | ||
signed_effects: Some(signed_effects), | ||
}) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it faster to
.get_objects
or could youget_object
by reference and only copy what you need?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not think there is an option of getting an object by reference from the DB right now. As soon as we do get, the object is read from the db value (bytes are pinned) and then deserialized and allocated as a whole. I went down the rabbit hole of looking at serialization formats that allow cheap in place reads (zero copy), like capnpn, flatbuf etc -- but changing the DB to use those is a bigger job.