Skip to content

Commit

Permalink
Introduce safekeeper peer recovery.
Browse files Browse the repository at this point in the history
Implements fetching of WAL by safekeeper from another safekeeper by imitating
behaviour of last elected leader. This allows to avoid WAL accumulation on
compute and facilitates faster compute startup as it doesn't need to download
any WAL. Actually removing WAL download in walproposer is a matter of another
patch though.

There is a per timeline task which always runs, checking regularly if it should
start recovery frome someone, meaning there is something to fetch and there is
no streaming compute. It then proceeds with fetching, finishing when there is
nothing more to receive.

Implements #4875
  • Loading branch information
arssher committed Oct 20, 2023
1 parent 76c7022 commit b332268
Show file tree
Hide file tree
Showing 10 changed files with 797 additions and 53 deletions.
6 changes: 5 additions & 1 deletion safekeeper/src/bin/safekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//
use anyhow::{bail, Context, Result};
use camino::{Utf8Path, Utf8PathBuf};
use clap::Parser;
use clap::{ArgAction, Parser};
use futures::future::BoxFuture;
use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
Expand Down Expand Up @@ -105,6 +105,9 @@ struct Args {
/// it during this period passed as a human readable duration.
#[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_HEARTBEAT_TIMEOUT, verbatim_doc_comment)]
heartbeat_timeout: Duration,
/// Enable/disable peer recovery.
#[arg(long, default_value = "false", action=ArgAction::Set)]
peer_recovery: bool,
/// Remote storage configuration for WAL backup (offloading to s3) as TOML
/// inline table, e.g.
/// {"max_concurrent_syncs" = 17, "max_sync_errors": 13, "bucket_name": "<BUCKETNAME>", "bucket_region":"<REGION>", "concurrency_limit": 119}
Expand Down Expand Up @@ -265,6 +268,7 @@ async fn main() -> anyhow::Result<()> {
broker_endpoint: args.broker_endpoint,
broker_keepalive_interval: args.broker_keepalive_interval,
heartbeat_timeout: args.heartbeat_timeout,
peer_recovery_enabled: args.peer_recovery,
remote_storage: args.remote_storage,
max_offloader_lag_bytes: args.max_offloader_lag,
wal_backup_enabled: !args.disable_wal_backup,
Expand Down
9 changes: 8 additions & 1 deletion safekeeper/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,13 @@ impl SafekeeperPostgresHandler {
/// from a walproposer recovery function. This connection gets a special handling:
/// safekeeper must stream all local WAL till the flush_lsn, whether committed or not.
pub fn is_walproposer_recovery(&self) -> bool {
self.appname == Some("wal_proposer_recovery".to_string())
match &self.appname {
None => false,
Some(appname) => {
appname == "wal_proposer_recovery" ||
// set by safekeeper peer recovery
appname.starts_with("safekeeper")
}
}
}
}
15 changes: 12 additions & 3 deletions safekeeper/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use tokio::io::AsyncReadExt;
use utils::http::endpoint::request_span;

use crate::receive_wal::WalReceiverState;
use crate::safekeeper::ServerInfo;
use crate::safekeeper::Term;
use crate::safekeeper::{ServerInfo, TermLsn};
use crate::send_wal::WalSenderState;
use crate::timeline::PeerInfo;
use crate::{debug_dump, pull_timeline};
Expand Down Expand Up @@ -60,16 +60,25 @@ fn get_conf(request: &Request<Body>) -> &SafeKeeperConf {
.as_ref()
}

/// Same as TermSwitchEntry, but serializes LSN using display serializer
/// Same as TermLsn, but serializes LSN using display serializer
/// in Postgres format, i.e. 0/FFFFFFFF. Used only for the API response.
#[serde_as]
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct TermSwitchApiEntry {
pub term: Term,
#[serde_as(as = "DisplayFromStr")]
pub lsn: Lsn,
}

impl From<TermSwitchApiEntry> for TermLsn {
fn from(api_val: TermSwitchApiEntry) -> Self {
TermLsn {
term: api_val.term,
lsn: api_val.lsn,
}
}
}

/// Augment AcceptorState with epoch for convenience
#[derive(Debug, Serialize, Deserialize)]
pub struct AcceptorStateStatus {
Expand Down
2 changes: 2 additions & 0 deletions safekeeper/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub struct SafeKeeperConf {
pub broker_endpoint: Uri,
pub broker_keepalive_interval: Duration,
pub heartbeat_timeout: Duration,
pub peer_recovery_enabled: bool,
pub remote_storage: Option<RemoteStorageConfig>,
pub max_offloader_lag_bytes: u64,
pub backup_parallel_jobs: usize,
Expand Down Expand Up @@ -100,6 +101,7 @@ impl SafeKeeperConf {
.parse()
.expect("failed to parse default broker endpoint"),
broker_keepalive_interval: Duration::from_secs(5),
peer_recovery_enabled: true,
wal_backup_enabled: true,
backup_parallel_jobs: 1,
pg_auth: None,
Expand Down
60 changes: 46 additions & 14 deletions safekeeper/src/receive_wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,12 @@ impl WalReceivers {

/// Register new walreceiver. Returned guard provides access to the slot and
/// automatically deregisters in Drop.
pub fn register(self: &Arc<WalReceivers>) -> WalReceiverGuard {
pub fn register(self: &Arc<WalReceivers>, conn_id: Option<ConnectionId>) -> WalReceiverGuard {
let slots = &mut self.mutex.lock().slots;
let walreceiver = WalReceiverState::Voting;
let walreceiver = WalReceiverState {
conn_id,
status: WalReceiverStatus::Voting,
};
// find empty slot or create new one
let pos = if let Some(pos) = slots.iter().position(|s| s.is_none()) {
slots[pos] = Some(walreceiver);
Expand Down Expand Up @@ -96,6 +99,18 @@ impl WalReceivers {
self.mutex.lock().slots.iter().flatten().cloned().collect()
}

/// Get number of streaming walreceivers (normally 0 or 1) from compute.
pub fn get_num_streaming(self: &Arc<WalReceivers>) -> usize {
self.mutex
.lock()
.slots
.iter()
.flatten()
// conn_id.is_none skips recovery which also registers here
.filter(|s| s.conn_id.is_some() && matches!(s.status, WalReceiverStatus::Streaming))
.count()
}

/// Unregister walsender.
fn unregister(self: &Arc<WalReceivers>, id: WalReceiverId) {
let mut shared = self.mutex.lock();
Expand All @@ -108,10 +123,17 @@ struct WalReceiversShared {
slots: Vec<Option<WalReceiverState>>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WalReceiverState {
/// None means it is recovery initiated by us (this safekeeper).
pub conn_id: Option<ConnectionId>,
pub status: WalReceiverStatus,
}

/// Walreceiver status. Currently only whether it passed voting stage and
/// started receiving the stream, but it is easy to add more if needed.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum WalReceiverState {
pub enum WalReceiverStatus {
Voting,
Streaming,
}
Expand All @@ -136,8 +158,8 @@ impl Drop for WalReceiverGuard {
}
}

const MSG_QUEUE_SIZE: usize = 256;
const REPLY_QUEUE_SIZE: usize = 16;
pub const MSG_QUEUE_SIZE: usize = 256;
pub const REPLY_QUEUE_SIZE: usize = 16;

impl SafekeeperPostgresHandler {
/// Wrapper around handle_start_wal_push_guts handling result. Error is
Expand Down Expand Up @@ -261,7 +283,7 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> {
tli.clone(),
msg_rx,
reply_tx,
self.conn_id,
Some(self.conn_id),
));

// Forward all messages to WalAcceptor
Expand Down Expand Up @@ -317,31 +339,41 @@ async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
// even when it writes a steady stream of messages.
const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1);

/// Takes messages from msg_rx, processes and pushes replies to reply_tx.
struct WalAcceptor {
/// Encapsulates a task which takes messages from msg_rx, processes and pushes
/// replies to reply_tx; reading from socket and writing to disk in parallel is
/// beneficial for performance, this struct provides writing to disk part.
pub struct WalAcceptor {
tli: Arc<Timeline>,
msg_rx: Receiver<ProposerAcceptorMessage>,
reply_tx: Sender<AcceptorProposerMessage>,
conn_id: Option<ConnectionId>,
}

impl WalAcceptor {
/// Spawn thread with WalAcceptor running, return handle to it.
fn spawn(
/// Spawn task with WalAcceptor running, return handle to it. Task returns
/// Ok(()) if either of channels has closed, and Err if any error during
/// message processing is encountered.
///
/// conn_id None means WalAcceptor is used by recovery initiated at this safekeeper.
pub fn spawn(
tli: Arc<Timeline>,
msg_rx: Receiver<ProposerAcceptorMessage>,
reply_tx: Sender<AcceptorProposerMessage>,
conn_id: ConnectionId,
conn_id: Option<ConnectionId>,
) -> JoinHandle<anyhow::Result<()>> {
task::spawn(async move {
let mut wa = WalAcceptor {
tli,
msg_rx,
reply_tx,
conn_id,
};

let span_ttid = wa.tli.ttid; // satisfy borrow checker
wa.run()
.instrument(info_span!("WAL acceptor", cid = %conn_id, ttid = %span_ttid))
.instrument(
info_span!("WAL acceptor", cid = %conn_id.unwrap_or(0), ttid = %span_ttid),
)
.await
})
}
Expand All @@ -355,7 +387,7 @@ impl WalAcceptor {
let _compute_conn_guard = ComputeConnectionGuard {
timeline: Arc::clone(&self.tli),
};
let walreceiver_guard = self.tli.get_walreceivers().register();
let walreceiver_guard = self.tli.get_walreceivers().register(self.conn_id);
self.tli.update_status_notify().await?;

// After this timestamp we will stop processing AppendRequests and send a response
Expand All @@ -372,7 +404,7 @@ impl WalAcceptor {

// Update walreceiver state in shmem for reporting.
if let ProposerAcceptorMessage::Elected(_) = &next_msg {
*walreceiver_guard.get() = WalReceiverState::Streaming;
walreceiver_guard.get().status = WalReceiverStatus::Streaming;
}

let reply_msg = if matches!(next_msg, ProposerAcceptorMessage::AppendRequest(_)) {
Expand Down
Loading

0 comments on commit b332268

Please sign in to comment.