Skip to content

Commit

Permalink
Replace Send/Receive Stores with a Database table
Browse files Browse the repository at this point in the history
  • Loading branch information
DanGould committed Jun 4, 2024
1 parent 4df2d7a commit 134cdf7
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 102 deletions.
2 changes: 1 addition & 1 deletion payjoin-cli/src/app/v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ impl App {

// Receive Check 4: have we seen this input before? More of a check for non-interactive i.e. payment processor receivers.
let payjoin = proposal.check_no_inputs_seen_before(|input| {
Ok(!self.db.insert_input_seen_before(*input).map_err(|e| Error::Server(e.into()))?)
Ok(self.db.insert_input_seen_before(*input).map_err(|e| Error::Server(e.into()))?)
})?;
log::trace!("check4");

Expand Down
114 changes: 13 additions & 101 deletions payjoin-cli/src/app/v2.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
use std::fs::OpenOptions;
use std::str::FromStr;
use std::sync::Arc;

use anyhow::{anyhow, Context, Result};
use bitcoincore_rpc::jsonrpc::serde_json;
use bitcoincore_rpc::RpcApi;
use payjoin::bitcoin::consensus::encode::serialize_hex;
use payjoin::bitcoin::psbt::Psbt;
use payjoin::bitcoin::Amount;
use payjoin::{base64, bitcoin, Error, PjUriBuilder};
use tokio::sync::Mutex as AsyncMutex;

use super::config::AppConfig;
use super::App as AppTrait;
Expand All @@ -18,18 +14,14 @@ use crate::db::Database;

pub(crate) struct App {
config: AppConfig,
receive_store: Arc<AsyncMutex<ReceiveStore>>,
send_store: Arc<AsyncMutex<SendStore>>,
db: Database,
}

#[async_trait::async_trait]
impl AppTrait for App {
fn new(config: AppConfig) -> Result<Self> {
let db = Database::create()?;
let receive_store = Arc::new(AsyncMutex::new(ReceiveStore::new()?));
let send_store = Arc::new(AsyncMutex::new(SendStore::new()?));
let app = Self { config, receive_store, send_store, db };
let app = Self { config, db };
app.bitcoind()?
.get_blockchain_info()
.context("Failed to connect to bitcoind. Check config RPC connection.")?;
Expand All @@ -54,21 +46,19 @@ impl AppTrait for App {
}

async fn send_payjoin(&self, bip21: &str, fee_rate: &f32, is_retry: bool) -> Result<()> {
let mut session = self.send_store.lock().await;
let req_ctx = if is_retry {
let mut req_ctx = if is_retry {
log::debug!("Resuming session");
// Get a reference to RequestContext
session.req_ctx.as_mut().expect("RequestContext is missing")
self.db.get_send_session()?.ok_or(anyhow!("No session found"))?
} else {
let req_ctx = self.create_pj_request(bip21, fee_rate)?;
session.write(req_ctx)?;
log::debug!("Writing req_ctx");
session.req_ctx.as_mut().expect("RequestContext is missing")
let mut req_ctx = self.create_pj_request(bip21, fee_rate)?;
self.db.insert_send_session(&mut req_ctx)?;
req_ctx
};
log::debug!("Awaiting response");
let res = self.long_poll_post(req_ctx).await?;
let res = self.long_poll_post(&mut req_ctx).await?;
self.process_pj_response(res)?;
session.clear()?;
self.db.clear_send_session()?;
Ok(())
}

Expand Down Expand Up @@ -97,12 +87,12 @@ impl AppTrait for App {
let enrolled = enroller
.process_res(ohttp_response.bytes().await?.to_vec().as_slice(), ctx)
.map_err(|_| anyhow!("Enrollment failed"))?;
self.receive_store.lock().await.write(enrolled.clone())?;
self.db.insert_recv_session(enrolled.clone())?;
enrolled
} else {
let session = self.receive_store.lock().await;
let session = self.db.get_recv_session()?;
println!("Resuming Payjoin session"); // TODO include session pubkey / payjoin directory
session.session.clone().ok_or(anyhow!("No session found"))?
session.ok_or(anyhow!("No session found"))?
};

println!("Receive session established");
Expand Down Expand Up @@ -137,7 +127,7 @@ impl AppTrait for App {
"Response successful. Watch mempool for successful Payjoin. TXID: {}",
payjoin_psbt.extract_tx().clone().txid()
);
self.receive_store.lock().await.clear()?;
self.db.clear_recv_session()?;
Ok(())
}
}
Expand Down Expand Up @@ -267,7 +257,7 @@ impl App {

// Receive Check 4: have we seen this input before? More of a check for non-interactive i.e. payment processor receivers.
let payjoin = proposal.check_no_inputs_seen_before(|input| {
Ok(!self.db.insert_input_seen_before(*input).map_err(|e| Error::Server(e.into()))?)
Ok(self.db.insert_input_seen_before(*input).map_err(|e| Error::Server(e.into()))?)
})?;
log::trace!("check4");

Expand Down Expand Up @@ -339,81 +329,3 @@ fn map_reqwest_err(e: reqwest::Error) -> anyhow::Error {
None => anyhow!("No HTTP response: {}", e),
}
}

struct SendStore {
req_ctx: Option<payjoin::send::RequestContext>,
file: std::fs::File,
}

impl SendStore {
fn new() -> Result<Self> {
let mut file =
OpenOptions::new().write(true).read(true).create(true).open("send_store.json")?;
let session = match serde_json::from_reader(&mut file) {
Ok(session) => Some(session),
Err(e) => {
log::debug!("error reading send session store: {}", e);
None
}
};

Ok(Self { req_ctx: session, file })
}

fn write(
&mut self,
session: payjoin::send::RequestContext,
) -> Result<&mut payjoin::send::RequestContext> {
use std::io::Write;

let session = self.req_ctx.insert(session);
let serialized = serde_json::to_string(session)?;
self.file.write_all(serialized.as_bytes())?;
Ok(session)
}

fn clear(&mut self) -> Result<()> {
let file = OpenOptions::new().write(true).open("send_store.json")?;
file.set_len(0)?;
Ok(())
}
}

struct ReceiveStore {
session: Option<payjoin::receive::v2::Enrolled>,
file: std::fs::File,
}

impl ReceiveStore {
fn new() -> Result<Self> {
let mut file =
OpenOptions::new().write(true).read(true).create(true).open("receive_store.json")?;
let session = match serde_json::from_reader(&mut file) {
Ok(session) => Some(session),
Err(e) => {
log::debug!("error reading receive session store: {}", e);
None
}
};

Ok(Self { session, file })
}

fn write(
&mut self,
session: payjoin::receive::v2::Enrolled,
) -> Result<&mut payjoin::receive::v2::Enrolled> {
use std::io::Write;

let session = self.session.insert(session);
let serialized = serde_json::to_string(session)?;
self.file.write_all(serialized.as_bytes())?;
Ok(session)
}

fn clear(&mut self) -> Result<()> {
let file = OpenOptions::new().write(true).open("receive_store.json")?;
file.set_len(0)?;
Ok(())
}
}
3 changes: 3 additions & 0 deletions payjoin-cli/src/db.rs → payjoin-cli/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,6 @@ impl Database {
Ok(was_seen_before)
}
}

#[cfg(feature = "v2")]
mod v2;
63 changes: 63 additions & 0 deletions payjoin-cli/src/db/v2.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use bitcoincore_rpc::jsonrpc::serde_json;
use payjoin::receive::v2::Enrolled;
use payjoin::send::RequestContext;
use redb::ReadableTable;

use super::*;

pub const SEND_SESSIONS: TableDefinition<&[u8], &str> = TableDefinition::new("send_sessions");
pub const RECV_SESSIONS: TableDefinition<&[u8], &str> = TableDefinition::new("recv_sessions");

impl Database {
pub fn insert_recv_session(&self, session: Enrolled) -> Result<()> {
let write_txn = self.0.begin_write()?;
let key = &session.public_key().serialize();
let value = serde_json::to_string(&session)?;
write_txn.open_table(RECV_SESSIONS)?.insert(key.as_slice(), value.as_str())?;
write_txn.commit()?;
Ok(())
}

pub fn get_recv_session(&self) -> Result<Option<Enrolled>> {
let read_txn = self.0.begin_read()?;
if let Some((_, session)) = read_txn.open_table(RECV_SESSIONS)?.first()? {
let session: Enrolled = serde_json::from_str(session.value())?;
Ok(Some(session))
} else {
Ok(None)
}
}

pub fn clear_recv_session(&self) -> Result<()> {
let write_txn = self.0.begin_write()?;
let _ = write_txn.open_table(RECV_SESSIONS)?.pop_first()?;
write_txn.commit()?;
Ok(())
}

pub fn insert_send_session(&self, session: &mut RequestContext) -> Result<()> {
let write_txn = self.0.begin_write()?;
let key = &session.public_key().serialize();
let value = serde_json::to_string(session)?;
write_txn.open_table(SEND_SESSIONS)?.insert(key.as_slice(), value.as_str())?;
write_txn.commit()?;
Ok(())
}

pub fn get_send_session(&self) -> Result<Option<RequestContext>> {
let read_txn = self.0.begin_read()?;
if let Some((_, session)) = read_txn.open_table(SEND_SESSIONS)?.first()? {
let session: RequestContext = serde_json::from_str(session.value())?;
Ok(Some(session))
} else {
Ok(None)
}
}

pub fn clear_send_session(&self) -> Result<()> {
let write_txn = self.0.begin_write()?;
let _ = write_txn.open_table(SEND_SESSIONS)?.pop_first()?;
write_txn.commit()?;
Ok(())
}
}
6 changes: 6 additions & 0 deletions payjoin/src/send/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,12 @@ impl RequestContext {
Ok(bitcoin::secp256k1::PublicKey::from_slice(&pubkey_bytes)
.map_err(InternalCreateRequestError::SubdirectoryInvalidPubkey)?)
}

#[cfg(feature = "v2")]
pub fn public_key(&self) -> PublicKey {
let secp = bitcoin::secp256k1::Secp256k1::new();
self.e.public_key(&secp)
}
}

#[cfg(feature = "v2")]
Expand Down

0 comments on commit 134cdf7

Please sign in to comment.