Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement dht pooled db connection #3596

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions base_layer/core/tests/base_node_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
use std::{convert::TryFrom, sync::Arc, time::Duration};

use randomx_rs::RandomXFlag;
use tari_crypto::tari_utilities::epoch_time::EpochTime;
use tempfile::{tempdir, TempDir};

use tari_common::configuration::Network;
Expand Down Expand Up @@ -303,8 +304,7 @@ async fn test_get_height_at_time() {
let (_, service, base_node, request_mock, consensus_manager, block0, _utxo0, _temp_dir) = setup().await;

let mut prev_block = block0.clone();
let mut times = Vec::new();
times.push(prev_block.header().timestamp);
let mut times: Vec<EpochTime> = vec![prev_block.header().timestamp];
for _ in 0..10 {
tokio::time::sleep(Duration::from_secs(2)).await;
let new_block = base_node
Expand Down
4 changes: 2 additions & 2 deletions base_layer/key_manager/src/mnemonic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ mod test {
"abandon".to_string(),
"tipico".to_string(),
];
assert_eq!(MnemonicLanguage::detect_language(&words2).is_err(), true);
assert!(MnemonicLanguage::detect_language(&words2).is_err());

// bounds check (last word is invalid)
let words3 = vec![
Expand All @@ -360,7 +360,7 @@ mod test {
"abandon".to_string(),
"topazio".to_string(),
];
assert_eq!(MnemonicLanguage::detect_language(&words3).is_err(), true);
assert!(MnemonicLanguage::detect_language(&words3).is_err());

// building up a word list: English/French + French -> French
let mut words = Vec::with_capacity(3);
Expand Down
4 changes: 2 additions & 2 deletions base_layer/wallet/src/contacts_service/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use diesel::result::Error as DieselError;
use tari_service_framework::reply_channel::TransportChannelError;
use thiserror::Error;

#[derive(Debug, Error, PartialEq)]
#[derive(Debug, Error)]
#[allow(clippy::large_enum_variant)]
pub enum ContactsServiceError {
#[error("Contact is not found")]
Expand All @@ -38,7 +38,7 @@ pub enum ContactsServiceError {
TransportChannelError(#[from] TransportChannelError),
}

#[derive(Debug, Error, PartialEq)]
#[derive(Debug, Error)]
pub enum ContactsServiceStorageError {
#[error("This write operation is not supported for provided DbKey")]
OperationNotSupported,
Expand Down
6 changes: 0 additions & 6 deletions base_layer/wallet/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,3 @@ impl From<WalletStorageError> for ExitCodes {
}
}
}

impl PartialEq for WalletStorageError {
fn eq(&self, other: &Self) -> bool {
self == other
}
}
2 changes: 1 addition & 1 deletion base_layer/wallet/src/output_manager_service/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ pub enum OutputManagerError {
InvalidMessageError(String),
}

#[derive(Debug, Error, PartialEq)]
#[derive(Debug, Error)]
pub enum OutputManagerStorageError {
#[error("Tried to insert an output that already exists in the database")]
DuplicateOutput,
Expand Down
27 changes: 15 additions & 12 deletions base_layer/wallet/tests/contacts_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,21 @@ pub fn test_contacts_service() {
let (_secret_key, public_key) = PublicKey::random_keypair(&mut OsRng);

let contact = runtime.block_on(contacts_service.get_contact(public_key.clone()));
assert_eq!(
contact,
Err(ContactsServiceError::ContactsServiceStorageError(
ContactsServiceStorageError::ValueNotFound(DbKey::Contact(public_key.clone()))
))
);
assert_eq!(
runtime.block_on(contacts_service.remove_contact(public_key.clone())),
Err(ContactsServiceError::ContactsServiceStorageError(
ContactsServiceStorageError::ValueNotFound(DbKey::Contact(public_key))
))
);
match contact {
Ok(_) => panic!("There should be an error here"),
Err(ContactsServiceError::ContactsServiceStorageError(ContactsServiceStorageError::ValueNotFound(val))) => {
assert_eq!(val, DbKey::Contact(public_key.clone()))
},
_ => panic!("There should be a specific error here"),
}
let result = runtime.block_on(contacts_service.remove_contact(public_key.clone()));
match result {
Ok(_) => panic!("There should be an error here"),
Err(ContactsServiceError::ContactsServiceStorageError(ContactsServiceStorageError::ValueNotFound(val))) => {
assert_eq!(val, DbKey::Contact(public_key))
},
_ => panic!("There should be a specific error here"),
}

let _ = runtime
.block_on(contacts_service.remove_contact(contacts[0].public_key.clone()))
Expand Down
1 change: 0 additions & 1 deletion base_layer/wallet/tests/wallet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ use tari_wallet::{
handle::TransactionEvent,
storage::sqlite_db::TransactionServiceSqliteDatabase,
},
utxo_scanner_service::utxo_scanning::UtxoScannerService,
Wallet,
WalletConfig,
WalletSqlite,
Expand Down
6 changes: 0 additions & 6 deletions common_sqlite/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,3 @@ pub enum SqliteStorageError {
#[error("Diesel R2d2 error")]
DieselR2d2Error(String),
}

impl PartialEq for SqliteStorageError {
fn eq(&self, other: &Self) -> bool {
self == other
}
}
1 change: 1 addition & 0 deletions comms/dht/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ tari_crypto = { git = "https://github.com/tari-project/tari-crypto.git", branch
tari_utilities = { version = "^0.3" }
tari_shutdown = { version = "^0.21", path = "../../infrastructure/shutdown" }
tari_storage = { version = "^0.21", path = "../../infrastructure/storage" }
tari_common_sqlite = { path = "../../common_sqlite" }

anyhow = "1.0.32"
bitflags = "1.2.0"
Expand Down
26 changes: 11 additions & 15 deletions comms/dht/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,6 @@ impl DhtActor {
let offline_ts = self
.database
.get_metadata_value::<DateTime<Utc>>(DhtMetadataKey::OfflineTimestamp)
.await
.ok()
.flatten();
info!(
Expand Down Expand Up @@ -284,25 +283,24 @@ impl DhtActor {
},

_ = dedup_cache_trim_ticker.tick() => {
if let Err(err) = self.msg_hash_dedup_cache.trim_entries().await {
if let Err(err) = self.msg_hash_dedup_cache.trim_entries() {
error!(target: LOG_TARGET, "Error when trimming message dedup cache: {:?}", err);
}
},

_ = self.shutdown_signal.wait() => {
info!(target: LOG_TARGET, "DhtActor is shutting down because it received a shutdown signal.");
self.mark_shutdown_time().await;
self.mark_shutdown_time();
break Ok(());
},
}
}
}

async fn mark_shutdown_time(&self) {
fn mark_shutdown_time(&self) {
if let Err(err) = self
.database
.set_metadata_value(DhtMetadataKey::OfflineTimestamp, Utc::now())
.await
{
warn!(target: LOG_TARGET, "Failed to mark offline time: {:?}", err);
}
Expand All @@ -323,7 +321,7 @@ impl DhtActor {
} => {
let msg_hash_cache = self.msg_hash_dedup_cache.clone();
Box::pin(async move {
match msg_hash_cache.add_body_hash(message_hash, received_from).await {
match msg_hash_cache.add_body_hash(message_hash, received_from) {
Ok(hit_count) => {
let _ = reply_tx.send(hit_count);
},
Expand All @@ -341,7 +339,7 @@ impl DhtActor {
GetMsgHashHitCount(hash, reply_tx) => {
let msg_hash_cache = self.msg_hash_dedup_cache.clone();
Box::pin(async move {
let hit_count = msg_hash_cache.get_hit_count(hash).await?;
let hit_count = msg_hash_cache.get_hit_count(hash)?;
let _ = reply_tx.send(hit_count);
Ok(())
})
Expand All @@ -366,14 +364,14 @@ impl DhtActor {
GetMetadata(key, reply_tx) => {
let db = self.database.clone();
Box::pin(async move {
let _ = reply_tx.send(db.get_metadata_value_bytes(key).await.map_err(Into::into));
let _ = reply_tx.send(db.get_metadata_value_bytes(key).map_err(Into::into));
Ok(())
})
},
SetMetadata(key, value, reply_tx) => {
let db = self.database.clone();
Box::pin(async move {
match db.set_metadata_value_bytes(key, value).await {
match db.set_metadata_value_bytes(key, value) {
Ok(_) => {
debug!(target: LOG_TARGET, "Dht metadata '{}' set", key);
let _ = reply_tx.send(Ok(()));
Expand Down Expand Up @@ -727,8 +725,8 @@ mod test {
use tari_test_utils::random;

async fn db_connection() -> DbConnection {
let conn = DbConnection::connect_memory(random::string(8)).await.unwrap();
conn.migrate().await.unwrap();
let conn = DbConnection::connect_memory(random::string(8)).unwrap();
conn.migrate().unwrap();
conn
}

Expand Down Expand Up @@ -838,7 +836,6 @@ mod test {
let num_hits = actor
.msg_hash_dedup_cache
.add_body_hash(key.clone(), CommsPublicKey::default())
.await
.unwrap();
assert_eq!(num_hits, 1);
}
Expand All @@ -847,15 +844,14 @@ mod test {
let num_hits = actor
.msg_hash_dedup_cache
.add_body_hash(key.clone(), CommsPublicKey::default())
.await
.unwrap();
assert_eq!(num_hits, 2);
}

let dedup_cache_db = actor.msg_hash_dedup_cache.clone();
// The cleanup ticker starts when the actor is spawned; the first cleanup event will fire fairly soon after the
// task is running on a thread. To remove this race condition, we trim the cache in the test.
let num_trimmed = dedup_cache_db.trim_entries().await.unwrap();
let num_trimmed = dedup_cache_db.trim_entries().unwrap();
assert_eq!(num_trimmed, 10);
actor.spawn();

Expand All @@ -877,7 +873,7 @@ mod test {
}

// Trim the database of excess entries
dedup_cache_db.trim_entries().await.unwrap();
dedup_cache_db.trim_entries().unwrap();

// Verify that the last half of the signatures have been removed and can be re-inserted into cache
for key in signatures.iter().take(capacity * 2).skip(capacity) {
Expand Down
Loading