From 8717c921d8e33e4b935573aadbffba7be0df5c6e Mon Sep 17 00:00:00 2001 From: lmatz Date: Fri, 10 Jan 2025 00:19:10 +0800 Subject: [PATCH] Revert "feat: batching telemetry event request avoid too many requests (#20000)" This reverts commit 1bc6bea26cd2c103b827837b007f3a1d5e2d6067. --- proto/telemetry.proto | 7 ++--- src/common/src/telemetry/report.rs | 37 +++------------------------ src/common/telemetry_event/src/lib.rs | 35 ++++++++----------------- 3 files changed, 16 insertions(+), 63 deletions(-) diff --git a/proto/telemetry.proto b/proto/telemetry.proto index 162ab80648aba..06e161506db15 100644 --- a/proto/telemetry.proto +++ b/proto/telemetry.proto @@ -7,7 +7,8 @@ option go_package = "risingwavelabs.com/risingwave/proto/telemetry"; enum MetaBackend { META_BACKEND_UNSPECIFIED = 0; META_BACKEND_MEMORY = 1; - META_BACKEND_ETCD = 2; + reserved 2; + reserved "META_BACKEND_ETCD"; META_BACKEND_RDB = 3; } @@ -166,7 +167,3 @@ message EventMessage { // mark the event is a test message bool is_test = 11; } - -message BatchEventMessage { - repeated EventMessage events = 1; -} diff --git a/src/common/src/telemetry/report.rs b/src/common/src/telemetry/report.rs index f4737038863b0..d4b1fd27335df 100644 --- a/src/common/src/telemetry/report.rs +++ b/src/common/src/telemetry/report.rs @@ -14,18 +14,13 @@ use std::sync::Arc; -use risingwave_pb::telemetry::PbEventMessage; +use risingwave_telemetry_event::get_telemetry_risingwave_cloud_uuid; pub use risingwave_telemetry_event::{ - current_timestamp, do_telemetry_event_report, post_telemetry_report_pb, - TELEMETRY_EVENT_REPORT_INTERVAL, TELEMETRY_REPORT_URL, TELEMETRY_TRACKING_ID, -}; -use risingwave_telemetry_event::{ - get_telemetry_risingwave_cloud_uuid, TELEMETRY_EVENT_REPORT_STASH_SIZE, - TELEMETRY_EVENT_REPORT_TX, + current_timestamp, post_telemetry_report_pb, TELEMETRY_REPORT_URL, TELEMETRY_TRACKING_ID, }; use tokio::sync::oneshot::Sender; use tokio::task::JoinHandle; -use tokio::time::{interval as tokio_interval_fn, Duration}; +use tokio::time::{interval, Duration}; use uuid::Uuid; use super::{Result, TELEMETRY_REPORT_INTERVAL}; @@ -65,13 +60,9 @@ where let begin_time = std::time::Instant::now(); let session_id = Uuid::new_v4().to_string(); - let mut interval = tokio_interval_fn(Duration::from_secs(TELEMETRY_REPORT_INTERVAL)); + let mut interval = interval(Duration::from_secs(TELEMETRY_REPORT_INTERVAL)); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - let mut event_interval = - tokio_interval_fn(Duration::from_secs(TELEMETRY_EVENT_REPORT_INTERVAL)); - event_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - // fetch telemetry tracking_id from the meta node only at the beginning // There is only one case tracking_id updated at the runtime ---- metastore data has been // cleaned. There is no way that metastore has been cleaned but nodes are still running @@ -100,29 +91,9 @@ where ) }); - let (tx, mut event_rx) = tokio::sync::mpsc::unbounded_channel::(); - TELEMETRY_EVENT_REPORT_TX.set(tx).unwrap_or_else(|_| { - tracing::warn!( - "Telemetry failed to set event reporting tx, event reporting will be disabled" - ); - }); - let mut event_stash = Vec::new(); - loop { tokio::select! { _ = interval.tick() => {}, - event = event_rx.recv() => { - debug_assert!(event.is_some()); - event_stash.push(event.unwrap()); - if event_stash.len() >= TELEMETRY_EVENT_REPORT_STASH_SIZE { - do_telemetry_event_report(&mut event_stash).await; - } - continue; - } - _ = event_interval.tick() => { - do_telemetry_event_report(&mut event_stash).await; - continue; - }, _ = &mut shutdown_rx => { tracing::info!("Telemetry exit"); return; diff --git a/src/common/telemetry_event/src/lib.rs b/src/common/telemetry_event/src/lib.rs index 18874f4818627..0be5f40e0de1c 100644 --- a/src/common/telemetry_event/src/lib.rs +++ b/src/common/telemetry_event/src/lib.rs @@ -21,11 +21,9 @@ use std::sync::OnceLock; use prost::Message; use risingwave_pb::telemetry::{ - EventMessage as PbEventMessage, PbBatchEventMessage, PbTelemetryDatabaseObject, + EventMessage as PbEventMessage, PbTelemetryDatabaseObject, TelemetryEventStage as PbTelemetryEventStage, }; -use thiserror_ext::AsReport; -use tokio::sync::mpsc::UnboundedSender; pub use util::*; pub type TelemetryResult = core::result::Result; @@ -34,7 +32,6 @@ pub type TelemetryResult = core::result::Result; pub type TelemetryError = String; pub static TELEMETRY_TRACKING_ID: OnceLock = OnceLock::new(); -pub static TELEMETRY_EVENT_REPORT_TX: OnceLock> = OnceLock::new(); pub const TELEMETRY_REPORT_URL: &str = "https://telemetry.risingwave.dev/api/v2/report"; @@ -45,21 +42,6 @@ pub fn get_telemetry_risingwave_cloud_uuid() -> Option { env::var(TELEMETRY_RISINGWAVE_CLOUD_UUID).ok() } -pub async fn do_telemetry_event_report(event_stash: &mut Vec) { - const TELEMETRY_EVENT_REPORT_TYPE: &str = "events"; // the batch report url - let url = (TELEMETRY_REPORT_URL.to_owned() + "/" + TELEMETRY_EVENT_REPORT_TYPE).to_owned(); - let batch_message = PbBatchEventMessage { - events: std::mem::take(event_stash), - }; - - post_telemetry_report_pb(&url, batch_message.encode_to_vec()) - .await - .unwrap_or_else(|e| tracing::debug!("{}", e)); -} - -pub const TELEMETRY_EVENT_REPORT_INTERVAL: u64 = 10; // 10 seconds -pub const TELEMETRY_EVENT_REPORT_STASH_SIZE: usize = 100; // 100 events to trigger a report action - pub fn report_event_common( event_stage: PbTelemetryEventStage, event_name: &str, @@ -113,12 +95,15 @@ pub fn request_to_telemetry_event( node, is_test, }; - - if let Some(tx) = TELEMETRY_EVENT_REPORT_TX.get() { - let _ = tx.send(event).inspect_err(|e| { - tracing::warn!("Failed to send telemetry event queue: {}", e.as_report()) - }); - } + let report_bytes = event.encode_to_vec(); + + tokio::spawn(async move { + const TELEMETRY_EVENT_REPORT_TYPE: &str = "event"; + let url = (TELEMETRY_REPORT_URL.to_owned() + "/" + TELEMETRY_EVENT_REPORT_TYPE).to_owned(); + post_telemetry_report_pb(&url, report_bytes) + .await + .unwrap_or_else(|e| tracing::info!("{}", e)) + }); } #[cfg(test)]