Skip to content

Commit

Permalink
DeltaMerge Engine POC
Browse files Browse the repository at this point in the history
  • Loading branch information
flowbehappy authored and JaySon-Huang committed Sep 5, 2019
1 parent 3c8b3a0 commit b963dd0
Show file tree
Hide file tree
Showing 32 changed files with 3,201 additions and 9 deletions.
1 change: 1 addition & 0 deletions dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ add_headers_and_sources(dbms src/Interpreters)
add_headers_and_sources(dbms src/Interpreters/ClusterProxy)
add_headers_and_sources(dbms src/Columns)
add_headers_and_sources(dbms src/Storages)
add_headers_and_sources(dbms src/Storages/DeltaMerge)
add_headers_and_sources(dbms src/Storages/Distributed)
add_headers_and_sources(dbms src/Storages/MergeTree)
add_headers_and_sources(dbms src/Storages/Transaction)
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Interpreters/InterpreterDeleteQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ BlockIO InterpreterDeleteQuery::execute()
checkAccess(query);

StoragePtr table = context.getTable(query.database, query.table);
if (!table->supportsModification())
throw Exception("Table engine " + table->getName() + " does not support Delete.");

auto table_lock = table->lockStructure(true, __PRETTY_FUNCTION__);

Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Interpreters/InterpreterFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <Parsers/ASTUseQuery.h>
#include <Parsers/ASTTruncateQuery.h>
#include <Parsers/TablePropertiesQueriesASTs.h>
#include <Parsers/ASTManageQuery.h>

#include <Interpreters/InterpreterAlterQuery.h>
#include <Interpreters/InterpreterCheckQuery.h>
Expand All @@ -39,6 +40,7 @@
#include <Interpreters/InterpreterSystemQuery.h>
#include <Interpreters/InterpreterUseQuery.h>
#include <Interpreters/InterpreterTruncateQuery.h>
#include <Interpreters/InterpreterManageQuery.h>

#include <Parsers/ASTSystemQuery.h>
#include <Common/typeid_cast.h>
Expand Down Expand Up @@ -168,6 +170,11 @@ std::unique_ptr<IInterpreter> InterpreterFactory::get(ASTPtr & query, Context &
throwIfReadOnly(context);
return std::make_unique<InterpreterTruncateQuery>(query, context);
}
else if (typeid_cast<ASTManageQuery *>(query.get()))
{
throwIfReadOnly(context);
return std::make_unique<InterpreterManageQuery>(query, context);
}
else
throw Exception("Unknown type of query: " + query->getID(), ErrorCodes::UNKNOWN_TYPE_OF_QUERY);
}
Expand Down
42 changes: 42 additions & 0 deletions dbms/src/Interpreters/InterpreterManageQuery.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterManageQuery.h>
#include <Parsers/ASTManageQuery.h>
#include <Storages/IStorage.h>
#include <Common/typeid_cast.h>

#include <Storages/StorageDeltaMerge.h>

namespace DB
{
BlockIO InterpreterManageQuery::execute()
{
const ASTManageQuery & ast = typeid_cast<const ASTManageQuery &>(*query_ptr);

StoragePtr table = context.getTable(ast.database, ast.table);
if (table->getName() != "DeltaMerge")
{
throw Exception("Manage operation can only be applied to DeltaMerge engine tables");
}
auto & dm_table = static_cast<StorageDeltaMerge &>(*table);
switch (ast.operation)
{
case ManageOperation::Enum::Flush:
{
dm_table.flushDelta();
return {};
}
case ManageOperation::Enum::Status:
{
BlockIO res;
res.in = dm_table.status();
return res;
}
case ManageOperation::Enum::Check:
{
dm_table.check();
return {};
}
}
return {};
}
}
29 changes: 29 additions & 0 deletions dbms/src/Interpreters/InterpreterManageQuery.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#pragma once

#include <Interpreters/IInterpreter.h>


namespace DB
{

class Context;
class IAST;
using ASTPtr = std::shared_ptr<IAST>;

class InterpreterManageQuery : public IInterpreter
{
public:
InterpreterManageQuery(const ASTPtr & query_ptr_, Context & context_)
: query_ptr(query_ptr_), context(context_)
{
}

BlockIO execute() override;

private:
ASTPtr query_ptr;
Context & context;
};


}
1 change: 1 addition & 0 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ struct Settings
M(SettingUInt64, shared_query_clients, 0, "How many clients will share the same query_id. If > 0, enable shared query mode.")\
M(SettingString, query_id, "", "The query_id, only for testing.")\
M(SettingUInt64, mutable_deduper, 5, "The deduper used by MutableMergeTree storage. By default 5. 0: OriginStreams, 1: OriginUnity, 2: ReplacingUnity, 3: ReplacingPartitioning, 4: DedupPartitioning, 5: ReplacingPartitioningOpt.")\
M(SettingUInt64, delta_merge_size, 10000000, "The delta rows limit in memory. After that delta rows will be flushed.")\
\
M(SettingUInt64, max_rows_in_set, 0, "Maximum size of the set (in number of elements) resulting from the execution of the IN section.") \
M(SettingUInt64, max_bytes_in_set, 0, "Maximum size of the set (in bytes in memory) resulting from the execution of the IN section.") \
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Parsers/ASTInsertQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class ASTInsertQuery : public IAST
const char * end = nullptr;

bool is_import;
bool is_upsert;

