Skip to content

Commit

Permalink
Batch split for prewrite and commit requests (tikv#390)
Browse files Browse the repository at this point in the history
Signed-off-by: Andrey Koshchiy <[email protected]>
  • Loading branch information
Andrey Koshchiy authored and bxq2011hust committed Mar 20, 2024
1 parent 8b62848 commit a9b0027
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 6 deletions.
1 change: 1 addition & 0 deletions config/tikv.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ region-split-check-diff = "1B"
pd-heartbeat-tick-interval = "2s"
pd-store-heartbeat-tick-interval = "5s"
split-region-check-tick-interval = "1s"
raft-entry-max-size = "1MB"

[rocksdb]
max-open-files = 10000
Expand Down
2 changes: 1 addition & 1 deletion src/request/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub use self::{
ResponseWithShard, RetryableMultiRegion,
},
plan_builder::{PlanBuilder, SingleKey},
shard::Shardable,
shard::{Batchable, HasNextBatch, NextBatch, Shardable},
};

pub mod plan;
Expand Down
95 changes: 95 additions & 0 deletions src/request/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,44 @@ pub trait Shardable {
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()>;
}

pub trait Batchable {
type Item;

fn batches(items: Vec<Self::Item>, batch_size: u64) -> Vec<Vec<Self::Item>> {
let mut batches: Vec<Vec<Self::Item>> = Vec::new();
let mut batch: Vec<Self::Item> = Vec::new();
let mut size = 0;

for item in items {
let item_size = Self::item_size(&item);
if size + item_size >= batch_size && !batch.is_empty() {
batches.push(batch);
batch = Vec::new();
size = 0;
}
size += item_size;
batch.push(item);
}
if !batch.is_empty() {
batches.push(batch)
}
batches
}

fn item_size(item: &Self::Item) -> u64;
}

// Use to iterate in a region for scan requests that have batch size limit.
// HasNextBatch use to get the next batch according to previous response.
pub trait HasNextBatch {
fn has_next_batch(&self) -> Option<(Vec<u8>, Vec<u8>)>;
}

// NextBatch use to change start key of request by result of `has_next_batch`.
pub trait NextBatch {
fn next_batch(&mut self, _range: (Vec<u8>, Vec<u8>));
}

impl<Req: KvRequest + Shardable> Shardable for Dispatch<Req> {
type Shard = Req::Shard;

Expand Down Expand Up @@ -167,3 +205,60 @@ macro_rules! shardable_range {
}
};
}

