Skip to content

Commit

Permalink
Queue PayJoins in 'Scheduler'
Browse files Browse the repository at this point in the history
Co-authored-by: 志宇 <[email protected]>
  • Loading branch information
DanGould and evanlinjin committed Oct 10, 2022
1 parent 19046bc commit 0324eaa
Show file tree
Hide file tree
Showing 4 changed files with 273 additions and 138 deletions.
2 changes: 1 addition & 1 deletion src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,5 @@ where
}
};

Ok(Some(ScheduledPayJoin { wallet_amount, channels, fee_rate }))
Ok(Some(ScheduledPayJoin::new(wallet_amount, channels, fee_rate)))
}
47 changes: 46 additions & 1 deletion src/lnd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@ use bitcoin::psbt::PartiallySignedTransaction;
use bitcoin::Address;
use ln_types::P2PAddress;
use tokio::sync::Mutex as AsyncMutex;
use tonic_lnd::rpc::{FundingTransitionMsg, OpenChannelRequest, OpenStatusUpdate};
use tonic_lnd::rpc::funding_transition_msg::Trigger;
use tonic_lnd::rpc::{
FundingPsbtVerify, FundingTransitionMsg, OpenChannelRequest, OpenStatusUpdate,
};

use crate::scheduler::ChannelId;

