Skip to content

Commit

Permalink
migrate to tikv/minitrace-rust
Browse files Browse the repository at this point in the history
Signed-off-by: Andy Lok <[email protected]>
  • Loading branch information
andylokandy committed Dec 18, 2023
1 parent 87f5c5c commit 5695e2e
Show file tree
Hide file tree
Showing 17 changed files with 104 additions and 113 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ either = "1.6"
fail = "0.4"
futures = { version = "0.3" }
lazy_static = "1"
log = "0.4"
log = { version = "0.4", features = ["kv_unstable"] }
minitrace = "0.6.2"
pin-project = "1"
prometheus = { version = "0.13", default-features = false }
prost = "0.12"
Expand All @@ -44,7 +45,6 @@ serde_derive = "1.0"
thiserror = "1"
tokio = { version = "1", features = ["sync", "rt-multi-thread", "macros"] }
tonic = { version = "0.10", features = ["tls"] }
tracing = "0.1.40"

[dev-dependencies]
clap = "2"
Expand Down
2 changes: 1 addition & 1 deletion src/common/security.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ use std::path::Path;
use std::path::PathBuf;
use std::time::Duration;

use log::info;
use regex::Regex;
use tonic::transport::Certificate;
use tonic::transport::Channel;
use tonic::transport::ClientTlsConfig;
use tonic::transport::Identity;
use tracing::info;

use crate::internal_err;
use crate::Result;
Expand Down
2 changes: 1 addition & 1 deletion src/pd/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use std::sync::Arc;
use async_trait::async_trait;
use futures::prelude::*;
use futures::stream::BoxStream;
use log::info;
use tokio::sync::RwLock;
use tracing::info;

use crate::compat::stream_fn;
use crate::kv::codec;
Expand Down
11 changes: 5 additions & 6 deletions src/pd/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@ use std::time::Duration;
use std::time::Instant;

use async_trait::async_trait;
use log::error;
use log::info;
use log::warn;
use tonic::transport::Channel;
use tonic::IntoRequest;
use tonic::Request;
use tracing::error;
use tracing::info;
use tracing::instrument;
use tracing::warn;

use super::timestamp::TimestampOracle;
use crate::internal_err;
Expand Down Expand Up @@ -104,7 +103,7 @@ impl Connection {
Connection { security_mgr }
}

