Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Import database #2964

Merged
merged 1 commit into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion scripts/antlr4/Cypher.g4.copy
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ oC_Statement
| kU_CommentOn
| kU_Transaction
| kU_Extension
| kU_ExportDatabase;
| kU_ExportDatabase
| kU_ImportDatabase;

kU_CopyFrom
: COPY SP oC_SchemaName ( ( SP? '(' SP? kU_ColumnNames SP? ')' SP? ) | SP ) FROM SP (kU_FilePaths | oC_Variable) ( SP? '(' SP? kU_ParsingOptions SP? ')' )? ;
Expand All @@ -47,6 +48,10 @@ kU_CopyTO
kU_ExportDatabase
: EXPORT SP DATABASE SP StringLiteral ( SP? '(' SP? kU_ParsingOptions SP? ')' )? ;

kU_ImportDatabase
: IMPORT SP DATABASE SP StringLiteral;


kU_StandaloneCall
: CALL SP oC_SymbolicName SP? '=' SP? oC_Literal ;

Expand Down Expand Up @@ -89,6 +94,8 @@ COLUMN : ( 'C' | 'c' ) ( 'O' | 'o' ) ( 'L' | 'l' ) ( 'U' | 'u' ) ( 'M' | 'm' ) (

EXPORT: ( 'E' | 'e') ( 'X' | 'x') ( 'P' | 'p') ( 'O' | 'o') ( 'R' | 'r') ( 'T' | 't');

IMPORT: ( 'I' | 'i') ( 'M' | 'm') ( 'P' | 'p') ( 'O' | 'o') ( 'R' | 'r') ( 'T' | 't');

DATABASE: ( 'D' | 'd') ( 'A' | 'a') ( 'T' | 't') ( 'A' | 'a') ( 'B' | 'b') ( 'A' | 'a') ( 'S' | 's')( 'E' | 'e');

kU_DDL
Expand Down Expand Up @@ -754,6 +761,7 @@ kU_NonReservedKeywords
| BEGIN
| END
| IN
| IMPORT
| EXPORT
| DATABASE
;
Expand Down
10 changes: 9 additions & 1 deletion src/antlr4/Cypher.g4
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ oC_Statement
| kU_CommentOn
| kU_Transaction
| kU_Extension
| kU_ExportDatabase;
| kU_ExportDatabase
| kU_ImportDatabase;

kU_CopyFrom
: COPY SP oC_SchemaName ( ( SP? '(' SP? kU_ColumnNames SP? ')' SP? ) | SP ) FROM SP (kU_FilePaths | oC_Variable) ( SP? '(' SP? kU_ParsingOptions SP? ')' )? ;
Expand All @@ -47,6 +48,10 @@ kU_CopyTO
kU_ExportDatabase
: EXPORT SP DATABASE SP StringLiteral ( SP? '(' SP? kU_ParsingOptions SP? ')' )? ;

kU_ImportDatabase
: IMPORT SP DATABASE SP StringLiteral;


kU_StandaloneCall
: CALL SP oC_SymbolicName SP? '=' SP? oC_Literal ;

Expand Down Expand Up @@ -89,6 +94,8 @@ COLUMN : ( 'C' | 'c' ) ( 'O' | 'o' ) ( 'L' | 'l' ) ( 'U' | 'u' ) ( 'M' | 'm' ) (

EXPORT: ( 'E' | 'e') ( 'X' | 'x') ( 'P' | 'p') ( 'O' | 'o') ( 'R' | 'r') ( 'T' | 't');

IMPORT: ( 'I' | 'i') ( 'M' | 'm') ( 'P' | 'p') ( 'O' | 'o') ( 'R' | 'r') ( 'T' | 't');

DATABASE: ( 'D' | 'd') ( 'A' | 'a') ( 'T' | 't') ( 'A' | 'a') ( 'B' | 'b') ( 'A' | 'a') ( 'S' | 's')( 'E' | 'e');

kU_DDL
Expand Down Expand Up @@ -754,6 +761,7 @@ kU_NonReservedKeywords
| BEGIN
| END
| IN
| IMPORT
| EXPORT
| DATABASE
;
Expand Down
3 changes: 2 additions & 1 deletion src/binder/bind/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ add_library(
bind_transaction.cpp
bind_updating_clause.cpp
bind_extension.cpp
bind_export_database.cpp)
bind_export_database.cpp
bind_import_database.cpp)

set(ALL_OBJECT_FILES
${ALL_OBJECT_FILES} $<TARGET_OBJECTS:kuzu_binder_bind>
Expand Down
42 changes: 42 additions & 0 deletions src/binder/bind/bind_import_database.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#include "binder/binder.h"
#include "binder/copy/bound_import_database.h"
#include "common/exception/binder.h"
#include "parser/port_db.h"

using namespace kuzu::common;
using namespace kuzu::parser;

namespace kuzu {
namespace binder {

std::string getFilePath(
common::VirtualFileSystem* vfs, const std::string boundFilePath, const std::string fileName) {
auto filePath = vfs->joinPath(boundFilePath, fileName);
if (!vfs->fileOrPathExists(filePath)) {
throw BinderException(stringFormat("File {} does not exist.", filePath));
}
auto fileInfo = vfs->openFile(filePath, O_RDONLY
#ifdef _WIN32
| _O_BINARY
#endif
);
auto fsize = fileInfo->getFileSize();
auto buffer = std::make_unique<char[]>(fsize);
fileInfo->readFile(buffer.get(), fsize);
return std::string(buffer.get(), fsize);
}

std::unique_ptr<BoundStatement> Binder::bindImportDatabaseClause(const Statement& statement) {
auto& importDatabaseStatement = ku_dynamic_cast<const Statement&, const ImportDB&>(statement);
auto boundFilePath = importDatabaseStatement.getFilePath();
if (!vfs->fileOrPathExists(boundFilePath)) {
throw BinderException(stringFormat("Directory {} does not exist.", boundFilePath));
}
std::string finalQueryStatements;
finalQueryStatements += getFilePath(vfs, boundFilePath, ImportDBConstants::SCHEMA_NAME);
finalQueryStatements += getFilePath(vfs, boundFilePath, ImportDBConstants::COPY_NAME);
finalQueryStatements += getFilePath(vfs, boundFilePath, ImportDBConstants::MACRO_NAME);
return std::make_unique<BoundImportDatabase>(boundFilePath, finalQueryStatements);
}
} // namespace binder
} // namespace kuzu
3 changes: 3 additions & 0 deletions src/binder/binder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ std::unique_ptr<BoundStatement> Binder::bind(const Statement& statement) {
case StatementType::EXPORT_DATABASE: {
boundStatement = bindExportDatabaseClause(statement);
} break;
case StatementType::IMPORT_DATABASE: {
boundStatement = bindImportDatabaseClause(statement);
} break;
default: {
KU_UNREACHABLE;
}
Expand Down
3 changes: 3 additions & 0 deletions src/binder/bound_statement_visitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ void BoundStatementVisitor::visit(const BoundStatement& statement) {
case StatementType::EXPORT_DATABASE: {
visitExportDatabase(statement);
} break;
case StatementType::IMPORT_DATABASE: {
visitImportDatabase(statement);
} break;
default:
KU_UNREACHABLE;
}
Expand Down
1 change: 1 addition & 0 deletions src/include/binder/binder.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ class Binder {
std::unique_ptr<BoundStatement> bindCopyToClause(const parser::Statement& statement);

std::unique_ptr<BoundStatement> bindExportDatabaseClause(const parser::Statement& statement);
std::unique_ptr<BoundStatement> bindImportDatabaseClause(const parser::Statement& statement);

/*** bind file scan ***/
std::unordered_map<std::string, common::Value> bindParsingOptions(
Expand Down
1 change: 1 addition & 0 deletions src/include/binder/bound_statement_visitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class BoundStatementVisitor {
virtual void visitCopyFrom(const BoundStatement&) {}
virtual void visitCopyTo(const BoundStatement&) {}
virtual void visitExportDatabase(const BoundStatement&) {}
virtual void visitImportDatabase(const BoundStatement&) {}
virtual void visitStandaloneCall(const BoundStatement&) {}
virtual void visitCommentOn(const BoundStatement&) {}
virtual void visitExplain(const BoundStatement&);
Expand Down
23 changes: 23 additions & 0 deletions src/include/binder/copy/bound_import_database.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#pragma once
#include "binder/bound_statement.h"

namespace kuzu {
namespace binder {

class BoundImportDatabase : public BoundStatement {
public:
BoundImportDatabase(std::string filePath, std::string query)
: BoundStatement{common::StatementType::IMPORT_DATABASE,
BoundStatementResult::createEmptyResult()},
filePath{std::move(filePath)}, query{query} {}

inline std::string getFilePath() const { return filePath; }
inline std::string getQuery() const { return query; }

private:
std::string filePath;
std::string query;
};

} // namespace binder
} // namespace kuzu
6 changes: 6 additions & 0 deletions src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,5 +208,11 @@ struct CopyToCSVConstants {
static constexpr const uint64_t DEFAULT_CSV_FLUSH_SIZE = 4096 * 8;
};

struct ImportDBConstants {
static constexpr char SCHEMA_NAME[] = "schema.cypher";
static constexpr char COPY_NAME[] = "copy.cypher";
static constexpr char MACRO_NAME[] = "macro.cypher";
};

} // namespace common
} // namespace kuzu
1 change: 1 addition & 0 deletions src/include/common/enums/statement_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ enum class StatementType : uint8_t {
TRANSACTION = 30,
EXTENSION = 31,
EXPORT_DATABASE = 32,
IMPORT_DATABASE = 33,
};

struct StatementTypeUtils {
Expand Down
2 changes: 2 additions & 0 deletions src/include/main/client_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ class ClientContext {

std::unique_ptr<QueryResult> query(std::string_view queryStatement);

void runQuery(std::string query);

private:
inline void resetActiveQuery() { activeQuery.reset(); }

Expand Down
1 change: 1 addition & 0 deletions src/include/parser/parsed_statement_visitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class StatementVisitor {
virtual void visitTransaction(const Statement& /*statement*/) {}
virtual void visitExtension(const Statement& /*statement*/) {}
virtual void visitExportDatabase(const Statement& /*statement*/) {}
virtual void visitImportDatabase(const Statement& /*statement*/) {}
// LCOV_EXCL_STOP
};

Expand Down
11 changes: 11 additions & 0 deletions src/include/parser/port_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,16 @@ class ExportDB : public Statement {
std::string filePath;
};

class ImportDB : public Statement {
public:
explicit ImportDB(std::string filePath)
: Statement{common::StatementType::IMPORT_DATABASE}, filePath{std::move(filePath)} {}

inline std::string getFilePath() const { return filePath; }

private:
std::string filePath;
};

} // namespace parser
} // namespace kuzu
1 change: 1 addition & 0 deletions src/include/parser/transformer.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class Transformer {
parsing_option_t transformParsingOptions(CypherParser::KU_ParsingOptionsContext& ctx);

std::unique_ptr<Statement> transformExportDatabase(CypherParser::KU_ExportDatabaseContext& ctx);
std::unique_ptr<Statement> transformImportDatabase(CypherParser::KU_ImportDatabaseContext& ctx);

// Transform query statement.
std::unique_ptr<Statement> transformQuery(CypherParser::OC_QueryContext& ctx);
Expand Down
3 changes: 2 additions & 1 deletion src/include/planner/operator/logical_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ enum class LogicalOperatorType : uint8_t {
UNION_ALL,
UNWIND,
EXTENSION,
EXPORT_DATABASE
EXPORT_DATABASE,
IMPORT_DATABASE,
};

class LogicalOperatorUtils {
Expand Down
29 changes: 29 additions & 0 deletions src/include/planner/operator/persistent/logical_import_db.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#pragma once

#include "planner/operator/logical_operator.h"

namespace kuzu {
namespace planner {

class LogicalImportDatabase : public LogicalOperator {
public:
explicit LogicalImportDatabase(std::string query)
: LogicalOperator{LogicalOperatorType::IMPORT_DATABASE}, query{query} {}

inline std::string getExpressionsForPrinting() const override { return std::string{}; }

void computeFactorizedSchema() override;
void computeFlatSchema() override;

inline std::string getQuery() const { return query; }

inline std::unique_ptr<LogicalOperator> copy() override {
return make_unique<LogicalImportDatabase>(query);
}

private:
std::string query;
};

} // namespace planner
} // namespace kuzu
1 change: 1 addition & 0 deletions src/include/planner/planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class Planner {

// Plan export database
std::unique_ptr<LogicalPlan> planExportDatabase(const binder::BoundStatement& statement);
std::unique_ptr<LogicalPlan> planImportDatabase(const binder::BoundStatement& statement);

// Plan query.
std::vector<std::unique_ptr<LogicalPlan>> planQuery(
Expand Down
27 changes: 27 additions & 0 deletions src/include/processor/operator/persistent/import_db.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#pragma once

#include "processor/operator/physical_operator.h"

namespace kuzu {
namespace processor {

class ImportDB : public PhysicalOperator {
public:
ImportDB(std::string query, uint32_t id, const std::string& paramsString)
: PhysicalOperator{PhysicalOperatorType::IMPORT_DATABASE, id, paramsString}, query{query} {}

bool canParallel() const override { return false; }

bool isSource() const override { return true; }

bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<ImportDB>(query, id, paramsString);
}

private:
std::string query;
};
} // namespace processor
} // namespace kuzu
1 change: 1 addition & 0 deletions src/include/processor/operator/physical_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ enum class PhysicalOperatorType : uint8_t {
FLATTEN,
HASH_JOIN_BUILD,
HASH_JOIN_PROBE,
IMPORT_DATABASE,
INDEX_LOOKUP,
INDEX_SCAN,
INSERT,
Expand Down
1 change: 1 addition & 0 deletions src/include/processor/plan_mapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ class PlanMapper {
std::unique_ptr<PhysicalOperator> mapTransaction(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapExtension(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapExportDatabase(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapImportDatabase(planner::LogicalOperator* logicalOperator);

std::unique_ptr<PhysicalOperator> createCopyRel(
std::shared_ptr<PartitionerSharedState> partitionerSharedState,
Expand Down
35 changes: 31 additions & 4 deletions src/main/client_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,10 @@
try {
// parsing
if (parsedStatement->getStatementType() != StatementType::TRANSACTION) {
auto txContext = this->transactionContext.get();
if (txContext->isAutoTransaction()) {
txContext->beginAutoTransaction(preparedStatement->readOnly);
if (transactionContext->isAutoTransaction()) {
transactionContext->beginAutoTransaction(preparedStatement->readOnly);
} else {
txContext->validateManualTransaction(
transactionContext->validateManualTransaction(
preparedStatement->allowActiveTransaction(), preparedStatement->readOnly);
}
if (!this->getTx()->isReadOnly()) {
Expand Down Expand Up @@ -425,5 +424,33 @@
}
}

void ClientContext::runQuery(std::string query) {
// TODO(Jimain): this is special for "Import database". Should refactor after we support
// multiple query statements in one Tx.
// Currently, we split multiple query statements into single query and execute them one by one,
// each with an auto transaction. The correct way is to execute them in one transaction. But we
// do not support DDL and copy in one Tx.
if (transactionContext->hasActiveTransaction()) {
hououou marked this conversation as resolved.
Show resolved Hide resolved
transactionContext->commit();
}
auto parsedStatements = std::vector<std::unique_ptr<Statement>>();
try {
parsedStatements = parseQuery(query);
} catch (std::exception& exception) { throw ConnectionException(exception.what()); }

Check warning on line 439 in src/main/client_context.cpp

View check run for this annotation

Codecov / codecov/patch

src/main/client_context.cpp#L439

Added line #L439 was not covered by tests
if (parsedStatements.empty()) {
throw ConnectionException("Connection Exception: Query is empty.");

Check warning on line 441 in src/main/client_context.cpp

View check run for this annotation

Codecov / codecov/patch

src/main/client_context.cpp#L441

Added line #L441 was not covered by tests
}
try {
for (auto& statement : parsedStatements) {
auto preparedStatement = prepareNoLock(statement.get());
auto currentQueryResult =
executeAndAutoCommitIfNecessaryNoLock(preparedStatement.get());
if (!currentQueryResult->isSuccess()) {
throw ConnectionException(currentQueryResult->errMsg);

Check warning on line 449 in src/main/client_context.cpp

View check run for this annotation

Codecov / codecov/patch

src/main/client_context.cpp#L449

Added line #L449 was not covered by tests
}
}
} catch (std::exception& exception) { throw ConnectionException(exception.what()); }

Check warning on line 452 in src/main/client_context.cpp

View check run for this annotation

Codecov / codecov/patch

src/main/client_context.cpp#L452

Added line #L452 was not covered by tests
return;
}
} // namespace main
} // namespace kuzu
Loading
Loading