Skip to content

Commit

Permalink
feat: add TableExec (#346)
Browse files Browse the repository at this point in the history
Please be sure to look over the pull request guidelines here:
https://github.com/spaceandtimelabs/sxt-proof-of-sql/blob/main/CONTRIBUTING.md#submit-pr.

# Please go through the following checklist
- [x] The PR title and commit messages adhere to guidelines here:
https://github.com/spaceandtimelabs/sxt-proof-of-sql/blob/main/CONTRIBUTING.md.
In particular `!` is used if and only if at least one breaking change
has been introduced.
- [x] I have run the ci check script with `source
scripts/run_ci_checks.sh`.

# Rationale for this change
`TableExec` is to be the most common source `ProofPlan` when all
`ProofPlan`s become composable. Basically all it does is reading from a
table. I plan to remove table reading functionalities from all other
`ProofPlan`s soon.
<!--
Why are you proposing this change? If this is already explained clearly
in the linked issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.

 Example:
 Add `NestedLoopJoinExec`.
 Closes #345.

Since we added `HashJoinExec` in #323 it has been possible to do
provable inner joins. However performance is not satisfactory in some
cases. Hence we need to fix the problem by implement
`NestedLoopJoinExec` and speed up the code
 for `HashJoinExec`.
-->

# What changes are included in this PR?
- add `TableExec`
<!--
There is no need to duplicate the description in the ticket here but it
is sometimes worth providing a summary of the individual changes in this
PR.

Example:
- Add `NestedLoopJoinExec`.
- Speed up `HashJoinExec`.
- Route joins to `NestedLoopJoinExec` if the outer input is sufficiently
small.
-->

# Are these changes tested?
<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code

If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?

Example:
Yes.
-->
N/A
  • Loading branch information
iajoiner authored Nov 13, 2024
2 parents 4e70b1a + 55a640b commit be228d2
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 5 deletions.
2 changes: 0 additions & 2 deletions crates/proof-of-sql/src/base/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ macro_rules! indexmap_macro {
}

/// Create an [`IndexSet`][self::IndexSet] from a list of values
#[cfg(test)]
macro_rules! indexset_macro {
($($value:expr,)+) => { $crate::base::map::indexset!($($value),+) };
($($value:expr),*) => {
Expand All @@ -39,5 +38,4 @@ macro_rules! indexset_macro {
}

pub(crate) use indexmap_macro as indexmap;
#[cfg(test)]
pub(crate) use indexset_macro as indexset;
4 changes: 3 additions & 1 deletion crates/proof-of-sql/src/sql/proof_plans/dyn_proof_plan.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{EmptyExec, FilterExec, GroupByExec, ProjectionExec};
use super::{EmptyExec, FilterExec, GroupByExec, ProjectionExec, TableExec};
use crate::{
base::{
database::{Column, ColumnField, ColumnRef, DataAccessor, OwnedTable, TableRef},
Expand All @@ -21,6 +21,8 @@ use serde::{Deserialize, Serialize};
pub enum DynProofPlan {
/// Source [`ProofPlan`] for (sub)queries without table source such as `SELECT "No table here" as msg;`
Empty(EmptyExec),
/// Source [`ProofPlan`] for (sub)queries with table source such as `SELECT col from tab;`
Table(TableExec),
/// Provable expressions for queries of the form
/// ```ignore
/// SELECT <result_expr1>, ..., <result_exprN> FROM <table>
Expand Down
5 changes: 5 additions & 0 deletions crates/proof-of-sql/src/sql/proof_plans/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
mod empty_exec;
pub use empty_exec::EmptyExec;

mod table_exec;
pub use table_exec::TableExec;
#[cfg(all(test, feature = "blitzar"))]
mod table_exec_test;

mod projection_exec;
pub(crate) use projection_exec::ProjectionExec;
#[cfg(all(test, feature = "blitzar"))]
Expand Down
111 changes: 111 additions & 0 deletions crates/proof-of-sql/src/sql/proof_plans/table_exec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
use crate::{
base::{
database::{Column, ColumnField, ColumnRef, DataAccessor, OwnedTable, TableRef},
map::{indexset, IndexMap, IndexSet},
proof::ProofError,
scalar::Scalar,
},
sql::proof::{
CountBuilder, FinalRoundBuilder, FirstRoundBuilder, ProofPlan, ProverEvaluate,
VerificationBuilder,
},
};
use alloc::vec::Vec;
use bumpalo::Bump;
use core::iter::repeat_with;
use serde::{Deserialize, Serialize};

/// Source [`ProofPlan`] for (sub)queries with table source such as `SELECT col from tab;`
/// Inspired by `DataFusion` data source [`ExecutionPlan`]s such as [`ArrowExec`] and [`CsvExec`].
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub struct TableExec {
/// Table reference
pub table_ref: TableRef,
/// Schema of the table
pub schema: Vec<ColumnField>,
}

impl TableExec {
/// Creates a new [`TableExec`].
#[must_use]
pub fn new(table_ref: TableRef, schema: Vec<ColumnField>) -> Self {
Self { table_ref, schema }
}

/// Returns the entire table.
fn get_table<'a, S: Scalar>(&self, accessor: &'a dyn DataAccessor<S>) -> Vec<Column<'a, S>> {
self.schema
.iter()
.map(|field| {
accessor.get_column(ColumnRef::new(
self.table_ref,
field.name(),
field.data_type(),
))
})
.collect()
}
}

impl ProofPlan for TableExec {
fn count(&self, builder: &mut CountBuilder) -> Result<(), ProofError> {
builder.count_intermediate_mles(self.schema.len());
Ok(())
}

#[allow(unused_variables)]
fn verifier_evaluate<S: Scalar>(
&self,
builder: &mut VerificationBuilder<S>,
_accessor: &IndexMap<ColumnRef, S>,
_result: Option<&OwnedTable<S>>,
) -> Result<Vec<S>, ProofError> {
Ok(repeat_with(|| builder.consume_intermediate_mle())
.take(self.schema.len())
.collect::<Vec<_>>())
}

fn get_column_result_fields(&self) -> Vec<ColumnField> {
self.schema.clone()
}

fn get_column_references(&self) -> IndexSet<ColumnRef> {
self.schema
.iter()
.map(|field| ColumnRef::new(self.table_ref, field.name(), field.data_type()))
.collect()
}

fn get_table_references(&self) -> IndexSet<TableRef> {
indexset! {self.table_ref}
}
}

impl ProverEvaluate for TableExec {
#[tracing::instrument(name = "TableExec::result_evaluate", level = "debug", skip_all)]
fn result_evaluate<'a, S: Scalar>(
&self,
_input_length: usize,
_alloc: &'a Bump,
accessor: &'a dyn DataAccessor<S>,
) -> Vec<Column<'a, S>> {
self.get_table(accessor)
}

fn first_round_evaluate(&self, _builder: &mut FirstRoundBuilder) {}

#[tracing::instrument(name = "TableExec::final_round_evaluate", level = "debug", skip_all)]
#[allow(unused_variables)]
fn final_round_evaluate<'a, S: Scalar>(
&self,
builder: &mut FinalRoundBuilder<'a, S>,
alloc: &'a Bump,
accessor: &'a dyn DataAccessor<S>,
) -> Vec<Column<'a, S>> {
let table = self.get_table(accessor);
for column in &table {
builder.produce_intermediate_mle(column.as_scalar(alloc));
}
table
}
}
88 changes: 88 additions & 0 deletions crates/proof-of-sql/src/sql/proof_plans/table_exec_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use super::test_utility::*;
use crate::{
base::database::{
owned_table_utility::*, table_utility::*, ColumnField, ColumnType, TableRef,
TableTestAccessor,
},
sql::proof::{exercise_verification, VerifiableQueryResult},
};
use blitzar::proof::InnerProductProof;
use bumpalo::Bump;

