Skip to content

Commit

Permalink
Review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
hansieodendaal committed Nov 22, 2021
1 parent 66a523b commit 0163a2d
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 55 deletions.
59 changes: 24 additions & 35 deletions comms/dht/src/dedup/dedup_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,13 @@ impl DedupCacheDatabase {
pub fn trim_entries(&self) -> Result<usize, StorageError> {
let capacity = self.capacity as i64;
let mut num_removed = 0;
let msg_count = {
let conn = self.connection.get_pooled_connection()?;
dedup_cache::table
.select(dsl::count(dedup_cache::id))
.first::<i64>(&conn)?
};
let conn = self.connection.get_pooled_connection()?;
let msg_count = dedup_cache::table
.select(dsl::count(dedup_cache::id))
.first::<i64>(&conn)?;
// Hysteresis added to minimize database impact
if msg_count > capacity {
let remove_count = msg_count - capacity;
let conn = self.connection.get_pooled_connection()?;
num_removed = diesel::sql_query(
"DELETE FROM dedup_cache WHERE id IN (SELECT id FROM dedup_cache ORDER BY last_hit_at ASC LIMIT $1)",
)
Expand All @@ -110,17 +107,15 @@ impl DedupCacheDatabase {

/// Insert new row into the table or updates an existing row. Returns the number of hits for this body hash.
fn insert_body_hash_or_update_stats(&self, body_hash: String, public_key: String) -> Result<u32, StorageError> {
let insert_result = {
let conn = self.connection.get_pooled_connection()?;
diesel::insert_into(dedup_cache::table)
.values((
dedup_cache::body_hash.eq(&body_hash),
dedup_cache::sender_public_key.eq(&public_key),
dedup_cache::number_of_hits.eq(1),
dedup_cache::last_hit_at.eq(Utc::now().naive_utc()),
))
.execute(&conn)
};
let conn = self.connection.get_pooled_connection()?;
let insert_result = diesel::insert_into(dedup_cache::table)
.values((
dedup_cache::body_hash.eq(&body_hash),
dedup_cache::sender_public_key.eq(&public_key),
dedup_cache::number_of_hits.eq(1),
dedup_cache::last_hit_at.eq(Utc::now().naive_utc()),
))
.execute(&conn);
match insert_result {
Ok(1) => Ok(1),
Ok(n) => Err(StorageError::UnexpectedResult(format!(
Expand All @@ -130,25 +125,19 @@ impl DedupCacheDatabase {
Err(diesel::result::Error::DatabaseError(kind, e_info)) => match kind {
DatabaseErrorKind::UniqueViolation => {
// Update hit stats for the message
{
let conn = self.connection.get_pooled_connection()?;
diesel::update(dedup_cache::table.filter(dedup_cache::body_hash.eq(&body_hash)))
.set((
dedup_cache::sender_public_key.eq(&public_key),
dedup_cache::number_of_hits.eq(dedup_cache::number_of_hits + 1),
dedup_cache::last_hit_at.eq(Utc::now().naive_utc()),
))
.execute(&conn)?;
}
diesel::update(dedup_cache::table.filter(dedup_cache::body_hash.eq(&body_hash)))
.set((
dedup_cache::sender_public_key.eq(&public_key),
dedup_cache::number_of_hits.eq(dedup_cache::number_of_hits + 1),
dedup_cache::last_hit_at.eq(Utc::now().naive_utc()),
))
.execute(&conn)?;
// TODO: Diesel support for RETURNING statements would remove this query, but is not
// TODO: available for Diesel + SQLite yet
let hits = {
let conn = self.connection.get_pooled_connection()?;
dedup_cache::table
.select(dedup_cache::number_of_hits)
.filter(dedup_cache::body_hash.eq(&body_hash))
.get_result::<i32>(&conn)?
};
let hits = dedup_cache::table
.select(dedup_cache::number_of_hits)
.filter(dedup_cache::body_hash.eq(&body_hash))
.get_result::<i32>(&conn)?;

Ok(hits as u32)
},
Expand Down
32 changes: 12 additions & 20 deletions comms/dht/src/store_forward/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,28 +203,20 @@ impl StoreAndForwardDatabase {

pub(crate) fn truncate_messages(&self, max_size: usize) -> Result<usize, StorageError> {
let mut num_removed = 0;
let msg_count = {
let conn = self.connection.get_pooled_connection()?;
stored_messages::table
.select(dsl::count(stored_messages::id))
.first::<i64>(&conn)? as usize
};
let conn = self.connection.get_pooled_connection()?;
let msg_count = stored_messages::table
.select(dsl::count(stored_messages::id))
.first::<i64>(&conn)? as usize;
if msg_count > max_size {
let remove_count = msg_count - max_size;
let message_ids: Vec<i32> = {
let conn = self.connection.get_pooled_connection()?;
stored_messages::table
.select(stored_messages::id)
.order_by(stored_messages::stored_at.asc())
.limit(remove_count as i64)
.get_results(&conn)?
};
num_removed = {
let conn = self.connection.get_pooled_connection()?;
diesel::delete(stored_messages::table)
.filter(stored_messages::id.eq_any(message_ids))
.execute(&conn)?
};
let message_ids: Vec<i32> = stored_messages::table
.select(stored_messages::id)
.order_by(stored_messages::stored_at.asc())
.limit(remove_count as i64)
.get_results(&conn)?;
num_removed = diesel::delete(stored_messages::table)
.filter(stored_messages::id.eq_any(message_ids))
.execute(&conn)?;
}
Ok(num_removed)
}
Expand Down

0 comments on commit 0163a2d

Please sign in to comment.