Skip to content

Commit

Permalink
Use actual PK for median
Browse files Browse the repository at this point in the history
  • Loading branch information
cdouglas committed Jul 11, 2024
1 parent a3861d9 commit cdef88b
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 145 deletions.
Binary file modified incremental_transactions/tpcc/graphs/byname_max_sql.rs.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified incremental_transactions/tpcc/graphs/byname_sql.rs.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified incremental_transactions/tpcc/graphs/byname_sql_incremental.rs.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
6 changes: 3 additions & 3 deletions incremental_transactions/tpcc/sql/byname.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
CREATE VIEW cust_agg AS
SELECT ARRAY_AGG(c_id ORDER BY c_first) AS cust_array
FROM (SELECT c.c_id, c.c_first
SELECT ARRAY_AGG((c_id + c_w_id + c_d_id) ORDER BY c_first) AS cust_array
FROM (SELECT c.c_id, c.c_w_id, c.c_d_id, c.c_first
FROM customer AS c,
transaction_parameters AS t
WHERE c.c_last = t.c_last
Expand All @@ -16,4 +16,4 @@ SELECT c.c_first, c.c_middle, c.c_id,
FROM customer as c,
cust_agg as a,
transaction_parameters as t
WHERE c.c_id = a.cust_array[(ARRAY_LENGTH(a.cust_array) / 2) + 1];
WHERE (c.c_id + c.c_w_id + c.c_d_id) = a.cust_array[(ARRAY_LENGTH(a.cust_array) / 2) + 1];
14 changes: 7 additions & 7 deletions incremental_transactions/tpcc/src/byname_max_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,13 @@ pub fn circuit(cconf: CircuitConfig) -> Result<(DBSPHandle, (ZSetHandle<Tup8<Opt
// DBSPIntegrateOperator 5331(941)
let stream5331: Stream<_, IndexedWSet<(), Tup14<Option<String>, Option<String>, Option<i32>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<Decimal>, Option<Decimal>, Option<Decimal>, Option<Timestamp>>>> = stream5329.integrate();
// rel#86:LogicalSort.(input=LogicalProject#84,sort0=$0,dir0=DESC,fetch=1)
// DBSPStreamAggregateOperator 8499(980)
let stream8499: Stream<_, IndexedWSet<(), Vec<Tup14<Option<String>, Option<String>, Option<i32>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<Decimal>, Option<Decimal>, Option<Decimal>, Option<Timestamp>>>>> = stream5331.stream_aggregate(Fold::<_, _, UnimplementedSemigroup<_>, _, _>::new(Vec::new(), move |t_9: &mut Vec<Tup14<Option<String>, Option<String>, Option<i32>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<Decimal>, Option<Decimal>, Option<Decimal>, Option<Timestamp>>>, t_10: &Tup14<Option<String>, Option<String>, Option<i32>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<Decimal>, Option<Decimal>, Option<Decimal>, Option<Timestamp>>, t_0: Weight, | {
// DBSPStreamAggregateOperator 8501(980)
let stream8501: Stream<_, IndexedWSet<(), Vec<Tup14<Option<String>, Option<String>, Option<i32>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<Decimal>, Option<Decimal>, Option<Decimal>, Option<Timestamp>>>>> = stream5331.stream_aggregate(Fold::<_, _, UnimplementedSemigroup<_>, _, _>::new(Vec::new(), move |t_9: &mut Vec<Tup14<Option<String>, Option<String>, Option<i32>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<Decimal>, Option<Decimal>, Option<Decimal>, Option<Timestamp>>>, t_10: &Tup14<Option<String>, Option<String>, Option<i32>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<Decimal>, Option<Decimal>, Option<Decimal>, Option<Timestamp>>, t_0: Weight, | {
weighted_push(t_9, t_10, t_0)
}));
// rel#86:LogicalSort.(input=LogicalProject#84,sort0=$0,dir0=DESC,fetch=1)
// DBSPMapOperator 8501(991)
let stream8501: Stream<_, WSet<Vec<Tup14<Option<String>, Option<String>, Option<i32>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<Decimal>, Option<Decimal>, Option<Decimal>, Option<Timestamp>>>>> = stream8499.map(move |(k, v): (&(), &Vec<Tup14<Option<String>, Option<String>, Option<i32>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<Decimal>, Option<Decimal>, Option<Decimal>, Option<Timestamp>>>)| -> Vec<Tup14<Option<String>, Option<String>, Option<i32>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<Decimal>, Option<Decimal>, Option<Decimal>, Option<Timestamp>>> {
// DBSPMapOperator 8503(991)
let stream8503: Stream<_, WSet<Vec<Tup14<Option<String>, Option<String>, Option<i32>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<Decimal>, Option<Decimal>, Option<Decimal>, Option<Timestamp>>>>> = stream8501.map(move |(k, v): (&(), &Vec<Tup14<Option<String>, Option<String>, Option<i32>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<Decimal>, Option<Decimal>, Option<Decimal>, Option<Timestamp>>>)| -> Vec<Tup14<Option<String>, Option<String>, Option<i32>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<Decimal>, Option<Decimal>, Option<Decimal>, Option<Timestamp>>> {
let ec = Extract::new(move |r: &Tup14<Option<String>, Option<String>, Option<i32>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<Decimal>, Option<Decimal>, Option<Decimal>, Option<Timestamp>>| r.0.clone()).rev();
let comp = move |a: &Tup14<Option<String>, Option<String>, Option<i32>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<Decimal>, Option<Decimal>, Option<Decimal>, Option<Timestamp>>, b: &Tup14<Option<String>, Option<String>, Option<i32>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<String>, Option<Decimal>, Option<Decimal>, Option<Decimal>, Option<Timestamp>>| { ec.compare(a, b) };let mut v = v.clone();
v.sort_by(comp);
Expand All @@ -230,10 +230,10 @@ pub fn circuit(cconf: CircuitConfig) -> Result<(DBSPHandle, (ZSetHandle<Tup8<Opt
// FETCH NEXT 1 ROWS ONLY
// ORDER BY `C_FIRST` DESC
// FETCH NEXT 1 ROWS ONLY
// DBSPSinkOperator 8506(1033)
let handle8506 = stream8501.output();
// DBSPSinkOperator 8508(1033)
let handle8508 = stream8503.output();

Ok((handle49, handle67, handle120, handle143, handle166, handle279, handle337, handle8506, ))
Ok((handle49, handle67, handle120, handle143, handle166, handle279, handle337, handle8508, ))
})?;
Ok((circuit, streams))
}
Expand Down
Loading

0 comments on commit cdef88b

Please sign in to comment.