Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse connection to Extract,Consume and execute substrait query plans #113

Merged
merged 17 commits into from
Nov 6, 2024
Merged
12 changes: 6 additions & 6 deletions .github/workflows/distribution.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,19 @@ concurrency:
jobs:
duckdb-stable-build:
name: Build extension binaries
uses: duckdb/extension-ci-tools/.github/workflows/_extension_distribution.yml@v1.1.0
uses: duckdb/extension-ci-tools/.github/workflows/_extension_distribution.yml@main
with:
duckdb_version: v1.1.0
exclude_archs: "wasm_mvp;wasm_eh;wasm_threads;windows_amd64;windows_amd64_rtools"
duckdb_version: main
exclude_archs: "wasm_mvp;wasm_eh;wasm_threads;windows_amd64;windows_amd64_mingw"
extension_name: substrait

duckdb-stable-deploy:
name: Deploy extension binaries
needs: duckdb-stable-build
uses: duckdb/extension-ci-tools/.github/workflows/_extension_deploy.yml@v1.1.0
uses: duckdb/extension-ci-tools/.github/workflows/_extension_deploy.yml@main
secrets: inherit
with:
duckdb_version: v1.1.0
exclude_archs: "wasm_mvp;wasm_eh;wasm_threads;windows_amd64;windows_amd64_rtools"
duckdb_version: main
exclude_archs: "wasm_mvp;wasm_eh;wasm_threads;windows_amd64;windows_amd64_mingw"
extension_name: substrait
deploy_latest: true
2 changes: 1 addition & 1 deletion .github/workflows/main_distribution.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ jobs:
with:
duckdb_version: main
ci_tools_version: main
exclude_archs: "wasm_mvp;wasm_eh;wasm_threads;windows_amd64;windows_amd64_rtools"
exclude_archs: "wasm_mvp;wasm_eh;wasm_threads;windows_amd64;windows_amd64_mingw"
extension_name: substrait

2 changes: 1 addition & 1 deletion duckdb
Submodule duckdb updated 1863 files
87 changes: 72 additions & 15 deletions src/from_substrait.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,6 @@

#include "duckdb/common/types/value.hpp"
#include "duckdb/parser/expression/list.hpp"
#include "duckdb/main/relation/join_relation.hpp"
#include "duckdb/main/relation/cross_product_relation.hpp"

#include "duckdb/main/relation/limit_relation.hpp"
#include "duckdb/main/relation/projection_relation.hpp"
#include "duckdb/main/relation/setop_relation.hpp"
#include "duckdb/main/relation/aggregate_relation.hpp"
#include "duckdb/main/relation/filter_relation.hpp"
#include "duckdb/main/relation/order_relation.hpp"
#include "duckdb/main/connection.hpp"
#include "duckdb/parser/parser.hpp"
#include "duckdb/common/exception.hpp"
Expand All @@ -25,7 +16,24 @@
#include "google/protobuf/util/json_util.h"
#include "substrait/plan.pb.h"

#include "duckdb/main/table_description.hpp"

#include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp"
#include "duckdb/common/helper.hpp"

#include "duckdb/main/relation.hpp"
#include "duckdb/main/relation/table_relation.hpp"
#include "duckdb/main/relation/table_function_relation.hpp"
#include "duckdb/main/relation/value_relation.hpp"
#include "duckdb/main/relation/view_relation.hpp"
#include "duckdb/main/relation/aggregate_relation.hpp"
#include "duckdb/main/relation/cross_product_relation.hpp"
#include "duckdb/main/relation/filter_relation.hpp"
#include "duckdb/main/relation/join_relation.hpp"
#include "duckdb/main/relation/limit_relation.hpp"
#include "duckdb/main/relation/order_relation.hpp"
#include "duckdb/main/relation/projection_relation.hpp"
#include "duckdb/main/relation/setop_relation.hpp"

