Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persistent store and forward #1680

Merged
merged 1 commit into from
Apr 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions applications/tari_base_node/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use tari_comms::{
ConnectionManagerEvent,
PeerManager,
};
use tari_comms_dht::Dht;
use tari_comms_dht::{DbConnectionUrl, Dht, DhtConfig};
use tari_core::{
base_node::{
chain_metadata_service::{ChainMetadataHandle, ChainMetadataServiceInitializer},
Expand Down Expand Up @@ -841,7 +841,10 @@ async fn setup_base_node_comms(
max_concurrent_inbound_tasks: 100,
outbound_buffer_size: 100,
// TODO - make this configurable
dht: Default::default(),
dht: DhtConfig {
database_url: DbConnectionUrl::File(config.data_dir.join("dht.db")),
..Default::default()
},
// TODO: This should be false unless testing locally - make this configurable
allow_test_addresses: true,
listener_liveness_whitelist_cidrs: config.listener_liveness_whitelist_cidrs.clone(),
Expand Down
3 changes: 3 additions & 0 deletions comms/dht/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ bitflags = "1.2.0"
bytes = "0.4.12"
chrono = "0.4.9"
derive-error = "0.0.4"
diesel = {version="1.4", features = ["sqlite", "serde_json", "chrono"]}
diesel_migrations = "1.4"
digest = "0.8.1"
futures= {version= "^0.3.1"}
log = "0.4.8"
Expand All @@ -34,6 +36,7 @@ serde_repr = "0.1.5"
tokio = {version="0.2.10", features=["rt-threaded", "blocking"]}
tower= "0.3.0"
ttl_cache = "0.5.1"

# tower-filter dependencies
pin-project = "0.4"

Expand Down
5 changes: 5 additions & 0 deletions comms/dht/diesel.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# For documentation on how to configure this file,
# see diesel.rs/guides/configuring-diesel-cli

[print_schema]
file = "src/schema.rs"
2 changes: 1 addition & 1 deletion comms/dht/examples/memorynet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ async fn do_store_and_forward_discovery(
println!("Waiting a few seconds for discovery to propagate around the network...");
time::delay_for(Duration::from_secs(8)).await;

let mut total_messages = drain_messaging_events(messaging_rx, true).await;
let mut total_messages = drain_messaging_events(messaging_rx, false).await;

banner!("🤓 {} is coming back online", get_name(node_identity.node_id()));
let (tx, ims_rx) = mpsc::channel(1);
Expand Down
Empty file added comms/dht/migrations/.gitkeep
Empty file.
2 changes: 2 additions & 0 deletions comms/dht/migrations/2020-04-01-095825_initial/down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP TABLE IF EXISTS stored_messages;
DROP TABLE IF EXISTS dht_settings;
27 changes: 27 additions & 0 deletions comms/dht/migrations/2020-04-01-095825_initial/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
CREATE TABLE stored_messages (
id INTEGER NOT NULL PRIMARY KEY,
version INT NOT NULL,
origin_pubkey TEXT NOT NULL,
origin_signature TEXT NOT NULL,
message_type INT NOT NULL,
destination_pubkey TEXT,
destination_node_id TEXT,
header BLOB NOT NULL,
body BLOB NOT NULL,
is_encrypted BOOLEAN NOT NULL CHECK (is_encrypted IN (0,1)),
priority INT NOT NULL,
stored_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_stored_messages_destination_pubkey ON stored_messages (destination_pubkey);
CREATE INDEX idx_stored_messages_destination_node_id ON stored_messages (destination_node_id);
CREATE INDEX idx_stored_messages_stored_at ON stored_messages (stored_at);
CREATE INDEX idx_stored_messages_priority ON stored_messages (priority);

CREATE TABLE dht_settings (
id INTEGER PRIMARY KEY NOT NULL,
key TEXT NOT NULL,
value BLOB NOT NULL
);

CREATE UNIQUE INDEX idx_dht_settings_key ON dht_settings (key);
99 changes: 76 additions & 23 deletions comms/dht/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,15 @@ use crate::{
discovery::DhtDiscoveryError,
outbound::{OutboundMessageRequester, SendMessageParams},
proto::{dht::JoinMessage, envelope::DhtMessageType, store_forward::StoredMessagesRequest},
storage::{DbConnection, DhtDatabase, DhtSettingKey, StorageError},
DhtConfig,
};
use chrono::{DateTime, Utc};
use derive_error::Error;
use futures::{
channel::{mpsc, mpsc::SendError, oneshot},
future,
future::BoxFuture,
stream::{Fuse, FuturesUnordered},
FutureExt,
SinkExt,
StreamExt,
};
Expand All @@ -60,7 +59,10 @@ use tari_comms::{
},
types::CommsPublicKey,
};
use tari_crypto::tari_utilities::ByteArray;
use tari_crypto::tari_utilities::{
message_format::{MessageFormat, MessageFormatError},
ByteArray,
};
use tari_shutdown::ShutdownSignal;
use tari_storage::IterationResult;
use ttl_cache::TtlCache;
Expand All @@ -80,6 +82,11 @@ pub enum DhtActorError {
SendFailed(String),
DiscoveryError(DhtDiscoveryError),
BlockingJoinError(tokio::task::JoinError),
StorageError(StorageError),
#[error(no_from)]
StoredValueFailedToDeserialize(MessageFormatError),
#[error(no_from)]
FailedToSerializeValue(MessageFormatError),
}

impl From<SendError> for DhtActorError {
Expand All @@ -98,23 +105,27 @@ impl From<SendError> for DhtActorError {
pub enum DhtRequest {
/// Send a Join request to the network
SendJoin,
/// Send a request for stored messages, optionally specifying a date time that the foreign node should
/// use to filter the returned messages.
SendRequestStoredMessages(Option<DateTime<Utc>>),
/// Send requests to neighbours for stored messages
SendRequestStoredMessages,
/// Inserts a message signature to the msg hash cache. This operation replies with a boolean
/// which is true if the signature already exists in the cache, otherwise false
MsgHashCacheInsert(Vec<u8>, oneshot::Sender<bool>),
/// Fetch selected peers according to the broadcast strategy
SelectPeers(BroadcastStrategy, oneshot::Sender<Vec<Peer>>),
GetSetting(DhtSettingKey, oneshot::Sender<Result<Option<Vec<u8>>, DhtActorError>>),
SetSetting(DhtSettingKey, Vec<u8>),
}

impl Display for DhtRequest {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
use DhtRequest::*;
match self {
DhtRequest::SendJoin => f.write_str("SendJoin"),
DhtRequest::SendRequestStoredMessages(d) => f.write_str(&format!("SendRequestStoredMessages ({:?})", d)),
DhtRequest::MsgHashCacheInsert(_, _) => f.write_str("MsgHashCacheInsert"),
DhtRequest::SelectPeers(s, _) => f.write_str(&format!("SelectPeers (Strategy={})", s)),
SendJoin => f.write_str("SendJoin"),
SendRequestStoredMessages => f.write_str("SendRequestStoredMessages"),
MsgHashCacheInsert(_, _) => f.write_str("MsgHashCacheInsert"),
SelectPeers(s, _) => f.write_str(&format!("SelectPeers (Strategy={})", s)),
GetSetting(key, _) => f.write_str(&format!("GetStoreItem (key={})", key)),
SetSetting(key, value) => f.write_str(&format!("SelectPeers (key={}, value={} bytes)", key, value.len())),
}
}
}
Expand Down Expand Up @@ -151,10 +162,25 @@ impl DhtRequester {
}

pub async fn send_request_stored_messages(&mut self) -> Result<(), DhtActorError> {
self.sender
.send(DhtRequest::SendRequestStoredMessages(None))
.await
.map_err(Into::into)
self.sender.send(DhtRequest::SendRequestStoredMessages).await?;
Ok(())
}

pub async fn get_setting<T: MessageFormat>(&mut self, key: DhtSettingKey) -> Result<Option<T>, DhtActorError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.sender.send(DhtRequest::GetSetting(key, reply_tx)).await?;
match reply_rx.await.map_err(|_| DhtActorError::ReplyCanceled)?? {
Some(bytes) => T::from_binary(&bytes)
.map(Some)
.map_err(DhtActorError::StoredValueFailedToDeserialize),
None => Ok(None),
}
}

pub async fn set_setting<T: MessageFormat>(&mut self, key: DhtSettingKey, value: T) -> Result<(), DhtActorError> {
let bytes = value.to_binary().map_err(DhtActorError::FailedToSerializeValue)?;
self.sender.send(DhtRequest::SetSetting(key, bytes)).await?;
Ok(())
}
}

Expand Down Expand Up @@ -191,18 +217,22 @@ impl<'a> DhtActor<'a> {
}
}

pub async fn run(mut self) {
pub async fn run(mut self) -> Result<(), DhtActorError> {
let conn = DbConnection::connect_url(self.config.database_url.clone()).await?;
let output = conn.migrate().await?;
info!(target: LOG_TARGET, "Dht database migration:\n{}", output);
let db = DhtDatabase::new(conn);

let mut shutdown_signal = self
.shutdown_signal
.take()
.expect("DhtActor initialized without shutdown_signal")
.fuse();
.expect("DhtActor initialized without shutdown_signal");

loop {
futures::select! {
request = self.request_rx.select_next_some() => {
debug!(target: LOG_TARGET, "DhtActor received message: {}", request);
let handler = self.request_handler(request);
let handler = self.request_handler(db.clone(), request);
self.pending_jobs.push(handler);
},

Expand All @@ -227,9 +257,11 @@ impl<'a> DhtActor<'a> {
}
}
}

Ok(())
}

fn request_handler(&mut self, request: DhtRequest) -> BoxFuture<'a, Result<(), DhtActorError>> {
fn request_handler(&mut self, db: DhtDatabase, request: DhtRequest) -> BoxFuture<'a, Result<(), DhtActorError>> {
use DhtRequest::*;
match request {
SendJoin => {
Expand Down Expand Up @@ -265,16 +297,31 @@ impl<'a> DhtActor<'a> {
}
})
},
SendRequestStoredMessages(maybe_since) => {
SendRequestStoredMessages => {
let node_identity = Arc::clone(&self.node_identity);
let outbound_requester = self.outbound_requester.clone();
Box::pin(Self::request_stored_messages(
node_identity,
outbound_requester,
db,
self.config.num_neighbouring_nodes,
maybe_since,
))
},
GetSetting(key, reply_tx) => Box::pin(async move {
let _ = reply_tx.send(db.get_value(key).await.map_err(Into::into));
Ok(())
}),
SetSetting(key, value) => Box::pin(async move {
match db.set_value(key, value).await {
Ok(_) => {
info!(target: LOG_TARGET, "Dht setting '{}' set", key);
},
Err(err) => {
error!(target: LOG_TARGET, "set_setting failed because {:?}", err);
},
}
Ok(())
}),
}
}

Expand Down Expand Up @@ -313,10 +360,16 @@ impl<'a> DhtActor<'a> {
async fn request_stored_messages(
node_identity: Arc<NodeIdentity>,
mut outbound_requester: OutboundMessageRequester,
db: DhtDatabase,
num_neighbouring_nodes: usize,
maybe_since: Option<DateTime<Utc>>,
) -> Result<(), DhtActorError>
{
let request = db
.get_value(DhtSettingKey::SafLastRequestTimestamp)
.await?
.map(StoredMessagesRequest::since)
.unwrap_or_else(StoredMessagesRequest::new);

outbound_requester
.send_message_no_header(
SendMessageParams::new()
Expand All @@ -328,7 +381,7 @@ impl<'a> DhtActor<'a> {
)
.with_dht_message_type(DhtMessageType::SafRequestMessages)
.finish(),
maybe_since.map(StoredMessagesRequest::since).unwrap_or_default(),
request,
)
.await
.map_err(|err| DhtActorError::SendFailed(format!("Failed to send request for stored messages: {}", err)))?;
Expand Down
7 changes: 6 additions & 1 deletion comms/dht/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::{outbound::DhtOutboundRequest, Dht, DhtConfig};
use crate::{outbound::DhtOutboundRequest, DbConnectionUrl, Dht, DhtConfig};
use futures::channel::mpsc;
use std::{sync::Arc, time::Duration};
use tari_comms::{
Expand Down Expand Up @@ -80,6 +80,11 @@ impl DhtBuilder {
self
}

pub fn with_database_url(mut self, database_url: DbConnectionUrl) -> Self {
self.config.database_url = database_url;
self
}

pub fn with_signature_cache_ttl(mut self, ttl: Duration) -> Self {
self.config.msg_hash_cache_ttl = ttl;
self
Expand Down
17 changes: 12 additions & 5 deletions comms/dht/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,22 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::envelope::Network;
use crate::{envelope::Network, storage::DbConnectionUrl};
use std::time::Duration;

/// The default maximum number of messages that can be stored using the Store-and-forward middleware
pub const SAF_MSG_CACHE_STORAGE_CAPACITY: usize = 10_000;
/// The default time-to-live duration used for storage of low priority messages by the Store-and-forward middleware
pub const SAF_LOW_PRIORITY_MSG_STORAGE_TTL: Duration = Duration::from_secs(6 * 60 * 60);
pub const SAF_LOW_PRIORITY_MSG_STORAGE_TTL: Duration = Duration::from_secs(6 * 60 * 60); // 6 hours
/// The default time-to-live duration used for storage of high priority messages by the Store-and-forward middleware
pub const SAF_HIGH_PRIORITY_MSG_STORAGE_TTL: Duration = Duration::from_secs(24 * 60 * 60);
pub const SAF_HIGH_PRIORITY_MSG_STORAGE_TTL: Duration = Duration::from_secs(2 * 24 * 60 * 60); // 2 days
/// The default number of peer nodes that a message has to be closer to, to be considered a neighbour
pub const DEFAULT_NUM_NEIGHBOURING_NODES: usize = 10;

#[derive(Debug, Clone)]
pub struct DhtConfig {
/// The `DbConnectionUrl` for the Dht database. Default: In-memory database
pub database_url: DbConnectionUrl,
/// The size of the buffer (channel) which holds pending outbound message requests.
/// Default: 20
pub outbound_buffer_size: usize,
Expand All @@ -53,8 +55,10 @@ pub struct DhtConfig {
/// Default: 6 hours
pub saf_low_priority_msg_storage_ttl: Duration,
/// The time-to-live duration used for storage of high priority messages by the Store-and-forward middleware.
/// Default: 24 hours
/// Default: 2 days
pub saf_high_priority_msg_storage_ttl: Duration,
/// The limit on the message size to store in SAF storage in bytes. Default 500kb
pub saf_max_message_size: usize,
/// The max capacity of the message hash cache
/// Default: 1000
pub msg_hash_cache_capacity: usize,
Expand Down Expand Up @@ -92,6 +96,7 @@ impl DhtConfig {
pub fn default_local_test() -> Self {
Self {
network: Network::LocalTest,
database_url: DbConnectionUrl::Memory,
..Default::default()
}
}
Expand All @@ -102,14 +107,16 @@ impl Default for DhtConfig {
Self {
num_neighbouring_nodes: DEFAULT_NUM_NEIGHBOURING_NODES,
saf_num_closest_nodes: 10,
saf_max_returned_messages: 1000,
saf_max_returned_messages: 100,
outbound_buffer_size: 20,
saf_msg_cache_storage_capacity: SAF_MSG_CACHE_STORAGE_CAPACITY,
saf_low_priority_msg_storage_ttl: SAF_LOW_PRIORITY_MSG_STORAGE_TTL,
saf_high_priority_msg_storage_ttl: SAF_HIGH_PRIORITY_MSG_STORAGE_TTL,
saf_max_message_size: 512 * 1024, // 512 kb
msg_hash_cache_capacity: 10_000,
msg_hash_cache_ttl: Duration::from_secs(300),
broadcast_cooldown_max_attempts: 3,
database_url: DbConnectionUrl::Memory,
broadcast_cooldown_period: Duration::from_secs(60 * 30),
discovery_request_timeout: Duration::from_secs(2 * 60),
network: Network::TestNet,
Expand Down
Loading