#[instrument(name = "pd::Connection::connect_cluster", skip_all)]
#[minitrace::trace]
pub async fn connect_cluster(
&self,
endpoints: &[String],
Expand All @@ -124,7 +123,7 @@ impl Connection {
}

// Re-establish connection with PD leader in asynchronous fashion.
#[instrument(name = "pd::Connection::reconnect", skip_all)]
#[minitrace::trace]
pub async fn reconnect(&self, cluster: &mut Cluster, timeout: Duration) -> Result<()> {
warn!("updating pd client");
let start = Instant::now();
Expand Down
22 changes: 10 additions & 12 deletions src/pd/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ use std::time::Duration;
use std::time::Instant;

use async_trait::async_trait;
use log::debug;
use minitrace::prelude::*;
use tokio::sync::RwLock;
use tokio::time::sleep;
use tracing::debug;
use tracing::info_span;
use tracing::instrument;

use crate::pd::Cluster;
use crate::pd::Connection;
Expand Down Expand Up @@ -78,8 +77,7 @@ macro_rules! retry_core {
let stats = pd_stats($tag);
let mut last_err = Ok(());
for retry in 0..LEADER_CHANGE_RETRY {
let span = info_span!("RetryClient::retry", retry);
let _enter = span.enter();
let _span = LocalSpan::enter_with_local_parent("RetryClient::retry");

let res = $call;

Expand All @@ -88,7 +86,7 @@ macro_rules! retry_core {
Err(e) => last_err = Err(e),
}

debug!("retry on last_err: {:?}", last_err);
debug!("retry {} on last_err: {:?}", retry, last_err);
let mut reconnect_count = MAX_REQUEST_COUNT;
while let Err(e) = $self.reconnect(RECONNECT_INTERVAL_SEC).await {
reconnect_count -= 1;
Expand Down Expand Up @@ -149,7 +147,7 @@ impl RetryClient<Cluster> {
impl RetryClientTrait for RetryClient<Cluster> {
// These get_* functions will try multiple times to make a request, reconnecting as necessary.
// It does not know about encoding. Caller should take care of it.
#[instrument(name = "RetryClient::get_region", skip_all)]
#[minitrace::trace]
async fn get_region(self: Arc<Self>, key: Vec<u8>) -> Result<RegionWithLeader> {
retry_mut!(self, "get_region", |cluster| {
let key = key.clone();
Expand All @@ -164,7 +162,7 @@ impl RetryClientTrait for RetryClient<Cluster> {
})
}

#[instrument(name = "RetryClient::get_region_by_id", skip(self))]
#[minitrace::trace]
async fn get_region_by_id(self: Arc<Self>, region_id: RegionId) -> Result<RegionWithLeader> {
retry_mut!(self, "get_region_by_id", |cluster| async {
cluster
Expand All @@ -176,7 +174,7 @@ impl RetryClientTrait for RetryClient<Cluster> {
})
}

#[instrument(name = "RetryClient::get_store", skip(self))]
#[minitrace::trace]
async fn get_store(self: Arc<Self>, id: StoreId) -> Result<metapb::Store> {
retry_mut!(self, "get_store", |cluster| async {
cluster
Expand All @@ -186,7 +184,7 @@ impl RetryClientTrait for RetryClient<Cluster> {
})
}

#[instrument(name = "RetryClient::get_all_stores", skip(self))]
#[minitrace::trace]
async fn get_all_stores(self: Arc<Self>) -> Result<Vec<metapb::Store>> {
retry_mut!(self, "get_all_stores", |cluster| async {
cluster
Expand All @@ -196,12 +194,12 @@ impl RetryClientTrait for RetryClient<Cluster> {
})
}

#[instrument(name = "RetryClient::get_timestamp", skip(self))]
#[minitrace::trace]
async fn get_timestamp(self: Arc<Self>) -> Result<Timestamp> {
retry!(self, "get_timestamp", |cluster| cluster.get_timestamp())
}

#[instrument(name = "RetryClient::update_safepoint", skip(self))]
#[minitrace::trace]
async fn update_safepoint(self: Arc<Self>, safepoint: u64) -> Result<bool> {
retry_mut!(self, "update_gc_safepoint", |cluster| async {
cluster
Expand Down
16 changes: 7 additions & 9 deletions src/pd/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@ use futures::prelude::*;
use futures::task::AtomicWaker;
use futures::task::Context;
use futures::task::Poll;
use log::debug;
use log::info;
use minitrace::prelude::*;
use pin_project::pin_project;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::sync::Mutex;
use tonic::transport::Channel;
use tracing::debug;
use tracing::info;
use tracing::info_span;
use tracing::instrument;

use crate::internal_err;
use crate::proto::pdpb::pd_client::PdClient;
Expand Down Expand Up @@ -65,7 +64,7 @@ impl TimestampOracle {
Ok(TimestampOracle { request_tx })
}

#[instrument(name = "TimestampOracle::get_timestamp", skip_all)]
#[minitrace::trace]
pub(crate) async fn get_timestamp(self) -> Result<Timestamp> {
debug!("getting current timestamp");
let (request, response) = oneshot::channel();
Expand All @@ -77,7 +76,7 @@ impl TimestampOracle {
}
}

#[instrument(name = "TimestampOracle::run_tso", skip_all)]
#[minitrace::trace]
async fn run_tso(
cluster_id: u64,
mut pd_client: PdClient<Channel>,
Expand All @@ -102,8 +101,7 @@ async fn run_tso(
let mut responses = pd_client.tso(request_stream).await?.into_inner();

while let Some(Ok(resp)) = responses.next().await {
let span = info_span!("handle_response");
let _enter = span.enter();
let _span = LocalSpan::enter_with_local_parent("handle_response");
debug!("got response: {:?}", resp);

{
Expand Down Expand Up @@ -136,7 +134,7 @@ struct TsoRequestStream {
impl Stream for TsoRequestStream {
type Item = TsoRequest;

#[instrument(name = "TsoRequestStream::poll_next", skip_all)]
#[minitrace::trace]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let mut this = self.project();

Expand Down
2 changes: 1 addition & 1 deletion src/raw/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::sync::Arc;
use std::u32;

use futures::StreamExt;
use tracing::debug;
use log::debug;

use crate::backoff::DEFAULT_REGION_BACKOFF;
use crate::common::Error;
Expand Down
2 changes: 1 addition & 1 deletion src/request/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ mod test {

impl HasLocks for MockRpcResponse {}

#[derive(Clone, Debug)]
#[derive(Debug, Clone)]
struct MockKvRequest {
test_invoking_count: Arc<AtomicUsize>,
}
Expand Down
59 changes: 28 additions & 31 deletions src/request/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,12 @@ use async_recursion::async_recursion;
use async_trait::async_trait;
use futures::future::try_join_all;
use futures::prelude::*;
use log::debug;
use log::info;
use minitrace::future::FutureExt;
use minitrace::prelude::*;
use tokio::sync::Semaphore;
use tokio::time::sleep;
use tracing::debug;
use tracing::info;
use tracing::info_span;
use tracing::instrument;
// use tracing::span;
// use tracing::Instrument;

use crate::backoff::Backoff;
use crate::pd::PdClient;
Expand Down Expand Up @@ -61,7 +59,7 @@ pub struct Dispatch<Req: KvRequest> {
impl<Req: KvRequest> Plan for Dispatch<Req> {
type Result = Req::Response;

#[instrument(name = "Dispatch::execute", skip_all, fields(label = self.request.label()))]
#[minitrace::trace]
async fn execute(&self) -> Result<Self::Result> {
let stats = tikv_stats(self.request.label());
let result = self
Expand Down Expand Up @@ -109,7 +107,7 @@ where
{
// A plan may involve multiple shards
#[async_recursion]
// #[instrument(skip_all)]
#[minitrace::trace]
async fn single_plan_handler(
pd_client: Arc<PdC>,
current_plan: P,
Expand All @@ -121,20 +119,20 @@ where
let mut handles = Vec::new();
for shard in shards {
let (shard, region_store) = shard?;
// let span = span!(tracing::Level::INFO, "shard", ?region_store);

let mut clone = current_plan.clone();
clone.apply_shard(shard, &region_store)?;
let handle = tokio::spawn(Self::single_shard_handler(
pd_client.clone(),
clone,
region_store,
backoff.clone(),
permits.clone(),
preserve_region_results,
));
let handle = tokio::spawn(
Self::single_shard_handler(
pd_client.clone(),
clone,
region_store,
backoff.clone(),
permits.clone(),
preserve_region_results,
)
.in_span(Span::enter_with_local_parent("single_shard_handler")),
);
handles.push(handle);
// handles.push(handle.instrument(span));
}

let results = try_join_all(handles).await?;
Expand All @@ -158,7 +156,7 @@ where
}

#[async_recursion]
// #[instrument(skip_all, fields(region_store = ?region_store))]
#[minitrace::trace]
async fn single_shard_handler(
pd_client: Arc<PdC>,
plan: P,
Expand Down Expand Up @@ -220,7 +218,7 @@ where
// 1. Ok(true): error has been resolved, retry immediately
// 2. Ok(false): backoff, and then retry
// 3. Err(Error): can't be resolved, return the error to upper level
// #[instrument(skip_all, fields(region_store = ?region_store, error = ?e))]
#[minitrace::trace]
async fn handle_region_error(
pd_client: Arc<PdC>,
e: errorpb::Error,
Expand Down Expand Up @@ -282,7 +280,7 @@ where
// 1. Ok(true): error has been resolved, retry immediately
// 2. Ok(false): backoff, and then retry
// 3. Err(Error): can't be resolved, return the error to upper level
// #[instrument(skip_all, fields(region_store = ?region_store, error = ?error))]
#[minitrace::trace]
async fn on_region_epoch_not_match(
pd_client: Arc<PdC>,
region_store: RegionStore,
Expand Down Expand Up @@ -319,7 +317,7 @@ where
Ok(false)
}

// #[instrument(skip_all, fields(region_store = ?region_store, error = ?e))]
#[minitrace::trace]
async fn handle_grpc_error(
pd_client: Arc<PdC>,
plan: P,
Expand All @@ -329,6 +327,7 @@ where
preserve_region_results: bool,
e: Error,
) -> Result<<Self as Plan>::Result> {
debug!("handle grpc error: {:?}", e);
let ver_id = region_store.region_with_leader.ver_id();
pd_client.invalidate_region_cache(ver_id).await;
match backoff.next_delay_duration() {
Expand Down Expand Up @@ -366,7 +365,7 @@ where
{
type Result = Vec<Result<P::Result>>;

#[instrument(name = "RetryableMultiRegion::execute", skip_all)]
#[minitrace::trace]
async fn execute(&self) -> Result<Self::Result> {
// Limit the maximum concurrency of multi-region request. If there are
// too many concurrent requests, TiKV is more likely to return a "TiKV
Expand Down Expand Up @@ -487,7 +486,7 @@ impl<In: Clone + Send + Sync + 'static, P: Plan<Result = Vec<Result<In>>>, M: Me
{
type Result = M::Out;

#[instrument(name = "MergeResponse::execute", skip_all)]
#[minitrace::trace]
async fn execute(&self) -> Result<Self::Result> {
self.merge.merge(self.inner.execute().await?)
}
Expand Down Expand Up @@ -584,17 +583,15 @@ where
{
type Result = P::Result;

#[minitrace::trace]
async fn execute(&self) -> Result<Self::Result> {
// let span = info_span!("ResolveLock::execute");
// let _enter = span.enter();

let mut result = self.inner.execute().await?;
let mut clone = self.clone();
let mut retry_cnt = 0;
loop {
retry_cnt += 1;
let span = info_span!("ResolveLock::execute::retry", retry_cnt);
let _enter = span.enter();
let _span = LocalSpan::enter_with_local_parent("ResolveLock::execute::retry")
.with_property(|| ("retry_count", retry_cnt.to_string()));

let locks = result.take_locks();
if locks.is_empty() {
Expand Down Expand Up @@ -704,7 +701,7 @@ where
{
type Result = CleanupLocksResult;

#[instrument(name = "CleanupLocks::execute", skip_all)]
#[minitrace::trace]
async fn execute(&self) -> Result<Self::Result> {
let mut result = CleanupLocksResult::default();
let mut inner = self.inner.clone();
Expand Down
Loading

0 comments on commit 5695e2e

Please sign in to comment.