Skip to content

Commit

Permalink
Add debug logs to header proposing and voting (#6101)
Browse files Browse the repository at this point in the history
Before these components are deleted, it seems it is still useful to have more visibility into their logic, especially there are known issues.
  • Loading branch information
mwtian committed Nov 14, 2022
1 parent 673a040 commit 067600e
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 17 deletions.
2 changes: 1 addition & 1 deletion narwhal/executor/src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ impl<Network: SubscriberNetwork> Fetcher<Network> {
return Some(batch);
}
Ok(None) => debug!("Payload {} not found locally", digest),
Err(err) => error!("Error communicating with out own worker: {}", err),
Err(err) => error!("Error communicating with own worker: {}", err),
}
None
}
Expand Down
22 changes: 11 additions & 11 deletions narwhal/primary/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ impl Core {
self
}

#[instrument(level = "debug", skip_all, fields(header_digest = ?header.digest()))]
#[instrument(level = "debug", skip_all, fields(header_digest = ?header.digest(), round = ?header.round))]
async fn process_own_header(&mut self, header: Header) -> DagResult<()> {
if header.epoch < self.committee.epoch() {
debug!("Proposer outdated");
Expand Down Expand Up @@ -272,12 +272,15 @@ impl Core {
#[async_recursion]
#[instrument(level = "debug", skip_all, fields(header_digest = ?header.digest()))]
async fn process_header_internal(&mut self, header: &Header, signed: bool) -> DagResult<()> {
debug!("Processing {:?} round:{:?}", header, header.round);
let header_source = if self.name.eq(&header.author) {
"own"
} else {
"other"
};
debug!(
"Processing {header_source} header {:?} round:{:?}",
header, header.round
);

// Indicate that we are processing this header.
let inserted = self
Expand Down Expand Up @@ -316,10 +319,7 @@ impl Core {
.headers_suspended
.with_label_values(&[&header.epoch.to_string(), "missing_parents"])
.inc();
debug!(
"Processing of {} suspended: missing parent(s)",
header.digest()
);
debug!("Processing header {header} suspended: missing parent(s)");
return Ok(());
}

Expand All @@ -344,7 +344,7 @@ impl Core {
.headers_suspended
.with_label_values(&[&header.epoch.to_string(), "missing_payload"])
.inc();
debug!("Processing of {header} suspended: missing payload");
debug!("Processing header {header} suspended: missing payload");
return Ok(());
}

Expand Down Expand Up @@ -449,9 +449,9 @@ impl Core {
}

#[async_recursion]
#[instrument(level = "debug", skip_all, fields(vote_digest = ?vote.digest()))]
#[instrument(level = "debug", skip_all, fields(vote_digest = ?vote.digest(), round = ?vote.round, from = ?vote.author))]
async fn process_vote(&mut self, vote: Vote) -> DagResult<()> {
debug!("Processing {:?}", vote);
debug!("Processing vote {:?}", vote);

// Add it to the votes' aggregator and try to make a new certificate.
if let Some(certificate) =
Expand Down Expand Up @@ -513,7 +513,7 @@ impl Core {
}

debug!(
"Processing {:?} round:{:?}",
"Processing certificate {:?} round:{:?}",
certificate,
certificate.round()
);
Expand Down Expand Up @@ -562,7 +562,7 @@ impl Core {
&& !self.synchronizer.check_parents(&certificate).await?
{
debug!(
"Processing of {:?} suspended: missing ancestors",
"Processing certificate {:?} suspended: missing ancestors",
certificate
);
self.metrics
Expand Down
22 changes: 17 additions & 5 deletions narwhal/primary/src/header_waiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use tokio::{
task::JoinHandle,
time::Instant,
};
use tracing::{debug, error, info};
use tracing::{debug, error, info, warn};
use types::{
metered_channel::{Receiver, Sender},
BatchDigest, CertificateDigest, GetCertificatesRequest, GetCertificatesResponse, Header,
Expand Down Expand Up @@ -189,17 +189,25 @@ impl HeaderWaiter {
let certificates = result
.map_err(|e| HeaderError::NetworkError(format!("{e:?}"), deliver.clone()))?
.certificates;
debug!("HeaderWaiter: GetCertificates returned successfully for parents of {} num={}", deliver.clone(), certificates.len());
for certificate in certificates {
tx_primary_messages
.send(PrimaryMessage::Certificate(certificate))
.await
.map_err(|_| HeaderError::ChannelFull(deliver.clone()))?;
}
} else {
debug!("HeaderWaiter: No GetCertificates call needed for {}", deliver.clone());
}
},
_ = &mut cancel => return Ok(deliver.clone()),
}
// Wait on certificates to show up in the store so we know they're processed by Core.
debug!(
"HeaderWaiter: Waiting for parents of {} to become available in store. num_missing={}",
deliver.clone(),
missing.len()
);
let waiting: Vec<_> = missing.into_iter().map(|x| store.notify_read(x)).collect();
tokio::select! {
result = try_join_all(waiting) => {
Expand Down Expand Up @@ -250,7 +258,6 @@ impl HeaderWaiter {
.worker(&self.name, &worker_id)
.expect("Author of valid header is not in the worker cache")
.name;

let message = WorkerSynchronizeMessage{digests, target: author.clone()};
synchronize_handles.push(self.network.send(worker_name, &message).await);
}
Expand All @@ -272,7 +279,7 @@ impl HeaderWaiter {
}

WaiterMessage::SyncParents(missing, header) => {
debug!("Synching the parents of {header}");
debug!("HeaderWaiter: Synching the parents of {header}");
let header_digest = header.digest();
let round = header.round;
let author = header.author.clone();
Expand All @@ -295,8 +302,10 @@ impl HeaderWaiter {
});
}
let network_future: OptionFuture<_> = if requires_sync.is_empty() {
debug!("HeaderWaiter: sync requests inflight for missing parents of {header}");
None
} else {
debug!("HeaderWaiter: sending GetCertificates to {author}, to fetch missing parents {:?} of {header}", requires_sync);
let network = self.network.network();
let target = self.committee.network_key(&author).unwrap();
let message = GetCertificatesRequest{digests: requires_sync};
Expand Down Expand Up @@ -333,9 +342,12 @@ impl HeaderWaiter {
},
Ok(header) => {
self.cleanup_pending_requests(&header);

// Ok to drop the header if core is overloaded.
let _ = self.tx_headers_loopback.try_send(header);
if let Err(e) = self.tx_headers_loopback.try_send(header.clone()) {
warn!("Failed to loop back ready header {header} {e}");
} else {
debug!("Header looped back to core: {header}");
}
},
};
}, // This request has been canceled when result is None.
Expand Down

0 comments on commit 067600e

Please sign in to comment.