Skip to content

Commit

Permalink
temp meta change
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Nov 27, 2024
1 parent c2490bb commit ed9266e
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 50 deletions.
9 changes: 7 additions & 2 deletions src/meta/src/barrier/context/context_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::sync::Arc;

use futures::future::try_join_all;
use risingwave_common::catalog::DatabaseId;
use risingwave_pb::common::WorkerNode;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::meta::PausedReason;
Expand Down Expand Up @@ -42,15 +43,19 @@ impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl {
self.scheduled_barriers.next_scheduled().await
}

fn abort_and_mark_blocked(&self, recovery_reason: RecoveryReason) {
fn abort_and_mark_blocked(
&self,
database_id: Option<DatabaseId>,
recovery_reason: RecoveryReason,
) {
self.set_status(BarrierManagerStatus::Recovering(recovery_reason));

// Mark blocked and abort buffered schedules, they might be dirty already.
self.scheduled_barriers
.abort_and_mark_blocked("cluster is under recovering");
}

fn mark_ready(&self) {
fn mark_ready(&self, database_id: Option<DatabaseId>) {
self.scheduled_barriers.mark_ready();
self.set_status(BarrierManagerStatus::Running);
}
Expand Down
9 changes: 7 additions & 2 deletions src/meta/src/barrier/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::future::Future;
use std::sync::Arc;

use arc_swap::ArcSwap;
use risingwave_common::catalog::DatabaseId;
use risingwave_pb::common::WorkerNode;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
Expand All @@ -42,8 +43,12 @@ pub(super) trait GlobalBarrierWorkerContext: Send + Sync + 'static {
) -> impl Future<Output = MetaResult<HummockVersionStats>> + Send + '_;

async fn next_scheduled(&self) -> Scheduled;
fn abort_and_mark_blocked(&self, recovery_reason: RecoveryReason);
fn mark_ready(&self);
fn abort_and_mark_blocked(
&self,
database_id: Option<DatabaseId>,
recovery_reason: RecoveryReason,
);
fn mark_ready(&self, database_id: Option<DatabaseId>);

fn post_collect_command<'a>(
&'a self,
Expand Down
72 changes: 32 additions & 40 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::collections::{HashMap, HashSet};
use std::error::Error;
use std::future::poll_fn;
use std::task::Poll;
use std::task::{Context, Poll};
use std::time::Duration;

use anyhow::anyhow;
Expand All @@ -35,11 +35,10 @@ use risingwave_pb::stream_service::streaming_control_stream_request::{
RemovePartialGraphRequest,
};
use risingwave_pb::stream_service::{
streaming_control_stream_request, streaming_control_stream_response, BarrierCompleteResponse,
InjectBarrierRequest, StreamingControlStreamRequest,
streaming_control_stream_request, streaming_control_stream_response, InjectBarrierRequest,
StreamingControlStreamRequest,
};
use risingwave_rpc_client::StreamingControlHandle;
use rw_futures_util::pending_on_none;
use thiserror_ext::AsReport;
use tokio::time::{sleep, timeout};
use tokio_retry::strategy::ExponentialBackoff;
Expand Down Expand Up @@ -167,20 +166,22 @@ impl ControlStreamManager {
*self = Self::new(self.env.clone());
}

async fn next_response(
fn poll_next_response(
&mut self,
) -> Option<(
cx: &mut Context<'_>,
) -> Poll<(
WorkerId,
MetaResult<streaming_control_stream_response::Response>,
)> {
if self.nodes.is_empty() {
return None;
return Poll::Pending;
}
let (worker_id, result) = poll_fn(|cx| {
let mut poll_result: Poll<(WorkerId, MetaResult<_>)> = Poll::Pending;
{
for (worker_id, node) in &mut self.nodes {
match node.handle.response_stream.poll_next_unpin(cx) {
Poll::Ready(result) => {
return Poll::Ready((
poll_result = Poll::Ready((
*worker_id,
result
.ok_or_else(|| anyhow!("end of stream").into())
Expand All @@ -201,47 +202,35 @@ impl ControlStreamManager {
resp => Ok(resp),
}
})
}),
})
));
break;
}
Poll::Pending => {
continue;
}
}
}
Poll::Pending
})
.await;
};

if let Err(err) = &result {
if let Poll::Ready((worker_id, Err(err))) = &poll_result {
let node = self
.nodes
.remove(&worker_id)
.remove(worker_id)
.expect("should exist when get shutdown resp");
warn!(node = ?node.worker, err = %err.as_report(), "get error from response stream");
}

Some((worker_id, result))
poll_result
}

