Skip to content

Commit

Permalink
Merge development
Browse files Browse the repository at this point in the history
  • Loading branch information
hansieodendaal committed Nov 28, 2024
1 parent 8e3a9ae commit 3beaa5f
Show file tree
Hide file tree
Showing 7 changed files with 491 additions and 45 deletions.
18 changes: 7 additions & 11 deletions .github/workflows/build_binaries.yml
Original file line number Diff line number Diff line change
Expand Up @@ -223,14 +223,8 @@ jobs:
- name: Install Windows dependencies
if: startsWith(runner.os,'Windows')
run: |
vcpkg.exe install sqlite3:x64-windows zlib:x64-windows
# Bug in choco - need to install each package individually
choco upgrade llvm -y
# psutils is out of date
# choco upgrade psutils -y
choco upgrade openssl -y
# Should already be installed
# choco upgrade strawberryperl -y
vcpkg install openssl:x64-windows-static
choco upgrade protoc -y
rustup target add ${{ matrix.builds.target }}
Expand Down Expand Up @@ -282,10 +276,8 @@ jobs:
echo "SHELL_EXT=.bat" >> $GITHUB_ENV
echo "TS_DIST=\dist" >> $GITHUB_ENV
echo "PLATFORM_SPECIFIC_DIR=windows" >> $GITHUB_ENV
echo "SQLITE3_LIB_DIR=C:\vcpkg\installed\x64-windows\lib" >> $GITHUB_ENV
echo "OPENSSL_DIR=C:\Program Files\OpenSSL-Win64" >> $GITHUB_ENV
echo "LIBCLANG_PATH=C:\Program Files\LLVM\bin" >> $GITHUB_ENV
echo "C:\Strawberry\perl\bin" >> $GITHUB_PATH
echo "VCPKG_ROOT=C:\vcpkg" >> $GITHUB_ENV
echo "OPENSSL_DIR=C:\vcpkg\packages\openssl_x64-windows-static" >> $GITHUB_ENV
- name: Cache cargo files and outputs
if: ${{ ( ! startsWith(github.ref, 'refs/tags/v') ) && ( ! matrix.builds.cross ) && ( env.CARGO_CACHE ) }}
Expand Down Expand Up @@ -325,6 +317,10 @@ jobs:
echo "cargo options is: ${{ env.CARGO_OPTIONS }}"
echo "cross flag: ${{ matrix.builds.cross }}"
- name: Debug environment variables - Windows
if: startsWith(runner.os,'Windows')
run: printenv

- name: Build release binaries
shell: bash
run: |
Expand Down
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion base_layer/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ base_node = [
"base_node_proto",
"monero",
"randomx-rs",
"hickory-client",
]
base_node_proto = []
benches = ["base_node"]
Expand Down Expand Up @@ -96,7 +97,7 @@ tiny-keccak = { package = "tari-tiny-keccak", version = "2.0.2", features = [
"keccak",
] }
dirs-next = "1.0.2"
hickory-client = { version = "0.25.0-alpha.2", features = ["dns-over-rustls", "dnssec-openssl"] }
hickory-client = { version = "0.25.0-alpha.2", features = ["dns-over-rustls", "dnssec-openssl"], optional = true }
anyhow = "1.0.53"

[dev-dependencies]
Expand Down
59 changes: 35 additions & 24 deletions base_layer/core/src/base_node/tari_pulse_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{str::FromStr, time::Duration};
use std::{cmp::min, str::FromStr, time::Duration};

