Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

coprocessor: support builtin aggregation function bit_and, 'bit_or, bit_xor. #2510

Merged
merged 28 commits into from
Dec 29, 2017
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
08b7d90
coprocessor: support more builtin aggregation functions.
spongedu Nov 25, 2017
c10c999
Refine tests
spongedu Nov 26, 2017
b84e546
Merge branch 'master' into agg_bit_ops
spongedu Nov 27, 2017
a83cdfa
recover test case that is deleted by mistake
spongedu Nov 27, 2017
77aa854
Merge branch 'agg_bit_ops' of https://github.com/spongedu/tikv into a…
spongedu Nov 27, 2017
6ee2b42
Merge remote-tracking branch 'upstream/master' into agg_bit_ops
spongedu Nov 28, 2017
663457d
Code refine
spongedu Nov 28, 2017
67a2107
Code Format Refine
spongedu Nov 28, 2017
233e511
Update tipb version
spongedu Nov 29, 2017
36155ec
Change tipb dependency to make compile happy
spongedu Nov 29, 2017
533fe1f
Merge branch 'master' into agg_bit_ops
AndreMouche Dec 5, 2017
886ebd8
Merge branch 'master' into agg_bit_ops
AndreMouche Dec 6, 2017
4f8ab35
Merge branch 'master' into agg_bit_ops
spongedu Dec 6, 2017
78e9950
Merge remote-tracking branch 'upstream/master' into agg_bit_ops
spongedu Dec 14, 2017
c42656f
Fix Cargo.lock
spongedu Dec 14, 2017
0f61e01
Fix Cargo.lock
spongedu Dec 14, 2017
7cb7783
Fix Cargo.toml
spongedu Dec 14, 2017
6640fc2
Fix config
spongedu Dec 14, 2017
7418715
Merge remote-tracking branch 'upstream/master' into agg_bit_ops
spongedu Dec 17, 2017
fc62b58
Merge branch 'master' into agg_bit_ops
spongedu Dec 18, 2017
dd276ed
Merge branch 'master' into agg_bit_ops
spongedu Dec 18, 2017
7e4a71f
Merge branch 'master' into agg_bit_ops
breezewish Dec 20, 2017
142daf8
Merge branch 'master' into agg_bit_ops
AndreMouche Dec 22, 2017
c5b4c4d
Remove redundant empty line
spongedu Dec 23, 2017
9071c3d
Merge remote-tracking branch 'upstream/master' into agg_bit_ops
spongedu Dec 23, 2017
f4c7921
Merge remote-tracking branch 'upstream/master' into agg_bit_ops
spongedu Dec 26, 2017
83d5e11
Merge branch 'master' into agg_bit_ops
AndreMouche Dec 29, 2017
6c4f402
Merge branch 'master' into agg_bit_ops
winoros Dec 29, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions src/coprocessor/select/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ use super::xeval::{evaluator, EvalContext};

pub fn build_aggr_func(tp: ExprType) -> Result<Box<AggrFunc>> {
match tp {
ExprType::Agg_BitAnd => Ok(box AggBitAnd {
c: 0xffffffffffffffff,
}),
ExprType::Agg_BitOr => Ok(box AggBitOr { c: 0 }),
ExprType::Agg_BitXor => Ok(box AggBitXor { c: 0 }),
ExprType::Count => Ok(box Count { c: 0 }),
ExprType::First => Ok(box First { e: None }),
ExprType::Sum => Ok(box Sum { res: None }),
Expand All @@ -42,6 +47,81 @@ pub trait AggrFunc {
fn calc(&mut self, collector: &mut Vec<Datum>) -> Result<()>;
}

struct AggBitAnd {
c: u64,
}

impl AggrFunc for AggBitAnd {
fn update(&mut self, _: &EvalContext, args: Vec<Datum>) -> Result<()> {
if args.len() != 1 {
return Err(box_err!(
"bit_and only support one column, but got {}",
args.len()
));
}
if args[0] == Datum::Null {
return Ok(());
}
self.c &= args[0].u64();
Ok(())
}

fn calc(&mut self, collector: &mut Vec<Datum>) -> Result<()> {
collector.push(Datum::U64(self.c));
Ok(())
}
}

struct AggBitOr {
c: u64,
}

impl AggrFunc for AggBitOr {
fn update(&mut self, _: &EvalContext, args: Vec<Datum>) -> Result<()> {
if args.len() != 1 {
return Err(box_err!(
"bit_or only support one column, but got {}",
args.len()
));
}
if args[0] == Datum::Null {
return Ok(());
}
self.c |= args[0].u64();
Ok(())
}

fn calc(&mut self, collector: &mut Vec<Datum>) -> Result<()> {
collector.push(Datum::U64(self.c));
Ok(())
}
}

struct AggBitXor {
c: u64,
}

impl AggrFunc for AggBitXor {
fn update(&mut self, _: &EvalContext, args: Vec<Datum>) -> Result<()> {
if args.len() != 1 {
return Err(box_err!(
"bit_xor only support one column, but got {}",
args.len()
));
}
if args[0] == Datum::Null {
return Ok(());
}
self.c ^= args[0].u64();
Ok(())
}

fn calc(&mut self, collector: &mut Vec<Datum>) -> Result<()> {
collector.push(Datum::U64(self.c));
Ok(())
}
}

struct Count {
c: u64,
}
Expand Down
116 changes: 116 additions & 0 deletions tests/coprocessor/test_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,18 @@ impl<'a> Select<'a> {
self.aggr_col(col, ExprType::Avg)
}

fn bit_and(self, col: Column) -> Select<'a> {
self.aggr_col(col, ExprType::Agg_BitAnd)
}

fn bit_or(self, col: Column) -> Select<'a> {
self.aggr_col(col, ExprType::Agg_BitOr)
}

