Skip to content

Commit

Permalink
Start and stop per timeline recovery task.
Browse files Browse the repository at this point in the history
Slightly refactors init: now load_tenant_timelines is also async to properly
init the timeline, but to keep global map lock sync we just acquire it anew for
each timeline.

Recovery task itself is just a stub here.

part of
#4875
  • Loading branch information
arssher committed Aug 28, 2023
1 parent 704519d commit 12310d9
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 65 deletions.
37 changes: 20 additions & 17 deletions safekeeper/src/bin/safekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,21 +341,35 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {

let (wal_backup_launcher_tx, wal_backup_launcher_rx) = mpsc::channel(100);

// Load all timelines from disk to memory.
GlobalTimelines::init(conf.clone(), wal_backup_launcher_tx)?;

// Keep handles to main tasks to die if any of them disappears.
let mut tasks_handles: FuturesUnordered<BoxFuture<(String, JoinTaskRes)>> =
FuturesUnordered::new();

// Start wal backup launcher before loading timelines as we'll notify it
// through the channel about timelines which need offloading, not draining
// the channel would cause deadlock.
let current_thread_rt = conf
.current_thread_runtime
.then(|| Handle::try_current().expect("no runtime in main"));
let conf_ = conf.clone();
let wal_backup_handle = current_thread_rt
.as_ref()
.unwrap_or_else(|| WAL_BACKUP_RUNTIME.handle())
.spawn(wal_backup::wal_backup_launcher_task_main(
conf_,
wal_backup_launcher_rx,
))
.map(|res| ("WAL backup launcher".to_owned(), res));
tasks_handles.push(Box::pin(wal_backup_handle));

// Load all timelines from disk to memory.
GlobalTimelines::init(conf.clone(), wal_backup_launcher_tx).await?;

let conf_ = conf.clone();
// Run everything in current thread rt, if asked.
if conf.current_thread_runtime {
info!("running in current thread runtime");
}
let current_thread_rt = conf
.current_thread_runtime
.then(|| Handle::try_current().expect("no runtime in main"));

let wal_service_handle = current_thread_rt
.as_ref()
Expand Down Expand Up @@ -408,17 +422,6 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
.map(|res| ("WAL remover".to_owned(), res));
tasks_handles.push(Box::pin(wal_remover_handle));

let conf_ = conf.clone();
let wal_backup_handle = current_thread_rt
.as_ref()
.unwrap_or_else(|| WAL_BACKUP_RUNTIME.handle())
.spawn(wal_backup::wal_backup_launcher_task_main(
conf_,
wal_backup_launcher_rx,
))
.map(|res| ("WAL backup launcher".to_owned(), res));
tasks_handles.push(Box::pin(wal_backup_handle));

set_build_info_metric(GIT_VERSION);

// TODO: update tokio-stream, convert to real async Stream with
Expand Down
1 change: 1 addition & 0 deletions safekeeper/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub mod json_ctrl;
pub mod metrics;
pub mod pull_timeline;
pub mod receive_wal;
pub mod recovery;
pub mod remove_wal;
pub mod safekeeper;
pub mod send_wal;
Expand Down
4 changes: 3 additions & 1 deletion safekeeper/src/pull_timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,9 @@ async fn pull_timeline(status: TimelineStatus, host: String) -> Result<Response>
tokio::fs::create_dir_all(conf.tenant_dir(&ttid.tenant_id)).await?;
tokio::fs::rename(tli_dir_path, &timeline_path).await?;

let tli = GlobalTimelines::load_timeline(ttid).context("Failed to load timeline after copy")?;
let tli = GlobalTimelines::load_timeline(ttid)
.await
.context("Failed to load timeline after copy")?;

info!(
"Loaded timeline {}, flush_lsn={}",
Expand Down
40 changes: 40 additions & 0 deletions safekeeper/src/recovery.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
//! This module implements pulling WAL from peer safekeepers if compute can't
//! provide it, i.e. safekeeper lags too much.

use std::sync::Arc;

use tokio::{select, time::sleep, time::Duration};
use tracing::{info, instrument};

use crate::{timeline::Timeline, SafeKeeperConf};

/// Entrypoint for per timeline task which always runs, checking whether
/// recovery for this safekeeper is needed and starting it if so.
#[instrument(name = "recovery task", skip_all, fields(ttid = %tli.ttid))]
pub async fn recovery_main(tli: Arc<Timeline>, _conf: SafeKeeperConf) {
info!("started");
let mut cancellation_rx = match tli.get_cancellation_rx() {
Ok(rx) => rx,
Err(_) => {
info!("timeline canceled during task start");
return;
}
};

select! {
_ = recovery_main_loop(tli) => { unreachable!() }
_ = cancellation_rx.changed() => {
info!("stopped");
}
}
}

const CHECK_INTERVAL_MS: u64 = 2000;

/// Check regularly whether we need to start recovery.
async fn recovery_main_loop(_tli: Arc<Timeline>) {
let check_duration = Duration::from_millis(CHECK_INTERVAL_MS);
loop {
sleep(check_duration).await;
}
}
41 changes: 30 additions & 11 deletions safekeeper/src/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use storage_broker::proto::SafekeeperTimelineInfo;
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;

use crate::receive_wal::WalReceivers;
use crate::recovery::recovery_main;
use crate::safekeeper::{
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState,
SafekeeperMemState, ServerInfo, Term,
Expand Down Expand Up @@ -327,13 +328,13 @@ pub struct Timeline {
impl Timeline {
/// Load existing timeline from disk.
pub fn load_timeline(
conf: SafeKeeperConf,
conf: &SafeKeeperConf,
ttid: TenantTimelineId,
wal_backup_launcher_tx: Sender<TenantTimelineId>,
) -> Result<Timeline> {
let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();

let shared_state = SharedState::restore(&conf, &ttid)?;
let shared_state = SharedState::restore(conf, &ttid)?;
let rcl = shared_state.sk.state.remote_consistent_lsn;
let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
watch::channel(shared_state.sk.state.commit_lsn);
Expand All @@ -355,7 +356,7 @@ impl Timeline {

/// Create a new timeline, which is not yet persisted to disk.
pub fn create_empty(
conf: SafeKeeperConf,
conf: &SafeKeeperConf,
ttid: TenantTimelineId,
wal_backup_launcher_tx: Sender<TenantTimelineId>,
server_info: ServerInfo,
Expand All @@ -371,7 +372,7 @@ impl Timeline {
wal_backup_launcher_tx,
commit_lsn_watch_tx,
commit_lsn_watch_rx,
mutex: Mutex::new(SharedState::create_new(&conf, &ttid, state)?),
mutex: Mutex::new(SharedState::create_new(conf, &ttid, state)?),
walsenders: WalSenders::new(Lsn(0)),
walreceivers: WalReceivers::new(),
cancellation_rx,
Expand All @@ -380,12 +381,16 @@ impl Timeline {
})
}

/// Initialize fresh timeline on disk and start background tasks. If bootstrap
/// Initialize fresh timeline on disk and start background tasks. If init
/// fails, timeline is cancelled and cannot be used anymore.
///
/// Bootstrap is transactional, so if it fails, created files will be deleted,
/// Init is transactional, so if it fails, created files will be deleted,
/// and state on disk should remain unchanged.
pub async fn bootstrap(&self, shared_state: &mut MutexGuard<'_, SharedState>) -> Result<()> {
pub async fn init_new(
self: &Arc<Timeline>,
shared_state: &mut MutexGuard<'_, SharedState>,
conf: &SafeKeeperConf,
) -> Result<()> {
match fs::metadata(&self.timeline_dir).await {
Ok(_) => {
// Timeline directory exists on disk, we should leave state unchanged
Expand All @@ -401,7 +406,7 @@ impl Timeline {
// Create timeline directory.
fs::create_dir_all(&self.timeline_dir).await?;

// Write timeline to disk and TODO: start background tasks.
// Write timeline to disk and start background tasks.
if let Err(e) = shared_state.sk.persist().await {
// Bootstrap failed, cancel timeline and remove timeline directory.
self.cancel(shared_state);
Expand All @@ -415,12 +420,16 @@ impl Timeline {

return Err(e);
}

// TODO: add more initialization steps here
self.update_status(shared_state);
self.bootstrap(conf);
Ok(())
}

/// Bootstrap new or existing timeline starting background stasks.
pub fn bootstrap(self: &Arc<Timeline>, conf: &SafeKeeperConf) {
// Start recovery task which always runs on the timeline.
tokio::spawn(recovery_main(self.clone(), conf.clone()));
}

/// Delete timeline from disk completely, by removing timeline directory. Background
/// timeline activities will stop eventually.
pub async fn delete_from_disk(
Expand Down Expand Up @@ -454,6 +463,16 @@ impl Timeline {
*self.cancellation_rx.borrow()
}

/// Returns watch channel which gets value when timeline is cancelled. It is
/// guaranteed to have not cancelled value observed (errors otherwise).
pub fn get_cancellation_rx(&self) -> Result<watch::Receiver<bool>> {
let rx = self.cancellation_rx.clone();
if *rx.borrow() {
bail!(TimelineError::Cancelled(self.ttid));
}
Ok(rx)
}

/// Take a writing mutual exclusive lock on timeline shared_state.
pub async fn write_shared_state(&self) -> MutexGuard<SharedState> {
self.mutex.lock().await
Expand Down
Loading

0 comments on commit 12310d9

Please sign in to comment.