use futures::future;
use hickory_client::{
Expand All @@ -33,13 +33,13 @@ use hickory_client::{
rr::{DNSClass, Name, RData, Record, RecordType},
tcp::TcpClientStream,
};
use log::{error, info, warn};
use log::{debug, error, info, trace, warn};
use serde::{Deserialize, Serialize};
use tari_p2p::Network;
use tari_service_framework::{async_trait, ServiceInitializationError, ServiceInitializer, ServiceInitializerContext};
use tari_shutdown::ShutdownSignal;
use tari_utilities::hex::Hex;
use tokio::{net::TcpStream as TokioTcpStream, sync::watch, time};
use tokio::{net::TcpStream as TokioTcpStream, sync::watch, time, time::MissedTickBehavior};

use super::LocalNodeCommsInterface;
use crate::base_node::comms_interface::CommsInterfaceError;
Expand Down Expand Up @@ -121,31 +121,37 @@ impl TariPulseService {
notify_passed_checkpoints: watch::Sender<bool>,
) {
let mut interval = time::interval(self.config.check_interval);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
tokio::pin!(interval);
let mut shutdown_signal = self.shutdown_signal.clone();
let mut count = 0u64;
let mut skip_ticks = 0;
let mut skipped_ticks = 0;

loop {
tokio::select! {
_ = interval.tick() => {
let passed_checkpoints = match self.passed_checkpoints(&mut base_node_service).await {
Ok(passed) => {
interval = time::interval(self.config.check_interval); // reset interval if back to healthy
passed
},
Err(err) => {
warn!(target: LOG_TARGET, "Failed to check if node has passed checkpoints: {:?}", err);
let old_interval = interval.period().as_secs();
let new_interval = if old_interval > (60 * 30) {
warn!(target: LOG_TARGET, "Reached maximum retry interval of 30 minutes.");
old_interval
} else {
// increase interval if node repeatedly (up to 30 min) fails to fetch checkpoints
interval = time::interval(Duration::from_secs(old_interval * 2));
interval.tick().await;
interval.period().as_secs()
};
warn!(target: LOG_TARGET, "Retrying in {} seconds", new_interval);
continue;
},
count += 1;
trace!(target: LOG_TARGET, "Interval tick: {}", count);
if skipped_ticks < skip_ticks {
skipped_ticks += 1;
debug!(target: LOG_TARGET, "Skipping {} of {} ticks", skipped_ticks, skip_ticks);
continue;
}
let passed_checkpoints = {
match self.passed_checkpoints(&mut base_node_service).await {
Ok(passed) => {
skip_ticks = 0;
skipped_ticks = 0;
passed
},
Err(err) => {
warn!(target: LOG_TARGET, "Failed to check if node has passed checkpoints: {:?}", err);
skip_ticks = min(skip_ticks + 1, 30 * 60 / self.config.check_interval.as_secs());
skipped_ticks = 0;
continue;
},
}
};

notify_passed_checkpoints
Expand Down Expand Up @@ -174,7 +180,12 @@ impl TariPulseService {
.max_by(|a, b| a.0.cmp(&b.0))
.ok_or(CommsInterfaceError::InternalError("No checkpoints found".to_string()))?;
let local_checkpoints = self.get_node_block(base_node_service, max_height_block.0).await?;
Ok(local_checkpoints.1 == max_height_block.1)
let passed = local_checkpoints.1 == max_height_block.1;
trace!(
target: LOG_TARGET, "Passed checkpoints: {}, DNS: ({}, {}), Local: ({}, {})",
passed, max_height_block.0, max_height_block.1, local_checkpoints.0, local_checkpoints.1
);
Ok(passed)
}

async fn get_node_block(
Expand Down
2 changes: 2 additions & 0 deletions base_layer/wallet/src/connectivity_service/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ pub enum WalletConnectivityError {
PeerIndexOutOfBounds(String),
#[error("Rpc client pool error: {0}")]
RpcClientPoolError(#[from] RpcClientPoolError),
#[error("Client cancelled: '{0}'")]
ClientCancelled(String),
}

impl From<mpsc::SendError> for WalletConnectivityError {
Expand Down
32 changes: 25 additions & 7 deletions base_layer/wallet/src/connectivity_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{mem, pin::pin, time::Duration};
use std::{
cmp::{max, min},
mem,
pin::pin,
time::Duration,
};

use futures::{future, future::Either};
use log::*;
Expand All @@ -41,7 +46,7 @@ use tari_rpc_framework::{
use tokio::{
sync::{mpsc, oneshot},
time,
time::MissedTickBehavior,
time::{timeout, Duration as TokioDuration, MissedTickBehavior},
};

use crate::{
Expand Down Expand Up @@ -305,9 +310,12 @@ impl WalletConnectivityService {
peer_manager.get_current_peer().clone()
};

let mut loop_count = 0;
let number_of_seeds = peer_manager.get_state().1.len();
loop {
loop_count += 1;
self.set_online_status(OnlineStatus::Connecting);
match self.try_setup_rpc_pool(&peer).await {
match self.try_setup_rpc_pool(&peer, loop_count / number_of_seeds + 1).await {
Ok(true) => {
self.base_node_watch.send(Some(peer_manager.clone()));
self.notify_pending_requests().await;
Expand Down Expand Up @@ -360,7 +368,7 @@ impl WalletConnectivityService {
self.online_status_watch.send(status);
}

async fn try_setup_rpc_pool(&mut self, peer: &Peer) -> Result<bool, WalletConnectivityError> {
async fn try_setup_rpc_pool(&mut self, peer: &Peer, dial_cycle: usize) -> Result<bool, WalletConnectivityError> {
self.last_attempted_peer = Some(peer.peer_id());
let peer_id = peer.peer_id();
let dial_wait = self
Expand All @@ -385,10 +393,20 @@ impl WalletConnectivityService {

// Create the first RPC session to ensure that we can connect.
{
// dial_timeout: 1 = 1s, 2 = 10s, 3 = 20s, 4 = 30s, 5 = 40s, 6 = 50s, 7 = 60s, 8 = 70s, 9 = 80s, 10 = 90s
let dial_timeout = TokioDuration::from_secs(min((max(1, 10 * (dial_cycle.saturating_sub(1)))) as u64, 90));
trace!(target: LOG_TARGET, "Attempt dial with client timeout {:?}", dial_timeout);

let mut bn_changed_fut = pin!(self.base_node_watch.changed());
match future::select(dial_wait, &mut bn_changed_fut).await {
Either::Left((result, _)) => result?,
Either::Right(_) => return Ok(false),
match timeout(dial_timeout, future::select(dial_wait, &mut bn_changed_fut)).await {
Ok(Either::Left((result, _))) => result?,
Ok(Either::Right(_)) => return Ok(false),
Err(_) => {
return Err(WalletConnectivityError::ClientCancelled(format!(
"Could not connect to '{}' in {:?}",
peer_id, dial_timeout
)))
},
};
debug!(target: LOG_TARGET, "Dial succeeded for {peer_id}");
let connect_fut = pin!(container.base_node_wallet_rpc_client.get());
Expand Down
Loading

0 comments on commit 3beaa5f

Please sign in to comment.