diff --git a/fastpay_core/Cargo.toml b/fastpay_core/Cargo.toml index c9d69fb5b57ed..ef08e0c874f24 100644 --- a/fastpay_core/Cargo.toml +++ b/fastpay_core/Cargo.toml @@ -24,7 +24,7 @@ move-core-types = { git = "https://github.com/diem/diem", rev="346301f33b3489bb4 move-vm-runtime = { git = "https://github.com/diem/diem", rev="346301f33b3489bb4e486ae6c0aa5e030223b492" } -typed-store = { git = "https://github.com/MystenLabs/mysten-infra", rev = "c1ab69f1a4004414bf0dee412f2e5839b71bc8f6" } +typed-store = { git = "https://github.com/MystenLabs/mysten-infra", rev = "0ef3f1fedcfbf3dfe0eeea65e05de073b7c25733" } [dev-dependencies] fdlimit = "0.2.1" diff --git a/fastpay_core/src/authority.rs b/fastpay_core/src/authority.rs index a71bc9501bb9e..1d63dd5ca47c7 100644 --- a/fastpay_core/src/authority.rs +++ b/fastpay_core/src/authority.rs @@ -19,7 +19,7 @@ use move_core_types::{ }; use move_vm_runtime::native_functions::NativeFunctionTable; use std::{ - collections::{BTreeMap, HashSet}, + collections::{BTreeMap, HashMap, HashSet}, sync::Arc, }; @@ -80,7 +80,12 @@ impl AuthorityState { None }; - for object_ref in input_objects { + let ids: Vec<_> = input_objects.iter().map(|(id, _, _)| *id).collect(); + // Get a copy of the object. + // TODO: We only need to read the read_only and owner field of the object, + // it's a bit wasteful to copy the entire object. + let objects = self.get_objects(&ids[..]).await?; + for (object_ref, object) in input_objects.into_iter().zip(objects) { let (object_id, sequence_number, _object_digest) = object_ref; fp_ensure!( @@ -88,13 +93,7 @@ impl AuthorityState { FastPayError::InvalidSequenceNumber ); - // Get a copy of the object. - // TODO: We only need to read the read_only and owner field of the object, - // it's a bit wasteful to copy the entire object. - let object = self - .object_state(&object_id) - .await - .map_err(|_| FastPayError::ObjectNotFound)?; + let object = object.ok_or(FastPayError::ObjectNotFound)?; // TODO(https://github.com/MystenLabs/fastnft/issues/123): This hack substitutes the real // object digest instead of using the one passed in by the client. We need to fix clients and @@ -171,14 +170,21 @@ impl AuthorityState { // Check the certificate and retrieve the transfer data. certificate.check(&self.committee)?; + let input_objects = order.input_objects(); + let ids: Vec<_> = input_objects.iter().map(|(id, _, _)| *id).collect(); + // Get a copy of the object. + // TODO: We only need to read the read_only and owner field of the object, + // it's a bit wasteful to copy the entire object. + let objects = self.get_objects(&ids[..]).await?; + let mut inputs = vec![]; - for (input_object_id, input_seq, _input_digest) in order.input_objects() { + let mut owner_index = HashMap::new(); + for (object_ref, object) in input_objects.into_iter().zip(objects) { + let (input_object_id, input_seq, _input_digest) = object_ref; + // If we have a certificate on the confirmation order it means that the input // object exists on other honest authorities, but we do not have it. - let input_object = self - .object_state(&input_object_id) - .await - .map_err(|_| FastPayError::ObjectNotFound)?; + let input_object = object.ok_or(FastPayError::ObjectNotFound)?; let input_sequence_number = input_object.version(); @@ -193,6 +199,10 @@ impl AuthorityState { return self.make_order_info(&transaction_digest).await; } + if !input_object.is_read_only() { + owner_index.insert(input_object_id, input_object.owner); + } + inputs.push(input_object.clone()); } @@ -202,7 +212,7 @@ impl AuthorityState { // Order-specific logic let mut temporary_store = AuthorityTemporaryStore::new(self, &inputs); - let _status = match order.kind { + let status = match order.kind { OrderKind::Transfer(t) => { debug_assert!( inputs.len() == 2, @@ -254,17 +264,36 @@ impl AuthorityState { } }; + // Make a list of all object that are either deleted or have changed owner, along with their old owner. + // This is used to update the owner index. + let drop_index_entries = temporary_store + .deleted() + .iter() + .map(|(id, _, _)| (owner_index[id], *id)) + .chain(temporary_store.written().iter().filter_map(|(id, _, _)| { + let owner = owner_index.get(id); + if owner.is_some() && *owner.unwrap() != temporary_store.objects()[id].owner { + Some((owner_index[id], *id)) + } else { + None + } + })) + .collect(); + // Update the database in an atomic manner let to_signed_effects = temporary_store.to_signed_effects( &self.name, &self.secret, &transaction_digest, - _status, + status, ); - self.update_state(temporary_store, certificate, to_signed_effects) - .await?; - - self.make_order_info(&transaction_digest).await + self.update_state( + temporary_store, + drop_index_entries, + certificate, + to_signed_effects, + ) + .await // Returns the OrderInfoResponse } pub async fn handle_account_info_request( @@ -396,11 +425,16 @@ impl AuthorityState { async fn update_state( &self, temporary_store: AuthorityTemporaryStore, + expired_object_owners: Vec<(FastPayAddress, ObjectID)>, certificate: CertifiedOrder, signed_effects: SignedOrderEffects, - ) -> Result<(), FastPayError> { - self._database - .update_state(temporary_store, certificate, signed_effects) + ) -> Result { + self._database.update_state( + temporary_store, + expired_object_owners, + certificate, + signed_effects, + ) } /// Get a read reference to an object/seq lock @@ -426,4 +460,11 @@ impl AuthorityState { .parent(object_ref) .expect("TODO: propagate the error") } + + pub async fn get_objects( + &self, + _objects: &[ObjectID], + ) -> Result>, FastPayError> { + self._database.get_objects(_objects) + } } diff --git a/fastpay_core/src/authority/authority_store.rs b/fastpay_core/src/authority/authority_store.rs index c2133d5dd2573..b3fee4e847792 100644 --- a/fastpay_core/src/authority/authority_store.rs +++ b/fastpay_core/src/authority/authority_store.rs @@ -9,6 +9,7 @@ use typed_store::traits::Map; pub struct AuthorityStore { objects: DBMap, order_lock: DBMap>, + owner_index: DBMap<(FastPayAddress, ObjectID), ObjectRef>, signed_orders: DBMap, certificates: DBMap, parent_sync: DBMap, @@ -24,6 +25,7 @@ impl AuthorityStore { db_options, &[ "objects", + "owner_index", "order_lock", "signed_orders", "certificates", @@ -34,6 +36,7 @@ impl AuthorityStore { .expect("Cannot open DB."); AuthorityStore { objects: DBMap::reopen(&db, Some("objects")).expect("Cannot open CF."), + owner_index: DBMap::reopen(&db, Some("owner_index")).expect("Cannot open CF."), order_lock: DBMap::reopen(&db, Some("order_lock")).expect("Cannot open CF."), signed_orders: DBMap::reopen(&db, Some("signed_orders")).expect("Cannot open CF."), certificates: DBMap::reopen(&db, Some("certificates")).expect("Cannot open CF."), @@ -51,10 +54,13 @@ impl AuthorityStore { account: FastPayAddress, ) -> Result, FastPayError> { Ok(self - .objects + .owner_index .iter() - .filter(|(_, object)| object.owner == account) - .map(|(id, object)| (id, object.version(), object.digest())) + // The object id [0; 16] is the smallest possible + .skip_to(&(account, AccountAddress::from([0; 16]))) + .map_err(|_| FastPayError::StorageError)? + .take_while(|((owner, _id), _object_ref)| (owner == &account)) + .map(|((_owner, _id), object_ref)| object_ref) .collect()) } @@ -86,6 +92,13 @@ impl AuthorityStore { .ok_or(FastPayError::ObjectNotFound) } + /// Get many objects + pub fn get_objects(&self, _objects: &[ObjectID]) -> Result>, FastPayError> { + self.objects + .multi_get(_objects) + .map_err(|_| FastPayError::StorageError) + } + /// Read a lock or returns Err(OrderLockDoesNotExist) if the lock does not exist. pub fn get_order_lock( &self, @@ -135,7 +148,14 @@ impl AuthorityStore { pub fn insert_object(&self, object: Object) -> Result<(), FastPayError> { self.objects .insert(&object.id(), &object) - .map_err(|_| FastPayError::StorageError) + .map_err(|_| FastPayError::StorageError)?; + + // Update the index + self.owner_index + .insert(&(object.owner, object.id()), &object.to_object_reference()) + .map_err(|_| FastPayError::StorageError)?; + + Ok(()) } /// Initialize a lock to an object reference to None, but keep it @@ -184,13 +204,14 @@ impl AuthorityStore { .lock() .map_err(|_| FastPayError::StorageError)?; - for obj_ref in mutable_input_objects { + let locks = self + .order_lock + .multi_get(mutable_input_objects) + .map_err(|_| FastPayError::StorageError)?; + + for (obj_ref, lock) in mutable_input_objects.iter().zip(locks) { // The object / version must exist, and therefore lock initialized. - let lock = self - .order_lock - .get(obj_ref) - .map_err(|_| FastPayError::StorageError)? - .ok_or(FastPayError::OrderLockDoesNotExist)?; + let lock = lock.ok_or(FastPayError::OrderLockDoesNotExist)?; if let Some(previous_tx_digest) = lock { if previous_tx_digest != tx_digest { @@ -221,9 +242,10 @@ impl AuthorityStore { pub fn update_state( &self, temporary_store: AuthorityTemporaryStore, + expired_object_owners: Vec<(FastPayAddress, ObjectID)>, certificate: CertifiedOrder, signed_effects: SignedOrderEffects, - ) -> Result<(), FastPayError> { + ) -> Result { // TODO: There is a lot of cloning used -- eliminate it. // Extract the new state from the execution @@ -240,7 +262,7 @@ impl AuthorityStore { write_batch = write_batch .insert_batch( &self.certificates, - std::iter::once((transaction_digest, certificate)), + std::iter::once((transaction_digest, certificate.clone())), ) .map_err(|_| FastPayError::StorageError)?; @@ -248,7 +270,7 @@ impl AuthorityStore { write_batch = write_batch .insert_batch( &self.signed_effects, - std::iter::once((transaction_digest, signed_effects)), + std::iter::once((transaction_digest, signed_effects.clone())), ) .map_err(|_| FastPayError::StorageError)?; @@ -260,6 +282,11 @@ impl AuthorityStore { ) .map_err(|_| FastPayError::StorageError)?; + // Delete the old owner index entries + write_batch = write_batch + .delete_batch(&self.owner_index, expired_object_owners.into_iter()) + .map_err(|_| FastPayError::StorageError)?; + // Index the certificate by the objects created write_batch = write_batch .insert_batch( @@ -281,6 +308,16 @@ impl AuthorityStore { ) .map_err(|_| FastPayError::StorageError)?; + // Update the indexes of the objects written + write_batch = write_batch + .insert_batch( + &self.owner_index, + written + .iter() + .map(|output_ref| ((objects[&output_ref.0].owner, output_ref.0), *output_ref)), + ) + .map_err(|_| FastPayError::StorageError)?; + // Insert each output object into the stores write_batch = write_batch .insert_batch( @@ -296,6 +333,8 @@ impl AuthorityStore { ) .map_err(|_| FastPayError::StorageError)?; + // Update the indexes of the objects written + // This is the critical region: testing the locks and writing the // new locks must be atomic, and no writes should happen in between. { @@ -307,19 +346,29 @@ impl AuthorityStore { // Check the locks are still active // TODO: maybe we could just check if the certificate is there instead? - for input_ref in active_inputs { - fp_ensure!( - self.order_lock - .contains_key(&input_ref) - .map_err(|_| FastPayError::StorageError)?, - FastPayError::OrderLockDoesNotExist - ); + let locks = self + .order_lock + .multi_get(&active_inputs[..]) + .map_err(|_| FastPayError::StorageError)?; + for object_lock in locks { + object_lock.ok_or(FastPayError::OrderLockDoesNotExist)?; } // Atomic write of all locks & other data - write_batch.write().map_err(|_| FastPayError::StorageError) + write_batch + .write() + .map_err(|_| FastPayError::StorageError)?; - // Implicit: drop(_lock); + // implict: drop(_lock); } // End of critical region + + Ok(OrderInfoResponse { + signed_order: self + .signed_orders + .get(&transaction_digest) + .map_err(|_| FastPayError::StorageError)?, + certified_order: Some(certificate), + signed_effects: Some(signed_effects), + }) } } diff --git a/fastpay_core/src/authority/temporary_store.rs b/fastpay_core/src/authority/temporary_store.rs index a59258069796c..82f4cb36bee45 100644 --- a/fastpay_core/src/authority/temporary_store.rs +++ b/fastpay_core/src/authority/temporary_store.rs @@ -4,7 +4,7 @@ pub struct AuthorityTemporaryStore { object_store: Arc, objects: BTreeMap, active_inputs: Vec, // Inputs that are not read only - pub written: Vec, // Objects written + written: Vec, // Objects written deleted: Vec, // Objects actively deleted } @@ -26,6 +26,20 @@ impl AuthorityTemporaryStore { } } + // Helpers to access private fields + + pub fn objects(&self) -> &BTreeMap { + &self.objects + } + + pub fn written(&self) -> &Vec { + &self.written + } + + pub fn deleted(&self) -> &Vec { + &self.deleted + } + /// Break up the structure and return its internal stores (objects, active_inputs, written, deleted) pub fn into_inner( self,