Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

coprocessor: collect output counts for each executor #2607

Merged
merged 5 commits into from
Dec 22, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/coprocessor/dag/dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub struct DAGContext {
exec: Box<Executor>,
output_offsets: Vec<u32>,
batch_row_limit: usize,
num_execs: usize,
}

impl DAGContext {
Expand Down Expand Up @@ -63,6 +64,7 @@ impl DAGContext {
exec: dag_executor.exec,
output_offsets: req.take_output_offsets(),
batch_row_limit: batch_row_limit,
num_execs: req.take_executors().len(),
})
}

Expand Down Expand Up @@ -91,6 +93,9 @@ impl DAGContext {
let mut resp = Response::new();
let mut sel_resp = SelectResponse::new();
sel_resp.set_chunks(RepeatedField::from_vec(chunks));
let mut counts = Vec::with_capacity(self.num_execs);
Copy link
Contributor

@coocood coocood Dec 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can simply use a constant number like 4 for capacity.
It doesn't worth a field in context.

self.exec.collect_output_counts(&mut counts);
sel_resp.set_output_counts(counts);
let data = box_try!(sel_resp.write_to_bytes());
resp.set_data(data);
return Ok(resp);
Expand Down
12 changes: 12 additions & 0 deletions src/coprocessor/dag/executor/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ pub struct AggregationExecutor {
cols: Arc<Vec<ColumnInfo>>,
related_cols_offset: Vec<usize>, // offset of related columns
src: Box<Executor>,
count: i64,
}

impl AggregationExecutor {
Expand Down Expand Up @@ -108,6 +109,7 @@ impl AggregationExecutor {
cols: columns,
related_cols_offset: visitor.column_offsets(),
src: src,
count: 0,
})
}

Expand Down Expand Up @@ -159,6 +161,7 @@ impl AggregationExecutor {

impl Executor for AggregationExecutor {
fn next(&mut self) -> Result<Option<Row>> {
self.count += 1;
if !self.executed {
self.aggregate()?;
self.executed = true;
Expand Down Expand Up @@ -190,6 +193,11 @@ impl Executor for AggregationExecutor {
}
}

fn collect_output_counts(&mut self, counts: &mut Vec<i64>) {
self.src.collect_output_counts(counts);
counts.push(self.count - 1);
}

fn collect_statistics_into(&mut self, statistics: &mut Statistics) {
self.src.collect_statistics_into(statistics);
}
Expand Down Expand Up @@ -364,5 +372,9 @@ mod test {
assert_eq!(ds[3], Datum::from(expect_cols.3));
assert_eq!(ds[4], Datum::from(expect_cols.4));
}
let expected_counts = vec![raw_data.len() as i64, expect_row_cnt as i64];
let mut counts = Vec::with_capacity(2);
aggr_ect.collect_output_counts(&mut counts);
assert_eq!(expected_counts, counts);
}
}
12 changes: 12 additions & 0 deletions src/coprocessor/dag/executor/index_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub struct IndexScanExecutor {
key_ranges: IntoIter<KeyRange>,
scanner: Option<Scanner>,
unique: bool,
count: i64,
}

impl IndexScanExecutor {
Expand Down Expand Up @@ -67,6 +68,7 @@ impl IndexScanExecutor {
key_ranges: key_ranges.into_iter(),
scanner: None,
unique: unique,
count: 0,
}
}

Expand All @@ -86,6 +88,7 @@ impl IndexScanExecutor {
key_ranges: key_ranges.into_iter(),
scanner: None,
unique: false,
count: 0,
}
}

