Skip to content

Commit

Permalink
Persistent store and forward
Browse files Browse the repository at this point in the history
- Implemented sqlite persistence for store and forward (SAF).
- Nodes keep track of a timestamp that they last requested messages for
- Nodes use that timestamp when requesting messages to reduce the number
  of duplicate messages requested
- Request for SAF messages is broken up into 3 responses: Discovery, Join
  and ExplicitlyAddressed - respectively, discovery messages, join messages and
  messages that are explicitly addressed to this node.
- DB migrations run on node startup
- Sqlite operations run on tokio blocking threads
- Sqlite connection interface that allows usage of sqlite's in-memory
  database connection in addition to the file-system database
  • Loading branch information
sdbondi committed Apr 6, 2020
1 parent 41d9940 commit b1a19f7
Show file tree
Hide file tree
Showing 42 changed files with 1,869 additions and 353 deletions.
7 changes: 5 additions & 2 deletions applications/tari_base_node/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use tari_comms::{
ConnectionManagerEvent,
PeerManager,
};
use tari_comms_dht::Dht;
use tari_comms_dht::{DbConnectionUrl, Dht, DhtConfig};
use tari_core::{
base_node::{
chain_metadata_service::{ChainMetadataHandle, ChainMetadataServiceInitializer},
Expand Down Expand Up @@ -841,7 +841,10 @@ async fn setup_base_node_comms(
max_concurrent_inbound_tasks: 100,
outbound_buffer_size: 100,
// TODO - make this configurable
dht: Default::default(),
dht: DhtConfig {
database_url: DbConnectionUrl::File(config.data_dir.join("dht.db")),
..Default::default()
},
// TODO: This should be false unless testing locally - make this configurable
allow_test_addresses: true,
listener_liveness_whitelist_cidrs: config.listener_liveness_whitelist_cidrs.clone(),
Expand Down
3 changes: 3 additions & 0 deletions comms/dht/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ bitflags = "1.2.0"
bytes = "0.4.12"
chrono = "0.4.9"
derive-error = "0.0.4"
diesel = {version="1.4", features = ["sqlite", "serde_json", "chrono"]}
diesel_migrations = "1.4"
digest = "0.8.1"
futures= {version= "^0.3.1"}
log = "0.4.8"
Expand All @@ -34,6 +36,7 @@ serde_repr = "0.1.5"
tokio = {version="0.2.10", features=["rt-threaded", "blocking"]}
tower= "0.3.0"
ttl_cache = "0.5.1"

# tower-filter dependencies
pin-project = "0.4"

Expand Down
5 changes: 5 additions & 0 deletions comms/dht/diesel.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# For documentation on how to configure this file,
# see diesel.rs/guides/configuring-diesel-cli

[print_schema]
file = "src/schema.rs"
2 changes: 1 addition & 1 deletion comms/dht/examples/memorynet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ async fn do_store_and_forward_discovery(
println!("Waiting a few seconds for discovery to propagate around the network...");
time::delay_for(Duration::from_secs(8)).await;

let mut total_messages = drain_messaging_events(messaging_rx, true).await;
let mut total_messages = drain_messaging_events(messaging_rx, false).await;

banner!("🤓 {} is coming back online", get_name(node_identity.node_id()));
let (tx, ims_rx) = mpsc::channel(1);
Expand Down
Empty file added comms/dht/migrations/.gitkeep
Empty file.
2 changes: 2 additions & 0 deletions comms/dht/migrations/2020-04-01-095825_initial/down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP TABLE IF EXISTS stored_messages;
DROP TABLE IF EXISTS dht_settings;
27 changes: 27 additions & 0 deletions comms/dht/migrations/2020-04-01-095825_initial/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
CREATE TABLE stored_messages (
id INTEGER NOT NULL PRIMARY KEY,
version INT NOT NULL,
origin_pubkey TEXT NOT NULL,
origin_signature TEXT NOT NULL,
message_type INT NOT NULL,
destination_pubkey TEXT,
destination_node_id TEXT,
header BLOB NOT NULL,
body BLOB NOT NULL,
is_encrypted BOOLEAN NOT NULL CHECK (is_encrypted IN (0,1)),
priority INT NOT NULL,
stored_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_stored_messages_destination_pubkey ON stored_messages (destination_pubkey);
CREATE INDEX idx_stored_messages_destination_node_id ON stored_messages (destination_node_id);
CREATE INDEX idx_stored_messages_stored_at ON stored_messages (stored_at);
CREATE INDEX idx_stored_messages_priority ON stored_messages (priority);

CREATE TABLE dht_settings (
id INTEGER PRIMARY KEY NOT NULL,
key TEXT NOT NULL,
value BLOB NOT NULL
);

CREATE UNIQUE INDEX idx_dht_settings_key ON dht_settings (key);
99 changes: 76 additions & 23 deletions comms/dht/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,15 @@ use crate::{
discovery::DhtDiscoveryError,
outbound::{OutboundMessageRequester, SendMessageParams},
proto::{dht::JoinMessage, envelope::DhtMessageType, store_forward::StoredMessagesRequest},
storage::{DbConnection, DhtDatabase, DhtSettingKey, StorageError},
DhtConfig,
};
use chrono::{DateTime, Utc};
use derive_error::Error;
use futures::{
channel::{mpsc, mpsc::SendError, oneshot},
future,
future::BoxFuture,
stream::{Fuse, FuturesUnordered},
FutureExt,
SinkExt,
StreamExt,
};
Expand All @@ -60,7 +59,10 @@ use tari_comms::{
},
types::CommsPublicKey,
};
use tari_crypto::tari_utilities::ByteArray;
use tari_crypto::tari_utilities::{
message_format::{MessageFormat, MessageFormatError},
ByteArray,
};
use tari_shutdown::ShutdownSignal;
use tari_storage::IterationResult;
use ttl_cache::TtlCache;
Expand All @@ -80,6 +82,11 @@ pub enum DhtActorError {
SendFailed(String),
DiscoveryError(DhtDiscoveryError),
BlockingJoinError(tokio::task::JoinError),
StorageError(StorageError),
#[error(no_from)]
StoredValueFailedToDeserialize(MessageFormatError),
#[error(no_from)]
FailedToSerializeValue(MessageFormatError),
}

impl From<SendError> for DhtActorError {
Expand All @@ -98,23 +105,27 @@ impl From<SendError> for DhtActorError {
pub enum DhtRequest {
/// Send a Join request to the network
SendJoin,
/// Send a request for stored messages, optionally specifying a date time that the foreign node should
/// use to filter the returned messages.
SendRequestStoredMessages(Option<DateTime<Utc>>),
/// Send requests to neighbours for stored messages
SendRequestStoredMessages,
/// Inserts a message signature to the msg hash cache. This operation replies with a boolean
/// which is true if the signature already exists in the cache, otherwise false
MsgHashCacheInsert(Vec<u8>, oneshot::Sender<bool>),
/// Fetch selected peers according to the broadcast strategy
SelectPeers(BroadcastStrategy, oneshot::Sender<Vec<Peer>>),
GetSetting(DhtSettingKey, oneshot::Sender<Result<Option<Vec<u8>>, DhtActorError>>),
SetSetting(DhtSettingKey, Vec<u8>),
}

impl Display for DhtRequest {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
use DhtRequest::*;
match self {
DhtRequest::SendJoin => f.write_str("SendJoin"),
DhtRequest::SendRequestStoredMessages(d) => f.write_str(&format!("SendRequestStoredMessages ({:?})", d)),
DhtRequest::MsgHashCacheInsert(_, _) => f.write_str("MsgHashCacheInsert"),
DhtRequest::SelectPeers(s, _) => f.write_str(&format!("SelectPeers (Strategy={})", s)),
SendJoin => f.write_str("SendJoin"),
SendRequestStoredMessages => f.write_str("SendRequestStoredMessages"),
MsgHashCacheInsert(_, _) => f.write_str("MsgHashCacheInsert"),
SelectPeers(s, _) => f.write_str(&format!("SelectPeers (Strategy={})", s)),
GetSetting(key, _) => f.write_str(&format!("GetStoreItem (key={})", key)),
SetSetting(key, value) => f.write_str(&format!("SelectPeers (key={}, value={} bytes)", key, value.len())),
}
}
}
Expand Down Expand Up @@ -151,10 +162,25 @@ impl DhtRequester {
}

pub async fn send_request_stored_messages(&mut self) -> Result<(), DhtActorError> {
self.sender
.send(DhtRequest::SendRequestStoredMessages(None))
.await
.map_err(Into::into)
self.sender.send(DhtRequest::SendRequestStoredMessages).await?;
Ok(())
}

pub async fn get_setting<T: MessageFormat>(&mut self, key: DhtSettingKey) -> Result<Option<T>, DhtActorError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.sender.send(DhtRequest::GetSetting(key, reply_tx)).await?;
match reply_rx.await.map_err(|_| DhtActorError::ReplyCanceled)?? {
Some(bytes) => T::from_binary(&bytes)
.map(Some)
.map_err(DhtActorError::StoredValueFailedToDeserialize),
None => Ok(None),
}
}

pub async fn set_setting<T: MessageFormat>(&mut self, key: DhtSettingKey, value: T) -> Result<(), DhtActorError> {
let bytes = value.to_binary().map_err(DhtActorError::FailedToSerializeValue)?;
self.sender.send(DhtRequest::SetSetting(key, bytes)).await?;
Ok(())
}
}

Expand Down Expand Up @@ -191,18 +217,22 @@ impl<'a> DhtActor<'a> {
}
}

pub async fn run(mut self) {
pub async fn run(mut self) -> Result<(), DhtActorError> {
let conn = DbConnection::connect_url(self.config.database_url.clone()).await?;
let output = conn.migrate().await?;
info!(target: LOG_TARGET, "Dht database migration:\n{}", output);
let db = DhtDatabase::new(conn);

let mut shutdown_signal = self
.shutdown_signal
.take()
.expect("DhtActor initialized without shutdown_signal")
.fuse();
.expect("DhtActor initialized without shutdown_signal");

loop {
futures::select! {
request = self.request_rx.select_next_some() => {
debug!(target: LOG_TARGET, "DhtActor received message: {}", request);
let handler = self.request_handler(request);
let handler = self.request_handler(db.clone(), request);
self.pending_jobs.push(handler);
},

Expand All @@ -227,9 +257,11 @@ impl<'a> DhtActor<'a> {
}
}
}

Ok(())
}

fn request_handler(&mut self, request: DhtRequest) -> BoxFuture<'a, Result<(), DhtActorError>> {
fn request_handler(&mut self, db: DhtDatabase, request: DhtRequest) -> BoxFuture<'a, Result<(), DhtActorError>> {
use DhtRequest::*;
match request {
SendJoin => {
Expand Down Expand Up @@ -265,16 +297,31 @@ impl<'a> DhtActor<'a> {
}
})
},
SendRequestStoredMessages(maybe_since) => {
SendRequestStoredMessages => {
let node_identity = Arc::clone(&self.node_identity);
let outbound_requester = self.outbound_requester.clone();
Box::pin(Self::request_stored_messages(
node_identity,
outbound_requester,
db,
self.config.num_neighbouring_nodes,
maybe_since,
))
},
GetSetting(key, reply_tx) => Box::pin(async move {
let _ = reply_tx.send(db.get_value(key).await.map_err(Into::into));
Ok(())
}),
SetSetting(key, value) => Box::pin(async move {
match db.set_value(key, value).await {
Ok(_) => {
info!(target: LOG_TARGET, "Dht setting '{}' set", key);
},
Err(err) => {
error!(target: LOG_TARGET, "set_setting failed because {:?}", err);
},
}
Ok(())
}),
}
}

Expand Down Expand Up @@ -313,10 +360,16 @@ impl<'a> DhtActor<'a> {
async fn request_stored_messages(
node_identity: Arc<NodeIdentity>,
mut outbound_requester: OutboundMessageRequester,
db: DhtDatabase,
num_neighbouring_nodes: usize,
maybe_since: Option<DateTime<Utc>>,
) -> Result<(), DhtActorError>
{
let request = db
.get_value(DhtSettingKey::SafLastRequestTimestamp)
.await?
.map(StoredMessagesRequest::since)
.unwrap_or_else(StoredMessagesRequest::new);

outbound_requester
.send_message_no_header(
SendMessageParams::new()
Expand All @@ -328,7 +381,7 @@ impl<'a> DhtActor<'a> {
)
.with_dht_message_type(DhtMessageType::SafRequestMessages)
.finish(),
maybe_since.map(StoredMessagesRequest::since).unwrap_or_default(),
request,
)
.await
.map_err(|err| DhtActorError::SendFailed(format!("Failed to send request for stored messages: {}", err)))?;
Expand Down
7 changes: 6 additions & 1 deletion comms/dht/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::{outbound::DhtOutboundRequest, Dht, DhtConfig};
use crate::{outbound::DhtOutboundRequest, DbConnectionUrl, Dht, DhtConfig};
use futures::channel::mpsc;
use std::{sync::Arc, time::Duration};
use tari_comms::{
Expand Down Expand Up @@ -80,6 +80,11 @@ impl DhtBuilder {
self
}

pub fn with_database_url(mut self, database_url: DbConnectionUrl) -> Self {
self.config.database_url = database_url;
self
}

pub fn with_signature_cache_ttl(mut self, ttl: Duration) -> Self {
self.config.msg_hash_cache_ttl = ttl;
self
Expand Down
17 changes: 12 additions & 5 deletions comms/dht/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,22 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::envelope::Network;
use crate::{envelope::Network, storage::DbConnectionUrl};
use std::time::Duration;

/// The default maximum number of messages that can be stored using the Store-and-forward middleware
pub const SAF_MSG_CACHE_STORAGE_CAPACITY: usize = 10_000;
/// The default time-to-live duration used for storage of low priority messages by the Store-and-forward middleware
pub const SAF_LOW_PRIORITY_MSG_STORAGE_TTL: Duration = Duration::from_secs(6 * 60 * 60);
pub const SAF_LOW_PRIORITY_MSG_STORAGE_TTL: Duration = Duration::from_secs(6 * 60 * 60); // 6 hours
/// The default time-to-live duration used for storage of high priority messages by the Store-and-forward middleware
pub const SAF_HIGH_PRIORITY_MSG_STORAGE_TTL: Duration = Duration::from_secs(24 * 60 * 60);
pub const SAF_HIGH_PRIORITY_MSG_STORAGE_TTL: Duration = Duration::from_secs(2 * 24 * 60 * 60); // 2 days
/// The default number of peer nodes that a message has to be closer to, to be considered a neighbour
pub const DEFAULT_NUM_NEIGHBOURING_NODES: usize = 10;

#[derive(Debug, Clone)]
pub struct DhtConfig {
/// The `DbConnectionUrl` for the Dht database. Default: In-memory database
pub database_url: DbConnectionUrl,
/// The size of the buffer (channel) which holds pending outbound message requests.
/// Default: 20
pub outbound_buffer_size: usize,
Expand All @@ -53,8 +55,10 @@ pub struct DhtConfig {
/// Default: 6 hours
pub saf_low_priority_msg_storage_ttl: Duration,
/// The time-to-live duration used for storage of high priority messages by the Store-and-forward middleware.
/// Default: 24 hours
/// Default: 2 days
pub saf_high_priority_msg_storage_ttl: Duration,
/// The limit on the message size to store in SAF storage in bytes. Default 500kb
pub saf_max_message_size: usize,
/// The max capacity of the message hash cache
/// Default: 1000
pub msg_hash_cache_capacity: usize,
Expand Down Expand Up @@ -92,6 +96,7 @@ impl DhtConfig {
pub fn default_local_test() -> Self {
Self {
network: Network::LocalTest,
database_url: DbConnectionUrl::Memory,
..Default::default()
}
}
Expand All @@ -102,14 +107,16 @@ impl Default for DhtConfig {
Self {
num_neighbouring_nodes: DEFAULT_NUM_NEIGHBOURING_NODES,
saf_num_closest_nodes: 10,
saf_max_returned_messages: 1000,
saf_max_returned_messages: 100,
outbound_buffer_size: 20,
saf_msg_cache_storage_capacity: SAF_MSG_CACHE_STORAGE_CAPACITY,
saf_low_priority_msg_storage_ttl: SAF_LOW_PRIORITY_MSG_STORAGE_TTL,
saf_high_priority_msg_storage_ttl: SAF_HIGH_PRIORITY_MSG_STORAGE_TTL,
saf_max_message_size: 512 * 1024, // 512 kb
msg_hash_cache_capacity: 10_000,
msg_hash_cache_ttl: Duration::from_secs(300),
broadcast_cooldown_max_attempts: 3,
database_url: DbConnectionUrl::Memory,
broadcast_cooldown_period: Duration::from_secs(60 * 30),
discovery_request_timeout: Duration::from_secs(2 * 60),
network: Network::TestNet,
Expand Down
Loading

0 comments on commit b1a19f7

Please sign in to comment.