From f71fccc25250833dda2e162b7fe6f819f1306afa Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 27 Nov 2024 14:44:26 +0800 Subject: [PATCH] reset trait change --- src/meta/src/barrier/context/context_impl.rs | 9 ++------- src/meta/src/barrier/context/mod.rs | 9 ++------- src/meta/src/barrier/worker.rs | 4 ++-- 3 files changed, 6 insertions(+), 16 deletions(-) diff --git a/src/meta/src/barrier/context/context_impl.rs b/src/meta/src/barrier/context/context_impl.rs index b2d0ce9fe941a..a687312f29e16 100644 --- a/src/meta/src/barrier/context/context_impl.rs +++ b/src/meta/src/barrier/context/context_impl.rs @@ -15,7 +15,6 @@ use std::sync::Arc; use futures::future::try_join_all; -use risingwave_common::catalog::DatabaseId; use risingwave_pb::common::WorkerNode; use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::meta::PausedReason; @@ -43,11 +42,7 @@ impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl { self.scheduled_barriers.next_scheduled().await } - fn abort_and_mark_blocked( - &self, - database_id: Option, - recovery_reason: RecoveryReason, - ) { + fn abort_and_mark_blocked(&self, recovery_reason: RecoveryReason) { self.set_status(BarrierManagerStatus::Recovering(recovery_reason)); // Mark blocked and abort buffered schedules, they might be dirty already. @@ -55,7 +50,7 @@ impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl { .abort_and_mark_blocked("cluster is under recovering"); } - fn mark_ready(&self, database_id: Option) { + fn mark_ready(&self) { self.scheduled_barriers.mark_ready(); self.set_status(BarrierManagerStatus::Running); } diff --git a/src/meta/src/barrier/context/mod.rs b/src/meta/src/barrier/context/mod.rs index 61136121c67af..7306c16171621 100644 --- a/src/meta/src/barrier/context/mod.rs +++ b/src/meta/src/barrier/context/mod.rs @@ -19,7 +19,6 @@ use std::future::Future; use std::sync::Arc; use arc_swap::ArcSwap; -use risingwave_common::catalog::DatabaseId; use risingwave_pb::common::WorkerNode; use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest; @@ -43,12 +42,8 @@ pub(super) trait GlobalBarrierWorkerContext: Send + Sync + 'static { ) -> impl Future> + Send + '_; async fn next_scheduled(&self) -> Scheduled; - fn abort_and_mark_blocked( - &self, - database_id: Option, - recovery_reason: RecoveryReason, - ); - fn mark_ready(&self, database_id: Option); + fn abort_and_mark_blocked(&self, recovery_reason: RecoveryReason); + fn mark_ready(&self); fn post_collect_command<'a>( &'a self, diff --git a/src/meta/src/barrier/worker.rs b/src/meta/src/barrier/worker.rs index 57cea71ad4e7a..36edb92634419 100644 --- a/src/meta/src/barrier/worker.rs +++ b/src/meta/src/barrier/worker.rs @@ -531,12 +531,12 @@ impl GlobalBarrierWorker { err: Option, recovery_reason: RecoveryReason, ) { - self.context.abort_and_mark_blocked(None, recovery_reason); + self.context.abort_and_mark_blocked(recovery_reason); // Clear all control streams to release resources (connections to compute nodes) first. self.control_stream_manager.clear(); self.recovery_inner(paused_reason, err).await; - self.context.mark_ready(None); + self.context.mark_ready(); } async fn recovery_inner(