pub(super) async fn next_collect_barrier_response(
pub(super) async fn next_response(
&mut self,
) -> (WorkerId, MetaResult<BarrierCompleteResponse>) {
use streaming_control_stream_response::Response;

{
let (worker_id, result) = pending_on_none(self.next_response()).await;

(
worker_id,
result.map(|resp| match resp {
Response::CompleteBarrier(resp) => resp,
Response::Shutdown(_) | Response::Init(_) => {
unreachable!("should be treated as error")
}
}),
)
}
) -> (
WorkerId,
MetaResult<streaming_control_stream_response::Response>,
) {
poll_fn(|cx| self.poll_next_response(cx)).await
}

pub(super) async fn collect_errors(
Expand All @@ -252,14 +241,17 @@ impl ControlStreamManager {
let mut errors = vec![(worker_id, first_err)];
#[cfg(not(madsim))]
{
let _ = timeout(COLLECT_ERROR_TIMEOUT, async {
while let Some((worker_id, result)) = self.next_response().await {
if let Err(e) = result {
errors.push((worker_id, e));
{
let _ = timeout(COLLECT_ERROR_TIMEOUT, async {
while !self.nodes.is_empty() {
let (worker_id, result) = self.next_response().await;
if let Err(e) = result {
errors.push((worker_id, e));
}
}
}
})
.await;
})
.await;
}
}
tracing::debug!(?errors, "collected stream errors");
errors
Expand Down
38 changes: 32 additions & 6 deletions src/meta/src/barrier/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::mem::replace;
use std::sync::{Arc, LazyLock};
use std::time::Duration;

use anyhow::anyhow;
use arc_swap::ArcSwap;
use itertools::Itertools;
use risingwave_common::system_param::reader::SystemParamsRead;
Expand All @@ -27,6 +28,7 @@ use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::{PausedReason, Recovery};
use risingwave_pb::stream_plan::barrier_mutation::Mutation;
use risingwave_pb::stream_plan::AddMutation;
use risingwave_pb::stream_service::streaming_control_stream_response::Response;
use thiserror_ext::AsReport;
use tokio::sync::mpsc;
use tokio::sync::oneshot::{Receiver, Sender};
Expand Down Expand Up @@ -317,8 +319,25 @@ impl GlobalBarrierWorker<GlobalBarrierWorkerContextImpl> {
}
}
},
(worker_id, resp_result) = self.control_stream_manager.next_collect_barrier_response() => {
if let Err(e) = resp_result.and_then(|resp| self.checkpoint_control.barrier_collected(resp, &mut self.control_stream_manager)) {
(worker_id, resp_result) = self.control_stream_manager.next_response() => {
if let Err(e) = resp_result.and_then(|resp| {
match resp {
Response::CompleteBarrier(resp) => {
self.checkpoint_control.barrier_collected(resp, &mut self.control_stream_manager)
},
Response::ReportDatabaseFailure(_) => {
let temp = 0;
todo!()
}
Response::ResetDatabase(_) => {
let temp = 0;
todo!()
}
other @ Response::Init(_) | other @ Response::Shutdown(_) => {
Err(anyhow!("get expected response: {:?}", other).into())
}
}
}) {
{

if self.checkpoint_control.is_failed_at_worker_err(worker_id) {
Expand Down Expand Up @@ -512,12 +531,12 @@ impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
err: Option<MetaError>,
recovery_reason: RecoveryReason,
) {
self.context.abort_and_mark_blocked(recovery_reason);
self.context.abort_and_mark_blocked(None, recovery_reason);
// Clear all control streams to release resources (connections to compute nodes) first.
self.control_stream_manager.clear();

self.recovery_inner(paused_reason, err).await;
self.context.mark_ready();
self.context.mark_ready(None);
}

async fn recovery_inner(
Expand Down Expand Up @@ -645,9 +664,16 @@ impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
debug!(?node_to_collect, "inject initial barrier");
while !node_to_collect.is_empty() {
let (worker_id, result) =
control_stream_manager.next_collect_barrier_response().await;
control_stream_manager.next_response().await;
let resp = result?;
assert_eq!(resp.epoch, barrier_info.prev_epoch());
match resp {
Response::CompleteBarrier(resp) => {
assert_eq!(resp.epoch, barrier_info.prev_epoch());
}
other => {
return Err(anyhow!("expect Response::CollectBarrier but get {:?}", other).into());
}
}
assert!(node_to_collect.remove(&worker_id));
}
debug!("collected initial barrier");
Expand Down

0 comments on commit ed9266e

Please sign in to comment.