Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
tnull committed Aug 13, 2024
1 parent 99bc97a commit 0d09eee
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 1 deletion.
38 changes: 38 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1669,6 +1669,44 @@ impl Node {
pub fn verify_signature(&self, msg: &[u8], sig: &str, pkey: &PublicKey) -> bool {
self.keys_manager.verify_signature(msg, sig, pkey)
}

/// TODO
pub fn lsps1_request_opening_parms(&self) -> Result<(), Error> {
let liquidity_source =
self.liquidity_source.as_ref().ok_or(Error::LiquiditySourceUnavailable)?;

let (node_id, address) = liquidity_source
.get_lsps1_service_details()
.ok_or(Error::LiquiditySourceUnavailable)?;

let rt_lock = self.runtime.read().unwrap();
let runtime = rt_lock.as_ref().unwrap();

let peer_info = PeerInfo { node_id, address };

let con_node_id = peer_info.node_id;
let con_addr = peer_info.address.clone();
let con_cm = Arc::clone(&self.connection_manager);

// We need to use our main runtime here as a local runtime might not be around to poll
// connection futures going forward.
tokio::task::block_in_place(move || {
runtime.block_on(async move {
con_cm.connect_peer_if_necessary(con_node_id, con_addr).await
})
})?;

log_info!(self.logger, "Connected to LSP {}@{}. ", peer_info.node_id, peer_info.address);

let liquidity_source = Arc::clone(&liquidity_source);
let response = tokio::task::block_in_place(move || {
runtime.block_on(async move { liquidity_source.lsps1_request_opening_params().await })
})?;

println!("RESPONSE: {:?}", response);

Ok(())
}
}

impl Drop for Node {
Expand Down
101 changes: 100 additions & 1 deletion src/liquidity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use lightning_invoice::{Bolt11Invoice, InvoiceBuilder, RoutingFees};
use lightning_liquidity::events::Event;
use lightning_liquidity::lsps0::ser::RequestId;
use lightning_liquidity::lsps1::client::LSPS1ClientConfig;
use lightning_liquidity::lsps1::event::LSPS1ClientEvent;
use lightning_liquidity::lsps1::msgs::LSPS1Options;
use lightning_liquidity::lsps2::client::LSPS2ClientConfig;
use lightning_liquidity::lsps2::event::LSPS2ClientEvent;
use lightning_liquidity::lsps2::msgs::OpeningFeeParams;
Expand All @@ -32,6 +34,8 @@ struct LSPS1Service {
address: SocketAddress,
token: Option<String>,
client_config: LSPS1ClientConfig,
pending_opening_params_requests:
Mutex<HashMap<RequestId, oneshot::Sender<LSPS1OpeningParamsResponse>>>,
}

struct LSPS2Service {
Expand Down Expand Up @@ -82,7 +86,14 @@ where
) -> &mut Self {
// TODO: allow to set max_channel_fees_msat
let client_config = LSPS1ClientConfig { max_channel_fees_msat: None };
self.lsps1_service = Some(LSPS1Service { node_id, address, token, client_config });
let pending_opening_params_requests = Mutex::new(HashMap::new());
self.lsps1_service = Some(LSPS1Service {
node_id,
address,
token,
client_config,
pending_opening_params_requests,
});
self
}

Expand Down Expand Up @@ -166,6 +177,59 @@ where

pub(crate) async fn handle_next_event(&self) {
match self.liquidity_manager.next_event_async().await {
Event::LSPS1Client(LSPS1ClientEvent::SupportedOptionsReady {
request_id,
counterparty_node_id,
supported_options,
}) => {
if let Some(lsps1_service) = self.lsps1_service.as_ref() {
if counterparty_node_id != lsps1_service.node_id {
debug_assert!(
false,
"Received response from unexpected LSP counterparty. This should never happen."
);
log_error!(
self.logger,
"Received response from unexpected LSP counterparty. This should never happen."
);
return;
}

if let Some(sender) = lsps1_service
.pending_opening_params_requests
.lock()
.unwrap()
.remove(&request_id)
{
let response = LSPS1OpeningParamsResponse { supported_options };

match sender.send(response) {
Ok(()) => (),
Err(e) => {
log_error!(
self.logger,
"Failed to handle response from liquidity service: {:?}",
e
);
},
}
} else {
debug_assert!(
false,
"Received response from liquidity service for unknown request."
);
log_error!(
self.logger,
"Received response from liquidity service for unknown request."
);
}
} else {
log_error!(
self.logger,
"Received unexpected LSPS1Client::SupportedOptionsReady event!"
);
}
},
Event::LSPS2Client(LSPS2ClientEvent::OpeningParametersReady {
request_id,
counterparty_node_id,
Expand Down Expand Up @@ -274,6 +338,36 @@ where
}
}

pub(crate) async fn lsps1_request_opening_params(
&self,
) -> Result<LSPS1OpeningParamsResponse, Error> {
let lsps1_service = self.lsps1_service.as_ref().ok_or(Error::LiquiditySourceUnavailable)?;

let client_handler = self.liquidity_manager.lsps1_client_handler().ok_or_else(|| {
log_error!(self.logger, "LSPS1 liquidity client was not configured.",);
Error::LiquiditySourceUnavailable
})?;

let (request_sender, request_receiver) = oneshot::channel();
{
let mut pending_opening_params_requests_lock =
lsps1_service.pending_opening_params_requests.lock().unwrap();
let request_id = client_handler.request_supported_options(lsps1_service.node_id);
pending_opening_params_requests_lock.insert(request_id, request_sender);
}

tokio::time::timeout(Duration::from_secs(LIQUIDITY_REQUEST_TIMEOUT_SECS), request_receiver)
.await
.map_err(|e| {
log_error!(self.logger, "Liquidity request timed out: {}", e);
Error::LiquidityRequestFailed
})?
.map_err(|e| {
log_error!(self.logger, "Failed to handle response from liquidity service: {}", e);
Error::LiquidityRequestFailed
})
}

pub(crate) async fn lsps2_receive_to_jit_channel(
&self, amount_msat: u64, description: &str, expiry_secs: u32,
max_total_lsp_fee_limit_msat: Option<u64>,
Expand Down Expand Up @@ -507,6 +601,11 @@ where
}
}

#[derive(Debug, Clone)]
pub(crate) struct LSPS1OpeningParamsResponse {
supported_options: LSPS1Options,
}

#[derive(Debug, Clone)]
pub(crate) struct LSPS2FeeResponse {
opening_fee_params_menu: Vec<OpeningFeeParams>,
Expand Down

0 comments on commit 0d09eee

Please sign in to comment.