#[cfg(test)]
mod test {
use rand::{thread_rng, Rng};

use super::Batchable;

#[test]
fn test_batches() {
let mut rng = thread_rng();

let items: Vec<_> = (0..3)
.map(|_| (0..2).map(|_| rng.gen::<u8>()).collect::<Vec<_>>())
.collect();

let batch_size = 5;

let batches = BatchableTest::batches(items.clone(), batch_size);

assert_eq!(batches.len(), 2);
assert_eq!(batches[0].len(), 2);
assert_eq!(batches[1].len(), 1);
assert_eq!(batches[0][0], items[0]);
assert_eq!(batches[0][1], items[1]);
assert_eq!(batches[1][0], items[2]);
}

#[test]
fn test_batches_big_item() {
let mut rng = thread_rng();

let items: Vec<_> = (0..3)
.map(|_| (0..3).map(|_| rng.gen::<u8>()).collect::<Vec<_>>())
.collect();

let batch_size = 2;

let batches = BatchableTest::batches(items.clone(), batch_size);

assert_eq!(batches.len(), 3);
for i in 0..items.len() {
let batch = &batches[i];
assert_eq!(batch.len(), 1);
assert_eq!(batch[0], items[i]);
}
}

struct BatchableTest;

impl Batchable for BatchableTest {
type Item = Vec<u8>;

fn item_size(item: &Self::Item) -> u64 {
item.len() as u64
}
}
}
69 changes: 65 additions & 4 deletions src/transaction/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use crate::{
collect_first,
pd::PdClient,
request::{
Collect, CollectSingle, CollectWithShard, DefaultProcessor, KvRequest, Merge, Process,
ResponseWithShard, Shardable, SingleKey,
Batchable, Collect, CollectSingle, CollectWithShard, DefaultProcessor,
KvRequest, Merge, Process, ResponseWithShard, Shardable, SingleKey,
},
store::{store_stream_for_keys, store_stream_for_range_by_start_key, RegionStore},
timestamp::TimestampExt,
Expand All @@ -14,14 +14,19 @@ use crate::{
Key, KvPair, Result, Value,
};
use either::Either;
use futures::stream::BoxStream;
use futures::{
stream::{self, BoxStream},
StreamExt,
};
use std::{collections::HashMap, iter, sync::Arc};
use tikv_client_common::Error::PessimisticLockError;
use tikv_client_proto::{
kvrpcpb::{self, TxnHeartBeatResponse},
pdpb::Timestamp,
};

use super::transaction::TXN_COMMIT_BATCH_SIZE;

// implement HasLocks for a response type that has a `pairs` field,
// where locks can be extracted from both the `pairs` and `error` fields
macro_rules! pair_locks {
Expand Down Expand Up @@ -251,7 +256,18 @@ impl Shardable for kvrpcpb::PrewriteRequest {
let mut mutations = self.mutations.clone();
mutations.sort_by(|a, b| a.key.cmp(&b.key));
log::debug!("PrewriteRequest shards");

store_stream_for_keys(mutations.into_iter(), pd_client.clone())
.flat_map(|result| match result {
Ok((mutations, store)) => stream::iter(kvrpcpb::PrewriteRequest::batches(
mutations,
TXN_COMMIT_BATCH_SIZE,
))
.map(move |batch| Ok((batch, store.clone())))
.boxed(),
Err(e) => stream::iter(Err(e)).boxed(),
})
.boxed()
}

fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
Expand All @@ -272,6 +288,16 @@ impl Shardable for kvrpcpb::PrewriteRequest {
}
}

impl Batchable for kvrpcpb::PrewriteRequest {
type Item = kvrpcpb::Mutation;

fn item_size(item: &Self::Item) -> u64 {
let mut size = item.get_key().len() as u64;
size += item.get_value().len() as u64;
size
}
}

pub fn new_commit_request(
keys: Vec<Vec<u8>>,
start_version: u64,
Expand All @@ -289,7 +315,42 @@ impl KvRequest for kvrpcpb::CommitRequest {
type Response = kvrpcpb::CommitResponse;
}

shardable_keys!(kvrpcpb::CommitRequest);
impl Shardable for kvrpcpb::CommitRequest {
type Shard = Vec<Vec<u8>>;

fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
let mut keys = self.keys.clone();
keys.sort();

store_stream_for_keys(keys.into_iter(), pd_client.clone())
.flat_map(|result| match result {
Ok((keys, store)) => {
stream::iter(kvrpcpb::CommitRequest::batches(keys, TXN_COMMIT_BATCH_SIZE))
.map(move |batch| Ok((batch, store.clone())))
.boxed()
}
Err(e) => stream::iter(Err(e)).boxed(),
})
.boxed()
}

fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
self.set_context(store.region_with_leader.context()?);
self.set_keys(shard.into_iter().map(Into::into).collect());
Ok(())
}
}

impl Batchable for kvrpcpb::CommitRequest {
type Item = Vec<u8>;

fn item_size(item: &Self::Item) -> u64 {
item.len() as u64
}
}

pub fn new_batch_rollback_request(
keys: Vec<Vec<u8>>,
Expand Down
2 changes: 1 addition & 1 deletion src/transaction/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1108,7 +1108,7 @@ const DEFAULT_LOCK_TTL: u64 = 3000;
const DEFAULT_HEARTBEAT_INTERVAL: Duration = Duration::from_millis(MAX_TTL / 2);
/// TiKV recommends each RPC packet should be less than around 1MB. We keep KV size of
/// each request below 16KB.
const TXN_COMMIT_BATCH_SIZE: u64 = 16 * 1024;
pub const TXN_COMMIT_BATCH_SIZE: u64 = 16 * 1024;
const TTL_FACTOR: f64 = 6000.0;

/// Optimistic or pessimistic transaction.
Expand Down
38 changes: 38 additions & 0 deletions tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,44 @@ async fn txn_pessimistic() -> Result<()> {
Ok(())
}

#[tokio::test]
#[serial]
async fn txn_split_batch() -> Result<()> {
init().await?;

let client = TransactionClient::new(pd_addrs(), None).await?;
let mut txn = client.begin_optimistic().await?;
let mut rng = thread_rng();

// testing with raft-entry-max-size = "1MB"
let keys_count: usize = 1000;
let val_len = 15000;

let values: Vec<_> = (0..keys_count)
.map(|_| (0..val_len).map(|_| rng.gen::<u8>()).collect::<Vec<_>>())
.collect();

for (i, value) in values.iter().enumerate() {
let key = Key::from(i.to_be_bytes().to_vec());
txn.put(key, value.clone()).await?;
}

txn.commit().await?;

let mut snapshot = client.snapshot(
client.current_timestamp().await?,
TransactionOptions::new_optimistic(),
);

for (i, value) in values.iter().enumerate() {
let key = Key::from(i.to_be_bytes().to_vec());
let from_snapshot = snapshot.get(key).await?.unwrap();
assert_eq!(from_snapshot, value.clone());
}

Ok(())
}

/// bank transfer mainly tests raw put and get
#[tokio::test]
#[serial]
Expand Down

0 comments on commit a9b0027

Please sign in to comment.