Skip to content

Commit

Permalink
fix node bindings
Browse files Browse the repository at this point in the history
  • Loading branch information
insipx committed Jul 1, 2024
1 parent a641d2f commit e669a0f
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 42 deletions.
2 changes: 2 additions & 0 deletions bindings_node/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bindings_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ xmtp_mls = { path = "../xmtp_mls", features = ["grpc", "native"] }
xmtp_proto = { path = "../xmtp_proto", features = ["proto_full"] }
xmtp_id = { path = "../xmtp_id" }
rand = "0.8.5"
log = { version = "0.4", features = ["release_max_level_debug"] }

[build-dependencies]
napi-build = "2.0.1"
Expand Down
19 changes: 5 additions & 14 deletions bindings_node/src/conversations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,31 +196,22 @@ impl NapiConversations {
ThreadsafeFunctionCallMode::Blocking,
);
},
|| {}, // on_close_callback
)
.map_err(|e| Error::from_reason(format!("{}", e)))?;
);

Ok(NapiStreamCloser::new(
stream_closer.close_fn,
stream_closer.is_closed_atomic,
))
Ok(NapiStreamCloser::new(stream_closer))
}

#[napi(ts_args_type = "callback: (err: null | Error, result: NapiMessage) => void")]
pub fn stream_all_messages(&self, callback: JsFunction) -> Result<NapiStreamCloser> {
let tsfn: ThreadsafeFunction<NapiMessage, ErrorStrategy::CalleeHandled> =
callback.create_threadsafe_function(0, |ctx| Ok(vec![ctx.value]))?;
let stream_closer = RustXmtpClient::stream_all_messages_with_callback_sync(
let stream_closer = RustXmtpClient::stream_all_messages_with_callback(
self.inner_client.clone(),
move |message| {
tsfn.call(Ok(message.into()), ThreadsafeFunctionCallMode::Blocking);
},
)
.map_err(|e| Error::from_reason(format!("{}", e)))?;
);

Ok(NapiStreamCloser::new(
stream_closer.close_fn,
stream_closer.is_closed_atomic,
))
Ok(NapiStreamCloser::new(stream_closer))
}
}
8 changes: 2 additions & 6 deletions bindings_node/src/groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -511,13 +511,9 @@ impl NapiGroup {
move |message| {
tsfn.call(Ok(message.into()), ThreadsafeFunctionCallMode::Blocking);
},
)
.map_err(|e| Error::from_reason(format!("{}", e)))?;
);

Ok(NapiStreamCloser::new(
stream_closer.close_fn,
stream_closer.is_closed_atomic,
))
Ok(stream_closer.into())
}

#[napi]
Expand Down
74 changes: 52 additions & 22 deletions bindings_node/src/streams.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,65 @@
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
};
use tokio::sync::oneshot::Sender;
use std::sync::Arc;
use tokio::{sync::Mutex, task::{JoinHandle, AbortHandle}};
use xmtp_mls::client::ClientError;
use napi::bindgen_prelude::Error;

use napi_derive::napi;

#[napi]
pub struct NapiStreamCloser {
close_fn: Arc<Mutex<Option<Sender<()>>>>,
is_closed_atomic: Arc<AtomicBool>,
handle: Arc<Mutex<Option<JoinHandle<Result<(), ClientError>>>>>,
// for convenience, does not require locking mutex.
abort_handle: Arc<AbortHandle>,
}

#[napi]
impl NapiStreamCloser {
pub fn new(close_fn: Arc<Mutex<Option<Sender<()>>>>, is_closed_atomic: Arc<AtomicBool>) -> Self {
Self {
close_fn,
is_closed_atomic,
pub fn new(handle: JoinHandle<Result<(), ClientError>>) -> Self {
Self {
abort_handle: Arc::new(handle.abort_handle()),
handle: Arc::new(Mutex::new(Some(handle))),
}
}
}
}

impl From<JoinHandle<Result<(), ClientError>>> for NapiStreamCloser {
fn from(handle: JoinHandle<Result<(), ClientError>>) -> Self {
NapiStreamCloser::new(handle)
}
}

#[napi]
pub fn end(&self) {
if let Ok(mut close_fn_option) = self.close_fn.lock() {
let _ = close_fn_option.take().map(|close_fn| close_fn.send(()));
#[napi]
impl NapiStreamCloser {
/// Signal the stream to end
/// Does not wait for the stream to end.
pub fn end(&self) {
self.abort_handle.abort();
}
}

#[napi]
pub fn is_closed(&self) -> bool {
self.is_closed_atomic.load(Ordering::Relaxed)
}
/// End the stream and `await` for it to shutdown
/// Returns the `Result` of the task.
pub async fn end_and_wait(&self) -> Result<(), Error> {
if self.abort_handle.is_finished() {
return Ok(());
}

let mut handle = self.handle.lock().await;
let handle = handle.take();
if let Some(h) = handle {
h.abort();
let join_result = h.await;
if matches!(join_result, Err(ref e) if !e.is_cancelled()) {
return Err(Error::from_reason(
format!("subscription event loop join error {}", join_result.unwrap_err())
));
}
} else {
log::warn!("subscription already closed");
}
Ok(())
}

/// Checks if this stream is closed
pub fn is_closed(&self) -> bool {
self.abort_handle.is_finished()
}
}

0 comments on commit e669a0f

Please sign in to comment.