Skip to content

Commit

Permalink
feat: add more span on mito engine
Browse files Browse the repository at this point in the history
  • Loading branch information
lyang24 committed Sep 3, 2024
1 parent 5e4bac2 commit 66719de
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 2 deletions.
2 changes: 2 additions & 0 deletions src/mito2/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use std::time::Duration;

use api::v1::OpType;
use async_trait::async_trait;
use common_telemetry::tracing;
use common_time::Timestamp;
use datafusion_common::arrow::array::UInt8Array;
use datatypes::arrow;
Expand Down Expand Up @@ -727,6 +728,7 @@ pub type BoxedBatchStream = BoxStream<'static, Result<Batch>>;

#[async_trait::async_trait]
impl<T: BatchReader + ?Sized> BatchReader for Box<T> {
#[tracing::instrument(skip_all)]
async fn next_batch(&mut self) -> Result<Option<Batch>> {
(**self).next_batch().await
}
Expand Down
4 changes: 3 additions & 1 deletion src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::time::{Duration, Instant};

use common_error::ext::BoxedError;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::{debug, error, warn};
use common_telemetry::{debug, error, tracing, warn};
use common_time::range::TimestampRange;
use common_time::Timestamp;
use datafusion::physical_plan::DisplayFormatType;
Expand Down Expand Up @@ -62,6 +62,7 @@ pub(crate) enum Scanner {

impl Scanner {
/// Returns a [SendableRecordBatchStream] to retrieve scan results from all partitions.
#[tracing::instrument(skip_all)]
pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
match self {
Scanner::Seq(seq_scan) => seq_scan.build_stream(),
Expand All @@ -70,6 +71,7 @@ impl Scanner {
}

/// Returns a [RegionScanner] to scan the region.
#[tracing::instrument(skip_all)]
pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
match self {
Scanner::Seq(seq_scan) => Ok(Box::new(seq_scan)),
Expand Down
3 changes: 2 additions & 1 deletion src/mito2/src/read/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use common_error::ext::BoxedError;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::util::ChainedRecordBatchStream;
use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
use common_telemetry::debug;
use common_telemetry::{debug, tracing};
use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
use datatypes::schema::SchemaRef;
use smallvec::smallvec;
Expand Down Expand Up @@ -244,6 +244,7 @@ impl SeqScan {
maybe_reader
}

#[tracing::instrument(skip_all)]
async fn build_reader_from_sources(
stream_ctx: &StreamContext,
mut sources: Vec<Source>,
Expand Down
2 changes: 2 additions & 0 deletions src/mito2/src/region_write_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::mem;
use std::sync::Arc;

use api::v1::{Mutation, OpType, Rows, WalEntry};
use common_telemetry::tracing;
use snafu::ResultExt;
use store_api::logstore::provider::Provider;
use store_api::logstore::LogStore;
Expand Down Expand Up @@ -190,6 +191,7 @@ impl RegionWriteCtx {
}

/// Consumes mutations and writes them into mutable memtable.
#[tracing::instrument(skip_all)]
pub(crate) fn write_memtable(&mut self) {
debug_assert_eq!(self.notifiers.len(), self.wal_entry.mutations.len());

Expand Down
2 changes: 2 additions & 0 deletions src/mito2/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::sync::Arc;

use api::v1::WalEntry;
use common_error::ext::BoxedError;
use common_telemetry::tracing;
use futures::future::BoxFuture;
use futures::stream::BoxStream;
use prost::Message;
Expand Down Expand Up @@ -202,6 +203,7 @@ impl<S: LogStore> WalWriter<S> {
}

/// Write all buffered entries to the WAL.
#[tracing::instrument(skip_all)]
pub async fn write_to_wal(&mut self) -> Result<AppendBatchResponse> {
// TODO(yingwen): metrics.

Expand Down

0 comments on commit 66719de

Please sign in to comment.