Skip to content

Commit

Permalink
reset trait change
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Nov 27, 2024
1 parent ed9266e commit f71fccc
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 16 deletions.
9 changes: 2 additions & 7 deletions src/meta/src/barrier/context/context_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use std::sync::Arc;

use futures::future::try_join_all;
use risingwave_common::catalog::DatabaseId;
use risingwave_pb::common::WorkerNode;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::meta::PausedReason;
Expand Down Expand Up @@ -43,19 +42,15 @@ impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl {
self.scheduled_barriers.next_scheduled().await
}

fn abort_and_mark_blocked(
&self,
database_id: Option<DatabaseId>,
recovery_reason: RecoveryReason,
) {
fn abort_and_mark_blocked(&self, recovery_reason: RecoveryReason) {
self.set_status(BarrierManagerStatus::Recovering(recovery_reason));

// Mark blocked and abort buffered schedules, they might be dirty already.
self.scheduled_barriers
.abort_and_mark_blocked("cluster is under recovering");
}

fn mark_ready(&self, database_id: Option<DatabaseId>) {
fn mark_ready(&self) {
self.scheduled_barriers.mark_ready();
self.set_status(BarrierManagerStatus::Running);
}
Expand Down
9 changes: 2 additions & 7 deletions src/meta/src/barrier/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use std::future::Future;
use std::sync::Arc;

use arc_swap::ArcSwap;
use risingwave_common::catalog::DatabaseId;
use risingwave_pb::common::WorkerNode;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
Expand All @@ -43,12 +42,8 @@ pub(super) trait GlobalBarrierWorkerContext: Send + Sync + 'static {
) -> impl Future<Output = MetaResult<HummockVersionStats>> + Send + '_;

async fn next_scheduled(&self) -> Scheduled;
fn abort_and_mark_blocked(
&self,
database_id: Option<DatabaseId>,
recovery_reason: RecoveryReason,
);
fn mark_ready(&self, database_id: Option<DatabaseId>);
fn abort_and_mark_blocked(&self, recovery_reason: RecoveryReason);
fn mark_ready(&self);

fn post_collect_command<'a>(
&'a self,
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/barrier/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,12 +531,12 @@ impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
err: Option<MetaError>,
recovery_reason: RecoveryReason,
) {
self.context.abort_and_mark_blocked(None, recovery_reason);
self.context.abort_and_mark_blocked(recovery_reason);
// Clear all control streams to release resources (connections to compute nodes) first.
self.control_stream_manager.clear();

self.recovery_inner(paused_reason, err).await;
self.context.mark_ready(None);
self.context.mark_ready();
}

async fn recovery_inner(
Expand Down

0 comments on commit f71fccc

Please sign in to comment.