Skip to content

Commit

Permalink
fix: clarify lazy API in bindings (#3131)
Browse files Browse the repository at this point in the history
  • Loading branch information
tychoish authored Aug 2, 2024
1 parent 232350f commit 95835b5
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 82 deletions.
29 changes: 9 additions & 20 deletions bindings/python/tests/test_execution_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,20 @@
import pytest


def test_eager_ddl():
def test_sql_validates_plan():
con = glaredb.connect()

with pytest.raises(Exception):
con.sql("select count(*) from tblsqlhelper;").to_arrow().to_pydict()["COUNT(*)"][0] == 0

one = con.sql("create table tblsqlhelper (a int, b int);")
assert con.sql("select * from tblsqlhelper;").to_arrow().num_rows == 0

two = con.sql("insert into tblsqlhelper values (4, 2);")
assert con.sql("select * from tblsqlhelper;").to_arrow().num_rows == 1

with pytest.raises(Exception, match="Duplicate name"):
one.execute()

assert con.sql("select count(*) from tblsqlhelper;").to_arrow().to_pydict()["COUNT(*)"][0] == 1
with pytest.raises(Exception):
con.sql("select count(*) from tblsqlhelper;")

two = con.sql("insert into tblsqlhelper values (1, 2);")
assert con.sql("select * from tblsqlhelper;").to_arrow().num_rows == 2
assert con.sql("select count(*) from tblsqlhelper;").to_arrow().to_pydict()["COUNT(*)"][0] == 2
def test_sql_excludes_ddl():
con = glaredb.connect()

two.execute()
assert con.sql("select * from tblsqlhelper;").to_arrow().num_rows == 3
with pytest.raises(Exception):
con.sql("select count(*) from tblsqlhelper;").to_arrow().to_pydict()["COUNT(*)"][0]

with pytest.raises(Exception):
con.sql("create table tblsqlhelper (a int, b int);")

def test_execute_is_eager():
con = glaredb.connect()
Expand All @@ -36,4 +26,3 @@ def test_execute_is_eager():
con.execute("create table tblexechelper (a int, b int);")

assert con.sql("select count(*) from tblexechelper;").to_arrow().to_pydict()["COUNT(*)"][0] == 0

131 changes: 69 additions & 62 deletions crates/glaredb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,7 @@ impl Connection {

/// Creates a query that is parsed when the Operation is invoked;
/// however, the query is only executed when the results are
/// iterated _unless_ the operation is a write operation or a DDL
/// operation, which are executed when the operation is invoked.
/// iterated. DDL and DML queries are not permitted.
pub fn sql(&self, query: impl Into<String>) -> Operation {
Operation {
op: OperationType::Sql,
Expand All @@ -263,10 +262,9 @@ impl Connection {
}
}

/// PRQL queries have the same semantics as SQL queries; however,
/// because PRQL does not include syntax for DML or DDL
/// operations, these queries only run when the result stream are
/// invoked.
/// PRQL queries have the same semantics as SQL queries, and do
/// not contain support for DDL or DML operations. These queries
/// only run when the result stream are invoked.
pub fn prql(&self, query: impl Into<String>) -> Operation {
Operation {
op: OperationType::Prql,
Expand Down Expand Up @@ -423,9 +421,9 @@ impl RecordStream {

#[derive(Debug, Clone)]
enum OperationType {
/// SQL operations create a operation that runs DDL/DML operations
/// directly, and executes other queries lazily when the results
/// are iterated.
/// SQL operations create a operation that runs queries lazily
/// when the results are iterated. DDL/DML operations are not
/// permitted.
Sql,
/// PRQL, which does not support DDL/DML in our implementation,
/// creates a lazy query object that only runs when the results
Expand Down Expand Up @@ -459,11 +457,8 @@ impl Operation {
self.schema.clone()
}

/// Evaluate constructs a plan for the query, and in the case of
/// all `OperationType::Execute` operations and
/// `OperationType::Sql` operations that write data, the operation
/// run immediately. All other operations run when `.resolve()` is
/// called.
/// Evaluate constructs a plan for the query that runs run when
/// `.resolve()` is called.
pub async fn evaluate(&mut self) -> Result<Self, DatabaseError> {
match self.op {
OperationType::Sql => {
Expand All @@ -474,27 +469,27 @@ impl Operation {
self.schema
.replace(Arc::new(plan.output_schema().unwrap_or_else(Schema::empty)));

match plan.to_owned().try_into_datafusion_plan()? {
LogicalPlan::Dml(_)
| LogicalPlan::Ddl(_)
| LogicalPlan::Copy(_)
| LogicalPlan::Extension(_) => {
RecordStream::from(Self::process_result(
ses.execute_logical_plan(
plan,
&OperationInfo::new().with_query_text(self.query.clone()),
)
.await?
.1,
))
.check()
.await?;
}
_ => {
self.plan.replace(plan);
match &plan {
sqlexec::LogicalPlan::Datafusion(dfplan) => match dfplan {
LogicalPlan::Dml(_)
| LogicalPlan::Ddl(_)
| LogicalPlan::Copy(_)
| LogicalPlan::Extension(_)
| LogicalPlan::Prepare(_) => {
return Err(DatabaseError::new(
".sql() is lazily evaluated use .execute() for this operation",
))
},
_ => {},
},
sqlexec::LogicalPlan::Transaction(_) => {
return Err(DatabaseError::new(".sql(), which is evaluated lazily cannot support multi-statement transactions"))
}
sqlexec::LogicalPlan::Noop => {}
};

self.plan.replace(plan);

Ok(self.clone())
}
OperationType::Prql => {
Expand Down Expand Up @@ -549,13 +544,33 @@ impl Operation {
pub async fn resolve(&mut self) -> Result<SendableRecordBatchStream, DatabaseError> {
match self.op {
OperationType::Sql => {
let mut ses = self.conn.session.lock().await;

let plan = if self.plan.is_some() {
self.plan.take().unwrap()
} else {
self.schema = None;
ses.create_logical_plan(&self.query).await?
{
let mut ses = self.conn.session.lock().await;
ses.create_logical_plan(&self.query).await?
}
};

match &plan {
sqlexec::LogicalPlan::Datafusion(dfplan) => match dfplan {
LogicalPlan::Dml(_)
| LogicalPlan::Ddl(_)
| LogicalPlan::Copy(_)
| LogicalPlan::Extension(_)
| LogicalPlan::Prepare(_) => {
return Err(DatabaseError::new(
".sql() is lazily evaluated use .execute() for this operation",
))
},
_ => {},
},
sqlexec::LogicalPlan::Transaction(_) => {
return Err(DatabaseError::new(".sql(), which is evaluated lazily cannot support multi-statement transactions"))
}
sqlexec::LogicalPlan::Noop => {}
};

let schema = if self.schema.is_some() {
Expand All @@ -568,40 +583,32 @@ impl Operation {
self.schema.replace(schema.clone());

let op = OperationInfo::new().with_query_text(self.query.clone());
match plan.to_owned().try_into_datafusion_plan()? {
LogicalPlan::Dml(_)
| LogicalPlan::Ddl(_)
| LogicalPlan::Copy(_)
| LogicalPlan::Extension(_) => Ok(Self::process_result(
ses.execute_logical_plan(plan, &op).await?.1,

let ses_clone = self.conn.session.clone();

Ok(Self::process_result(ExecutionResult::Query {
stream: Box::pin(RecordBatchStreamAdapter::new(
schema.clone(),
futures::stream::once(async move {
let mut ses = ses_clone.lock().await;
match ses.execute_logical_plan(plan, &op).await {
Ok((_, res)) => Self::process_result(res),
Err(e) => Self::handle_error(e),
}
})
.flatten(),
)),
_ => {
let ses_clone = self.conn.session.clone();

Ok(Self::process_result(ExecutionResult::Query {
stream: Box::pin(RecordBatchStreamAdapter::new(
schema.clone(),
futures::stream::once(async move {
let mut ses = ses_clone.lock().await;
match ses.execute_logical_plan(plan, &op).await {
Ok((_, res)) => Self::process_result(res),
Err(e) => Self::handle_error(e),
}
})
.flatten(),
)),
}))
}
}
}))
}
OperationType::Prql => {
let mut ses = self.conn.session.lock().await;

let plan = if self.plan.is_some() {
self.plan.take().unwrap()
} else {
self.schema = None;
ses.prql_to_lp(&self.query).await?
{
let mut ses = self.conn.session.lock().await;
ses.prql_to_lp(&self.query).await?
}
};

let schema = if self.schema.is_some() {
Expand Down

0 comments on commit 95835b5

Please sign in to comment.