Skip to content

Commit

Permalink
fix sync worker err loop (#1378)
Browse files Browse the repository at this point in the history
  • Loading branch information
insipx authored Dec 5, 2024
1 parent 76114b2 commit 3103e7b
Show file tree
Hide file tree
Showing 8 changed files with 352 additions and 216 deletions.
15 changes: 4 additions & 11 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,23 +440,16 @@ impl FfiXmtpClient {
.register_identity(signature_request.clone())
.await?;

self.maybe_start_sync_worker().await?;
self.maybe_start_sync_worker();

Ok(())
}

/// Starts the sync worker if the history sync url is present.
async fn maybe_start_sync_worker(&self) -> Result<(), GenericError> {
if self.inner_client.history_sync_url().is_none() {
return Ok(());
fn maybe_start_sync_worker(&self) {
if self.inner_client.history_sync_url().is_some() {
self.inner_client.start_sync_worker();
}

self.inner_client
.start_sync_worker()
.await
.map_err(GenericError::from_error)?;

Ok(())
}

pub async fn send_sync_request(&self, kind: FfiDeviceSyncKind) -> Result<(), GenericError> {
Expand Down
2 changes: 1 addition & 1 deletion examples/cli/cli-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ async fn main() -> color_eyre::eyre::Result<()> {
let conn = client.store().conn().unwrap();
let provider = client.mls_provider().unwrap();
client.sync_welcomes(&conn).await.unwrap();
client.start_sync_worker().await.unwrap();
client.start_sync_worker();
client
.send_sync_request(&provider, DeviceSyncKind::MessageHistory)
.await
Expand Down
8 changes: 4 additions & 4 deletions xmtp_mls/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ harness = false
name = "identity"
required-features = ["bench"]

[[bench]]
harness = false
name = "sync"
required-features = ["bench"]
#[[bench]]
#harness = false
#name = "sync"
#required-features = ["bench"]

102 changes: 48 additions & 54 deletions xmtp_mls/benches/sync.rs
Original file line number Diff line number Diff line change
@@ -1,55 +1,49 @@
//! Benchmarking for syncing functions
use crate::tracing::Instrument;
use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
use tokio::runtime::{Builder, Runtime};
use xmtp_mls::utils::bench::{bench_async_setup, BENCH_ROOT_SPAN};
use xmtp_mls::utils::bench::{clients, init_logging};
// //! Benchmarking for syncing functions
// use crate::tracing::Instrument;
// use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
// use tokio::runtime::{Builder, Runtime};
// use xmtp_mls::utils::bench::{bench_async_setup, BENCH_ROOT_SPAN};
// use xmtp_mls::utils::bench::{clients, init_logging};
//
// #[macro_use]
// extern crate tracing;
//
// fn setup() -> Runtime {
// Builder::new_multi_thread()
// .enable_time()
// .enable_io()
// .thread_name("xmtp-bencher")
// .build()
// .unwrap()
// }
//
// fn start_sync_worker(c: &mut Criterion) {
// init_logging();
//
// let runtime = setup();
// let mut benchmark_group = c.benchmark_group("start_sync_worker");
// benchmark_group.sample_size(10);
// benchmark_group.bench_function("start_sync_worker", |b| {
// let span = trace_span!(BENCH_ROOT_SPAN);
// b.to_async(&runtime).iter_batched(
// || {
// bench_async_setup(|| async {
// let client = clients::new_client(true).await;
// // set history sync URL
// (client, span.clone())
// })
// },
// |(client, span)| async move { client.start_sync_worker().instrument(span) },
// BatchSize::SmallInput,
// )
// });
//
// benchmark_group.finish();
// }

#[macro_use]
extern crate tracing;

fn setup() -> Runtime {
Builder::new_multi_thread()
.enable_time()
.enable_io()
.thread_name("xmtp-bencher")
.build()
.unwrap()
}

fn start_sync_worker(c: &mut Criterion) {
init_logging();

let runtime = setup();
let mut benchmark_group = c.benchmark_group("start_sync_worker");
benchmark_group.sample_size(10);
benchmark_group.bench_function("start_sync_worker", |b| {
let span = trace_span!(BENCH_ROOT_SPAN);
b.to_async(&runtime).iter_batched(
|| {
bench_async_setup(|| async {
let client = clients::new_client(true).await;
// set history sync URL
(client, span.clone())
})
},
|(client, span)| async move {
client
.start_sync_worker()
.instrument(span)
.await
.unwrap()
},
BatchSize::SmallInput,
)
});

benchmark_group.finish();
}

criterion_group!(
name = sync;
config = Criterion::default().sample_size(10);
targets = start_sync_worker
);
criterion_main!(sync);
// criterion_group!(
// name = sync;
// config = Criterion::default().sample_size(10);
// targets = start_sync_worker
// );
// criterion_main!(sync);
26 changes: 20 additions & 6 deletions xmtp_mls/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ use crate::{
mutex_registry::MutexRegistry,
retry::Retry,
retry_async, retryable,
storage::wallet_addresses::WalletEntry,
storage::{
consent_record::{ConsentState, ConsentType, StoredConsentRecord},
db_connection::DbConnection,
group::{GroupMembershipState, GroupQueryArgs, StoredGroup},
group_message::StoredGroupMessage,
refresh_state::EntityKind,
wallet_addresses::WalletEntry,
EncryptedMessageStore, StorageError,
},
subscriptions::{LocalEventError, LocalEvents},
Expand Down Expand Up @@ -253,6 +253,25 @@ where
}
}

impl<ApiClient, V> Client<ApiClient, V>
where
ApiClient: XmtpApi + Send + Sync + 'static,
V: SmartContractSignatureVerifier + Send + Sync + 'static,
{
/// Reconnect to the client's database if it has previously been released
pub fn reconnect_db(&self) -> Result<(), ClientError> {
self.context.store.reconnect()?;
// restart all the workers
// TODO: The only worker we have right now are the
// sync workers. if we have other workers we
// should create a better way to track them.
if self.history_sync_url.is_some() {
self.start_sync_worker();
}
Ok(())
}
}

impl<ApiClient, V> Client<ApiClient, V>
where
ApiClient: XmtpApi,
Expand Down Expand Up @@ -467,11 +486,6 @@ where
Ok(())
}

/// Reconnect to the client's database if it has previously been released
pub fn reconnect_db(&self) -> Result<(), ClientError> {
self.context.store.reconnect()?;
Ok(())
}
/// Get a reference to the client's identity struct
pub fn identity(&self) -> &Identity {
&self.context.identity
Expand Down
Loading

0 comments on commit 3103e7b

Please sign in to comment.