Skip to content

Commit

Permalink
Merge branch 'main' into remote-compactor
Browse files Browse the repository at this point in the history
  • Loading branch information
LeslieKid authored Sep 27, 2024
2 parents 86cebbb + aa438ca commit e6cfb01
Show file tree
Hide file tree
Showing 7 changed files with 238 additions and 122 deletions.
22 changes: 18 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/analytic_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ anyhow = { workspace = true }
arc-swap = "1.4.0"
arena = { workspace = true }
arrow = { workspace = true }
async-scoped = { version = "0.9.0", features = ["use-tokio"] }
async-stream = { workspace = true }
async-trait = { workspace = true }
atomic_enum = { workspace = true }
Expand Down
116 changes: 61 additions & 55 deletions src/analytic_engine/src/instance/wal_replayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,13 @@ use common_types::{
schema::{IndexInWriterSchema, Schema},
table::ShardId,
};
use futures::StreamExt;
use generic_error::BoxError;
use lazy_static::lazy_static;
use logger::{debug, error, info, trace, warn};
use prometheus::{exponential_buckets, register_histogram, Histogram};
use snafu::ResultExt;
use table_engine::table::TableId;
use tokio::sync::{Mutex, MutexGuard};
use tokio::sync::{Mutex, MutexGuard, Semaphore};
use wal::{
log_batch::LogEntry,
manager::{
Expand Down Expand Up @@ -74,6 +73,8 @@ lazy_static! {
.unwrap();
}

const MAX_REPLAY_TASK_NUM: usize = 20;

/// Wal replayer supporting both table based and region based
// TODO: limit the memory usage in `RegionBased` mode.
pub struct WalReplayer<'a> {
Expand Down Expand Up @@ -189,22 +190,23 @@ impl Replay for TableBasedReplay {
..Default::default()
};

let mut tasks = futures::stream::iter(
table_datas
.iter()
.map(|table_data| {
let table_id = table_data.id;
let read_ctx = &read_ctx;
async move {
let ret = Self::recover_table_logs(context, table_data, read_ctx).await;
(table_id, ret)
}
})
.collect::<Vec<_>>(),
)
.buffer_unordered(20);
while let Some((table_id, ret)) = tasks.next().await {
if let Err(e) = ret {
let ((), results) = async_scoped::TokioScope::scope_and_block(|scope| {
// Limit the maximum number of concurrent tasks.
let semaphore = Arc::new(Semaphore::new(MAX_REPLAY_TASK_NUM));
for table_data in table_datas {
let table_id = table_data.id;
let read_ctx = &read_ctx;
let semaphore = semaphore.clone();
scope.spawn(async move {
let _permit = semaphore.acquire().await.unwrap();
let ret = Self::recover_table_logs(context, table_data, read_ctx).await;
(table_id, ret)
});
}
});

for result in results.into_iter().flatten() {
if let (table_id, Err(e)) = result {
// If occur error, mark this table as failed and store the cause.
failed_tables.insert(table_id, e);
}
Expand Down Expand Up @@ -345,15 +347,15 @@ impl RegionBasedReplay {
table_data: table_data.clone(),
serial_exec,
};
serial_exec_ctxs.insert(table_data.id, serial_exec_ctx);
serial_exec_ctxs.insert(table_data.id, Mutex::new(serial_exec_ctx));
table_datas_by_id.insert(table_data.id.as_u64(), table_data.clone());
}

let table_datas_by_id = Arc::new(table_datas_by_id);
let schema_provider = TableSchemaProviderAdapter {
table_datas: table_datas_by_id.clone(),
};
let serial_exec_ctxs = Arc::new(Mutex::new(serial_exec_ctxs));
let serial_exec_ctxs = serial_exec_ctxs;
// Split and replay logs.
loop {
let _timer = PULL_LOGS_DURATION_HISTOGRAM.start_timer();
Expand Down Expand Up @@ -381,49 +383,53 @@ impl RegionBasedReplay {
async fn replay_single_batch(
context: &ReplayContext,
log_batch: &VecDeque<LogEntry<ReadPayload>>,
serial_exec_ctxs: &Arc<Mutex<HashMap<TableId, SerialExecContext<'_>>>>,
serial_exec_ctxs: &HashMap<TableId, Mutex<SerialExecContext<'_>>>,
failed_tables: &mut FailedTables,
) -> Result<()> {
let mut table_batches = Vec::new();
// TODO: No `group_by` method in `VecDeque`, so implement it manually here...
Self::split_log_batch_by_table(log_batch, &mut table_batches);

// TODO: Replay logs of different tables in parallel.
let mut replay_tasks = Vec::with_capacity(table_batches.len());
for table_batch in table_batches {
// Some tables may have failed in previous replay, ignore them.
if failed_tables.contains_key(&table_batch.table_id) {
continue;
}
let log_entries: Vec<_> = table_batch
.ranges
.iter()
.flat_map(|range| log_batch.range(range.clone()))
.collect();

let serial_exec_ctxs = serial_exec_ctxs.clone();
replay_tasks.push(async move {
// Some tables may have been moved to other shards or dropped, ignore such logs.
if let Some(ctx) = serial_exec_ctxs.lock().await.get_mut(&table_batch.table_id) {
let result = replay_table_log_entries(
&context.flusher,
context.max_retry_flush_limit,
&mut ctx.serial_exec,
&ctx.table_data,
log_entries.into_iter(),
)
.await;
(table_batch.table_id, Some(result))
} else {
(table_batch.table_id, None)
let ((), results) = async_scoped::TokioScope::scope_and_block(|scope| {
// Limit the maximum number of concurrent tasks.
let semaphore = Arc::new(Semaphore::new(MAX_REPLAY_TASK_NUM));

for table_batch in table_batches {
// Some tables may have failed in previous replay, ignore them.
if failed_tables.contains_key(&table_batch.table_id) {
continue;
}
});
}
let log_entries: Vec<_> = table_batch
.ranges
.iter()
.flat_map(|range| log_batch.range(range.clone()))
.collect();
let semaphore = semaphore.clone();

scope.spawn(async move {
let _permit = semaphore.acquire().await.unwrap();
// Some tables may have been moved to other shards or dropped, ignore such logs.
if let Some(ctx) = serial_exec_ctxs.get(&table_batch.table_id) {
let mut ctx = ctx.lock().await;
let table_data = ctx.table_data.clone();
let result = replay_table_log_entries(
&context.flusher,
context.max_retry_flush_limit,
&mut ctx.serial_exec,
&table_data,
log_entries.into_iter(),
)
.await;
(table_batch.table_id, Some(result))
} else {
(table_batch.table_id, None)
}
});
}
});

// Run at most 20 tasks in parallel
let mut replay_tasks = futures::stream::iter(replay_tasks).buffer_unordered(20);
while let Some((table_id, ret)) = replay_tasks.next().await {
if let Some(Err(e)) = ret {
for result in results.into_iter().flatten() {
if let (table_id, Some(Err(e))) = result {
// If occur error, mark this table as failed and store the cause.
failed_tables.insert(table_id, e);
}
Expand Down
2 changes: 2 additions & 0 deletions src/wal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ required-features = ["wal-message-queue", "wal-table-kv", "wal-rocksdb", "wal-lo

[dependencies]
anyhow = { workspace = true }
async-scoped = { version = "0.9.0", features = ["use-tokio"] }
async-trait = { workspace = true }
bytes_ext = { workspace = true }
chrono = { workspace = true }
Expand All @@ -64,6 +65,7 @@ memmap2 = { version = "0.9.4", optional = true }
message_queue = { workspace = true, optional = true }
prometheus = { workspace = true }
prost = { workspace = true }
rayon = "1.10.0"
runtime = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
Expand Down
16 changes: 8 additions & 8 deletions src/wal/src/local_storage_impl/record_encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ define_result!(Error);
/// +---------+--------+--------+------------+--------------+--------------+-------+
/// ```
#[derive(Debug)]
pub struct Record<'a> {
pub struct Record {
/// The version number of the record.
pub version: u8,

Expand All @@ -83,19 +83,19 @@ pub struct Record<'a> {
pub value_length: u32,

/// Common log value.
pub value: &'a [u8],
pub value: Vec<u8>,
}

impl<'a> Record<'a> {
pub fn new(table_id: u64, sequence_num: u64, value: &'a [u8]) -> Result<Self> {
impl Record {
pub fn new(table_id: u64, sequence_num: u64, value: &[u8]) -> Result<Self> {
let mut record = Record {
version: NEWEST_RECORD_ENCODING_VERSION,
crc: 0,
length: (8 + 8 + 4 + value.len()) as u32,
table_id,
sequence_num,
value_length: value.len() as u32,
value,
value: value.to_vec(),
};

// Calculate CRC
Expand Down Expand Up @@ -128,7 +128,7 @@ impl RecordEncoding {
}
}

impl Encoder<Record<'_>> for RecordEncoding {
impl Encoder<Record> for RecordEncoding {
type Error = Error;

fn encode<B: BufMut>(&self, buf: &mut B, record: &Record) -> Result<()> {
Expand All @@ -147,7 +147,7 @@ impl Encoder<Record<'_>> for RecordEncoding {
buf.try_put_u64(record.table_id).context(Encoding)?;
buf.try_put_u64(record.sequence_num).context(Encoding)?;
buf.try_put_u32(record.value_length).context(Encoding)?;
buf.try_put(record.value).context(Encoding)?;
buf.try_put(record.value.as_slice()).context(Encoding)?;
Ok(())
}

Expand Down Expand Up @@ -222,7 +222,7 @@ impl RecordEncoding {
let value_length = buf.try_get_u32().context(Decoding)?;

// Read value
let value = &buf[0..value_length as usize];
let value = buf[0..value_length as usize].to_vec();
buf.advance(value_length as usize);

Ok(Record {
Expand Down
Loading

0 comments on commit e6cfb01

Please sign in to comment.