Skip to content

Commit

Permalink
feat: get-peer supports partial node id lookup
Browse files Browse the repository at this point in the history
Add support for short/partial node ID hex strings to `get-peer`
  • Loading branch information
sdbondi committed Sep 22, 2021
1 parent 6ccb20f commit 9582e3c
Show file tree
Hide file tree
Showing 12 changed files with 119 additions and 53 deletions.
10 changes: 7 additions & 3 deletions applications/tari_base_node/src/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,12 +406,16 @@ impl CommandHandler {
});
}

pub fn get_peer(&self, node_id: NodeId) {
pub fn get_peer(&self, partial: Vec<u8>, original_str: String) {
let peer_manager = self.peer_manager.clone();

self.executor.spawn(async move {
match peer_manager.find_by_node_id(&node_id).await {
Ok(peer) => {
match peer_manager.find_all_starts_with(&partial).await {
Ok(peers) if peers.is_empty() => {
println!("No peer matching '{}'", original_str);
},
Ok(peers) => {
let peer = peers.first().unwrap();
let eid = EmojiId::from_pubkey(&peer.public_key);
println!("Emoji ID: {}", eid);
println!("Public Key: {}", peer.public_key);
Expand Down
24 changes: 18 additions & 6 deletions applications/tari_base_node/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@ use tari_app_utilities::utilities::{
parse_emoji_id_or_public_key_or_node_id,
};
use tari_common_types::types::{Commitment, PrivateKey, PublicKey, Signature};
use tari_core::{crypto::tari_utilities::hex::from_hex, proof_of_work::PowAlgorithm, tari_utilities::hex::Hex};
use tari_core::{
crypto::tari_utilities::hex::from_hex,
proof_of_work::PowAlgorithm,
tari_utilities::{hex::Hex, ByteArray},
};
use tari_crypto::tari_utilities::hex;
use tari_shutdown::Shutdown;

/// Enum representing commands used by the basenode
Expand Down Expand Up @@ -515,20 +520,27 @@ impl Parser {
}

fn process_get_peer<'a, I: Iterator<Item = &'a str>>(&mut self, mut args: I) {
let node_id = match args
let (original_str, partial) = match args
.next()
.map(parse_emoji_id_or_public_key_or_node_id)
.map(|s| {
parse_emoji_id_or_public_key_or_node_id(s)
.map(either_to_node_id)
.map(|n| (s.to_string(), n.to_vec()))
.or_else(|| {
let bytes = hex::from_hex(&s[..s.len() - (s.len() % 2)]).unwrap_or_default();
Some((s.to_string(), bytes))
})
})
.flatten()
.map(either_to_node_id)
{
Some(n) => n,
None => {
println!("Usage: get-peer [NodeId|PublicKey|EmojiId]");
println!("Usage: get-peer [Partial NodeId | PublicKey | EmojiId]");
return;
},
};

self.command_handler.get_peer(node_id)
self.command_handler.get_peer(partial, original_str)
}

/// Function to process the list-peers command
Expand Down
5 changes: 5 additions & 0 deletions comms/src/peer_manager/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ impl PeerManager {
self.peer_storage.read().await.find_by_node_id(node_id)
}

/// Find the peer with the provided substring. This currently only compares the given bytes to the NodeId
pub async fn find_all_starts_with(&self, partial: &[u8]) -> Result<Vec<Peer>, PeerManagerError> {
self.peer_storage.read().await.find_all_starts_with(partial)
}

/// Find the peer with the provided PublicKey
pub async fn find_by_public_key(&self, public_key: &CommsPublicKey) -> Result<Peer, PeerManagerError> {
self.peer_storage.read().await.find_by_public_key(public_key)
Expand Down
28 changes: 15 additions & 13 deletions comms/src/peer_manager/node_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,20 @@ use tari_crypto::tari_utilities::{
};
use thiserror::Error;

const NODE_ID_ARRAY_SIZE: usize = 13; // 104-bit as per RFC-0151
type NodeIdArray = [u8; NODE_ID_ARRAY_SIZE];
type NodeIdArray = [u8; NodeId::BYTE_SIZE];

pub type NodeDistance = XorDistance; // or HammingDistance

#[derive(Debug, Error, Clone)]
pub enum NodeIdError {
#[error("Incorrect byte count (expected {} bytes)", NODE_ID_ARRAY_SIZE)]
#[error("Incorrect byte count (expected {} bytes)", NodeId::BYTE_SIZE)]
IncorrectByteCount,
#[error("Invalid digest output size")]
InvalidDigestOutputSize,
}

//------------------------------------- XOR Metric -----------------------------------------------//
const NODE_XOR_DISTANCE_ARRAY_SIZE: usize = NODE_ID_ARRAY_SIZE;
const NODE_XOR_DISTANCE_ARRAY_SIZE: usize = NodeId::BYTE_SIZE;
type NodeXorDistanceArray = [u8; NODE_XOR_DISTANCE_ARRAY_SIZE];

#[derive(Clone, Debug, Eq, PartialOrd, Ord, Default)]
Expand Down Expand Up @@ -145,7 +144,7 @@ impl HammingDistance {

/// Returns the maximum distance.
pub const fn max_distance() -> Self {
Self([NODE_ID_ARRAY_SIZE as u8 * 8; NODE_HAMMING_DISTANCE_ARRAY_SIZE])
Self([NodeId::BYTE_SIZE as u8 * 8; NODE_HAMMING_DISTANCE_ARRAY_SIZE])
}
}

Expand All @@ -172,7 +171,7 @@ impl PartialEq for HammingDistance {

/// Calculate the Exclusive OR between the node_id x and y.
fn xor(x: &NodeIdArray, y: &NodeIdArray) -> NodeIdArray {
let mut nd = [0u8; NODE_ID_ARRAY_SIZE];
let mut nd = [0u8; NodeId::BYTE_SIZE];
for i in 0..nd.len() {
nd[i] = x[i] ^ y[i];
}
Expand Down Expand Up @@ -224,6 +223,9 @@ impl fmt::Display for NodeDistance {
pub struct NodeId(NodeIdArray);

impl NodeId {
/// 104-bit/13 byte as per RFC-0151
pub const BYTE_SIZE: usize = 13;

/// Construct a new node id on the origin
pub fn new() -> Self {
Default::default()
Expand All @@ -232,9 +234,9 @@ impl NodeId {
/// Derive a node id from a public key: node_id=hash(public_key)
pub fn from_key<K: ByteArray>(key: &K) -> Self {
let bytes = key.as_bytes();
let mut buf = [0u8; NODE_ID_ARRAY_SIZE];
VarBlake2b::new(NODE_ID_ARRAY_SIZE)
.expect("NODE_ID_ARRAY_SIZE is invalid")
let mut buf = [0u8; NodeId::BYTE_SIZE];
VarBlake2b::new(NodeId::BYTE_SIZE)
.expect("NodeId::NODE_ID_ARRAY_SIZE is invalid")
.chain(bytes)
.finalize_variable(|hash| {
// Safety: output size and buf size are equal
Expand Down Expand Up @@ -347,9 +349,9 @@ impl TryFrom<&[u8]> for NodeId {

/// Construct a node id from 32 bytes
fn try_from(elements: &[u8]) -> Result<Self, Self::Error> {
if elements.len() >= NODE_ID_ARRAY_SIZE {
let mut bytes = [0; NODE_ID_ARRAY_SIZE];
bytes.copy_from_slice(&elements[0..NODE_ID_ARRAY_SIZE]);
if elements.len() >= NodeId::BYTE_SIZE {
let mut bytes = [0; NodeId::BYTE_SIZE];
bytes.copy_from_slice(&elements[0..NodeId::BYTE_SIZE]);
Ok(NodeId(bytes))
} else {
Err(NodeIdError::IncorrectByteCount)
Expand Down Expand Up @@ -577,7 +579,7 @@ mod test {
let hamming_dist = HammingDistance::from_node_ids(&node_id1, &node_id2);
assert_eq!(hamming_dist, HammingDistance([18]));

let node_max = NodeId::from_bytes(&[255; NODE_ID_ARRAY_SIZE]).unwrap();
let node_max = NodeId::from_bytes(&[255; NodeId::BYTE_SIZE]).unwrap();
let node_min = NodeId::default();

let hamming_dist = HammingDistance::from_node_ids(&node_max, &node_min);
Expand Down
18 changes: 18 additions & 0 deletions comms/src/peer_manager/peer_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use log::*;
use multiaddr::Multiaddr;
use rand::{rngs::OsRng, seq::SliceRandom};
use std::{collections::HashMap, time::Duration};
use tari_crypto::tari_utilities::ByteArray;
use tari_storage::{IterationResult, KeyValueStore};

const LOG_TARGET: &str = "comms::peer_manager::peer_storage";
Expand Down Expand Up @@ -216,6 +217,23 @@ where DS: KeyValueStore<PeerId, Peer>
})
}

pub fn find_all_starts_with(&self, partial: &[u8]) -> Result<Vec<Peer>, PeerManagerError> {
if partial.is_empty() || partial.len() > NodeId::BYTE_SIZE {
return Ok(Vec::new());
}

let keys = self
.node_id_index
.iter()
.filter(|(k, _)| {
let l = partial.len();
&k.as_bytes()[..l] == partial
})
.map(|(_, id)| *id)
.collect::<Vec<_>>();
self.peer_db.get_many(&keys).map_err(PeerManagerError::DatabaseError)
}

/// Find the peer with the provided PublicKey
pub fn find_by_public_key(&self, public_key: &CommsPublicKey) -> Result<Peer, PeerManagerError> {
let peer_key = self
Expand Down
7 changes: 7 additions & 0 deletions comms/src/peer_manager/wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ where T: KeyValueStore<PeerId, Peer>
self.inner.get(key)
}

fn get_many(&self, keys: &[PeerId]) -> Result<Vec<Peer>, KeyValStoreError> {
if keys.iter().any(|k| k == &MIGRATION_VERSION_KEY) {
return Ok(Vec::new());
}
self.inner.get_many(keys)
}

fn size(&self) -> Result<usize, KeyValStoreError> {
self.inner.size().map(|s| s.saturating_sub(1))
}
Expand Down
7 changes: 7 additions & 0 deletions infrastructure/storage/src/key_val_store/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::lmdb_store::LMDBError;
use thiserror::Error;

#[derive(Debug, Error, Clone)]
Expand All @@ -35,3 +36,9 @@ pub enum KeyValStoreError {
#[error("The specified key did not exist in the key-val store")]
KeyNotFound,
}

impl From<LMDBError> for KeyValStoreError {
fn from(e: LMDBError) -> Self {
KeyValStoreError::DatabaseError(format!("{:?}", e))
}
}
11 changes: 11 additions & 0 deletions infrastructure/storage/src/key_val_store/hmap_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,17 @@ impl<K: Clone + Eq + Hash, V: Clone> KeyValueStore<K, V> for HashmapDatabase<K,
self.get(key)
}

/// Get the values corresponding to the provided keys from the key-value database.
fn get_many(&self, keys: &[K]) -> Result<Vec<V>, KeyValStoreError> {
keys.iter()
.filter_map(|k| match self.get(k) {
Ok(Some(v)) => Some(Ok(v)),
Ok(None) => None,
Err(e) => Some(Err(e)),
})
.collect()
}

/// Returns the total number of entries recorded in the key-value database.
fn size(&self) -> Result<usize, KeyValStoreError> {
self.len()
Expand Down
3 changes: 3 additions & 0 deletions infrastructure/storage/src/key_val_store/key_val_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ pub trait KeyValueStore<K, V> {
/// Get the value corresponding to the provided key from the key-value database.
fn get(&self, key: &K) -> Result<Option<V>, KeyValStoreError>;

/// Get the value corresponding to the provided key from the key-value database.
fn get_many(&self, keys: &[K]) -> Result<Vec<V>, KeyValStoreError>;

/// Returns the total number of entries recorded in the key-value database.
fn size(&self) -> Result<usize, KeyValStoreError>;

Expand Down
38 changes: 21 additions & 17 deletions infrastructure/storage/src/key_val_store/lmdb_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,46 +63,50 @@ where
{
/// Inserts a key-value pair into the key-value database.
fn insert(&self, key: K, value: V) -> Result<(), KeyValStoreError> {
self.inner
.insert::<K, V>(&key, &value)
.map_err(|e| KeyValStoreError::DatabaseError(format!("{:?}", e)))
self.inner.insert::<K, V>(&key, &value).map_err(Into::into)
}

/// Get the value corresponding to the provided key from the key-value database.
fn get(&self, key: &K) -> Result<Option<V>, KeyValStoreError>
where for<'t> V: serde::de::DeserializeOwned {
self.inner.get::<K, V>(key).map_err(Into::into)
}

/// Get the values corresponding to the provided keys from the key-value database.
fn get_many(&self, keys: &[K]) -> Result<Vec<V>, KeyValStoreError>
where for<'t> V: serde::de::DeserializeOwned {
self.inner
.get::<K, V>(key)
.map_err(|e| KeyValStoreError::DatabaseError(format!("{:?}", e)))
.with_read_transaction(|access| {
keys.iter()
.filter_map(|k| match access.get::<K, V>(k) {
Ok(Some(v)) => Some(Ok(v)),
Ok(None) => None,
Err(e) => Some(Err(e)),
})
.collect::<Result<Vec<_>, _>>()
})?
.map_err(Into::into)
}

/// Returns the total number of entries recorded in the key-value database.
fn size(&self) -> Result<usize, KeyValStoreError> {
self.inner
.len()
.map_err(|e| KeyValStoreError::DatabaseError(format!("{:?}", e)))
self.inner.len().map_err(Into::into)
}

/// Iterate over all the stored records and execute the function `f` for each pair in the key-value database.
fn for_each<F>(&self, f: F) -> Result<(), KeyValStoreError>
where F: FnMut(Result<(K, V), KeyValStoreError>) -> IterationResult {
self.inner
.for_each::<K, V, F>(f)
.map_err(|e| KeyValStoreError::DatabaseError(format!("{:?}", e)))
self.inner.for_each::<K, V, F>(f).map_err(Into::into)
}

/// Checks whether a record exist in the key-value database that corresponds to the provided `key`.
fn exists(&self, key: &K) -> Result<bool, KeyValStoreError> {
self.inner
.contains_key::<K>(key)
.map_err(|e| KeyValStoreError::DatabaseError(format!("{:?}", e)))
self.inner.contains_key::<K>(key).map_err(Into::into)
}

/// Remove the record from the key-value database that corresponds with the provided `key`.
fn delete(&self, key: &K) -> Result<(), KeyValStoreError> {
self.inner
.remove::<K>(key)
.map_err(|e| KeyValStoreError::DatabaseError(format!("{:?}", e)))
self.inner.remove::<K>(key).map_err(Into::into)
}
}

Expand Down
13 changes: 4 additions & 9 deletions infrastructure/storage/src/lmdb_store/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -629,18 +629,13 @@ impl LMDBDatabase {
}

/// Create a read-only transaction on the current database and execute the instructions given in the closure. The
/// transaction is automatically committed when the closure goes out of scope. You may provide the results of the
/// transaction to the calling scope by populating a `Vec<V>` with the results of `txn.get(k)`. Otherwise, if the
/// results are not needed, or you did not call `get`, just return `Ok(None)`.
pub fn with_read_transaction<F, V>(&self, f: F) -> Result<Option<Vec<V>>, LMDBError>
where
V: serde::de::DeserializeOwned,
F: FnOnce(LMDBReadTransaction) -> Result<Option<Vec<V>>, LMDBError>,
{
/// transaction is automatically committed when the closure goes out of scope.
pub fn with_read_transaction<F, R>(&self, f: F) -> Result<R, LMDBError>
where F: FnOnce(LMDBReadTransaction) -> R {
let txn = ReadTransaction::new(self.env.clone())?;
let access = txn.access();
let wrapper = LMDBReadTransaction { db: &self.db, access };
f(wrapper)
Ok(f(wrapper))
}

/// Create a transaction with write access on the current table.
Expand Down
8 changes: 3 additions & 5 deletions infrastructure/storage/tests/lmdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,16 +181,14 @@ fn transactions() {
{
let (users, db) = insert_all_users("transactions");
// Test the `exists` and value retrieval functions
let res = db.with_read_transaction::<_, User>(|txn| {
db.with_read_transaction(|txn| {
for user in users.iter() {
assert!(txn.exists(&user.id).unwrap());
let check: User = txn.get(&user.id).unwrap().unwrap();
assert_eq!(check, *user);
}
Ok(None)
});
println!("{:?}", res);
assert!(res.unwrap().is_none());
})
.unwrap();
}
clean_up("transactions"); // In Windows file handles must be released before files can be deleted
}
Expand Down

0 comments on commit 9582e3c

Please sign in to comment.