fn bit_xor(self, col: Column) -> Select<'a> {
self.aggr_col(col, ExprType::Agg_BitXor)
}

fn max(self, col: Column) -> Select<'a> {
self.aggr_col(col, ExprType::Max)
}
Expand Down Expand Up @@ -863,6 +875,18 @@ impl DAGSelect {
self.aggr_col(col, ExprType::Min)
}

fn bit_and(self, col: Column) -> DAGSelect {
self.aggr_col(col, ExprType::Agg_BitAnd)
}

fn bit_or(self, col: Column) -> DAGSelect {
self.aggr_col(col, ExprType::Agg_BitOr)
}

fn bit_xor(self, col: Column) -> DAGSelect {
self.aggr_col(col, ExprType::Agg_BitXor)
}

fn group_by(mut self, cols: &[Column]) -> DAGSelect {
for col in cols {
let offset = offset_for_column(&self.cols, col.id);
Expand Down Expand Up @@ -1530,6 +1554,98 @@ fn test_aggr_extre() {
end_point.stop().unwrap();
}

#[test]
fn test_aggr_bit_ops() {
let data = vec![
(1, Some("name:0"), 2),
(2, Some("name:3"), 3),
(4, Some("name:0"), 1),
(5, Some("name:5"), 4),
(6, Some("name:5"), 5),
(7, None, 4),
];

let product = ProductTable::new();
let (mut store, mut end_point) = init_with_data(&product, &data);

store.begin();
for &(id, name) in &[(8, b"name:5"), (9, b"name:6")] {
store
.insert_into(&product.table)
.set(product.id, Datum::I64(id))
.set(product.name, Datum::Bytes(name.to_vec()))
.set(product.count, Datum::Null)
.execute();
}
store.commit();

let exp = vec![
(
Datum::Bytes(b"name:0".to_vec()),
Datum::U64(0),
Datum::U64(3),
Datum::U64(3),
),
(
Datum::Bytes(b"name:3".to_vec()),
Datum::U64(3),
Datum::U64(3),
Datum::U64(3),
),
(
Datum::Bytes(b"name:5".to_vec()),
Datum::U64(4),
Datum::U64(5),
Datum::U64(1),
),
(Datum::Null, Datum::U64(4), Datum::U64(4), Datum::U64(4)),
(
Datum::Bytes(b"name:6".to_vec()),
Datum::U64(18446744073709551615),
Datum::U64(0),
Datum::U64(0),
),
];
// for selection
let req = Select::from(&product.table)
.bit_and(product.count)
.bit_or(product.count)
.bit_xor(product.count)
.group_by(&[product.name])
.build();
let mut resp = handle_select(&end_point, req);
assert_eq!(row_cnt(resp.get_chunks()), exp.len());
let spliter = ChunkSpliter::new(resp.take_chunks().into_vec());
for (row, (name, bitand, bitor, bitxor)) in spliter.zip(exp.clone()) {
let gk = datum::encode_value(&[name]).unwrap();
let expected_datum = vec![Datum::Bytes(gk), bitand, bitor, bitxor];
let expected_encoded = datum::encode_value(&expected_datum).unwrap();
assert_eq!(row.data, &*expected_encoded);
}

// for dag
let req = DAGSelect::from(&product.table)
.bit_and(product.count)
.bit_or(product.count)
.bit_xor(product.count)
.group_by(&[product.name])
.build();
let mut resp = handle_select(&end_point, req);
let mut row_count = 0;
let exp_len = exp.len();
let spliter = DAGChunkSpliter::new(resp.take_chunks().into_vec(), 4);
for (row, (name, bitand, bitor, bitxor)) in spliter.zip(exp) {
let expected_datum = vec![bitand, bitor, bitxor, name];
let expected_encoded = datum::encode_value(&expected_datum).unwrap();
let result_encoded = datum::encode_value(&row).unwrap();
assert_eq!(result_encoded, &*expected_encoded);
row_count += 1;
}
assert_eq!(row_count, exp_len);

end_point.stop().unwrap();
}

#[test]
fn test_order_by_column() {
let data = vec![
Expand Down