Skip to content

Commit

Permalink
[Store and forward] Private message storage and retrieval
Browse files Browse the repository at this point in the history
Storage and retreival of messages without an origin (PR #1686)
All private/anonymous messages are stored as low priority (short TTL)
messages.

- New response type `Anonymous` for store and forward query responses
- Added some more logging
- DHT store and forward integration test
- SAF messages can never be stored in store and forward
  • Loading branch information
sdbondi committed Apr 14, 2020
1 parent a45872d commit 132bd58
Show file tree
Hide file tree
Showing 16 changed files with 315 additions and 60 deletions.
16 changes: 15 additions & 1 deletion comms/dht/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,11 @@ impl<'a> DhtActor<'a> {
},
SendRequestStoredMessages => {
let node_identity = Arc::clone(&self.node_identity);
let peer_manager = Arc::clone(&self.peer_manager);
let outbound_requester = self.outbound_requester.clone();
Box::pin(Self::request_stored_messages(
node_identity,
peer_manager,
outbound_requester,
db,
self.config.num_neighbouring_nodes,
Expand Down Expand Up @@ -359,17 +361,29 @@ impl<'a> DhtActor<'a> {

async fn request_stored_messages(
node_identity: Arc<NodeIdentity>,
peer_manager: Arc<PeerManager>,
mut outbound_requester: OutboundMessageRequester,
db: DhtDatabase,
num_neighbouring_nodes: usize,
) -> Result<(), DhtActorError>
{
let request = db
let mut request = db
.get_value(DhtSettingKey::SafLastRequestTimestamp)
.await?
.map(StoredMessagesRequest::since)
.unwrap_or_else(StoredMessagesRequest::new);

// Calculate the network region threshold for our node id.
// i.e. "Give me all messages that are this close to my node ID"
let threshold = peer_manager
.calc_region_threshold(
node_identity.node_id(),
num_neighbouring_nodes,
PeerFeatures::DHT_STORE_FORWARD,
)
.await?;
request.dist_threshold = threshold.to_vec();

outbound_requester
.send_message_no_header(
SendMessageParams::new()
Expand Down
8 changes: 4 additions & 4 deletions comms/dht/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub const SAF_MSG_CACHE_STORAGE_CAPACITY: usize = 10_000;
/// The default time-to-live duration used for storage of low priority messages by the Store-and-forward middleware
pub const SAF_LOW_PRIORITY_MSG_STORAGE_TTL: Duration = Duration::from_secs(6 * 60 * 60); // 6 hours
/// The default time-to-live duration used for storage of high priority messages by the Store-and-forward middleware
pub const SAF_HIGH_PRIORITY_MSG_STORAGE_TTL: Duration = Duration::from_secs(2 * 24 * 60 * 60); // 2 days
pub const SAF_HIGH_PRIORITY_MSG_STORAGE_TTL: Duration = Duration::from_secs(3 * 24 * 60 * 60); // 3 days
/// The default number of peer nodes that a message has to be closer to, to be considered a neighbour
pub const DEFAULT_NUM_NEIGHBOURING_NODES: usize = 10;

Expand All @@ -55,9 +55,9 @@ pub struct DhtConfig {
/// Default: 6 hours
pub saf_low_priority_msg_storage_ttl: Duration,
/// The time-to-live duration used for storage of high priority messages by the Store-and-forward middleware.
/// Default: 2 days
/// Default: 3 days
pub saf_high_priority_msg_storage_ttl: Duration,
/// The limit on the message size to store in SAF storage in bytes. Default 500kb
/// The limit on the message size to store in SAF storage in bytes. Default 1 MiB
pub saf_max_message_size: usize,
/// The max capacity of the message hash cache
/// Default: 1000
Expand Down Expand Up @@ -112,7 +112,7 @@ impl Default for DhtConfig {
saf_msg_cache_storage_capacity: SAF_MSG_CACHE_STORAGE_CAPACITY,
saf_low_priority_msg_storage_ttl: SAF_LOW_PRIORITY_MSG_STORAGE_TTL,
saf_high_priority_msg_storage_ttl: SAF_HIGH_PRIORITY_MSG_STORAGE_TTL,
saf_max_message_size: 512 * 1024, // 512 kb
saf_max_message_size: 1024 * 1024, // 1 MiB
msg_hash_cache_capacity: 10_000,
msg_hash_cache_ttl: Duration::from_secs(300),
broadcast_cooldown_max_attempts: 3,
Expand Down
4 changes: 4 additions & 0 deletions comms/dht/src/inbound/dedup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use futures::{task::Context, Future};
use log::*;
use std::task::Poll;
use tari_comms::{pipeline::PipelineError, types::Challenge};
use tari_crypto::tari_utilities::hex::Hex;
use tower::{layer::Layer, Service, ServiceExt};

const LOG_TARGET: &str = "comms::dht::dedup";
Expand Down Expand Up @@ -77,6 +78,7 @@ where S: Service<DhtInboundMessage, Response = (), Error = PipelineError>
{
trace!(target: LOG_TARGET, "Checking inbound message cache for duplicates");
let hash = Self::hash_message(&message);
trace!(target: LOG_TARGET, "Inserting message hash {}", hash.to_hex());
if dht_requester
.insert_message_hash(hash)
.await
Expand All @@ -88,6 +90,8 @@ where S: Service<DhtInboundMessage, Response = (), Error = PipelineError>
);
return Ok(());
}

trace!(target: LOG_TARGET, "Passing message onto next service");
next_service.oneshot(message).await
}

Expand Down
2 changes: 1 addition & 1 deletion comms/dht/src/inbound/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl Display for DhtInboundMessage {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> {
write!(
f,
"\n---- DhtInboundMessage ---- \nSize: {} byte(s)\nType: {}\nPeer: {}\nHeader: {}\n----",
"\n---- Inbound Message ---- \nSize: {} byte(s)\nType: {}\nPeer: {}\nHeader: {}\n----",
self.body.len(),
self.dht_header.message_type,
self.source_peer,
Expand Down
1 change: 1 addition & 0 deletions comms/dht/src/outbound/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ where S: Service<DhtOutboundMessage, Response = (), Error = PipelineError>
reply_tx: oneshot::Sender<SendMessageResponse>,
) -> Result<Vec<DhtOutboundMessage>, DhtOutboundError>
{
trace!(target: LOG_TARGET, "Send params: {:?}", params);
if params
.broadcast_strategy
.direct_public_key()
Expand Down
2 changes: 1 addition & 1 deletion comms/dht/src/outbound/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ impl fmt::Display for DhtOutboundMessage {
let header_str = self
.custom_header
.as_ref()
.and_then(|h| Some(format!("{} (Propagated)", h)))
.map(|h| format!("{} (Propagated)", h))
.unwrap_or_else(|| {
format!(
"Network: {:?}, Flags: {:?}, Destination: {}",
Expand Down
19 changes: 11 additions & 8 deletions comms/dht/src/proto/store_forward.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package tari.dht.store_forward;
message StoredMessagesRequest {
google.protobuf.Timestamp since = 1;
uint32 request_id = 2;
bytes dist_threshold = 3;
}

// Storage for a single message envelope, including the date and time when the element was stored
Expand All @@ -27,14 +28,16 @@ message StoredMessagesResponse {
repeated StoredMessage messages = 1;
uint32 request_id = 2;
enum SafResponseType {
// All applicable messages
General = 0;
// Send messages explicitly addressed to the requesting node or within the requesting node's region
ExplicitlyAddressed = 1;
// Send Discovery messages that could be for the requester
Discovery = 2;
// Send Join messages that the requester could be interested in
Join = 3;
// Messages for the requested public key or node ID
ForMe = 0;
// Discovery messages that could be for the requester
Discovery = 1;
// Join messages that the requester could be interested in
Join = 2;
// Messages without an explicit destination and with an unidentified encrypted source
Anonymous = 3;
// Messages within the requesting node's region
InRegion = 4;
}
SafResponseType response_type = 3;
}
20 changes: 12 additions & 8 deletions comms/dht/src/proto/tari.dht.store_forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ pub struct StoredMessagesRequest {
pub since: ::std::option::Option<::prost_types::Timestamp>,
#[prost(uint32, tag = "2")]
pub request_id: u32,
#[prost(bytes, tag = "3")]
pub dist_threshold: std::vec::Vec<u8>,
}
/// Storage for a single message envelope, including the date and time when the element was stored
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -34,13 +36,15 @@ pub mod stored_messages_response {
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum SafResponseType {
/// All applicable messages
General = 0,
/// Send messages explicitly addressed to the requesting node or within the requesting node's region
ExplicitlyAddressed = 1,
/// Send Discovery messages that could be for the requester
Discovery = 2,
/// Send Join messages that the requester could be interested in
Join = 3,
/// Messages for the requested public key
ForMe = 0,
/// Discovery messages that could be for the requester
Discovery = 1,
/// Join messages that the requester could be interested in
Join = 2,
/// Messages without an explicit destination and with an unidentified encrypted source
Anonymous = 3,
/// Messages within the requesting node's region
InRegion = 4,
}
}
100 changes: 95 additions & 5 deletions comms/dht/src/store_forward/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ use crate::{
};
use chrono::{DateTime, NaiveDateTime, Utc};
use diesel::{BoolExpressionMethods, ExpressionMethods, QueryDsl, RunQueryDsl};
use tari_comms::types::CommsPublicKey;
use tari_comms::{
peer_manager::{node_id::NodeDistance, NodeId},
types::CommsPublicKey,
};
use tari_crypto::tari_utilities::hex::Hex;

pub struct StoreAndForwardDatabase {
Expand All @@ -54,19 +57,106 @@ impl StoreAndForwardDatabase {
.await
}

pub async fn find_messages_for_public_key(
pub async fn find_messages_for_peer(
&self,
public_key: &CommsPublicKey,
node_id: &NodeId,
since: Option<DateTime<Utc>>,
limit: i64,
) -> Result<Vec<StoredMessage>, StorageError>
{
let pk_hex = public_key.to_hex();
let node_id_hex = node_id.to_hex();
self.connection
.with_connection_async::<_, Vec<StoredMessage>>(move |conn| {
let mut query = stored_messages::table
.select(stored_messages::all_columns)
.filter(
stored_messages::destination_pubkey
.eq(pk_hex)
.or(stored_messages::destination_node_id.eq(node_id_hex)),
)
.into_boxed();

if let Some(since) = since {
query = query.filter(stored_messages::stored_at.ge(since.naive_utc()));
}

query
.order_by(stored_messages::stored_at.asc())
.limit(limit)
.get_results(conn)
.map_err(Into::into)
})
.await
}

pub async fn find_regional_messages(
&self,
node_id: &NodeId,
dist_threshold: Option<Box<NodeDistance>>,
since: Option<DateTime<Utc>>,
limit: i64,
) -> Result<Vec<StoredMessage>, StorageError>
{
let node_id_hex = node_id.to_hex();
let results = self
.connection
.with_connection_async::<_, Vec<StoredMessage>>(move |conn| {
let mut query = stored_messages::table
.select(stored_messages::all_columns)
.filter(stored_messages::destination_node_id.ne(node_id_hex))
.filter(stored_messages::destination_node_id.is_not_null())
.filter(stored_messages::message_type.eq(DhtMessageType::None as i32))
.into_boxed();

if let Some(since) = since {
query = query.filter(stored_messages::stored_at.ge(since.naive_utc()));
}

query
.order_by(stored_messages::stored_at.asc())
.limit(limit)
.get_results(conn)
.map_err(Into::into)
})
.await?;

match dist_threshold {
Some(dist_threshold) => {
// Filter node ids that are within the distance threshold from the source node id
let results = results
.into_iter()
// TODO: Investigate if we could do this in sqlite using XOR (^)
.filter(|message| match message.destination_node_id {
Some(ref dest_node_id) => match NodeId::from_hex(dest_node_id).ok() {
Some(dest_node_id) => {
&dest_node_id == node_id || &dest_node_id.distance(node_id) <= &*dist_threshold
},
None => false,
},
None => true,
})
.collect();
Ok(results)
},
None => Ok(results),
}
}

pub async fn find_anonymous_messages(
&self,
since: Option<DateTime<Utc>>,
limit: i64,
) -> Result<Vec<StoredMessage>, StorageError>
{
self.connection
.with_connection_async(move |conn| {
let mut query = stored_messages::table
.select(stored_messages::all_columns)
.filter(stored_messages::destination_pubkey.eq(pk_hex))
.filter(stored_messages::origin_pubkey.is_null())
.filter(stored_messages::destination_pubkey.is_null())
.filter(stored_messages::is_encrypted.eq(true))
.filter(stored_messages::message_type.eq(DhtMessageType::None as i32))
.into_boxed();

Expand All @@ -75,7 +165,7 @@ impl StoreAndForwardDatabase {
}

query
.order_by(stored_messages::stored_at.desc())
.order_by(stored_messages::stored_at.asc())
.limit(limit)
.get_results(conn)
.map_err(Into::into)
Expand Down Expand Up @@ -109,7 +199,7 @@ impl StoreAndForwardDatabase {
}

query
.order_by(stored_messages::stored_at.desc())
.order_by(stored_messages::stored_at.asc())
.limit(limit)
.get_results(conn)
.map_err(Into::into)
Expand Down
2 changes: 2 additions & 0 deletions comms/dht/src/store_forward/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,6 @@ pub enum StoreAndForwardError {
/// The envelope version is invalid
InvalidEnvelopeVersion,
MalformedNodeId(ByteArrayError),
/// NodeDistance threshold was invalid
InvalidNodeDistanceThreshold,
}
2 changes: 2 additions & 0 deletions comms/dht/src/store_forward/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,15 @@ impl StoredMessagesRequest {
Self {
since: None,
request_id: OsRng.next_u32(),
dist_threshold: Vec::new(),
}
}

pub fn since(since: DateTime<Utc>) -> Self {
Self {
since: Some(datetime_to_timestamp(since)),
request_id: OsRng.next_u32(),
dist_threshold: Vec::new(),
}
}
}
Expand Down
Loading

0 comments on commit 132bd58

Please sign in to comment.