From 3a70b9f28bdb5578dba0ce5b475a74156fc8988d Mon Sep 17 00:00:00 2001 From: George Danezis Date: Mon, 10 Jan 2022 18:48:11 +0000 Subject: [PATCH 1/5] Upate an owener-object index --- fastpay_core/Cargo.toml | 2 +- fastpay_core/src/authority.rs | 47 ++++++++++++++++--- fastpay_core/src/authority/authority_store.rs | 43 +++++++++++++++-- fastpay_core/src/authority/temporary_store.rs | 18 ++++++- 4 files changed, 96 insertions(+), 14 deletions(-) diff --git a/fastpay_core/Cargo.toml b/fastpay_core/Cargo.toml index c9d69fb5b57ed..ef08e0c874f24 100644 --- a/fastpay_core/Cargo.toml +++ b/fastpay_core/Cargo.toml @@ -24,7 +24,7 @@ move-core-types = { git = "https://github.com/diem/diem", rev="346301f33b3489bb4 move-vm-runtime = { git = "https://github.com/diem/diem", rev="346301f33b3489bb4e486ae6c0aa5e030223b492" } -typed-store = { git = "https://github.com/MystenLabs/mysten-infra", rev = "c1ab69f1a4004414bf0dee412f2e5839b71bc8f6" } +typed-store = { git = "https://github.com/MystenLabs/mysten-infra", rev = "0ef3f1fedcfbf3dfe0eeea65e05de073b7c25733" } [dev-dependencies] fdlimit = "0.2.1" diff --git a/fastpay_core/src/authority.rs b/fastpay_core/src/authority.rs index a71bc9501bb9e..e29f171845dab 100644 --- a/fastpay_core/src/authority.rs +++ b/fastpay_core/src/authority.rs @@ -19,7 +19,7 @@ use move_core_types::{ }; use move_vm_runtime::native_functions::NativeFunctionTable; use std::{ - collections::{BTreeMap, HashSet}, + collections::{BTreeMap, HashMap, HashSet}, sync::Arc, }; @@ -172,6 +172,7 @@ impl AuthorityState { certificate.check(&self.committee)?; let mut inputs = vec![]; + let mut owner_index = HashMap::new(); for (input_object_id, input_seq, _input_digest) in order.input_objects() { // If we have a certificate on the confirmation order it means that the input // object exists on other honest authorities, but we do not have it. @@ -193,6 +194,10 @@ impl AuthorityState { return self.make_order_info(&transaction_digest).await; } + if !input_object.is_read_only() { + owner_index.insert(input_object_id, input_object.owner); + } + inputs.push(input_object.clone()); } @@ -202,7 +207,7 @@ impl AuthorityState { // Order-specific logic let mut temporary_store = AuthorityTemporaryStore::new(self, &inputs); - let _status = match order.kind { + let status = match order.kind { OrderKind::Transfer(t) => { debug_assert!( inputs.len() == 2, @@ -254,15 +259,38 @@ impl AuthorityState { } }; + // Make a list of all object that are either deleted or have changed owner, along with their old owner. + // This is used to update the owner index. + let drop_index_entries = temporary_store + .deleted() + .iter() + .map(|(id, _, _)| (owner_index[id], *id)) + .chain( + temporary_store + .written() + .iter() + .filter(|(id, _, _)| { + let owner = owner_index.get(id); + owner.is_some() && *owner.unwrap() != temporary_store.objects()[id].owner + }) + .map(|(id, _, _)| (owner_index[id], *id)), + ) + .collect(); + // Update the database in an atomic manner let to_signed_effects = temporary_store.to_signed_effects( &self.name, &self.secret, &transaction_digest, - _status, + status, ); - self.update_state(temporary_store, certificate, to_signed_effects) - .await?; + self.update_state( + temporary_store, + drop_index_entries, + certificate, + to_signed_effects, + ) + .await?; self.make_order_info(&transaction_digest).await } @@ -396,11 +424,16 @@ impl AuthorityState { async fn update_state( &self, temporary_store: AuthorityTemporaryStore, + expired_object_owners: Vec<(FastPayAddress, ObjectID)>, certificate: CertifiedOrder, signed_effects: SignedOrderEffects, ) -> Result<(), FastPayError> { - self._database - .update_state(temporary_store, certificate, signed_effects) + self._database.update_state( + temporary_store, + expired_object_owners, + certificate, + signed_effects, + ) } /// Get a read reference to an object/seq lock diff --git a/fastpay_core/src/authority/authority_store.rs b/fastpay_core/src/authority/authority_store.rs index c2133d5dd2573..ed2fd764523cc 100644 --- a/fastpay_core/src/authority/authority_store.rs +++ b/fastpay_core/src/authority/authority_store.rs @@ -9,6 +9,7 @@ use typed_store::traits::Map; pub struct AuthorityStore { objects: DBMap, order_lock: DBMap>, + _owner_index: DBMap<(FastPayAddress, ObjectID), ObjectRef>, signed_orders: DBMap, certificates: DBMap, parent_sync: DBMap, @@ -24,6 +25,7 @@ impl AuthorityStore { db_options, &[ "objects", + "owner_index", "order_lock", "signed_orders", "certificates", @@ -34,6 +36,7 @@ impl AuthorityStore { .expect("Cannot open DB."); AuthorityStore { objects: DBMap::reopen(&db, Some("objects")).expect("Cannot open CF."), + _owner_index: DBMap::reopen(&db, Some("owner_index")).expect("Cannot open CF."), order_lock: DBMap::reopen(&db, Some("order_lock")).expect("Cannot open CF."), signed_orders: DBMap::reopen(&db, Some("signed_orders")).expect("Cannot open CF."), certificates: DBMap::reopen(&db, Some("certificates")).expect("Cannot open CF."), @@ -51,10 +54,12 @@ impl AuthorityStore { account: FastPayAddress, ) -> Result, FastPayError> { Ok(self - .objects + ._owner_index .iter() - .filter(|(_, object)| object.owner == account) - .map(|(id, object)| (id, object.version(), object.digest())) + .skip_to(&(account, AccountAddress::from([0; 16]))) + .map_err(|_| FastPayError::StorageError)? + .take_while(|((owner, _id), _object_ref)| (owner == &account)) + .map(|((_owner, _id), object_ref)| object_ref) .collect()) } @@ -135,7 +140,14 @@ impl AuthorityStore { pub fn insert_object(&self, object: Object) -> Result<(), FastPayError> { self.objects .insert(&object.id(), &object) - .map_err(|_| FastPayError::StorageError) + .map_err(|_| FastPayError::StorageError)?; + + // Update the index + self._owner_index + .insert(&(object.owner, object.id()), &object.to_object_reference()) + .map_err(|_| FastPayError::StorageError)?; + + Ok(()) } /// Initialize a lock to an object reference to None, but keep it @@ -221,6 +233,7 @@ impl AuthorityStore { pub fn update_state( &self, temporary_store: AuthorityTemporaryStore, + _expired_object_owners: Vec<(FastPayAddress, ObjectID)>, certificate: CertifiedOrder, signed_effects: SignedOrderEffects, ) -> Result<(), FastPayError> { @@ -260,6 +273,16 @@ impl AuthorityStore { ) .map_err(|_| FastPayError::StorageError)?; + // Delete the old owner index entries + write_batch = write_batch + .delete_batch( + &self._owner_index, + _expired_object_owners + .into_iter() + .map(|(owner, id)| (owner, id)), + ) + .map_err(|_| FastPayError::StorageError)?; + // Index the certificate by the objects created write_batch = write_batch .insert_batch( @@ -281,6 +304,16 @@ impl AuthorityStore { ) .map_err(|_| FastPayError::StorageError)?; + // Update the indexes of the objects written + write_batch = write_batch + .insert_batch( + &self._owner_index, + written + .iter() + .map(|output_ref| ((objects[&output_ref.0].owner, output_ref.0), *output_ref)), + ) + .map_err(|_| FastPayError::StorageError)?; + // Insert each output object into the stores write_batch = write_batch .insert_batch( @@ -296,6 +329,8 @@ impl AuthorityStore { ) .map_err(|_| FastPayError::StorageError)?; + // Update the indexes of the objects written + // This is the critical region: testing the locks and writing the // new locks must be atomic, and no writes should happen in between. { diff --git a/fastpay_core/src/authority/temporary_store.rs b/fastpay_core/src/authority/temporary_store.rs index a59258069796c..1d7243c2ef8bd 100644 --- a/fastpay_core/src/authority/temporary_store.rs +++ b/fastpay_core/src/authority/temporary_store.rs @@ -4,8 +4,8 @@ pub struct AuthorityTemporaryStore { object_store: Arc, objects: BTreeMap, active_inputs: Vec, // Inputs that are not read only - pub written: Vec, // Objects written - deleted: Vec, // Objects actively deleted + written: Vec, // Objects written + deleted: Vec, // Objects actively deleted } impl AuthorityTemporaryStore { @@ -26,6 +26,20 @@ impl AuthorityTemporaryStore { } } + // Helpers to access private fields + + pub fn objects(&self) -> &BTreeMap { + &self.objects + } + + pub fn written(&self) -> &Vec { + &self.written + } + + pub fn deleted(&self) -> &Vec { + &self.deleted + } + /// Break up the structure and return its internal stores (objects, active_inputs, written, deleted) pub fn into_inner( self, From d5e3df75f77e6d33881036697368b7bd47b581e4 Mon Sep 17 00:00:00 2001 From: George Danezis Date: Mon, 10 Jan 2022 19:43:49 +0000 Subject: [PATCH 2/5] Remove _ --- fastpay_core/src/authority/authority_store.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/fastpay_core/src/authority/authority_store.rs b/fastpay_core/src/authority/authority_store.rs index ed2fd764523cc..e2a2b799fc455 100644 --- a/fastpay_core/src/authority/authority_store.rs +++ b/fastpay_core/src/authority/authority_store.rs @@ -9,7 +9,7 @@ use typed_store::traits::Map; pub struct AuthorityStore { objects: DBMap, order_lock: DBMap>, - _owner_index: DBMap<(FastPayAddress, ObjectID), ObjectRef>, + owner_index: DBMap<(FastPayAddress, ObjectID), ObjectRef>, signed_orders: DBMap, certificates: DBMap, parent_sync: DBMap, @@ -36,7 +36,7 @@ impl AuthorityStore { .expect("Cannot open DB."); AuthorityStore { objects: DBMap::reopen(&db, Some("objects")).expect("Cannot open CF."), - _owner_index: DBMap::reopen(&db, Some("owner_index")).expect("Cannot open CF."), + owner_index: DBMap::reopen(&db, Some("owner_index")).expect("Cannot open CF."), order_lock: DBMap::reopen(&db, Some("order_lock")).expect("Cannot open CF."), signed_orders: DBMap::reopen(&db, Some("signed_orders")).expect("Cannot open CF."), certificates: DBMap::reopen(&db, Some("certificates")).expect("Cannot open CF."), @@ -54,7 +54,7 @@ impl AuthorityStore { account: FastPayAddress, ) -> Result, FastPayError> { Ok(self - ._owner_index + .owner_index .iter() .skip_to(&(account, AccountAddress::from([0; 16]))) .map_err(|_| FastPayError::StorageError)? @@ -143,7 +143,7 @@ impl AuthorityStore { .map_err(|_| FastPayError::StorageError)?; // Update the index - self._owner_index + self.owner_index .insert(&(object.owner, object.id()), &object.to_object_reference()) .map_err(|_| FastPayError::StorageError)?; @@ -276,7 +276,7 @@ impl AuthorityStore { // Delete the old owner index entries write_batch = write_batch .delete_batch( - &self._owner_index, + &self.owner_index, _expired_object_owners .into_iter() .map(|(owner, id)| (owner, id)), @@ -307,7 +307,7 @@ impl AuthorityStore { // Update the indexes of the objects written write_batch = write_batch .insert_batch( - &self._owner_index, + &self.owner_index, written .iter() .map(|output_ref| ((objects[&output_ref.0].owner, output_ref.0), *output_ref)), From 962810c347ab6de83f78dff8fdcb902eab948a3c Mon Sep 17 00:00:00 2001 From: George Danezis Date: Mon, 10 Jan 2022 19:47:35 +0000 Subject: [PATCH 3/5] Fmt & comments --- fastpay_core/src/authority/authority_store.rs | 1 + fastpay_core/src/authority/temporary_store.rs | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/fastpay_core/src/authority/authority_store.rs b/fastpay_core/src/authority/authority_store.rs index e2a2b799fc455..50fcdfe964d03 100644 --- a/fastpay_core/src/authority/authority_store.rs +++ b/fastpay_core/src/authority/authority_store.rs @@ -56,6 +56,7 @@ impl AuthorityStore { Ok(self .owner_index .iter() + // The object id [0; 16] is the smallest possible .skip_to(&(account, AccountAddress::from([0; 16]))) .map_err(|_| FastPayError::StorageError)? .take_while(|((owner, _id), _object_ref)| (owner == &account)) diff --git a/fastpay_core/src/authority/temporary_store.rs b/fastpay_core/src/authority/temporary_store.rs index 1d7243c2ef8bd..82f4cb36bee45 100644 --- a/fastpay_core/src/authority/temporary_store.rs +++ b/fastpay_core/src/authority/temporary_store.rs @@ -4,8 +4,8 @@ pub struct AuthorityTemporaryStore { object_store: Arc, objects: BTreeMap, active_inputs: Vec, // Inputs that are not read only - written: Vec, // Objects written - deleted: Vec, // Objects actively deleted + written: Vec, // Objects written + deleted: Vec, // Objects actively deleted } impl AuthorityTemporaryStore { From 80b5c6ef77822b664a5a1b26fa1fb5105b67e337 Mon Sep 17 00:00:00 2001 From: George Danezis Date: Mon, 10 Jan 2022 23:36:59 +0000 Subject: [PATCH 4/5] Use multi-get from db --- fastpay_core/src/authority.rs | 50 +++++++++++------ fastpay_core/src/authority/authority_store.rs | 54 ++++++++++++------- 2 files changed, 70 insertions(+), 34 deletions(-) diff --git a/fastpay_core/src/authority.rs b/fastpay_core/src/authority.rs index e29f171845dab..2d6ca0a7ab2c8 100644 --- a/fastpay_core/src/authority.rs +++ b/fastpay_core/src/authority.rs @@ -80,7 +80,13 @@ impl AuthorityState { None }; - for object_ref in input_objects { + let ids: Vec<_> = input_objects.iter().map(|(id, _, _)| *id).collect(); + // Get a copy of the object. + // TODO: We only need to read the read_only and owner field of the object, + // it's a bit wasteful to copy the entire object. + let _objects = self.get_objects(&ids[..]).await?; + for (object_ref, object) in input_objects.into_iter().zip(_objects) { + //for object_ref in input_objects.into_iter() { let (object_id, sequence_number, _object_digest) = object_ref; fp_ensure!( @@ -88,13 +94,12 @@ impl AuthorityState { FastPayError::InvalidSequenceNumber ); - // Get a copy of the object. - // TODO: We only need to read the read_only and owner field of the object, - // it's a bit wasteful to copy the entire object. - let object = self - .object_state(&object_id) - .await - .map_err(|_| FastPayError::ObjectNotFound)?; + // let object = self + // .object_state(&object_id) + // .await + // .map_err(|_| FastPayError::ObjectNotFound)?; + + let object = object.ok_or(FastPayError::ObjectNotFound)?; // TODO(https://github.com/MystenLabs/fastnft/issues/123): This hack substitutes the real // object digest instead of using the one passed in by the client. We need to fix clients and @@ -171,15 +176,21 @@ impl AuthorityState { // Check the certificate and retrieve the transfer data. certificate.check(&self.committee)?; + let input_objects = order.input_objects(); + let ids: Vec<_> = input_objects.iter().map(|(id, _, _)| *id).collect(); + // Get a copy of the object. + // TODO: We only need to read the read_only and owner field of the object, + // it's a bit wasteful to copy the entire object. + let _objects = self.get_objects(&ids[..]).await?; + let mut inputs = vec![]; let mut owner_index = HashMap::new(); - for (input_object_id, input_seq, _input_digest) in order.input_objects() { + for (object_ref, object) in input_objects.into_iter().zip(_objects) { + let (input_object_id, input_seq, _input_digest) = object_ref; + // If we have a certificate on the confirmation order it means that the input // object exists on other honest authorities, but we do not have it. - let input_object = self - .object_state(&input_object_id) - .await - .map_err(|_| FastPayError::ObjectNotFound)?; + let input_object = object.ok_or(FastPayError::ObjectNotFound)?; let input_sequence_number = input_object.version(); @@ -290,9 +301,9 @@ impl AuthorityState { certificate, to_signed_effects, ) - .await?; + .await - self.make_order_info(&transaction_digest).await + // self.make_order_info(&transaction_digest).await } pub async fn handle_account_info_request( @@ -427,7 +438,7 @@ impl AuthorityState { expired_object_owners: Vec<(FastPayAddress, ObjectID)>, certificate: CertifiedOrder, signed_effects: SignedOrderEffects, - ) -> Result<(), FastPayError> { + ) -> Result { self._database.update_state( temporary_store, expired_object_owners, @@ -459,4 +470,11 @@ impl AuthorityState { .parent(object_ref) .expect("TODO: propagate the error") } + + pub async fn get_objects( + &self, + _objects: &[ObjectID], + ) -> Result>, FastPayError> { + self._database.get_objects(_objects) + } } diff --git a/fastpay_core/src/authority/authority_store.rs b/fastpay_core/src/authority/authority_store.rs index 50fcdfe964d03..418b79774ef49 100644 --- a/fastpay_core/src/authority/authority_store.rs +++ b/fastpay_core/src/authority/authority_store.rs @@ -92,6 +92,13 @@ impl AuthorityStore { .ok_or(FastPayError::ObjectNotFound) } + /// Get many objects + pub fn get_objects(&self, _objects: &[ObjectID]) -> Result>, FastPayError> { + self.objects + .multi_get(_objects) + .map_err(|_| FastPayError::StorageError) + } + /// Read a lock or returns Err(OrderLockDoesNotExist) if the lock does not exist. pub fn get_order_lock( &self, @@ -197,13 +204,14 @@ impl AuthorityStore { .lock() .map_err(|_| FastPayError::StorageError)?; - for obj_ref in mutable_input_objects { + let locks = self + .order_lock + .multi_get(mutable_input_objects) + .map_err(|_| FastPayError::StorageError)?; + + for (obj_ref, lock) in mutable_input_objects.iter().zip(locks) { // The object / version must exist, and therefore lock initialized. - let lock = self - .order_lock - .get(obj_ref) - .map_err(|_| FastPayError::StorageError)? - .ok_or(FastPayError::OrderLockDoesNotExist)?; + let lock = lock.ok_or(FastPayError::OrderLockDoesNotExist)?; if let Some(previous_tx_digest) = lock { if previous_tx_digest != tx_digest { @@ -237,7 +245,7 @@ impl AuthorityStore { _expired_object_owners: Vec<(FastPayAddress, ObjectID)>, certificate: CertifiedOrder, signed_effects: SignedOrderEffects, - ) -> Result<(), FastPayError> { + ) -> Result { // TODO: There is a lot of cloning used -- eliminate it. // Extract the new state from the execution @@ -254,7 +262,7 @@ impl AuthorityStore { write_batch = write_batch .insert_batch( &self.certificates, - std::iter::once((transaction_digest, certificate)), + std::iter::once((transaction_digest, certificate.clone())), ) .map_err(|_| FastPayError::StorageError)?; @@ -262,7 +270,7 @@ impl AuthorityStore { write_batch = write_batch .insert_batch( &self.signed_effects, - std::iter::once((transaction_digest, signed_effects)), + std::iter::once((transaction_digest, signed_effects.clone())), ) .map_err(|_| FastPayError::StorageError)?; @@ -343,19 +351,29 @@ impl AuthorityStore { // Check the locks are still active // TODO: maybe we could just check if the certificate is there instead? - for input_ref in active_inputs { - fp_ensure!( - self.order_lock - .contains_key(&input_ref) - .map_err(|_| FastPayError::StorageError)?, - FastPayError::OrderLockDoesNotExist - ); + let locks = self + .order_lock + .multi_get(&active_inputs[..]) + .map_err(|_| FastPayError::StorageError)?; + for object_lock in locks { + object_lock.ok_or(FastPayError::OrderLockDoesNotExist)?; } // Atomic write of all locks & other data - write_batch.write().map_err(|_| FastPayError::StorageError) + write_batch + .write() + .map_err(|_| FastPayError::StorageError)?; - // Implicit: drop(_lock); + // implict: drop(_lock); } // End of critical region + + Ok(OrderInfoResponse { + signed_order: self + .signed_orders + .get(&transaction_digest) + .map_err(|_| FastPayError::StorageError)?, + certified_order: Some(certificate), + signed_effects: Some(signed_effects), + }) } } From 3dc355d5bf8fe090458d06e622e3bcea28f8bcbc Mon Sep 17 00:00:00 2001 From: George Danezis Date: Tue, 11 Jan 2022 16:00:32 +0000 Subject: [PATCH 5/5] Clean up after reviews --- fastpay_core/src/authority.rs | 36 +++++++------------ fastpay_core/src/authority/authority_store.rs | 9 ++--- 2 files changed, 15 insertions(+), 30 deletions(-) diff --git a/fastpay_core/src/authority.rs b/fastpay_core/src/authority.rs index 2d6ca0a7ab2c8..1d63dd5ca47c7 100644 --- a/fastpay_core/src/authority.rs +++ b/fastpay_core/src/authority.rs @@ -84,9 +84,8 @@ impl AuthorityState { // Get a copy of the object. // TODO: We only need to read the read_only and owner field of the object, // it's a bit wasteful to copy the entire object. - let _objects = self.get_objects(&ids[..]).await?; - for (object_ref, object) in input_objects.into_iter().zip(_objects) { - //for object_ref in input_objects.into_iter() { + let objects = self.get_objects(&ids[..]).await?; + for (object_ref, object) in input_objects.into_iter().zip(objects) { let (object_id, sequence_number, _object_digest) = object_ref; fp_ensure!( @@ -94,11 +93,6 @@ impl AuthorityState { FastPayError::InvalidSequenceNumber ); - // let object = self - // .object_state(&object_id) - // .await - // .map_err(|_| FastPayError::ObjectNotFound)?; - let object = object.ok_or(FastPayError::ObjectNotFound)?; // TODO(https://github.com/MystenLabs/fastnft/issues/123): This hack substitutes the real @@ -181,11 +175,11 @@ impl AuthorityState { // Get a copy of the object. // TODO: We only need to read the read_only and owner field of the object, // it's a bit wasteful to copy the entire object. - let _objects = self.get_objects(&ids[..]).await?; + let objects = self.get_objects(&ids[..]).await?; let mut inputs = vec![]; let mut owner_index = HashMap::new(); - for (object_ref, object) in input_objects.into_iter().zip(_objects) { + for (object_ref, object) in input_objects.into_iter().zip(objects) { let (input_object_id, input_seq, _input_digest) = object_ref; // If we have a certificate on the confirmation order it means that the input @@ -276,16 +270,14 @@ impl AuthorityState { .deleted() .iter() .map(|(id, _, _)| (owner_index[id], *id)) - .chain( - temporary_store - .written() - .iter() - .filter(|(id, _, _)| { - let owner = owner_index.get(id); - owner.is_some() && *owner.unwrap() != temporary_store.objects()[id].owner - }) - .map(|(id, _, _)| (owner_index[id], *id)), - ) + .chain(temporary_store.written().iter().filter_map(|(id, _, _)| { + let owner = owner_index.get(id); + if owner.is_some() && *owner.unwrap() != temporary_store.objects()[id].owner { + Some((owner_index[id], *id)) + } else { + None + } + })) .collect(); // Update the database in an atomic manner @@ -301,9 +293,7 @@ impl AuthorityState { certificate, to_signed_effects, ) - .await - - // self.make_order_info(&transaction_digest).await + .await // Returns the OrderInfoResponse } pub async fn handle_account_info_request( diff --git a/fastpay_core/src/authority/authority_store.rs b/fastpay_core/src/authority/authority_store.rs index 418b79774ef49..b3fee4e847792 100644 --- a/fastpay_core/src/authority/authority_store.rs +++ b/fastpay_core/src/authority/authority_store.rs @@ -242,7 +242,7 @@ impl AuthorityStore { pub fn update_state( &self, temporary_store: AuthorityTemporaryStore, - _expired_object_owners: Vec<(FastPayAddress, ObjectID)>, + expired_object_owners: Vec<(FastPayAddress, ObjectID)>, certificate: CertifiedOrder, signed_effects: SignedOrderEffects, ) -> Result { @@ -284,12 +284,7 @@ impl AuthorityStore { // Delete the old owner index entries write_batch = write_batch - .delete_batch( - &self.owner_index, - _expired_object_owners - .into_iter() - .map(|(owner, id)| (owner, id)), - ) + .delete_batch(&self.owner_index, expired_object_owners.into_iter()) .map_err(|_| FastPayError::StorageError)?; // Index the certificate by the objects created