#[derive(Debug)]
pub enum LndError {
Expand Down Expand Up @@ -62,6 +67,18 @@ impl From<tonic_lnd::ConnectError> for LndError {
pub struct LndClient(Arc<AsyncMutex<tonic_lnd::Client>>);

impl LndClient {
/// New [LndClient] from [Config].
pub async fn from_config(config: &crate::config::Config) -> Result<Self, LndError> {
let raw_client = tonic_lnd::connect(
config.lnd_address.clone(),
&config.lnd_cert_path,
&config.lnd_macaroon_path,
)
.await?;

Self::new(raw_client).await
}

pub async fn new(mut client: tonic_lnd::Client) -> Result<Self, LndError> {
let response = client
.get_info(tonic_lnd::rpc::GetInfoRequest {})
Expand Down Expand Up @@ -161,6 +178,34 @@ impl LndClient {
Ok(None)
}

/// Sends the `FundingPsbtVerify` message to remote lnd nodes to finalize channels of given
/// channel ids.
pub async fn verify_funding<I>(&self, funded_psbt: &[u8], chan_ids: I) -> Result<(), LndError>
where
I: IntoIterator<Item = ChannelId>,
{
let handles = chan_ids
.into_iter()
.map(|chan_id| {
let client = self.clone();
let req = FundingTransitionMsg {
trigger: Some(Trigger::PsbtVerify(FundingPsbtVerify {
pending_chan_id: chan_id.into(),
funded_psbt: funded_psbt.to_vec(),
skip_finalize: true,
})),
};
tokio::spawn(async move { client.funding_state_step(req).await })
})
.collect::<Vec<_>>();

for handle in handles {
handle.await.unwrap()?;
}

Ok(())
}

pub async fn funding_state_step(&self, req: FundingTransitionMsg) -> Result<(), LndError> {
let client = &mut *self.0.lock().await;
client.funding_state_step(req).await?;
Expand Down
144 changes: 19 additions & 125 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ use std::collections::HashMap;
use std::sync::{Arc, Mutex};

use bip78::receiver::*;
use bitcoin::util::address::Address;
use bitcoin::util::psbt::PartiallySignedTransaction;
use bitcoin::{Script, TxOut};
use bitcoin::{Address, Script, TxOut};
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use scheduler::ScheduledPayJoin;

use crate::args::parse_args;
use crate::lnd::*;
use crate::scheduler::Scheduler;

#[macro_use]
extern crate serde_derive;
Expand Down Expand Up @@ -69,36 +68,27 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let scheduled_pj = parse_args(args).expect("failed to parse remaining arguments");

let client =
tonic_lnd::connect(config.lnd_address, &config.lnd_cert_path, &config.lnd_macaroon_path)
.await
.expect("failed to connect");

let client = LndClient::new(client).await?;
let mut handler = Handler::new(client).await;
let scheduler = Scheduler::new(LndClient::from_config(&config).await?);

if let Some(payjoin) = scheduled_pj {
payjoin.test_connections(&mut handler.client).await;
let address = handler.client.get_new_bech32_address().await?;
let address = scheduler.schedule_payjoin(&payjoin).await?;

// TODO: Don't hardcode pj endpoint
// * Optional cli flag or ENV for pj endpoint (in the case of port forwarding), otherwise
// we should determine the bip21 string using `api::ServeOptions`
println!(
"bitcoin:{}?amount={}&pj=https://example.com/pj",
"bitcoin:{}?amount={}&pj=https://localhost:3010/pj",
address,
payjoin.total_amount().to_string_in(bitcoin::Denomination::Bitcoin)
);

handler.payjoins.insert(&address, payjoin).expect("new handler should be empty");
}

let addr = ([127, 0, 0, 1], config.bind_port).into();

let service = make_service_fn(move |_| {
let handler = handler.clone();

let sched = scheduler.clone(); // TODO Review this double clone. Wataf is going on here, are we referencing the same scheduler or do we need a container?
async move {
Ok::<_, hyper::Error>(service_fn(move |request| {
handle_web_req(handler.clone(), request)
}))
Ok::<_, hyper::Error>(service_fn(move |request| handle_web_req(sched.clone(), request)))
}
});

Expand All @@ -117,13 +107,11 @@ impl bip78::receiver::Headers for Headers {
}

async fn handle_web_req(
handler: Handler,
scheduler: Scheduler,
req: Request<Body>,
) -> Result<Response<Body>, hyper::Error> {
use std::path::Path;

use bitcoin::consensus::Encodable;

match (req.method(), req.uri().path()) {
(&Method::GET, "/pj") => {
let index =
Expand All @@ -142,8 +130,6 @@ async fn handle_web_req(
(&Method::POST, "/pj") => {
dbg!(req.uri().query());

let lnd = handler.client;

let headers = Headers(req.headers().to_owned());
let query = {
let uri = req.uri();
Expand All @@ -157,117 +143,25 @@ async fn handle_web_req(
dbg!(&bytes); // this is correct by my accounts
let reader = &*bytes;
let original_request = UncheckedProposal::from_request(reader, query, headers).unwrap();
if original_request.is_output_substitution_disabled() {
// TODO handle error for output substitution properly, don't panic
panic!("Output substitution must be enabled");
}

let proposal = original_request
// This is interactive, NOT a Payment Processor, so we don't save original tx.
// Humans can solve the failure case out of band by trying again.
.assume_interactive_receive_endpoint()
.assume_no_inputs_owned() // TODO Check
.assume_no_mixed_input_scripts() // This check is silly and could be ignored
.assume_no_inputs_seen_before(); // TODO

let mut psbt = proposal.psbt().clone();
eprintln!("Received transaction: {:#?}", psbt);
{
for input in &mut psbt.unsigned_tx.input {
// clear signature
input.script_sig = bitcoin::blockdata::script::Script::new();
}
}
// TODO: Handle with payjoin crate. Support multiple receiver outputs.
let (our_output, scheduled_payjoin) = handler
.payjoins
.find(&mut psbt.unsigned_tx.output)
.expect("the transaction doesn't contain our output");
// TODO: replace with scheduled_payjoin.total_channel_amount()
let total_channel_amount: bitcoin::Amount = scheduled_payjoin
.channels
.iter()
.map(|channel| channel.amount)
.fold(bitcoin::Amount::ZERO, std::ops::Add::add);
// TODO: replace with sheduled_payjoin.fees()
let fees = scheduler::calculate_fees(
scheduled_payjoin.channels.len() as u64,
scheduled_payjoin.fee_rate,
scheduled_payjoin.wallet_amount != bitcoin::Amount::ZERO,
);
let proposal_psbt = scheduler.propose_payjoin(original_request).await.unwrap();

// FIXME we shouldn't have anything that panics when handling an http request
assert_eq!(
our_output.value,
(total_channel_amount + scheduled_payjoin.wallet_amount + fees).as_sat()
);
// TODO handle error
let open_chan_results = scheduled_payjoin.multi_open_channel(&lnd).await.unwrap();

let mut txouts = open_chan_results.iter().map(|(_, txo)| txo.clone());
let chids = open_chan_results.iter().map(|(id, _)| *id);

let channel_output = txouts.next().expect("no channels");

if scheduled_payjoin.wallet_amount == bitcoin::Amount::ZERO {
assert_eq!(channel_output.value, scheduled_payjoin.channels[0].amount.as_sat());
*our_output = channel_output;
} else {
our_output.value = scheduled_payjoin.wallet_amount.as_sat();
psbt.unsigned_tx.output.push(channel_output)
}

psbt.unsigned_tx.output.extend(txouts);
psbt.outputs.resize_with(psbt.unsigned_tx.output.len(), Default::default);

eprintln!("PSBT to be given to LND: {:#?}", psbt);
let mut psbt_bytes = Vec::new();
psbt.consensus_encode(&mut psbt_bytes).unwrap();

for chid in chids {
let psbt_verify = tonic_lnd::rpc::FundingPsbtVerify {
pending_chan_id: Vec::from(&chid as &[_]),
funded_psbt: psbt_bytes.clone(),
skip_finalize: true,
};

let transition_msg = tonic_lnd::rpc::FundingTransitionMsg {
trigger: Some(tonic_lnd::rpc::funding_transition_msg::Trigger::PsbtVerify(
psbt_verify,
)),
};

lnd.funding_state_step(transition_msg)
.await
.expect("failed to execute funding state step");
}

// Reset transaction state to be non-finalized
let psbt = PartiallySignedTransaction::from_unsigned_tx(psbt.unsigned_tx.clone())
.expect("resetting tx failed");
let mut psbt_bytes = Vec::new();
eprintln!("PSBT that will be returned: {:#?}", psbt);
psbt.consensus_encode(&mut psbt_bytes).unwrap();
let psbt_bytes = base64::encode(psbt_bytes);

Ok(Response::new(Body::from(psbt_bytes)))
Ok(Response::new(Body::from(proposal_psbt)))
}

(&Method::POST, "/pj/schedule") => {
let bytes = hyper::body::to_bytes(req.into_body()).await?;
let request =
serde_json::from_slice::<ScheduledPayJoin>(&bytes).expect("invalid request");
request.test_connections(&handler.client).await;
let address = handler
.client
.get_new_bech32_address()
.await
.expect("lnd returned a bech32 address");

let address = scheduler.schedule_payjoin(&request).await.unwrap();
let total_amount = request.total_amount();
handler.payjoins.insert(&address, request).expect("address reuse");

// TODO: Don't hardcode pj endpoint
// * Optional cli flag or ENV for pj endpoint (in the case of port forwarding), otherwise
// we should determine the bip21 string using `api::ServeOptions`
let uri = format!(
"bitcoin:{}?amount={}&pj=https://example.com/pj",
"bitcoin:{}?amount={}&pj=https://localhost:3010/pj",
address,
total_amount.to_string_in(bitcoin::Denomination::Bitcoin)
);
Expand Down
Loading

0 comments on commit 0324eaa

Please sign in to comment.