diff --git a/narwhal/executor/src/subscriber.rs b/narwhal/executor/src/subscriber.rs index 0ce4c767ce3a1..0810b945fbd11 100644 --- a/narwhal/executor/src/subscriber.rs +++ b/narwhal/executor/src/subscriber.rs @@ -256,7 +256,7 @@ impl Fetcher { return Some(batch); } Ok(None) => debug!("Payload {} not found locally", digest), - Err(err) => error!("Error communicating with out own worker: {}", err), + Err(err) => error!("Error communicating with own worker: {}", err), } None } diff --git a/narwhal/primary/src/core.rs b/narwhal/primary/src/core.rs index bd9a781e72b39..92e825efbc05f 100644 --- a/narwhal/primary/src/core.rs +++ b/narwhal/primary/src/core.rs @@ -229,7 +229,7 @@ impl Core { self } - #[instrument(level = "debug", skip_all, fields(header_digest = ?header.digest()))] + #[instrument(level = "debug", skip_all, fields(header_digest = ?header.digest(), round = ?header.round))] async fn process_own_header(&mut self, header: Header) -> DagResult<()> { if header.epoch < self.committee.epoch() { debug!("Proposer outdated"); @@ -272,12 +272,15 @@ impl Core { #[async_recursion] #[instrument(level = "debug", skip_all, fields(header_digest = ?header.digest()))] async fn process_header_internal(&mut self, header: &Header, signed: bool) -> DagResult<()> { - debug!("Processing {:?} round:{:?}", header, header.round); let header_source = if self.name.eq(&header.author) { "own" } else { "other" }; + debug!( + "Processing {header_source} header {:?} round:{:?}", + header, header.round + ); // Indicate that we are processing this header. let inserted = self @@ -316,10 +319,7 @@ impl Core { .headers_suspended .with_label_values(&[&header.epoch.to_string(), "missing_parents"]) .inc(); - debug!( - "Processing of {} suspended: missing parent(s)", - header.digest() - ); + debug!("Processing header {header} suspended: missing parent(s)"); return Ok(()); } @@ -344,7 +344,7 @@ impl Core { .headers_suspended .with_label_values(&[&header.epoch.to_string(), "missing_payload"]) .inc(); - debug!("Processing of {header} suspended: missing payload"); + debug!("Processing header {header} suspended: missing payload"); return Ok(()); } @@ -449,9 +449,9 @@ impl Core { } #[async_recursion] - #[instrument(level = "debug", skip_all, fields(vote_digest = ?vote.digest()))] + #[instrument(level = "debug", skip_all, fields(vote_digest = ?vote.digest(), round = ?vote.round, from = ?vote.author))] async fn process_vote(&mut self, vote: Vote) -> DagResult<()> { - debug!("Processing {:?}", vote); + debug!("Processing vote {:?}", vote); // Add it to the votes' aggregator and try to make a new certificate. if let Some(certificate) = @@ -513,7 +513,7 @@ impl Core { } debug!( - "Processing {:?} round:{:?}", + "Processing certificate {:?} round:{:?}", certificate, certificate.round() ); @@ -562,7 +562,7 @@ impl Core { && !self.synchronizer.check_parents(&certificate).await? { debug!( - "Processing of {:?} suspended: missing ancestors", + "Processing certificate {:?} suspended: missing ancestors", certificate ); self.metrics diff --git a/narwhal/primary/src/header_waiter.rs b/narwhal/primary/src/header_waiter.rs index 0237b3623e48f..33b8525007841 100644 --- a/narwhal/primary/src/header_waiter.rs +++ b/narwhal/primary/src/header_waiter.rs @@ -24,7 +24,7 @@ use tokio::{ task::JoinHandle, time::Instant, }; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, warn}; use types::{ metered_channel::{Receiver, Sender}, BatchDigest, CertificateDigest, GetCertificatesRequest, GetCertificatesResponse, Header, @@ -189,17 +189,25 @@ impl HeaderWaiter { let certificates = result .map_err(|e| HeaderError::NetworkError(format!("{e:?}"), deliver.clone()))? .certificates; + debug!("HeaderWaiter: GetCertificates returned successfully for parents of {} num={}", deliver.clone(), certificates.len()); for certificate in certificates { tx_primary_messages .send(PrimaryMessage::Certificate(certificate)) .await .map_err(|_| HeaderError::ChannelFull(deliver.clone()))?; } + } else { + debug!("HeaderWaiter: No GetCertificates call needed for {}", deliver.clone()); } }, _ = &mut cancel => return Ok(deliver.clone()), } // Wait on certificates to show up in the store so we know they're processed by Core. + debug!( + "HeaderWaiter: Waiting for parents of {} to become available in store. num_missing={}", + deliver.clone(), + missing.len() + ); let waiting: Vec<_> = missing.into_iter().map(|x| store.notify_read(x)).collect(); tokio::select! { result = try_join_all(waiting) => { @@ -250,7 +258,6 @@ impl HeaderWaiter { .worker(&self.name, &worker_id) .expect("Author of valid header is not in the worker cache") .name; - let message = WorkerSynchronizeMessage{digests, target: author.clone()}; synchronize_handles.push(self.network.send(worker_name, &message).await); } @@ -272,7 +279,7 @@ impl HeaderWaiter { } WaiterMessage::SyncParents(missing, header) => { - debug!("Synching the parents of {header}"); + debug!("HeaderWaiter: Synching the parents of {header}"); let header_digest = header.digest(); let round = header.round; let author = header.author.clone(); @@ -295,8 +302,10 @@ impl HeaderWaiter { }); } let network_future: OptionFuture<_> = if requires_sync.is_empty() { + debug!("HeaderWaiter: sync requests inflight for missing parents of {header}"); None } else { + debug!("HeaderWaiter: sending GetCertificates to {author}, to fetch missing parents {:?} of {header}", requires_sync); let network = self.network.network(); let target = self.committee.network_key(&author).unwrap(); let message = GetCertificatesRequest{digests: requires_sync}; @@ -333,9 +342,12 @@ impl HeaderWaiter { }, Ok(header) => { self.cleanup_pending_requests(&header); - // Ok to drop the header if core is overloaded. - let _ = self.tx_headers_loopback.try_send(header); + if let Err(e) = self.tx_headers_loopback.try_send(header.clone()) { + warn!("Failed to loop back ready header {header} {e}"); + } else { + debug!("Header looped back to core: {header}"); + } }, }; }, // This request has been canceled when result is None.