Skip to content

Commit

Permalink
feat: broadcast one sided (#6568)
Browse files Browse the repository at this point in the history
Description
---
Broadcast one-sided transactions

Motivation and Context
---
When a wallet sends a 1-sided tx it will now send it across the network
to the peer as well. If the peer listens to the it, ti will register the
tx immediately and show it to the user.

How Has This Been Tested?
---
manual
  • Loading branch information
SWvheerden authored Sep 20, 2024
1 parent 0d5e09c commit a954383
Show file tree
Hide file tree
Showing 11 changed files with 137 additions and 35 deletions.
5 changes: 5 additions & 0 deletions base_layer/common_types/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ impl Display for TransactionStatus {

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum ImportStatus {
/// Special case where we import a tx received from broadcast
Broadcast,
/// This transaction import status is used when importing a spendable UTXO
Imported,
/// This transaction import status is used when a one-sided transaction has been scanned but is unconfirmed
Expand All @@ -183,6 +185,7 @@ impl TryFrom<ImportStatus> for TransactionStatus {

fn try_from(value: ImportStatus) -> Result<Self, Self::Error> {
match value {
ImportStatus::Broadcast => Ok(TransactionStatus::Broadcast),
ImportStatus::Imported => Ok(TransactionStatus::Imported),
ImportStatus::OneSidedUnconfirmed => Ok(TransactionStatus::OneSidedUnconfirmed),
ImportStatus::OneSidedConfirmed => Ok(TransactionStatus::OneSidedConfirmed),
Expand All @@ -197,6 +200,7 @@ impl TryFrom<TransactionStatus> for ImportStatus {

fn try_from(value: TransactionStatus) -> Result<Self, Self::Error> {
match value {
TransactionStatus::Broadcast => Ok(ImportStatus::Broadcast),
TransactionStatus::Imported => Ok(ImportStatus::Imported),
TransactionStatus::OneSidedUnconfirmed => Ok(ImportStatus::OneSidedUnconfirmed),
TransactionStatus::OneSidedConfirmed => Ok(ImportStatus::OneSidedConfirmed),
Expand All @@ -210,6 +214,7 @@ impl TryFrom<TransactionStatus> for ImportStatus {
impl fmt::Display for ImportStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
match self {
ImportStatus::Broadcast => write!(f, "Broadcast"),
ImportStatus::Imported => write!(f, "Imported"),
ImportStatus::OneSidedUnconfirmed => write!(f, "OneSidedUnconfirmed"),
ImportStatus::OneSidedConfirmed => write!(f, "OneSidedConfirmed"),
Expand Down
8 changes: 4 additions & 4 deletions base_layer/wallet/src/output_manager_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ pub enum OutputManagerRequest {
num_outputs: usize,
},

ScanForRecoverableOutputs(Vec<TransactionOutput>),
ScanOutputs(Vec<TransactionOutput>),
ScanForRecoverableOutputs(Vec<(TransactionOutput, Option<TxId>)>),
ScanOutputs(Vec<(TransactionOutput, Option<TxId>)>),
AddKnownOneSidedPaymentScript(KnownOneSidedPaymentScript),
CreateOutputWithFeatures {
value: MicroMinotari,
Expand Down Expand Up @@ -747,7 +747,7 @@ impl OutputManagerHandle {

pub async fn scan_for_recoverable_outputs(
&mut self,
outputs: Vec<TransactionOutput>,
outputs: Vec<(TransactionOutput, Option<TxId>)>,
) -> Result<Vec<RecoveredOutput>, OutputManagerError> {
match self
.handle
Expand All @@ -761,7 +761,7 @@ impl OutputManagerHandle {

pub async fn scan_outputs_for_one_sided_payments(
&mut self,
outputs: Vec<TransactionOutput>,
outputs: Vec<(TransactionOutput, Option<TxId>)>,
) -> Result<Vec<RecoveredOutput>, OutputManagerError> {
match self.handle.call(OutputManagerRequest::ScanOutputs(outputs)).await?? {
OutputManagerResponse::ScanOutputs(outputs) => Ok(outputs),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,16 @@ where
/// them to the database and increment the key manager index
pub async fn scan_and_recover_outputs(
&mut self,
outputs: Vec<TransactionOutput>,
outputs: Vec<(TransactionOutput, Option<TxId>)>,
) -> Result<Vec<RecoveredOutput>, OutputManagerError> {
let start = Instant::now();
let outputs_length = outputs.len();

let known_scripts = self.db.get_all_known_one_sided_payment_scripts()?;

let mut rewound_outputs: Vec<(WalletOutput, bool, FixedHash)> = Vec::new();
let mut rewound_outputs: Vec<(WalletOutput, bool, FixedHash, Option<TxId>)> = Vec::new();
let push_pub_key_script = script!(PushPubKey(Box::default()))?;
for output in outputs {
for (output, tx_id) in outputs {
let known_script_index = known_scripts.iter().position(|s| s.script == output.script);
if output.script != script!(Nop)? &&
known_script_index.is_none() &&
Expand Down Expand Up @@ -122,7 +122,7 @@ where
payment_id,
);

rewound_outputs.push((uo, known_script_index.is_some(), hash));
rewound_outputs.push((uo, known_script_index.is_some(), hash, tx_id));
}

let rewind_time = start.elapsed();
Expand All @@ -134,7 +134,7 @@ where
);

let mut rewound_outputs_with_tx_id: Vec<RecoveredOutput> = Vec::new();
for (output, has_known_script, hash) in &mut rewound_outputs {
for (output, has_known_script, hash, tx_id) in &mut rewound_outputs {
let db_output = DbWalletOutput::from_wallet_output(
output.clone(),
&self.master_key_manager,
Expand All @@ -144,7 +144,10 @@ where
None,
)
.await?;
let tx_id = TxId::new_random();
let tx_id = match tx_id {
Some(id) => *id,
None => TxId::new_random(),
};
let output_hex = db_output.commitment.to_hex();
if let Err(e) = self.db.add_unspent_output_with_tx_id(tx_id, db_output) {
match e {
Expand Down
14 changes: 7 additions & 7 deletions base_layer/wallet/src/output_manager_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3109,7 +3109,7 @@ where
#[allow(clippy::too_many_lines)]
async fn scan_outputs_for_one_sided_payments(
&mut self,
outputs: Vec<TransactionOutput>,
outputs: Vec<(TransactionOutput, Option<TxId>)>,
) -> Result<Vec<RecoveredOutput>, OutputManagerError> {
let mut known_keys = Vec::new();
let known_scripts = self.resources.db.get_all_known_one_sided_payment_scripts()?;
Expand All @@ -3127,7 +3127,7 @@ where

let mut scanned_outputs = vec![];

for output in outputs {
for (output, tx_id) in outputs {
if let [Opcode::PushPubKey(scanned_pk)] = output.script.as_slice() {
if let Some(matched_key) = known_keys.iter().find(|x| &x.0 == scanned_pk.as_ref()) {
let shared_secret = self
Expand Down Expand Up @@ -3166,7 +3166,7 @@ where
payment_id,
);

scanned_outputs.push((rewound_output, OutputSource::OneSided));
scanned_outputs.push((rewound_output, OutputSource::OneSided, tx_id));
}
}
}
Expand Down Expand Up @@ -3232,7 +3232,7 @@ where
payment_id,
);

scanned_outputs.push((rewound_output, OutputSource::StealthOneSided));
scanned_outputs.push((rewound_output, OutputSource::StealthOneSided, tx_id));
}
}
}
Expand All @@ -3245,12 +3245,12 @@ where
// Import scanned outputs into the wallet
async fn import_onesided_outputs(
&self,
scanned_outputs: Vec<(WalletOutput, OutputSource)>,
scanned_outputs: Vec<(WalletOutput, OutputSource, Option<TxId>)>,
) -> Result<Vec<RecoveredOutput>, OutputManagerError> {
let mut rewound_outputs = Vec::with_capacity(scanned_outputs.len());

for (output, output_source) in scanned_outputs {
let tx_id = TxId::new_random();
for (output, output_source, tx_id) in scanned_outputs {
let tx_id = tx_id.unwrap_or(TxId::new_random());
let db_output = DbWalletOutput::from_wallet_output(
output.clone(),
&self.resources.key_manager,
Expand Down
78 changes: 76 additions & 2 deletions base_layer/wallet/src/transaction_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1855,7 +1855,7 @@ where
CompletedTransaction::new(
tx_id,
self.resources.one_sided_tari_address.clone(),
dest_address,
dest_address.clone(),
amount,
fee,
tx.clone(),
Expand All @@ -1870,6 +1870,15 @@ where
)
.await?;

tokio::spawn(send_finalized_transaction_message(
tx_id,
tx.clone(),
dest_address.comms_public_key().clone(),
self.resources.outbound_message_service.clone(),
self.resources.config.direct_send_timeout,
self.resources.config.transaction_routing_mechanism,
));

Ok(tx_id)
}

Expand Down Expand Up @@ -2930,6 +2939,7 @@ where
/// Accept the public reply from a recipient and apply the reply to the relevant transaction protocol
/// # Arguments
/// 'recipient_reply' - The public response from a recipient with data required to complete the transaction
#[allow(clippy::too_many_lines)]
pub async fn accept_finalized_transaction(
&mut self,
source_pubkey: CommsPublicKey,
Expand Down Expand Up @@ -2993,7 +3003,70 @@ where
Some(s) => s,
}
},
Err(_) => return Err(TransactionServiceError::TransactionDoesNotExistError),
Err(_) => {
// we dont currently know of this transaction, so lets see if we can recover funds from this
// transaction, and if so, lets add it to our pool of transactions.
let outputs = transaction.body.outputs();
let mut recovered = self
.resources
.output_manager_service
.scan_for_recoverable_outputs(outputs.iter().map(|o| (o.clone(), Some(tx_id))).collect())
.await?;
recovered.append(
&mut self
.resources
.output_manager_service
.scan_outputs_for_one_sided_payments(
outputs.iter().map(|o| (o.clone(), Some(tx_id))).collect(),
)
.await?,
);
if recovered.is_empty() {
return Err(TransactionServiceError::TransactionDoesNotExistError);
};
// we should only be able to recover 1 output per tx, but we use the vec here to be safe
let mut source_address = None;
let mut payment_id = None;
let mut amount = None;
for ro in recovered {
match &ro.output.payment_id {
PaymentId::AddressAndData(address, _) | PaymentId::Address(address) => {
if source_address.is_none() {
source_address = Some(address.clone());
payment_id = Some(ro.output.payment_id.clone());
amount = Some(ro.output.value);
}
},
_ => {},
};
}
let completed_transaction = CompletedTransaction::new(
tx_id,
source_address.clone().unwrap_or_default(),
self.resources.one_sided_tari_address.clone(),
amount.unwrap_or_default(),
transaction.body.get_total_fee()?,
transaction.clone(),
TransactionStatus::Completed,
"".to_string(),
Utc::now().naive_utc(),
TransactionDirection::Inbound,
None,
None,
payment_id,
)?;
self.db
.insert_completed_transaction(tx_id, completed_transaction.clone())?;
self.restart_receive_transaction_protocol(
tx_id,
source_address.unwrap_or_default(),
join_handles,
);
match self.finalized_transaction_senders.get_mut(&tx_id) {
None => return Err(TransactionServiceError::TransactionDoesNotExistError),
Some(s) => s,
}
},
}
},
Some(s) => s,
Expand Down Expand Up @@ -3439,6 +3512,7 @@ where
payment_id,
)?;
let transaction_event = match import_status {
ImportStatus::Broadcast => TransactionEvent::TransactionBroadcast(tx_id),
ImportStatus::Imported => TransactionEvent::DetectedTransactionUnconfirmed {
tx_id,
num_confirmations: 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ where
&mut self
.resources
.output_manager_service
.scan_for_recoverable_outputs(outputs.clone())
.scan_for_recoverable_outputs(outputs.clone().into_iter().map(|o| (o, None)).collect())
.await?
.into_iter()
.map(|ro| -> Result<_, UtxoScannerError> {
Expand All @@ -593,7 +593,7 @@ where
&mut self
.resources
.output_manager_service
.scan_outputs_for_one_sided_payments(outputs.clone())
.scan_outputs_for_one_sided_payments(outputs.clone().into_iter().map(|o| (o, None)).collect())
.await?
.into_iter()
.map(|ro| -> Result<_, UtxoScannerError> {
Expand Down
14 changes: 10 additions & 4 deletions base_layer/wallet/tests/output_manager_service_tests/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2236,12 +2236,18 @@ async fn scan_for_recovery_test() {
}
let mut recoverable_outputs = Vec::new();
for output in &recoverable_wallet_outputs {
recoverable_outputs.push(output.to_transaction_output(&oms.key_manager_handle).await.unwrap());
recoverable_outputs.push((
output.to_transaction_output(&oms.key_manager_handle).await.unwrap(),
None,
));
}

let mut non_recoverable_outputs = Vec::new();
for output in non_recoverable_wallet_outputs {
non_recoverable_outputs.push(output.to_transaction_output(&oms.key_manager_handle).await.unwrap());
non_recoverable_outputs.push((
output.to_transaction_output(&oms.key_manager_handle).await.unwrap(),
None,
));
}

oms.output_manager_handle
Expand All @@ -2256,7 +2262,7 @@ async fn scan_for_recovery_test() {
.clone()
.into_iter()
.chain(non_recoverable_outputs.clone().into_iter())
.collect::<Vec<TransactionOutput>>(),
.collect::<Vec<(TransactionOutput, Option<TxId>)>>(),
)
.await
.unwrap();
Expand Down Expand Up @@ -2291,7 +2297,7 @@ async fn recovered_output_key_not_in_keychain() {

let result = oms
.output_manager_handle
.scan_for_recoverable_outputs(vec![rewindable_output])
.scan_for_recoverable_outputs(vec![(rewindable_output, None)])
.await;
assert!(
matches!(result.as_deref(), Ok([])),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl OutputManagerServiceMock {
.clone()
.into_iter()
.filter_map(|dbuo| {
if requested_outputs.iter().any(|ro| dbuo.commitment == ro.commitment) {
if requested_outputs.iter().any(|ro| dbuo.commitment == ro.0.commitment) {
Some(RecoveredOutput {
output: dbuo.wallet_output,
tx_id: TxId::new_random(),
Expand All @@ -127,7 +127,7 @@ impl OutputManagerServiceMock {
.clone()
.into_iter()
.filter_map(|dbuo| {
if requested_outputs.iter().any(|ro| dbuo.commitment == ro.commitment) {
if requested_outputs.iter().any(|ro| dbuo.commitment == ro.0.commitment) {
Some(RecoveredOutput {
output: dbuo.wallet_output,
tx_id: TxId::new_random(),
Expand Down
14 changes: 10 additions & 4 deletions base_layer/wallet/tests/transaction_service_tests/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1795,15 +1795,18 @@ async fn recover_one_sided_transaction() {
let outputs = completed_tx.transaction.body.outputs().clone();

let recovered_outputs_1 = bob_oms
.scan_outputs_for_one_sided_payments(outputs.clone())
.scan_outputs_for_one_sided_payments(outputs.iter().map(|o| (o.clone(), None)).collect())
.await
.unwrap();
// Bob should be able to claim 1 output.
assert_eq!(1, recovered_outputs_1.len());
assert_eq!(value, recovered_outputs_1[0].output.value);

// Should ignore already existing outputs
let recovered_outputs_2 = bob_oms.scan_outputs_for_one_sided_payments(outputs).await.unwrap();
let recovered_outputs_2 = bob_oms
.scan_outputs_for_one_sided_payments(outputs.into_iter().map(|o| (o, None)).collect())
.await
.unwrap();
assert!(recovered_outputs_2.is_empty());
}

Expand Down Expand Up @@ -1919,15 +1922,18 @@ async fn recover_stealth_one_sided_transaction() {
let outputs = completed_tx.transaction.body.outputs().clone();

let recovered_outputs_1 = bob_oms
.scan_outputs_for_one_sided_payments(outputs.clone())
.scan_outputs_for_one_sided_payments(outputs.iter().map(|o| (o.clone(), None)).collect())
.await
.unwrap();
// Bob should be able to claim 1 output.
assert_eq!(1, recovered_outputs_1.len());
assert_eq!(value, recovered_outputs_1[0].output.value);

// Should ignore already existing outputs
let recovered_outputs_2 = bob_oms.scan_outputs_for_one_sided_payments(outputs).await.unwrap();
let recovered_outputs_2 = bob_oms
.scan_outputs_for_one_sided_payments(outputs.into_iter().map(|o| (o, None)).collect())
.await
.unwrap();
assert!(recovered_outputs_2.is_empty());
}

Expand Down
Loading

0 comments on commit a954383

Please sign in to comment.