Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(meta): extract common method to inject initial barrier #19622

Merged
merged 3 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 114 additions & 3 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,17 @@ use futures::future::try_join_all;
use futures::StreamExt;
use itertools::Itertools;
use risingwave_common::catalog::{DatabaseId, TableId};
use risingwave_common::util::epoch::Epoch;
use risingwave_common::util::tracing::TracingContext;
use risingwave_connector::source::SplitImpl;
use risingwave_meta_model::WorkerId;
use risingwave_pb::common::{ActorInfo, WorkerNode};
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::meta::PausedReason;
use risingwave_pb::stream_plan::barrier_mutation::Mutation;
use risingwave_pb::stream_plan::{Barrier, BarrierMutation, StreamActor, SubscriptionUpstreamInfo};
use risingwave_pb::stream_plan::{
AddMutation, Barrier, BarrierMutation, StreamActor, SubscriptionUpstreamInfo,
};
use risingwave_pb::stream_service::streaming_control_stream_request::{
CreatePartialGraphRequest, PbDatabaseInitialPartialGraph, PbInitRequest, PbInitialPartialGraph,
RemovePartialGraphRequest,
Expand All @@ -43,14 +48,18 @@ use rw_futures_util::pending_on_none;
use thiserror_ext::AsReport;
use tokio::time::{sleep, timeout};
use tokio_retry::strategy::ExponentialBackoff;
use tracing::{error, info, warn};
use tracing::{debug, error, info, warn};
use uuid::Uuid;

use super::{Command, InflightSubscriptionInfo};
use super::{BarrierKind, Command, InflightSubscriptionInfo, TracedEpoch};
use crate::barrier::checkpoint::{BarrierWorkerState, DatabaseCheckpointControl};
use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
use crate::barrier::info::{BarrierInfo, InflightDatabaseInfo};
use crate::barrier::progress::CreateMviewProgressTracker;
use crate::controller::fragment::InflightFragmentInfo;
use crate::manager::MetaSrvEnv;
use crate::model::{ActorId, StreamJobFragments};
use crate::stream::build_actor_connector_splits;
use crate::{MetaError, MetaResult};

const COLLECT_ERROR_TIMEOUT: Duration = Duration::from_secs(3);
Expand Down Expand Up @@ -283,6 +292,108 @@ impl ControlStreamManager {
}

impl ControlStreamManager {
/// Extract information from the loaded runtime barrier worker snapshot info, and inject the initial barrier.
///
/// Return:
/// - the worker nodes that need to wait for initial barrier collection
/// - the extracted database information
/// - the `prev_epoch` of the initial barrier
pub(super) fn inject_database_initial_barrier(
&mut self,
database_id: DatabaseId,
info: InflightDatabaseInfo,
state_table_committed_epochs: &mut HashMap<TableId, u64>,
stream_actors: &mut HashMap<ActorId, StreamActor>,
source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
background_jobs: &mut HashMap<TableId, (String, StreamJobFragments)>,
subscription_info: InflightSubscriptionInfo,
paused_reason: Option<PausedReason>,
hummock_version_stats: &HummockVersionStats,
) -> MetaResult<(HashSet<WorkerId>, DatabaseCheckpointControl, u64)> {
let source_split_assignments = info
.fragment_infos()
.flat_map(|info| info.actors.keys())
.filter_map(|actor_id| {
let actor_id = *actor_id as ActorId;
source_splits
.remove(&actor_id)
.map(|splits| (actor_id, splits))
})
.collect();
let mutation = Mutation::Add(AddMutation {
// Actors built during recovery is not treated as newly added actors.
actor_dispatchers: Default::default(),
added_actors: Default::default(),
actor_splits: build_actor_connector_splits(&source_split_assignments),
pause: paused_reason.is_some(),
subscriptions_to_add: Default::default(),
});

let mut epochs = info.existing_table_ids().map(|table_id| {
(
table_id,
state_table_committed_epochs
.remove(&table_id)
.expect("should exist"),
)
});
let (first_table_id, prev_epoch) = epochs.next().expect("non-empty");
for (table_id, epoch) in epochs {
assert_eq!(
prev_epoch, epoch,
"{} has different committed epoch to {}",
first_table_id, table_id
);
}
let prev_epoch = TracedEpoch::new(Epoch(prev_epoch));
// Use a different `curr_epoch` for each recovery attempt.
let curr_epoch = prev_epoch.next();
let barrier_info = BarrierInfo {
prev_epoch,
curr_epoch,
kind: BarrierKind::Initial,
};

let mut node_actors: HashMap<_, Vec<_>> = HashMap::new();
for (actor_id, worker_id) in info.fragment_infos().flat_map(|info| info.actors.iter()) {
let worker_id = *worker_id as WorkerId;
let actor_id = *actor_id as ActorId;
let stream_actor = stream_actors.remove(&actor_id).expect("should exist");
node_actors.entry(worker_id).or_default().push(stream_actor);
}

let background_mviews = info
.job_ids()
.filter_map(|job_id| background_jobs.remove(&job_id).map(|mview| (job_id, mview)))
.collect();
let tracker = CreateMviewProgressTracker::recover(background_mviews, hummock_version_stats);

let node_to_collect = self.inject_barrier(
database_id,
None,
Some(mutation),
&barrier_info,
info.fragment_infos(),
info.fragment_infos(),
Some(node_actors),
vec![],
vec![],
)?;
debug!(
?node_to_collect,
database_id = database_id.database_id,
"inject initial barrier"
);

let new_epoch = barrier_info.curr_epoch;
let state = BarrierWorkerState::recovery(new_epoch, info, subscription_info, paused_reason);
Ok((
node_to_collect,
DatabaseCheckpointControl::recovery(database_id, tracker, state),
barrier_info.prev_epoch.value().0,
))
}

pub(super) fn inject_command_ctx_barrier(
&mut self,
database_id: DatabaseId,
Expand Down
150 changes: 38 additions & 112 deletions src/meta/src/barrier/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,11 @@ use std::time::Duration;

use arc_swap::ArcSwap;
use itertools::Itertools;
use risingwave_common::catalog::DatabaseId;
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_common::system_param::PAUSE_ON_NEXT_BOOTSTRAP_KEY;
use risingwave_common::util::epoch::Epoch;
use risingwave_meta_model::WorkerId;
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::{PausedReason, Recovery};
use risingwave_pb::stream_plan::barrier_mutation::Mutation;
use risingwave_pb::stream_plan::AddMutation;
use thiserror_ext::AsReport;
use tokio::sync::mpsc;
use tokio::sync::oneshot::{Receiver, Sender};
Expand All @@ -35,18 +32,14 @@ use tokio::time::Instant;
use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tracing::{debug, error, info, warn, Instrument};

use crate::barrier::checkpoint::{
BarrierWorkerState, CheckpointControl, DatabaseCheckpointControl,
};
use crate::barrier::checkpoint::CheckpointControl;
use crate::barrier::complete_task::{BarrierCompleteOutput, CompletingTask};
use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
use crate::barrier::info::BarrierInfo;
use crate::barrier::progress::CreateMviewProgressTracker;
use crate::barrier::rpc::{merge_node_rpc_errors, ControlStreamManager};
use crate::barrier::schedule::PeriodicBarriers;
use crate::barrier::{
schedule, BarrierKind, BarrierManagerRequest, BarrierManagerStatus,
BarrierWorkerRuntimeInfoSnapshot, InflightSubscriptionInfo, RecoveryReason, TracedEpoch,
schedule, BarrierManagerRequest, BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot,
InflightSubscriptionInfo, RecoveryReason,
};
use crate::error::MetaErrorInner;
use crate::hummock::HummockManagerRef;
Expand All @@ -55,9 +48,8 @@ use crate::manager::{
ActiveStreamingWorkerChange, ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv,
MetadataManager,
};
use crate::model::ActorId;
use crate::rpc::metrics::GLOBAL_META_METRICS;
use crate::stream::{build_actor_connector_splits, ScaleControllerRef, SourceManagerRef};
use crate::stream::{ScaleControllerRef, SourceManagerRef};
use crate::{MetaError, MetaResult};

/// [`crate::barrier::worker::GlobalBarrierWorker`] sends barriers to all registered compute nodes and
Expand Down Expand Up @@ -234,7 +226,9 @@ impl GlobalBarrierWorker<GlobalBarrierWorkerContextImpl> {

self.run_inner(shutdown_rx).await
}
}

impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
async fn run_inner(mut self, mut shutdown_rx: Receiver<()>) {
let (local_notification_tx, mut local_notification_rx) =
tokio::sync::mpsc::unbounded_channel();
Expand Down Expand Up @@ -571,110 +565,42 @@ impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
})?;
info!(elapsed=?reset_start_time.elapsed(), "control stream reset");

let mut databases = HashMap::new();

let recovery_result: MetaResult<_> = try {
let mut collected_databases = HashMap::new();
let mut collecting_databases = HashMap::new();
for (database_id, info) in database_fragment_infos {
let source_split_assignments = info
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The deleted code is mostly moved to the inject_database_initial_barrier in src/meta/src/barrier/rpc.rs with only changing some variable names.

.fragment_infos()
.flat_map(|info| info.actors.keys())
.filter_map(|actor_id| {
let actor_id = *actor_id as ActorId;
source_splits
.remove(&actor_id)
.map(|splits| (actor_id, splits))
})
.collect();
let mutation = Mutation::Add(AddMutation {
// Actors built during recovery is not treated as newly added actors.
actor_dispatchers: Default::default(),
added_actors: Default::default(),
actor_splits: build_actor_connector_splits(&source_split_assignments),
pause: paused_reason.is_some(),
subscriptions_to_add: Default::default(),
});

let new_epoch = {
let mut epochs = info.existing_table_ids().map(|table_id| {
(
table_id,
state_table_committed_epochs
.remove(&table_id)
.expect("should exist"),
)
});
let (first_table_id, prev_epoch) = epochs.next().expect("non-empty");
for (table_id, epoch) in epochs {
assert_eq!(
prev_epoch, epoch,
"{} has different committed epoch to {}",
first_table_id, table_id
);
}
let prev_epoch = TracedEpoch::new(Epoch(prev_epoch));
// Use a different `curr_epoch` for each recovery attempt.
let curr_epoch = prev_epoch.next();
let barrier_info = BarrierInfo {
prev_epoch,
curr_epoch,
kind: BarrierKind::Initial,
};

let mut node_actors: HashMap<_, Vec<_>> = HashMap::new();
for (actor_id, worker_id) in
info.fragment_infos().flat_map(|info| info.actors.iter())
{
let worker_id = *worker_id as WorkerId;
let actor_id = *actor_id as ActorId;
let stream_actor =
stream_actors.remove(&actor_id).expect("should exist");
node_actors.entry(worker_id).or_default().push(stream_actor);
}

let mut node_to_collect = control_stream_manager.inject_barrier(
database_id,
None,
Some(mutation),
&barrier_info,
info.fragment_infos(),
info.fragment_infos(),
Some(node_actors),
vec![],
vec![],
)?;
debug!(?node_to_collect, "inject initial barrier");
while !node_to_collect.is_empty() {
let (worker_id, result) =
control_stream_manager.next_collect_barrier_response().await;
let resp = result?;
assert_eq!(resp.epoch, barrier_info.prev_epoch());
assert!(node_to_collect.remove(&worker_id));
}
debug!("collected initial barrier");
barrier_info.curr_epoch
};

let background_mviews = info
.job_ids()
.filter_map(|job_id| {
background_jobs.remove(&job_id).map(|mview| (job_id, mview))
})
.collect();
let tracker = CreateMviewProgressTracker::recover(
background_mviews,
&hummock_version_stats,
);
let state = BarrierWorkerState::recovery(
new_epoch,
let (node_to_collect, database, prev_epoch) = control_stream_manager.inject_database_initial_barrier(
database_id,
info,
&mut state_table_committed_epochs,
&mut stream_actors,
&mut source_splits,
&mut background_jobs,
subscription_infos.remove(&database_id).unwrap_or_default(),
paused_reason,
);
databases.insert(
database_id,
DatabaseCheckpointControl::recovery(database_id, tracker, state),
);
&hummock_version_stats,
)?;
if !node_to_collect.is_empty() {
assert!(collecting_databases.insert(database_id, (node_to_collect, database, prev_epoch)).is_none());
} else {
warn!(database_id = database_id.database_id, "database has no node to inject initial barrier");
assert!(collected_databases.insert(database_id, database).is_none());
}
}
while !collecting_databases.is_empty() {
let (worker_id, result) =
control_stream_manager.next_collect_barrier_response().await;
let resp = result?;
let database_id = DatabaseId::new(resp.database_id);
let (node_to_collect, _, prev_epoch) = collecting_databases.get_mut(&database_id).expect("should exist");
assert_eq!(resp.epoch, *prev_epoch);
assert!(node_to_collect.remove(&worker_id));
if node_to_collect.is_empty() {
let (_, database, _) = collecting_databases.remove(&database_id).expect("should exist");
assert!(collected_databases.insert(database_id, database).is_none());
}
}
debug!("collected initial barrier");
if !stream_actors.is_empty() {
warn!(actor_ids = ?stream_actors.keys().collect_vec(), "unused stream actors in recovery");
}
Expand All @@ -694,7 +620,7 @@ impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
active_streaming_nodes,
control_stream_manager,
CheckpointControl::new(
databases,
collected_databases,
hummock_version_stats,
),
)
Expand Down