Expand Down Expand Up @@ -145,6 +148,7 @@ impl IndexScanExecutor {

impl Executor for IndexScanExecutor {
fn next(&mut self) -> Result<Option<Row>> {
self.count += 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'd better increase count only when meets some value since we will support stream in the future.

loop {
if let Some(row) = self.get_row_from_range_scanner()? {
return Ok(Some(row));
Expand All @@ -170,6 +174,10 @@ impl Executor for IndexScanExecutor {
}
}

fn collect_output_counts(&mut self, counts: &mut Vec<i64>) {
counts.push(self.count - 1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'd better set self.count to zero after collect

}

fn collect_statistics_into(&mut self, statistics: &mut Statistics) {
statistics.add(&self.statistics);
self.statistics = Statistics::default();
Expand Down Expand Up @@ -360,6 +368,10 @@ mod test {
}
}
assert!(scanner.next().unwrap().is_none());
let expected_counts = vec![KEY_NUMBER as i64];
let mut counts = Vec::with_capacity(1);
scanner.collect_output_counts(&mut counts);
assert_eq!(expected_counts, counts);
}

#[test]
Expand Down
4 changes: 4 additions & 0 deletions src/coprocessor/dag/executor/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ impl<'a> Executor for LimitExecutor<'a> {
}
}

fn collect_output_counts(&mut self, _: &mut Vec<i64>) {
// We do not know whether `limit` has consumed all of it's source, so just ignore it.
}

fn collect_statistics_into(&mut self, statistics: &mut Statistics) {
self.src.collect_statistics_into(statistics);
}
Expand Down
1 change: 1 addition & 0 deletions src/coprocessor/dag/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ impl Row {

pub trait Executor {
fn next(&mut self) -> Result<Option<Row>>;
fn collect_output_counts(&mut self, counts: &mut Vec<i64>);
fn collect_statistics_into(&mut self, stats: &mut Statistics);
}

Expand Down
12 changes: 12 additions & 0 deletions src/coprocessor/dag/executor/selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub struct SelectionExecutor {
related_cols_offset: Vec<usize>, // offset of related columns
ctx: Arc<EvalContext>,
src: Box<Executor>,
count: i64,
}

impl SelectionExecutor {
Expand All @@ -49,13 +50,15 @@ impl SelectionExecutor {
related_cols_offset: visitor.column_offsets(),
ctx: ctx,
src: src,
count: 0,
})
}
}

#[allow(never_loop)]
impl Executor for SelectionExecutor {
fn next(&mut self) -> Result<Option<Row>> {
self.count += 1;
'next: while let Some(row) = self.src.next()? {
let cols = inflate_with_col_for_dag(
&self.ctx,
Expand All @@ -75,6 +78,11 @@ impl Executor for SelectionExecutor {
Ok(None)
}

fn collect_output_counts(&mut self, counts: &mut Vec<i64>) {
self.src.collect_output_counts(counts);
counts.push(self.count - 1);
}

fn collect_statistics_into(&mut self, statistics: &mut Statistics) {
self.src.collect_statistics_into(statistics);
}
Expand Down Expand Up @@ -276,5 +284,9 @@ mod tests {
assert_eq!(selection_rows.len(), expect_row_handles.len());
let result_row = selection_rows.iter().map(|r| r.handle).collect::<Vec<_>>();
assert_eq!(result_row, expect_row_handles);
let expected_counts = vec![raw_data.len() as i64, expect_row_handles.len() as i64];
let mut counts = Vec::with_capacity(2);
selection_executor.collect_output_counts(&mut counts);
assert_eq!(expected_counts, counts);
}
}
11 changes: 11 additions & 0 deletions src/coprocessor/dag/executor/table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub struct TableScanExecutor {
col_ids: HashSet<i64>,
key_ranges: IntoIter<KeyRange>,
scanner: Option<Scanner>,
count: i64,
}

impl TableScanExecutor {
Expand Down Expand Up @@ -62,6 +63,7 @@ impl TableScanExecutor {
col_ids: col_ids,
key_ranges: key_ranges.into_iter(),
scanner: None,
count: 0,
}
}

Expand Down Expand Up @@ -97,6 +99,7 @@ impl TableScanExecutor {

impl Executor for TableScanExecutor {
fn next(&mut self) -> Result<Option<Row>> {
self.count += 1;
loop {
if let Some(row) = self.get_row_from_range_scanner()? {
return Ok(Some(row));
Expand All @@ -123,6 +126,10 @@ impl Executor for TableScanExecutor {
}
}

fn collect_output_counts(&mut self, counts: &mut Vec<i64>) {
counts.push(self.count - 1);
}

fn collect_statistics_into(&mut self, statistics: &mut Statistics) {
statistics.add(&self.statistics);
self.statistics = Statistics::default();
Expand Down Expand Up @@ -210,6 +217,10 @@ mod test {
assert_eq!(expect_row[&cid], v.to_vec());
}
assert!(table_scanner.next().unwrap().is_none());
let expected_counts = vec![1];
let mut counts = Vec::with_capacity(1);
table_scanner.collect_output_counts(&mut counts);
assert_eq!(expected_counts, counts);
}

#[test]
Expand Down
12 changes: 12 additions & 0 deletions src/coprocessor/dag/executor/topn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub struct TopNExecutor {
iter: Option<IntoIter<SortRow>>,
ctx: Arc<EvalContext>,
src: Box<Executor>,
count: i64,
}

impl TopNExecutor {
Expand All @@ -87,6 +88,7 @@ impl TopNExecutor {
iter: None,
ctx: ctx,
src: src,
count: 0,
})
}

Expand Down Expand Up @@ -114,6 +116,7 @@ impl TopNExecutor {

impl Executor for TopNExecutor {
fn next(&mut self) -> Result<Option<Row>> {
self.count += 1;
if self.iter.is_none() {
self.fetch_all()?;
self.iter = Some(self.heap.take().unwrap().into_sorted_vec()?.into_iter());
Expand All @@ -128,6 +131,11 @@ impl Executor for TopNExecutor {
}
}

fn collect_output_counts(&mut self, counts: &mut Vec<i64>) {
self.src.collect_output_counts(counts);
counts.push(self.count - 1);
}

fn collect_statistics_into(&mut self, statistics: &mut Statistics) {
self.src.collect_statistics_into(statistics);
}
Expand Down Expand Up @@ -436,5 +444,9 @@ pub mod test {
for (row, handle) in topn_rows.iter().zip(expect_row_handles) {
assert_eq!(row.handle, handle);
}
let expected_counts = vec![6, limit as i64];
let mut counts = Vec::with_capacity(2);
topn_ect.collect_output_counts(&mut counts);
assert_eq!(expected_counts, counts);
}
}
19 changes: 19 additions & 0 deletions tests/coprocessor/test_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2730,3 +2730,22 @@ fn test_key_is_locked_for_index() {
assert!(resp.has_locked(), "{:?}", resp);
end_point.stop().unwrap().join().unwrap();
}

#[test]
fn test_output_counts() {
let data = vec![
(1, Some("name:0"), 2),
(2, Some("name:4"), 3),
(4, Some("name:3"), 1),
(5, Some("name:1"), 4),
];

let product = ProductTable::new();
let (_, mut end_point) = init_with_data(&product, &data);

let req = DAGSelect::from(&product.table).build();
let resp = handle_select(&end_point, req);
assert_eq!(resp.get_output_counts(), [data.len() as i64]);

end_point.stop().unwrap().join().unwrap();
}