#[test]
fn we_can_create_and_prove_an_empty_table_exec() {
let alloc = Bump::new();
let table_ref = TableRef::new("namespace.table_name".parse().unwrap());
let plan = table_exec(
table_ref,
vec![ColumnField::new("a".parse().unwrap(), ColumnType::BigInt)],
);
let accessor = TableTestAccessor::<InnerProductProof>::new_from_table(
table_ref,
table([borrowed_bigint("a", [0_i64; 0], &alloc)]),
0_usize,
(),
);
let verifiable_res = VerifiableQueryResult::new(&plan, &accessor, &());
exercise_verification(&verifiable_res, &plan, &accessor, table_ref);
let res = verifiable_res.verify(&plan, &accessor, &()).unwrap().table;
let expected = owned_table([bigint("a", [0_i64; 0])]);
assert_eq!(res, expected);
}

#[test]
fn we_can_create_and_prove_a_table_exec() {
let alloc = Bump::new();
let table_ref = TableRef::new("namespace.table_name".parse().unwrap());
let plan = table_exec(
table_ref,
vec![
ColumnField::new("language_rank".parse().unwrap(), ColumnType::BigInt),
ColumnField::new("language_name".parse().unwrap(), ColumnType::VarChar),
ColumnField::new("space_and_time".parse().unwrap(), ColumnType::VarChar),
],
);
let accessor = TableTestAccessor::<InnerProductProof>::new_from_table(
table_ref,
table([
borrowed_bigint("language_rank", [0_i64, 1, 2, 3], &alloc),
borrowed_varchar(
"language_name",
["English", "Español", "Português", "Français"],
&alloc,
),
borrowed_varchar(
"space_and_time",
[
"space and time",
"espacio y tiempo",
"espaço e tempo",
"espace et temps",
],
&alloc,
),
]),
0_usize,
(),
);
let verifiable_res = VerifiableQueryResult::new(&plan, &accessor, &());
exercise_verification(&verifiable_res, &plan, &accessor, table_ref);
let res = verifiable_res.verify(&plan, &accessor, &()).unwrap().table;
let expected = owned_table([
bigint("language_rank", [0, 1, 2, 3]),
varchar(
"language_name",
["English", "Español", "Português", "Français"],
),
varchar(
"space_and_time",
[
"space and time",
"espacio y tiempo",
"espaço e tempo",
"espace et temps",
],
),
]);
assert_eq!(res, expected);
}
11 changes: 9 additions & 2 deletions crates/proof-of-sql/src/sql/proof_plans/test_utility.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
use super::{DynProofPlan, FilterExec, GroupByExec, ProjectionExec};
use crate::sql::proof_exprs::{AliasedDynProofExpr, ColumnExpr, DynProofExpr, TableExpr};
use super::{DynProofPlan, FilterExec, GroupByExec, ProjectionExec, TableExec};
use crate::{
base::database::{ColumnField, TableRef},
sql::proof_exprs::{AliasedDynProofExpr, ColumnExpr, DynProofExpr, TableExpr},
};

pub fn table_exec(table_ref: TableRef, schema: Vec<ColumnField>) -> DynProofPlan {
DynProofPlan::Table(TableExec::new(table_ref, schema))
}

pub fn projection(results: Vec<AliasedDynProofExpr>, table: TableExpr) -> DynProofPlan {
DynProofPlan::Projection(ProjectionExec::new(results, table))
Expand Down

0 comments on commit be228d2

Please sign in to comment.