/** Get the text that identifies this element. */
String getID() const override { return "InsertQuery_" + database + "_" + table; };
Expand Down
56 changes: 56 additions & 0 deletions dbms/src/Parsers/ASTManageQuery.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#pragma once

#include <Parsers/IAST.h>


namespace DB
{
namespace ManageOperation
{
enum Enum
{
Flush,
Status,
Check,
};

inline const char * toString(UInt64 op)
{
static const char * data[] = {"Flush", "Status", "Check"};
return op < 3 ? data[op] : "Unknown operation";
}
}

/** Manage query
*/
class ASTManageQuery : public IAST
{
public:
String database;
String table;

ManageOperation::Enum operation;

/** Get the text that identifies this element. */
String getID() const override
{
return "ManageQuery_" + database + "_" + table + "_" + ManageOperation::toString(operation);
};

ASTPtr clone() const override
{
auto res = std::make_shared<ASTManageQuery>(*this);
res->children.clear();
return res;
}

protected:
void formatImpl(const FormatSettings & settings, FormatState & /*state*/, FormatStateStacked /*frame*/) const override
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << "MANAGE TABLE " << (settings.hilite ? hilite_none : "")
<< (!database.empty() ? backQuoteIfNeed(database) + "." : "") << backQuoteIfNeed(table) << " "
<< (settings.hilite ? hilite_keyword : "") << ManageOperation::toString(operation)
<< (settings.hilite ? hilite_none : "");
}
};
}
7 changes: 4 additions & 3 deletions dbms/src/Parsers/ParserInsertQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
/// Insertion data
const char * data = nullptr;

bool is_insert = s_insert_into.ignore(pos, expected) ||
s_upsert_into.ignore(pos, expected);
bool is_insert = s_insert_into.ignore(pos, expected);
bool is_upsert = s_upsert_into.ignore(pos, expected);
bool is_import = s_import_into.ignore(pos, expected);
if (!is_insert && !is_import)
if (!is_insert && !is_upsert && !is_import)
return false;

s_table.ignore(pos, expected);
Expand Down Expand Up @@ -168,6 +168,7 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
query->data = data != end ? data : nullptr;
query->end = end;
query->is_import = is_import;
query->is_upsert = is_upsert;

if (columns)
query->children.push_back(columns);
Expand Down
61 changes: 61 additions & 0 deletions dbms/src/Parsers/ParserManageQuery.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#include <Parsers/CommonParsers.h>
#include <Parsers/ParserManageQuery.h>
#include <Parsers/ParserPartition.h>

#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTManageQuery.h>

#include <Common/typeid_cast.h>


namespace DB
{
bool ParserManageQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ParserKeyword s_manage_table("MANAGE TABLE");
ParserKeyword s_flush("FLUSH");
ParserKeyword s_status("STATUS");
ParserKeyword s_check("CHECK");

ParserToken s_dot(TokenType::Dot);
ParserIdentifier name_p;
ASTPtr database;
ASTPtr table;
ManageOperation::Enum operation;

if (!s_manage_table.ignore(pos, expected))
return false;

if (!name_p.parse(pos, table, expected))
return false;

if (s_dot.ignore(pos, expected))
{
database = table;
if (!name_p.parse(pos, table, expected))
return false;
}

if (s_flush.ignore(pos, expected))
operation = ManageOperation::Enum::Flush;
else if (s_status.ignore(pos, expected))
operation = ManageOperation::Enum::Status;
else if (s_check.ignore(pos, expected))
operation = ManageOperation::Enum::Check;
else
return false;

auto query = std::make_shared<ASTManageQuery>();
node = query;

if (database)
query->database = typeid_cast<const ASTIdentifier &>(*database).name;
if (table)
query->table = typeid_cast<const ASTIdentifier &>(*table).name;

query->operation = operation;

return true;
}
}
17 changes: 17 additions & 0 deletions dbms/src/Parsers/ParserManageQuery.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#pragma once

#include <Parsers/IParserBase.h>
#include <Parsers/ExpressionElementParsers.h>


namespace DB
{

class ParserManageQuery : public IParserBase
{
protected:
const char * getName() const { return "MANAGE query"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
};

}
5 changes: 4 additions & 1 deletion dbms/src/Parsers/ParserQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <Parsers/ParserAlterQuery.h>
#include <Parsers/ParserSystemQuery.h>
#include <Parsers/ParserTruncateQuery.h>
#include <Parsers/ParserManageQuery.h>

namespace DB
{
Expand All @@ -28,6 +29,7 @@ bool ParserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserOptimizeQuery optimize_p;
ParserSystemQuery system_p;
ParserTruncateQuery truncate_p;
ParserManageQuery manage_p;

bool res = query_with_output_p.parse(pos, node, expected)
|| insert_p.parse(pos, node, expected)
Expand All @@ -37,7 +39,8 @@ bool ParserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
|| dbginvoke_p.parse(pos, node, expected)
|| optimize_p.parse(pos, node, expected)
|| system_p.parse(pos, node, expected)
|| truncate_p.parse(pos, node, expected);
|| truncate_p.parse(pos, node, expected)
|| manage_p.parse(pos, node, expected);

return res;
}
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ if (ENABLE_TESTS)
add_subdirectory (tests EXCLUDE_FROM_ALL)
add_subdirectory (Transaction/tests EXCLUDE_FROM_ALL)
add_subdirectory (Page/tests EXCLUDE_FROM_ALL)
add_subdirectory (DeltaMerge/tests EXCLUDE_FROM_ALL)
endif ()
Loading

0 comments on commit b963dd0

Please sign in to comment.