namespace duckdb {
const std::unordered_map<std::string, std::string> SubstraitToDuckDB::function_names_remap = {
Expand All @@ -40,7 +48,7 @@ const case_insensitive_set_t SubstraitToDuckDB::valid_extract_subfields = {
"quarter", "microsecond", "milliseconds", "second", "minute", "hour"};

string SubstraitToDuckDB::RemapFunctionName(const string &function_name) {
// Lets first drop any extension id
// Let's first drop any extension id
string name;
for (auto &c : function_name) {
if (c == ':') {
Expand All @@ -67,7 +75,9 @@ string SubstraitToDuckDB::RemoveExtension(const string &function_name) {
return name;
}

SubstraitToDuckDB::SubstraitToDuckDB(Connection &con_p, const string &serialized, bool json) : con(con_p) {
SubstraitToDuckDB::SubstraitToDuckDB(shared_ptr<ClientContext> &context_p, const string &serialized, bool json,
bool acquire_lock_p)
: context(context_p), acquire_lock(acquire_lock_p) {
if (!json) {
if (!plan.ParseFromString(serialized)) {
throw std::runtime_error("Was not possible to convert binary into Substrait plan");
Expand Down Expand Up @@ -510,16 +520,46 @@ shared_ptr<Relation> SubstraitToDuckDB::TransformAggregateOp(const substrait::Re
return make_shared_ptr<AggregateRelation>(TransformOp(sop.aggregate().input()), std::move(expressions),
std::move(groups));
}
unique_ptr<TableDescription> TableInfo(ClientContext &context, const string &schema_name, const string &table_name) {
// obtain the table info
auto table = Catalog::GetEntry<TableCatalogEntry>(context, INVALID_CATALOG, schema_name, table_name,
OnEntryNotFound::RETURN_NULL);
if (!table) {
return {};
}
// write the table info to the result
auto result = make_uniq<TableDescription>(INVALID_CATALOG, schema_name, table_name);
for (auto &column : table->GetColumns().Logical()) {
result->columns.emplace_back(column.Copy());
}
return result;
}

shared_ptr<Relation> SubstraitToDuckDB::TransformReadOp(const substrait::Rel &sop) {
auto &sget = sop.read();
shared_ptr<Relation> scan;
auto context_wrapper = make_shared_ptr<RelationContextWrapper>(context);
if (sget.has_named_table()) {
auto table_name = sget.named_table().names(0);
// If we can't find a table with that name, let's try a view.
try {
scan = con.Table(sget.named_table().names(0));
auto table_info = TableInfo(*context, DEFAULT_SCHEMA, table_name);
if (!table_info) {
throw CatalogException("Table '%s' does not exist!", table_name);
}
if (acquire_lock) {
scan = make_shared_ptr<TableRelation>(context, std::move(table_info));

} else {
scan = make_shared_ptr<TableRelation>(context_wrapper, std::move(table_info));
}
} catch (...) {
scan = con.View(sget.named_table().names(0));
if (acquire_lock) {
scan = make_shared_ptr<ViewRelation>(context, DEFAULT_SCHEMA, table_name);

} else {
scan = make_shared_ptr<ViewRelation>(context_wrapper, DEFAULT_SCHEMA, table_name);
}
}
} else if (sget.has_local_files()) {
vector<Value> parquet_files;
Expand All @@ -540,7 +580,18 @@ shared_ptr<Relation> SubstraitToDuckDB::TransformReadOp(const substrait::Rel &so
}
string name = "parquet_" + StringUtil::GenerateRandomName();
named_parameter_map_t named_parameters({{"binary_as_string", Value::BOOLEAN(false)}});
scan = con.TableFunction("parquet_scan", {Value::LIST(parquet_files)}, named_parameters)->Alias(name);
vector<Value> parameters {Value::LIST(parquet_files)};
shared_ptr<TableFunctionRelation> scan_rel;
if (acquire_lock) {
scan_rel = make_shared_ptr<TableFunctionRelation>(context, "parquet_scan", parameters,
std::move(named_parameters));
} else {
scan_rel = make_shared_ptr<TableFunctionRelation>(context_wrapper, "parquet_scan", parameters,
std::move(named_parameters));
}

auto rel = static_cast<Relation *>(scan_rel.get());
pdet marked this conversation as resolved.
Show resolved Hide resolved
scan = rel->Alias(name);
} else if (sget.has_virtual_table()) {
// We need to handle a virtual table as a LogicalExpressionGet
auto literal_values = sget.virtual_table().values();
Expand All @@ -553,7 +604,13 @@ shared_ptr<Relation> SubstraitToDuckDB::TransformReadOp(const substrait::Rel &so
}
expression_rows.emplace_back(expression_row);
}
scan = con.Values(expression_rows);
vector<string> column_names;
if (acquire_lock) {
scan = make_shared_ptr<ValueRelation>(context, expression_rows, column_names);

} else {
scan = make_shared_ptr<ValueRelation>(context_wrapper, expression_rows, column_names);
}
} else {
throw NotImplementedException("Unsupported type of read operator for substrait");
}
Expand Down
17 changes: 14 additions & 3 deletions src/include/from_substrait.hpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
//===----------------------------------------------------------------------===//
// DuckDB
//
// from_substrait.hpp
//
//
//===----------------------------------------------------------------------===//

#pragma once

#include <string>
Expand All @@ -10,7 +18,8 @@ namespace duckdb {

class SubstraitToDuckDB {
public:
SubstraitToDuckDB(Connection &con_p, const string &serialized, bool json = false);
SubstraitToDuckDB(shared_ptr<ClientContext> &context_p, const string &serialized, bool json = false,
bool acquire_lock = false);
//! Transforms Substrait Plan to DuckDB Relation
shared_ptr<Relation> TransformPlan();

Expand Down Expand Up @@ -48,8 +57,8 @@ class SubstraitToDuckDB {

//! Transform Substrait Sort Order to DuckDB Order
OrderByNode TransformOrder(const substrait::SortField &sordf);
//! DuckDB Connection
Connection &con;
//! DuckDB Client Context
shared_ptr<ClientContext> context;
//! Substrait Plan
substrait::Plan plan;
//! Variable used to register functions
Expand All @@ -59,5 +68,7 @@ class SubstraitToDuckDB {
static const unordered_map<std::string, std::string> function_names_remap;
static const case_insensitive_set_t valid_extract_subfields;
vector<ParsedExpression *> struct_expressions;
//! If we should acquire a client context lock when creating the relatiosn
const bool acquire_lock;
};
} // namespace duckdb
8 changes: 8 additions & 0 deletions src/include/to_substrait.hpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
//===----------------------------------------------------------------------===//
// DuckDB
//
// to_substrait.hpp
//
//
//===----------------------------------------------------------------------===//

#pragma once

#include "custom_extensions/custom_extensions.hpp"
Expand Down
Loading
Loading