From bdbbdf4b427a913a7e5f63a2c59a9b3b97572878 Mon Sep 17 00:00:00 2001 From: flow Date: Wed, 30 Jan 2019 18:14:06 +0800 Subject: [PATCH] DeltaMerge Engine POC --- dbms/CMakeLists.txt | 1 + .../Interpreters/InterpreterDeleteQuery.cpp | 2 + dbms/src/Interpreters/InterpreterFactory.cpp | 7 + .../Interpreters/InterpreterManageQuery.cpp | 42 + .../src/Interpreters/InterpreterManageQuery.h | 29 + dbms/src/Interpreters/Settings.h | 1 + dbms/src/Parsers/ASTInsertQuery.h | 1 + dbms/src/Parsers/ASTManageQuery.h | 56 + dbms/src/Parsers/ParserInsertQuery.cpp | 7 +- dbms/src/Parsers/ParserManageQuery.cpp | 61 + dbms/src/Parsers/ParserManageQuery.h | 17 + dbms/src/Parsers/ParserQuery.cpp | 5 +- dbms/src/Storages/CMakeLists.txt | 1 + dbms/src/Storages/DeltaMerge/.clang-format | 89 ++ .../DeltaMerge/DeltaMergeBlockInputStream.h | 248 ++++ .../DeltaMerge/DeltaMergeBlockOutputStream.h | 395 +++++ .../Storages/DeltaMerge/DeltaMergeDefines.cpp | 6 + .../Storages/DeltaMerge/DeltaMergeDefines.h | 37 + dbms/src/Storages/DeltaMerge/DeltaTree.cpp | 8 + dbms/src/Storages/DeltaMerge/DeltaTree.h | 1273 +++++++++++++++++ dbms/src/Storages/DeltaMerge/Tablet.h | 13 + dbms/src/Storages/DeltaMerge/Tuple.h | 36 + dbms/src/Storages/DeltaMerge/ValueSpace.h | 271 ++++ .../Storages/DeltaMerge/tests/CMakeLists.txt | 4 + .../Storages/DeltaMerge/tests/delta_tree.cpp | 192 +++ dbms/src/Storages/IStorage.h | 3 + dbms/src/Storages/StorageDeltaMerge.cpp | 273 ++++ dbms/src/Storages/StorageDeltaMerge.h | 100 ++ dbms/src/Storages/StorageMergeTree.h | 1 + dbms/src/Storages/StorageTinyLog.cpp | 24 +- dbms/src/Storages/StorageTinyLog.h | 6 + dbms/src/Storages/registerStorages.cpp | 1 + 32 files changed, 3201 insertions(+), 9 deletions(-) create mode 100644 dbms/src/Interpreters/InterpreterManageQuery.cpp create mode 100644 dbms/src/Interpreters/InterpreterManageQuery.h create mode 100644 dbms/src/Parsers/ASTManageQuery.h create mode 100644 dbms/src/Parsers/ParserManageQuery.cpp create mode 100644 dbms/src/Parsers/ParserManageQuery.h create mode 100644 dbms/src/Storages/DeltaMerge/.clang-format create mode 100644 dbms/src/Storages/DeltaMerge/DeltaMergeBlockInputStream.h create mode 100644 dbms/src/Storages/DeltaMerge/DeltaMergeBlockOutputStream.h create mode 100644 dbms/src/Storages/DeltaMerge/DeltaMergeDefines.cpp create mode 100644 dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h create mode 100644 dbms/src/Storages/DeltaMerge/DeltaTree.cpp create mode 100644 dbms/src/Storages/DeltaMerge/DeltaTree.h create mode 100644 dbms/src/Storages/DeltaMerge/Tablet.h create mode 100644 dbms/src/Storages/DeltaMerge/Tuple.h create mode 100644 dbms/src/Storages/DeltaMerge/ValueSpace.h create mode 100644 dbms/src/Storages/DeltaMerge/tests/CMakeLists.txt create mode 100644 dbms/src/Storages/DeltaMerge/tests/delta_tree.cpp create mode 100644 dbms/src/Storages/StorageDeltaMerge.cpp create mode 100644 dbms/src/Storages/StorageDeltaMerge.h diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index 77e356bfb2b..e63ca8fc128 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -53,6 +53,7 @@ add_headers_and_sources(dbms src/Interpreters) add_headers_and_sources(dbms src/Interpreters/ClusterProxy) add_headers_and_sources(dbms src/Columns) add_headers_and_sources(dbms src/Storages) +add_headers_and_sources(dbms src/Storages/DeltaMerge) add_headers_and_sources(dbms src/Storages/Distributed) add_headers_and_sources(dbms src/Storages/MergeTree) add_headers_and_sources(dbms src/Storages/Transaction) diff --git a/dbms/src/Interpreters/InterpreterDeleteQuery.cpp b/dbms/src/Interpreters/InterpreterDeleteQuery.cpp index f039bf6be0f..79da2392ad4 100644 --- a/dbms/src/Interpreters/InterpreterDeleteQuery.cpp +++ b/dbms/src/Interpreters/InterpreterDeleteQuery.cpp @@ -50,6 +50,8 @@ BlockIO InterpreterDeleteQuery::execute() checkAccess(query); StoragePtr table = context.getTable(query.database, query.table); + if (!table->supportsModification()) + throw Exception("Table engine " + table->getName() + " does not support Delete."); auto table_lock = table->lockStructure(true, __PRETTY_FUNCTION__); diff --git a/dbms/src/Interpreters/InterpreterFactory.cpp b/dbms/src/Interpreters/InterpreterFactory.cpp index f8582a01d02..11458576b8c 100644 --- a/dbms/src/Interpreters/InterpreterFactory.cpp +++ b/dbms/src/Interpreters/InterpreterFactory.cpp @@ -16,6 +16,7 @@ #include #include #include +#include #include #include @@ -39,6 +40,7 @@ #include #include #include +#include #include #include @@ -168,6 +170,11 @@ std::unique_ptr InterpreterFactory::get(ASTPtr & query, Context & throwIfReadOnly(context); return std::make_unique(query, context); } + else if (typeid_cast(query.get())) + { + throwIfReadOnly(context); + return std::make_unique(query, context); + } else throw Exception("Unknown type of query: " + query->getID(), ErrorCodes::UNKNOWN_TYPE_OF_QUERY); } diff --git a/dbms/src/Interpreters/InterpreterManageQuery.cpp b/dbms/src/Interpreters/InterpreterManageQuery.cpp new file mode 100644 index 00000000000..4da1883de5a --- /dev/null +++ b/dbms/src/Interpreters/InterpreterManageQuery.cpp @@ -0,0 +1,42 @@ +#include +#include +#include +#include +#include + +#include + +namespace DB +{ +BlockIO InterpreterManageQuery::execute() +{ + const ASTManageQuery & ast = typeid_cast(*query_ptr); + + StoragePtr table = context.getTable(ast.database, ast.table); + if (table->getName() != "DeltaMerge") + { + throw Exception("Manage operation can only be applied to DeltaMerge engine tables"); + } + auto & dm_table = static_cast(*table); + switch (ast.operation) + { + case ManageOperation::Enum::Flush: + { + dm_table.flushDelta(); + return {}; + } + case ManageOperation::Enum::Status: + { + BlockIO res; + res.in = dm_table.status(); + return res; + } + case ManageOperation::Enum::Check: + { + dm_table.check(); + return {}; + } + } + return {}; +} +} diff --git a/dbms/src/Interpreters/InterpreterManageQuery.h b/dbms/src/Interpreters/InterpreterManageQuery.h new file mode 100644 index 00000000000..beb391c0b00 --- /dev/null +++ b/dbms/src/Interpreters/InterpreterManageQuery.h @@ -0,0 +1,29 @@ +#pragma once + +#include + + +namespace DB +{ + +class Context; +class IAST; +using ASTPtr = std::shared_ptr; + +class InterpreterManageQuery : public IInterpreter +{ +public: + InterpreterManageQuery(const ASTPtr & query_ptr_, Context & context_) + : query_ptr(query_ptr_), context(context_) + { + } + + BlockIO execute() override; + +private: + ASTPtr query_ptr; + Context & context; +}; + + +} diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index 8c80ed0f990..95a827675c6 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -231,6 +231,7 @@ struct Settings M(SettingUInt64, shared_query_clients, 0, "How many clients will share the same query_id. If > 0, enable shared query mode.")\ M(SettingString, query_id, "", "The query_id, only for testing.")\ M(SettingUInt64, mutable_deduper, 5, "The deduper used by MutableMergeTree storage. By default 5. 0: OriginStreams, 1: OriginUnity, 2: ReplacingUnity, 3: ReplacingPartitioning, 4: DedupPartitioning, 5: ReplacingPartitioningOpt.")\ + M(SettingUInt64, delta_merge_size, 10000000, "The delta rows limit in memory. After that delta rows will be flushed.")\ \ M(SettingUInt64, max_rows_in_set, 0, "Maximum size of the set (in number of elements) resulting from the execution of the IN section.") \ M(SettingUInt64, max_bytes_in_set, 0, "Maximum size of the set (in bytes in memory) resulting from the execution of the IN section.") \ diff --git a/dbms/src/Parsers/ASTInsertQuery.h b/dbms/src/Parsers/ASTInsertQuery.h index 240f374d582..628d8adda43 100644 --- a/dbms/src/Parsers/ASTInsertQuery.h +++ b/dbms/src/Parsers/ASTInsertQuery.h @@ -28,6 +28,7 @@ class ASTInsertQuery : public IAST const char * end = nullptr; bool is_import; + bool is_upsert; /** Get the text that identifies this element. */ String getID() const override { return "InsertQuery_" + database + "_" + table; }; diff --git a/dbms/src/Parsers/ASTManageQuery.h b/dbms/src/Parsers/ASTManageQuery.h new file mode 100644 index 00000000000..b84c9d0f679 --- /dev/null +++ b/dbms/src/Parsers/ASTManageQuery.h @@ -0,0 +1,56 @@ +#pragma once + +#include + + +namespace DB +{ +namespace ManageOperation +{ + enum Enum + { + Flush, + Status, + Check, + }; + + inline const char * toString(UInt64 op) + { + static const char * data[] = {"Flush", "Status", "Check"}; + return op < 3 ? data[op] : "Unknown operation"; + } +} + +/** Manage query + */ +class ASTManageQuery : public IAST +{ +public: + String database; + String table; + + ManageOperation::Enum operation; + + /** Get the text that identifies this element. */ + String getID() const override + { + return "ManageQuery_" + database + "_" + table + "_" + ManageOperation::toString(operation); + }; + + ASTPtr clone() const override + { + auto res = std::make_shared(*this); + res->children.clear(); + return res; + } + +protected: + void formatImpl(const FormatSettings & settings, FormatState & /*state*/, FormatStateStacked /*frame*/) const override + { + settings.ostr << (settings.hilite ? hilite_keyword : "") << "MANAGE TABLE " << (settings.hilite ? hilite_none : "") + << (!database.empty() ? backQuoteIfNeed(database) + "." : "") << backQuoteIfNeed(table) << " " + << (settings.hilite ? hilite_keyword : "") << ManageOperation::toString(operation) + << (settings.hilite ? hilite_none : ""); + } +}; +} diff --git a/dbms/src/Parsers/ParserInsertQuery.cpp b/dbms/src/Parsers/ParserInsertQuery.cpp index 923651829c3..1b4fd4e8aed 100644 --- a/dbms/src/Parsers/ParserInsertQuery.cpp +++ b/dbms/src/Parsers/ParserInsertQuery.cpp @@ -54,10 +54,10 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) /// Insertion data const char * data = nullptr; - bool is_insert = s_insert_into.ignore(pos, expected) || - s_upsert_into.ignore(pos, expected); + bool is_insert = s_insert_into.ignore(pos, expected); + bool is_upsert = s_upsert_into.ignore(pos, expected); bool is_import = s_import_into.ignore(pos, expected); - if (!is_insert && !is_import) + if (!is_insert && !is_upsert && !is_import) return false; s_table.ignore(pos, expected); @@ -168,6 +168,7 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) query->data = data != end ? data : nullptr; query->end = end; query->is_import = is_import; + query->is_upsert = is_upsert; if (columns) query->children.push_back(columns); diff --git a/dbms/src/Parsers/ParserManageQuery.cpp b/dbms/src/Parsers/ParserManageQuery.cpp new file mode 100644 index 00000000000..073cc04d9d7 --- /dev/null +++ b/dbms/src/Parsers/ParserManageQuery.cpp @@ -0,0 +1,61 @@ +#include +#include +#include + +#include +#include +#include + +#include + + +namespace DB +{ +bool ParserManageQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) +{ + ParserKeyword s_manage_table("MANAGE TABLE"); + ParserKeyword s_flush("FLUSH"); + ParserKeyword s_status("STATUS"); + ParserKeyword s_check("CHECK"); + + ParserToken s_dot(TokenType::Dot); + ParserIdentifier name_p; + ASTPtr database; + ASTPtr table; + ManageOperation::Enum operation; + + if (!s_manage_table.ignore(pos, expected)) + return false; + + if (!name_p.parse(pos, table, expected)) + return false; + + if (s_dot.ignore(pos, expected)) + { + database = table; + if (!name_p.parse(pos, table, expected)) + return false; + } + + if (s_flush.ignore(pos, expected)) + operation = ManageOperation::Enum::Flush; + else if (s_status.ignore(pos, expected)) + operation = ManageOperation::Enum::Status; + else if (s_check.ignore(pos, expected)) + operation = ManageOperation::Enum::Check; + else + return false; + + auto query = std::make_shared(); + node = query; + + if (database) + query->database = typeid_cast(*database).name; + if (table) + query->table = typeid_cast(*table).name; + + query->operation = operation; + + return true; +} +} diff --git a/dbms/src/Parsers/ParserManageQuery.h b/dbms/src/Parsers/ParserManageQuery.h new file mode 100644 index 00000000000..5cd4b800601 --- /dev/null +++ b/dbms/src/Parsers/ParserManageQuery.h @@ -0,0 +1,17 @@ +#pragma once + +#include +#include + + +namespace DB +{ + +class ParserManageQuery : public IParserBase +{ +protected: + const char * getName() const { return "MANAGE query"; } + bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected); +}; + +} diff --git a/dbms/src/Parsers/ParserQuery.cpp b/dbms/src/Parsers/ParserQuery.cpp index f480390f1f9..8f232fd7f86 100644 --- a/dbms/src/Parsers/ParserQuery.cpp +++ b/dbms/src/Parsers/ParserQuery.cpp @@ -12,6 +12,7 @@ #include #include #include +#include namespace DB { @@ -28,6 +29,7 @@ bool ParserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) ParserOptimizeQuery optimize_p; ParserSystemQuery system_p; ParserTruncateQuery truncate_p; + ParserManageQuery manage_p; bool res = query_with_output_p.parse(pos, node, expected) || insert_p.parse(pos, node, expected) @@ -37,7 +39,8 @@ bool ParserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) || dbginvoke_p.parse(pos, node, expected) || optimize_p.parse(pos, node, expected) || system_p.parse(pos, node, expected) - || truncate_p.parse(pos, node, expected); + || truncate_p.parse(pos, node, expected) + || manage_p.parse(pos, node, expected); return res; } diff --git a/dbms/src/Storages/CMakeLists.txt b/dbms/src/Storages/CMakeLists.txt index 33353edcc10..37f650d57be 100644 --- a/dbms/src/Storages/CMakeLists.txt +++ b/dbms/src/Storages/CMakeLists.txt @@ -4,4 +4,5 @@ if (ENABLE_TESTS) add_subdirectory (tests EXCLUDE_FROM_ALL) add_subdirectory (Transaction/tests EXCLUDE_FROM_ALL) add_subdirectory (Page/tests EXCLUDE_FROM_ALL) + add_subdirectory (DeltaMerge/tests EXCLUDE_FROM_ALL) endif () diff --git a/dbms/src/Storages/DeltaMerge/.clang-format b/dbms/src/Storages/DeltaMerge/.clang-format new file mode 100644 index 00000000000..ccdb933ce7c --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/.clang-format @@ -0,0 +1,89 @@ +--- +BasedOnStyle: Google +Language: Cpp +AlignAfterOpenBracket: true +BreakBeforeBraces: Custom +BraceWrapping: { + AfterClass: 'true' + AfterControlStatement: 'true' + AfterEnum : 'true' + AfterFunction : 'true' + AfterNamespace : 'true' + AfterStruct : 'true' + AfterUnion : 'true' + BeforeCatch : 'true' + BeforeElse : 'true' + IndentBraces : 'false' +} + +BreakConstructorInitializersBeforeComma: false +Cpp11BracedListStyle: true +ColumnLimit: 140 +ConstructorInitializerAllOnOneLineOrOnePerLine: true +ExperimentalAutoDetectBinPacking: true +UseTab: Never +TabWidth: 4 +IndentWidth: 4 +Standard: Cpp11 +PointerAlignment: Middle +MaxEmptyLinesToKeep: 2 +KeepEmptyLinesAtTheStartOfBlocks: true +AllowShortFunctionsOnASingleLine: Inline +#AllowShortFunctionsOnASingleLine: Empty +AlwaysBreakTemplateDeclarations: true +IndentCaseLabels: false +#SpaceAfterTemplateKeyword: true +#SortIncludes: true +FixNamespaceComments: true + +ReflowComments: false +AlignEscapedNewlinesLeft: true + +# Not changed: +AccessModifierOffset: -4 +AlignConsecutiveAssignments: true +AlignConsecutiveDeclarations: true +AlignOperands: false +AlignTrailingComments: true +AllowAllParametersOfDeclarationOnNextLine: true +AllowShortBlocksOnASingleLine: true +AllowShortCaseLabelsOnASingleLine: false +AllowShortIfStatementsOnASingleLine: false +AllowShortLoopsOnASingleLine: false +AlwaysBreakAfterDefinitionReturnType: None +AlwaysBreakBeforeMultilineStrings: false +BinPackArguments: false +BinPackParameters: false +BreakBeforeBinaryOperators: All +BreakBeforeTernaryOperators: true +CommentPragmas: '^ IWYU pragma:' +ConstructorInitializerIndentWidth: 4 +ContinuationIndentWidth: 4 +DerivePointerAlignment: false +DisableFormat: false +ForEachMacros: [ foreach, Q_FOREACH, BOOST_FOREACH ] +IndentWidth: 4 +IndentWrappedFunctionNames: false +MacroBlockBegin: '' +MacroBlockEnd: '' +NamespaceIndentation: None +ObjCBlockIndentWidth: 4 +ObjCSpaceAfterProperty: true +ObjCSpaceBeforeProtocolList: true +PenaltyBreakBeforeFirstCallParameter: 19 +PenaltyBreakComment: 300 +PenaltyBreakFirstLessLess: 120 +PenaltyBreakString: 1000 +PenaltyExcessCharacter: 1000000 +PenaltyReturnTypeOnItsOwnLine: 60 +SpaceAfterCStyleCast: false +SpaceBeforeAssignmentOperators: true +SpaceBeforeParens: ControlStatements +SpaceInEmptyParentheses: false +SpacesBeforeTrailingComments: 1 +SpacesInContainerLiterals: true +SpacesInCStyleCastParentheses: false +SpacesInParentheses: false +SpacesInSquareBrackets: false +... + diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeBlockInputStream.h b/dbms/src/Storages/DeltaMerge/DeltaMergeBlockInputStream.h new file mode 100644 index 00000000000..898edd5bcfd --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeBlockInputStream.h @@ -0,0 +1,248 @@ +#pragma once + +#include + +#include +#include +#include +#include + +#include + +#include +#include +#include +#include + +namespace DB +{ + +class DeltaMergeBlockInputStream final : public IProfilingBlockInputStream +{ + static constexpr size_t UNLIMITED = std::numeric_limits::max(); + +private: + DeltaTreePtr delta_tree; + + ValueSpacePtr insert_value_space; + ValueSpacePtr modify_value_space; + + EntryIterator entry_it; + EntryIterator entry_end; + + Block header; + size_t num_columns; + size_t expected_block_size; + Ids vs_column_offsets; + + size_t skip_rows; + + bool finished = false; + Columns stable_block_columns; + size_t stable_block_rows = 0; + size_t stable_block_pos = 0; + + size_t sid = 0; + +public: + DeltaMergeBlockInputStream(const BlockInputStreamPtr & stable_input_stream, + const DeltaTreePtr & delta_tree_, + size_t expected_block_size_) + : delta_tree(delta_tree_), + insert_value_space(delta_tree->insert_value_space), + modify_value_space(delta_tree->modify_value_space), + entry_it(delta_tree->begin()), + entry_end(delta_tree->end()), + expected_block_size(expected_block_size_) + { + children.push_back(stable_input_stream); + header = stable_input_stream->getHeader(); + num_columns = header.columns(); + + const auto & names_and_types = insert_value_space->namesAndTypes(); + for (const auto & c : header) + { + for (size_t i = 0; i < names_and_types.size(); ++i) + { + if (c.name == names_and_types[i].name) + vs_column_offsets.push_back(i); + } + } + + skip_rows = entry_it != entry_end ? entry_it.getSid() : UNLIMITED; + } + + String getName() const override { return "DeltaMerge"; } + Block getHeader() const override { return children.back()->getHeader(); } + +protected: + Block readImpl() override + { + if (finished) + return {}; + MutableColumns columns; + initOutputColumns(columns); + if (columns.empty()) + return {}; + + while (columns[0]->size() < expected_block_size) + { + if (!next(columns)) + { + finished = true; + break; + } + } + if (!columns.at(0)->size()) + return {}; + + return header.cloneWithColumns(std::move(columns)); + } + +private: + inline void initOutputColumns(MutableColumns & columns) + { + columns.resize(num_columns); + + for (size_t i = 0; i < num_columns; ++i) + { + columns[i] = header.safeGetByPosition(i).column->cloneEmpty(); + columns[i]->reserve(expected_block_size); + } + } + + inline bool fillStableBlockIfNeed() + { + if (!stable_block_columns.empty() && stable_block_pos < stable_block_rows) + return true; + + stable_block_columns.clear(); + stable_block_rows = 0; + stable_block_pos = 0; + auto block = children.back()->read(); + if (!block) + return false; + + stable_block_rows = block.rows(); + for (size_t column_id = 0; column_id < num_columns; ++column_id) + stable_block_columns.push_back(block.getByPosition(column_id).column); + return true; + } + + inline void ignoreStableTuples(size_t n) + { + while (n) + { + if (!fillStableBlockIfNeed()) + throw Exception("Not more rows to ignore!"); + auto skip = std::min(stable_block_columns.at(0)->size() - stable_block_pos, n); + stable_block_pos += skip; + n -= skip; + } + } + + bool next(MutableColumns & output_columns) + { + while (skip_rows || (entry_it != entry_end && entry_it.getType() == DT_DEL)) + { + if (skip_rows) + { + if (!fillStableBlockIfNeed()) + return false; + auto in_output_rows = output_columns.at(0)->size(); + auto in_input_rows = stable_block_columns.at(0)->size(); + if (!in_output_rows && !stable_block_pos && in_input_rows <= skip_rows) + { + // Simply return stable_input_stream block. + for (size_t column_id = 0; column_id < output_columns.size(); ++column_id) + output_columns[column_id] = (*std::move(stable_block_columns[column_id])).mutate(); + + stable_block_pos += in_input_rows; + skip_rows -= in_input_rows; + sid += in_input_rows; + } + else + { + auto copy_rows = std::min(expected_block_size - in_output_rows, in_input_rows - stable_block_pos); + copy_rows = std::min(copy_rows, skip_rows); + + for (size_t column_id = 0; column_id < num_columns; ++column_id) + output_columns[column_id]->insertRangeFrom(*stable_block_columns[column_id], stable_block_pos, copy_rows); + + stable_block_pos += copy_rows; + skip_rows -= copy_rows; + sid += copy_rows; + } + return true; + } + else + { + if (!fillStableBlockIfNeed()) + throw Exception("No more rows to delete!"); + if (unlikely(sid != entry_it.getSid())) + throw Exception("Algorithm broken!"); + + ignoreStableTuples(entry_it.getValue()); + sid += entry_it.getValue(); + + ++entry_it; + skip_rows = (entry_it != entry_end ? entry_it.getSid() : UNLIMITED) - sid; + } + } + + if (unlikely(sid != entry_it.getSid())) + throw Exception("Algorithm broken!"); + + if (entry_it.getType() == DT_INS) + { + auto value_id = entry_it.getValue(); + for (size_t column_id = 0; column_id < num_columns; ++column_id) + { + const auto & split_column = insert_value_space->columnAt(vs_column_offsets[column_id]); + output_columns[column_id]->insertFrom(split_column.chunk(value_id), split_column.offsetInChunk(value_id)); + } + } + else + { + // Modify + if (!fillStableBlockIfNeed()) + return false; + std::vector has_modifies(num_columns, INVALID_ID); + + if (entry_it.getType() == DT_MULTI_MOD) + { + DTModifiesPtr modifies = reinterpret_cast(entry_it.getValue()); + for (const auto & m : *modifies) + has_modifies[m.column_id] = m.value; + } + else + { + has_modifies[entry_it.getType()] = entry_it.getValue(); + } + + for (size_t column_id = 0; column_id < num_columns; ++column_id) + { + auto offset = vs_column_offsets[column_id]; + if (has_modifies[offset] != INVALID_ID) + { + output_columns[column_id]->insertFrom(*stable_block_columns[column_id], stable_block_pos); + } + else + { + auto & split_column = insert_value_space->columnAt(offset); + auto value_id = has_modifies[offset]; + output_columns[column_id]->insertFrom(split_column.chunk(value_id), split_column.offsetInChunk(value_id)); + } + } + + ++stable_block_pos; + ++sid; + } + + ++entry_it; + skip_rows = (entry_it != entry_end ? entry_it.getSid() : UNLIMITED) - sid; + + return true; + } +}; +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeBlockOutputStream.h b/dbms/src/Storages/DeltaMerge/DeltaMergeBlockOutputStream.h new file mode 100644 index 00000000000..220d8ca13ad --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeBlockOutputStream.h @@ -0,0 +1,395 @@ +#pragma once + +#include + +#include + +#include +#include + +#include + +#include + +#include + +namespace DB +{ + +enum class Action : UInt8 +{ + Insert = 1, + Upsert = 2, + Delete = 3, + Update = 4 +}; + +inline int compareTuple(const ValueSpacePtr & left, + size_t l_pos, + const ValueSpacePtr & right, + size_t r_pos, + const SortDescription & sort_desc, + const Ids & vs_column_offsets) +{ + auto num_sort_columns = sort_desc.size(); + for (size_t i = 0; i < num_sort_columns; ++i) + { + int direction = sort_desc[i].direction; + int nulls_direction = sort_desc[i].nulls_direction; + + auto col_offset = vs_column_offsets[i]; + + auto & left_col = left->columnAt(col_offset).chunk(l_pos); + auto left_col_offset = left->columnAt(col_offset).offsetInChunk(l_pos); + + auto & right_col = right->columnAt(col_offset).chunk(r_pos); + auto right_col_offset = right->columnAt(col_offset).offsetInChunk(r_pos); + + int res = direction * left_col.compareAt(left_col_offset, right_col_offset, right_col, nulls_direction); + if (res != 0) + return res; + } + return 0; +} + +inline int compareTuple(const Columns & left, size_t l_pos, const Columns & right, size_t r_pos, const SortDescription & sort_desc) +{ + auto num_sort_columns = sort_desc.size(); + for (size_t i = 0; i < num_sort_columns; ++i) + { + int direction = sort_desc[i].direction; + int nulls_direction = sort_desc[i].nulls_direction; + int res = direction * left[i]->compareAt(l_pos, r_pos, *(right[i]), nulls_direction); + if (res != 0) + return res; + } + return 0; +} + +struct RidGenerator +{ + BlockInputStreamPtr stable_stream; + const SortDescription & sort_desc; + const size_t num_sort_columns; + + Columns modify_block_columns; + size_t modify_block_rows; + size_t modify_block_pos = 0; + // Whether this row's pk duplicates with the next one, if so, they share the same rid. + std::vector modify_block_dup_next; + // Whether current row's pk duplicates with the previous one. Used by Upsert. + bool dup_prev = false; + + Columns stable_block_columns; + size_t stable_block_rows = 0; + size_t stable_block_pos = 0; + bool stable_finished = false; + + UInt64 rid = 0; + + RidGenerator(const BlockInputStreamPtr & stable_stream_, const SortDescription & sort_desc_, Action action, const Block & modify_block) + : stable_stream(stable_stream_), + sort_desc(sort_desc_), + num_sort_columns(sort_desc.size()), + modify_block_rows(modify_block.rows()), + modify_block_dup_next(modify_block_rows, false) + { + stable_stream->readPrefix(); + + for (size_t i = 0; i < num_sort_columns; ++i) + modify_block_columns.push_back(modify_block.getByName(sort_desc[i].column_name).column); + + // Check continually tuples with identical primary key. + for (size_t row = 0; row < modify_block_rows - 1; ++row) + { + auto res = compareTuple(modify_block_columns, row, modify_block_columns, row + 1, sort_desc); + if (unlikely(res > 0)) + throw Exception("Algorithm broken: res should not > 0"); + if (res == 0 && (action == Action::Insert || action == Action::Delete)) + throw Exception("Duplicate primary key exist in modify block"); + modify_block_dup_next[row] = (res == 0); + } + } + + ~RidGenerator() { stable_stream->readSuffix(); } + + inline int compareModifyToStable() const + { + return compareTuple(modify_block_columns, modify_block_pos, stable_block_columns, stable_block_pos, sort_desc); + } + + inline bool fillStableBlockIfNeed() + { + if (stable_finished) + return false; + if (!stable_block_columns.empty() && stable_block_pos < stable_block_rows) + return true; + stable_block_columns.clear(); + stable_block_rows = 0; + stable_block_pos = 0; + auto block = stable_stream->read(); + if (!block) + { + stable_finished = true; + return false; + } + stable_block_rows = block.rows(); + for (size_t column_id = 0; column_id < num_sort_columns; ++column_id) + stable_block_columns.push_back(block.getByName(sort_desc[column_id].column_name).column); + +#ifndef NDEBUG + for (size_t row = 0; row < stable_block_rows - 1; ++row) + { + auto res = compareTuple(stable_block_columns, row, stable_block_columns, row + 1, sort_desc); + if (unlikely(res >= 0)) + throw Exception("Algorithm broken: res should not >= 0"); + } +#endif + return true; + } + + UInt64 nextForInsert() + { + while (fillStableBlockIfNeed()) + { + auto res = compareModifyToStable(); + if (res == 0) + throw Exception("Duplicate primary key already exists in table"); + else if (res < 0) + { + ++modify_block_pos; + return rid++; + } + else + { + ++stable_block_pos; + ++rid; + } + } + + ++modify_block_pos; + return rid++; + } + + std::pair nextForUpsert() + { + while (fillStableBlockIfNeed()) + { + auto res = compareModifyToStable(); + if (res > 0) + { + ++stable_block_pos; + ++rid; + continue; + } + + auto cur_dup_prev = dup_prev; + dup_prev = modify_block_dup_next[modify_block_pos++]; + if (res == 0) + { + if (dup_prev) + return {rid, true}; + else + { + ++stable_block_pos; + return {rid++, true}; + } + } + else if (res < 0) + { + if (dup_prev) + return {rid, cur_dup_prev}; + else + return {rid++, cur_dup_prev}; + } + } + + auto cur_dup_prev = dup_prev; + dup_prev = modify_block_dup_next[modify_block_pos]; + ++modify_block_pos; + if (dup_prev) + return {rid, cur_dup_prev}; + else + return {rid++, cur_dup_prev}; + } + + UInt64 nextForDelete() + { + while (fillStableBlockIfNeed()) + { + auto res = compareModifyToStable(); + if (res == 0) + { + ++modify_block_pos; + ++stable_block_pos; + return rid; + } + else if (res > 0) + { + ++stable_block_pos; + ++rid; + } + else // res < 0 + { + throw Exception("Rows not found"); + } + } + throw Exception("Rows not found"); + } + + UInt64 nextForUpdate() + { + while (fillStableBlockIfNeed()) + { + auto res = compareModifyToStable(); + if (res == 0) + { + if (modify_block_dup_next[modify_block_pos++]) + return rid; + else + return rid++; + } + else if (res > 0) + { + ++stable_block_pos; + ++rid; + } + else // res < 0 + { + throw Exception("Rows not found"); + } + } + throw Exception("Rows not found"); + } +}; + +class DeltaMergeBlockOutputStream final : public IBlockOutputStream +{ +public: + using InputStreamCreator = std::function; + using Flusher = std::function; + + DeltaMergeBlockOutputStream( // + const InputStreamCreator & stable_input_stream_creator_, + DeltaTreePtr & delta_tree_, + const Action action_, + const SortDescription & primary_sort_descr_, + const Flusher flusher_, + const size_t flush_limit_) + : stable_input_stream_creator(stable_input_stream_creator_), + delta_tree(delta_tree_), + action(action_), + primary_sort_descr(primary_sort_descr_), + flusher(flusher_), + flush_limit(flush_limit_), + log(&Logger::get("DeltaMergeBlockOutputStream")) + { + } + + Block getHeader() const override + { + Block res; + for (auto & nt : delta_tree->insert_value_space->namesAndTypes()) + { + res.insert({nt.type, nt.name}); + } + return res; + } + + void writeSuffix() override + { + delta_tree->insert_value_space->gc(); + delta_tree->modify_value_space->gc(); + } + + Block sortBlock(const Block & block) + { + if (isAlreadySorted(block, primary_sort_descr)) + return block; + + Block res; + IColumn::Permutation perm; + stableGetPermutation(block, primary_sort_descr, perm); + for (const auto & col : block) + res.insert(ColumnWithTypeAndName(col.column->permute(perm, 0), col.type, col.name)); + return res; + } + + void write(const Block & block_) override + { + if (!block_.rows()) + return; + + Block block = sortBlock(block_); + auto block_rows = block.rows(); + + RidGenerator rid_gen(stable_input_stream_creator(), primary_sort_descr, action, block); + + switch (action) + { + case Action::Insert: + { + auto value_ids = delta_tree->insert_value_space->addFromInsert(block); + Ids rids(block_rows); + for (size_t i = 0; i < block_rows; ++i) + rids[i] = rid_gen.nextForInsert(); + for (size_t i = 0; i < block_rows; ++i) + delta_tree->addInsert(rids[i], value_ids[i]); + break; + } + case Action::Upsert: + { + auto value_ids = delta_tree->insert_value_space->addFromInsert(block); + using Rids = std::vector>; + Rids rids(block_rows); + for (size_t i = 0; i < block_rows; ++i) + rids[i] = rid_gen.nextForUpsert(); + for (size_t i = 0; i < block_rows; ++i) + { + const auto & rid_dup = rids[i]; + if (rid_dup.second) + delta_tree->addDelete(rid_dup.first); + delta_tree->addInsert(rid_dup.first, value_ids[i]); + } + break; + } + case Action::Delete: + { + Ids rids(block_rows); + for (size_t i = 0; i < block_rows; ++i) + rids[i] = rid_gen.nextForDelete(); + for (size_t i = 0; i < block_rows; ++i) + delta_tree->addDelete(rids[i]); + break; + } + case Action::Update: + { + auto tuples = delta_tree->insert_value_space->addFromModify(block); + Ids rids(block_rows); + for (size_t i = 0; i < block_rows; ++i) + rids[i] = rid_gen.nextForUpdate(); + for (size_t i = 0; i < block_rows; ++i) + delta_tree->addModify(rids[i], tuples[i]); + break; + } + default: + throw Exception("Illegal action"); + } + + if (delta_tree->entries() >= flush_limit) + flusher(); + } + +private: + InputStreamCreator stable_input_stream_creator; + + DeltaTreePtr & delta_tree; // it could be changed by flusher. + Action action; + SortDescription primary_sort_descr; + + Flusher flusher; + size_t flush_limit; + + Logger * log; +}; +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeDefines.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeDefines.cpp new file mode 100644 index 00000000000..19205610f09 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeDefines.cpp @@ -0,0 +1,6 @@ +#include + +namespace DB +{ +const size_t INVALID_ID = std::numeric_limits::max(); +} \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h b/dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h new file mode 100644 index 00000000000..38b16408770 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h @@ -0,0 +1,37 @@ +#pragma once + +#include +#include + +#include +#include + +#include + +namespace DB +{ +/// S: Scale factor of delta tree node. +/// M: Capacity factor of leaf node. +/// F: Capacity factor of Intern node. +/// For example, the max capacity of leaf node would be: M * S. +static constexpr size_t DT_S = 3; +static constexpr size_t DT_M = 55; +static constexpr size_t DT_F = 13; + +extern const size_t INVALID_ID; + +template > +class DeltaTree; + +template +class DTEntryIterator; + +class MemoryValueSpace; + +using EntryIterator = DTEntryIterator; +using ValueSpacePtr = std::shared_ptr; +using DefaultDeltaTree = DeltaTree; +using DeltaTreePtr = std::shared_ptr; +using Ids = std::vector; + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/DeltaTree.cpp b/dbms/src/Storages/DeltaMerge/DeltaTree.cpp new file mode 100644 index 00000000000..99cd376154e --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/DeltaTree.cpp @@ -0,0 +1,8 @@ +#include + +#include + +namespace DB +{ + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/DeltaTree.h b/dbms/src/Storages/DeltaMerge/DeltaTree.h new file mode 100644 index 00000000000..c84a0ac6146 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/DeltaTree.h @@ -0,0 +1,1273 @@ +#pragma once + +#include +#include +#include + +#include +#include +#include + +#include + +namespace DB +{ + +struct DTMutation; +struct DTModify; +template +struct DTLeaf; +template +struct DTIntern; + +extern const size_t INVALID_ID; + +using DTModifies = std::vector; +using DTModifiesPtr = DTModifies *; +static_assert(sizeof(UInt64) >= sizeof(DTModifiesPtr)); + +#define as(T, p) (reinterpret_cast(p)) +#define asNode(p) (reinterpret_cast(p)) +#define isLeaf(p) (((*reinterpret_cast(p)) & 0x01) != 0) +#define nodeName(p) (isLeaf(p) ? "leaf" : "intern") + +/// DTMutation type available values. +static constexpr UInt16 DT_INS = 65535; +static constexpr UInt16 DT_DEL = 65534; +static constexpr UInt16 DT_MULTI_MOD = 65533; +static constexpr UInt16 DT_MAX_COLUMN_ID = 65500; + +inline std::string DTTypeString(UInt16 type) +{ + String type_s; + switch (type) + { + case DT_INS: + type_s = "INS"; + break; + case DT_DEL: + type_s = "DEL"; + break; + case DT_MULTI_MOD: + type_s = "MMD"; + break; + default: + type_s = ::DB::toString(type); + break; + } + return type_s; +} + +struct DTMutation +{ + DTMutation() = default; + DTMutation(UInt16 type_, UInt64 value_) : type(type_), value(value_) {} + + /// DT_INS : Insert + /// DT_DEL : Delete + /// DT_MULTI_MOD : modify chain + /// otherwise, mutation is in MOD mode, "type" is modify columnId. + UInt16 type = 0; + /// for DT_INS and MOD, "value" is the value index in value space; + /// for DT_MULTI_MOD, "value" represents the chain pointer; + /// for DT_DEL, "value" is the consecutive deleting number, e.g. 5 means 5 tuples deleted from current position. + UInt64 value = 0; + + inline bool isModify() const { return type != DT_INS && type != DT_DEL; } +}; + +struct DTModify +{ + DTModify() = default; + + DTModify(size_t column_id_, UInt64 value_) : column_id(column_id_), value(value_) {} + + size_t column_id = 0; + UInt64 value = 0; +}; + + +/// Note that we allocate one more slot for entries in DTIntern and DTLeaf, to simplify entry insert operation. + +template +struct DTLeaf +{ + using NodePtr = void *; + using Leaf = DTLeaf; + using Intern = DTIntern; + using LeafPtr = Leaf *; + using InternPtr = Intern *; + + DTLeaf() = default; + + const size_t mark = 1; // <-- This mark MUST be declared at first place! + + UInt64 sids[M * S + 1]; + DTMutation mutations[M * S + 1]; + size_t count = 0; // mutations number count + + LeafPtr prev = nullptr; + LeafPtr next = nullptr; + InternPtr parent = nullptr; + + inline UInt64 sid(size_t pos) const { return sids[pos]; } + inline UInt64 rid(size_t pos, Int64 delta) const { return sids[pos] + delta; } + inline UInt16 type(size_t pos) const { return mutations[pos].type; } + inline UInt16 value(size_t pos) const { return mutations[pos].value; } + inline bool isModify(size_t pos) const { return mutations[pos].isModify(); } + + static inline bool overflow(size_t count) { return count > M * S; } + static inline bool underflow(size_t count) { return count < M; } + inline bool legal() { return !overflow(count) && !underflow(count); } + inline std::string state() { return overflow(count) ? "overflow" : (underflow(count) ? "underflow" : "legal"); } + + /// shift entries from pos with n steps. + inline void shiftEntries(size_t pos, int n) + { + if (n == 0) + { + return; + } + if (n > 0) + { + std::move_backward(std::begin(sids) + pos, std::begin(sids) + count, std::begin(sids) + count + n); + std::move_backward(std::begin(mutations) + pos, std::begin(mutations) + count, std::begin(mutations) + count + n); + } + else + { + std::move(std::begin(sids) + pos, std::begin(sids) + count, std::begin(sids) + pos + n); + std::move(std::begin(mutations) + pos, std::begin(mutations) + count, std::begin(mutations) + pos + n); + } + } + + /// calculate the delta this leaf node generated. + inline Int64 getDelta() const + { + Int64 delta = 0; + for (size_t i = 0; i < count; ++i) + { + const auto & m = mutations[i]; + if (m.type == DT_INS) + delta += 1; + else if (m.type == DT_DEL) + delta -= m.value; + } + return delta; + } + + /// Search the first pos with equal or greater id. + /// Returns the mutations count if the id is not found in this leaf. + template + inline std::pair search(const UInt64 id, Int64 delta) const + { + size_t i = 0; + for (; i < count; ++i) + { + if constexpr (isRid) + { + if (id <= rid(i, delta)) + return {i, delta}; + } + else + { + if (id <= sid(i)) + return {i, delta}; + } + if (type(i) == DT_INS) + delta += 1; + else if (type(i) == DT_DEL) + delta -= value(i); + } + return {i, delta}; + } + + template + inline bool exists(const UInt64 id, Int64 delta) const + { + return search(id, delta).first < count; + } + + inline std::pair searchRid(const UInt64 id, Int64 delta) const { return search(id, delta); } + inline std::pair searchSid(const UInt64 id, Int64 delta) const { return search(id, delta); } + + inline bool existsRid(const UInt64 id, Int64 delta) const { return exists(id, delta); } + inline bool existsSid(const UInt64 id) const { return exists(id, 0); } + + /// Split into two nodes. + /// This mehod only handle the pre/next/parent link, but won't handle the updates in parent node. + /// Returns the new splited sid. + inline UInt64 split(LeafPtr right_n) + { + size_t split = M * S / 2; + + right_n->prev = this; + right_n->next = this->next; + right_n->parent = this->parent; + + if (this->next) + this->next->prev = right_n; + this->next = right_n; + + std::move(std::begin(this->sids) + split, std::begin(this->sids) + this->count, std::begin(right_n->sids)); + std::move(std::begin(this->mutations) + split, std::begin(this->mutations) + this->count, std::begin(right_n->mutations)); + + right_n->count = this->count - split; + this->count = split; + + return right_n->sids[0]; + } + + /// Merge this node and the sibling. + /// Note that sibling should be deleted outside. + inline void merge(LeafPtr sibling, bool left, size_t node_pos) + { + adopt(sibling, left, sibling->count, node_pos); + if (left) + { + this->prev = sibling->prev; + if (this->prev) + this->prev->next = this; + } + else + { + this->next = sibling->next; + if (this->next) + this->next->prev = this; + } + } + + /// Adopt one entry from sibling, whether sibling is from left or right are handled in different way. + /// Returns new separator sid. + inline UInt64 adopt(LeafPtr sibling, bool left, size_t adopt_count, size_t /*node_pos*/) + { + if (left) + { + this->shiftEntries(0, adopt_count); + + auto sibling_cut = sibling->count - adopt_count; + std::move(std::begin(sibling->sids) + sibling_cut, std::begin(sibling->sids) + sibling->count, std::begin(this->sids)); + std::move( + std::begin(sibling->mutations) + sibling_cut, std::begin(sibling->mutations) + sibling->count, std::begin(this->mutations)); + + sibling->count -= adopt_count; + this->count += adopt_count; + + return this->sids[0]; + } + else + { + std::move(std::begin(sibling->sids), std::begin(sibling->sids) + adopt_count, std::begin(this->sids) + this->count); + std::move( + std::begin(sibling->mutations), std::begin(sibling->mutations) + adopt_count, std::begin(this->mutations) + this->count); + + sibling->shiftEntries(adopt_count, -adopt_count); + + sibling->count -= adopt_count; + this->count += adopt_count; + + return sibling->sids[0]; + } + } +}; + +template +struct DTIntern +{ + using NodePtr = void *; + using Leaf = DTLeaf; + using Intern = DTIntern; + using LeafPtr = Leaf *; + using InternPtr = Intern *; + + DTIntern() = default; + + const size_t mark = 0; // <-- This mark MUST be declared at first place! + + UInt64 sids[F * S + 1]; + Int64 deltas[F * S + 1]; + NodePtr children[F * S + 1]; + size_t count = 0; // children number count, and sids' is "count - 1" + + InternPtr parent = nullptr; + + inline UInt64 sid(size_t pos) const { return sids[pos]; } + inline UInt64 rid(size_t pos, Int64 delta) const { return sids[pos] + delta; } + + static inline bool overflow(size_t count) { return count > F * S; } + static inline bool underflow(size_t count) { return count < F; } + inline bool legal() { return !overflow(count) && !underflow(count); } + inline std::string state() { return overflow(count) ? "overflow" : (underflow(count) ? "underflow" : "legal"); } + + /// shift entries from pos with n steps. + inline void shiftEntries(size_t child_pos, int n) + { + if (n == 0) + { + return; + } + else if (n > 0) + { + if (child_pos != count) + { + if (count + n > F * S + 1) + throw Exception("Overflow"); + } + + std::move_backward(std::begin(sids) + child_pos, std::begin(sids) + count, std::begin(sids) + count + n); + std::move_backward(std::begin(deltas) + child_pos, std::begin(deltas) + count, std::begin(deltas) + count + n); + std::move_backward(std::begin(children) + child_pos, std::begin(children) + count, std::begin(children) + count + n); + if (((int)child_pos) - 1 >= 0) + sids[child_pos - 1 + n] = sids[child_pos - 1]; + + return; + } + else + { + if (child_pos != count) + { + if ((Int64)child_pos < -n) + throw Exception("Underflow"); + } + + if (((int)child_pos) - 1 + n >= 0) + sids[child_pos - 1 + n] = sids[child_pos - 1]; + std::move(std::begin(sids) + child_pos, std::begin(sids) + count, std::begin(sids) + child_pos + n); + std::move(std::begin(deltas) + child_pos, std::begin(deltas) + count, std::begin(deltas) + child_pos + n); + std::move(std::begin(children) + child_pos, std::begin(children) + count, std::begin(children) + child_pos + n); + return; + } + } + + inline size_t searchChild(NodePtr child) + { + size_t i = 0; + for (; i < count && children[i] != child; ++i) {} + return i; + } + + inline Int64 getDelta() + { + Int64 delta = 0; + for (size_t i = 0; i < count; ++i) + { + delta += deltas[i]; + } + return delta; + } + + inline void refreshChildParent() + { + if (isLeaf(children[0])) + for (size_t i = 0; i < count; ++i) + as(Leaf, children[i])->parent = this; + else + for (size_t i = 0; i < count; ++i) + as(Intern, children[i])->parent = this; + } + + /// Split into two nodes. + /// Returns the newly created node, and new separator sid. + inline UInt64 split(InternPtr right_n) + { + size_t split = F * S / 2; + + right_n->parent = this->parent; + + std::move(std::begin(this->sids) + split, std::begin(this->sids) + this->count, std::begin(right_n->sids)); + std::move(std::begin(this->deltas) + split, std::begin(this->deltas) + this->count, std::begin(right_n->deltas)); + std::move(std::begin(this->children) + split, std::begin(this->children) + this->count, std::begin(right_n->children)); + + right_n->count = this->count - split; + this->count = split; + + this->refreshChildParent(); + right_n->refreshChildParent(); + + return this->sids[this->count - 1]; + } + + /// Merge this node and the sibling, node_pos is the position of currently node in parent. + /// Note that sibling should be deleted outside. + inline void merge(InternPtr sibling, bool left, size_t node_pos) { adopt(sibling, left, sibling->count, node_pos); } + + /// Adopt one entry from sibling, whether sibling is from left or right are handled in different way. + /// node_pos is the position of currently node in parent. + /// Returns new separator sid. + inline UInt64 adopt(InternPtr sibling, bool left, size_t adopt_count, size_t node_pos) + { + if (left) + { + this->shiftEntries(0, adopt_count); + + auto sibling_cut = sibling->count - adopt_count; + // if adopt_count equals to sibling->count, new_sep_sid is meaningless. + auto new_sep_sid = !sibling_cut ? INVALID_ID : sibling->sids[sibling_cut - 1]; + + std::move(std::begin(sibling->sids) + sibling_cut, std::begin(sibling->sids) + sibling->count - 1, std::begin(this->sids)); + std::move(std::begin(sibling->deltas) + sibling_cut, std::begin(sibling->deltas) + sibling->count, std::begin(this->deltas)); + std::move( + std::begin(sibling->children) + sibling_cut, std::begin(sibling->children) + sibling->count, std::begin(this->children)); + + this->sids[adopt_count - 1] = parent->sids[node_pos - 1]; + + sibling->count -= adopt_count; + this->count += adopt_count; + + this->refreshChildParent(); + + return new_sep_sid; + } + else + { + auto new_sep_sid = adopt_count == sibling->count ? INVALID_ID : sibling->sids[adopt_count - 1]; + + std::move(std::begin(sibling->sids), std::begin(sibling->sids) + adopt_count, std::begin(this->sids) + this->count); + std::move(std::begin(sibling->deltas), std::begin(sibling->deltas) + adopt_count, std::begin(this->deltas) + this->count); + std::move(std::begin(sibling->children), std::begin(sibling->children) + adopt_count, std::begin(this->children) + this->count); + + sibling->shiftEntries(adopt_count, -adopt_count); + + this->sids[this->count - 1] = parent->sids[node_pos]; + + sibling->count -= adopt_count; + this->count += adopt_count; + + this->refreshChildParent(); + + return new_sep_sid; + } + } +}; + + +template +class DTEntryIterator +{ + using LeafPtr = DTLeaf *; + + LeafPtr leaf; + size_t pos; + Int64 delta; + +public: + DTEntryIterator(LeafPtr leaf_, size_t pos_, Int64 delta_) : leaf(leaf_), pos(pos_), delta(delta_) {} + + bool operator==(const DTEntryIterator & rhs) const { return leaf == rhs.leaf && pos == rhs.pos; } + bool operator!=(const DTEntryIterator & rhs) const { return !(*this == rhs); } + + DTEntryIterator & operator++() + { + if (leaf->type(pos) == DT_INS) + delta += 1; + else if (leaf->type(pos) == DT_DEL) + delta -= leaf->value(pos); + + if (++pos >= leaf->count && leaf->next) + { + leaf = leaf->next; + pos = 0; + } + + return *this; + } + + DTEntryIterator & operator--() + { + if (pos > 0) + { + --pos; + } + else + { + leaf = leaf->prev; + pos = leaf->count - 1; + } + + if (leaf->type(pos) == DT_INS) + delta -= 1; + else if (leaf->type(pos) == DT_DEL) + delta += leaf->value(pos); + + return *this; + } + + DTMutation getMutation() const { return leaf->mutations[pos]; } + LeafPtr getLeaf() const { return leaf; } + size_t getPos() const { return pos; } + Int64 getDelta() const { return delta; } + UInt16 getType() const { return leaf->mutations[pos].type; } + UInt64 getValue() const { return leaf->mutations[pos].value; } + UInt64 getSid() const { return leaf->sids[pos]; } + UInt64 getRid() const { return leaf->sids[pos] + delta; } +}; + +template +class DeltaTree : private Allocator +{ +public: + using NodePtr = void *; + using Leaf = DTLeaf; + using Intern = DTIntern; + using LeafPtr = Leaf *; + using InternPtr = Intern *; + using EntryIterator = DTEntryIterator; + using ValueSpacePtr = std::shared_ptr; + + static_assert(M >= 2); + static_assert(F >= 2); + static_assert(S >= 2); + /// We rely on the standard layout to determine whether a node is Leaf or Intern. + static_assert(std::is_standard_layout_v); + static_assert(std::is_standard_layout_v); + +private: + NodePtr root; + LeafPtr left_leaf, right_leaf; + size_t height = 1; + + size_t insert_count = 0; + size_t delete_count = 0; + size_t modify_count = 0; + + size_t num_entries = 0; + + Logger * log; + +public: + ValueSpacePtr insert_value_space; + ValueSpacePtr modify_value_space; + +private: + inline bool isRootOnly() const { return height == 1; } + + void check(NodePtr node) const; + + using LeafAndDeltaPtr = std::pair; + /// Find right most leaf this id (rid/sid) could exists/insert. + template + LeafAndDeltaPtr findRightLeaf(const UInt64 id) const; + + /// Find left most leaf this id (rid/sid) could exists. + template + LeafAndDeltaPtr findLeftLeaf(const UInt64 id) const; + + using InterAndSid = std::pair; + template + InterAndSid submitMinSid(T * node, UInt64 subtree_min_sid); + + template + InternPtr afterNodeUpdated(T * node); + + inline void afterLeafUpdated(LeafPtr leaf) + { + if (leaf->count == 0 && isRootOnly()) + return; + + InternPtr next; + UInt64 subtree_min_sid = INVALID_ID; + std::tie(next, subtree_min_sid) = submitMinSid(leaf, subtree_min_sid); + while (next) + { + std::tie(next, subtree_min_sid) = submitMinSid(next, subtree_min_sid); + } + + next = afterNodeUpdated(leaf); + while (next) + { + next = afterNodeUpdated(next); + } + + //#ifndef NDEBUG + // checkAll(); + //#endif + } + + template + void freeNode(T * node) + { + Allocator::free(reinterpret_cast(node), sizeof(T)); + } + + template + T * createNode() + { + T * n = reinterpret_cast(Allocator::alloc(sizeof(T))); + new (n) T(); + return n; + } + + template + void freeTree(T * node) + { + constexpr bool is_leaf = std::is_same::value; + if constexpr (!is_leaf) + { + InternPtr intern = static_cast(node); + if (intern->count) + { + if (isLeaf(intern->children[0])) + for (size_t i = 0; i < intern->count; ++i) + freeTree(as(Leaf, intern->children[i])); + else + for (size_t i = 0; i < intern->count; ++i) + freeTree(as(Intern, intern->children[i])); + } + } + freeNode(node); + } + +public: + DeltaTree(const ValueSpacePtr & insert_value_space_, const ValueSpacePtr & modify_value_space_) + : log(&Logger::get("DeltaTree")), // + insert_value_space(insert_value_space_), + modify_value_space(modify_value_space_) + { + root = createNode(); + left_leaf = right_leaf = as(Leaf, root); + + LOG_TRACE(log, "MM create"); + } + + + ~DeltaTree() + { + if (isLeaf(root)) + freeTree((LeafPtr)root); + else + freeTree((InternPtr)root); + + LOG_TRACE(log, "MM free"); + } + + void checkAll() const + { + LeafPtr p = left_leaf; + size_t count = 0; + for (; p != right_leaf; p = p->next) + { + count += p->count; + } + count += right_leaf->count; + if (count != num_entries) + throw Exception("WTF"); + + check(root); + } + + size_t getHeight() const { return height; } + EntryIterator begin() const { return EntryIterator(left_leaf, 0, 0); } + EntryIterator end() const + { + Int64 delta = isLeaf(root) ? as(Leaf, root)->getDelta() : as(Intern, root)->getDelta(); + return EntryIterator(right_leaf, right_leaf->count, delta); + } + + size_t entries() const { return num_entries; } + + void addModify(const UInt64 rid, const UInt16 column_id, const UInt64 new_value_id); + void addModify(const UInt64 rid, const RefTuple & tuple); + void addDelete(const UInt64 rid); + void addInsert(const UInt64 rid, const UInt64 tuple_id); +}; + +#define DT_TEMPLATE template +#define DT_CLASS DeltaTree + +DT_TEMPLATE +void DT_CLASS::check(NodePtr node) const +{ + if (isLeaf(node)) + { + LeafPtr p = as(Leaf, node); + if (p->mark > 1 || ((node != root) && (Leaf::overflow(p->count) || Leaf::underflow(p->count)))) + throw Exception("WTF"); + InternPtr parent = p->parent; + if (parent) + { + auto pos = parent->searchChild(p); + if (pos >= parent->count) + throw Exception("WTF"); + if (parent->deltas[pos] != p->getDelta()) + { + throw Exception("WTF"); + } + if (pos > 0 && parent->sids[pos - 1] != p->sids[0]) + { + throw Exception("WTF"); + } + } + } + else + { + InternPtr p = as(Intern, node); + if (p->mark > 1 || ((node != root) && (Intern::overflow(p->count) || Intern::underflow(p->count)))) + throw Exception("WTF"); + + InternPtr parent = p->parent; + if (parent) + { + auto pos = parent->searchChild(p); + if (pos >= parent->count) + throw Exception("WTF"); + if (parent->deltas[pos] != p->getDelta()) + { + throw Exception("WTF"); + } + } + for (size_t i = 0; i < p->count; ++i) + { + check(p->children[i]); + } + } +} + +DT_TEMPLATE +void DT_CLASS::addModify(const UInt64 rid, const UInt16 column_id, const UInt64 new_value_id) +{ + addModify(rid, RefTuple(column_id, new_value_id)); +} + + +DT_TEMPLATE +void DT_CLASS::addModify(const UInt64 rid, const RefTuple & tuple) +{ + if (tuple.values.empty()) + return; + + ++modify_count; + + size_t pos; + LeafPtr leaf; + Int64 delta; + std::tie(leaf, delta) = findRightLeaf(rid); + std::tie(pos, delta) = leaf->searchRid(rid, delta); + + bool exists = pos != leaf->count && leaf->rid(pos, delta) == rid; + if (exists && leaf->type(pos) == DT_DEL) + { + /// Skip DT_DEL entries. + EntryIterator leaf_it(leaf, pos, delta); + EntryIterator leaf_end(this->end()); + while (leaf_it != leaf_end && leaf_it.getRid() == rid && leaf_it.getType() == DT_DEL) + { + ++leaf_it; + } + leaf = leaf_it.getLeaf(); + pos = leaf_it.getPos(); + delta = leaf_it.getDelta(); + + exists = leaf_it != leaf_end && leaf->rid(pos, delta) == rid; + } + + if (unlikely(!isRootOnly() && !leaf->legal())) + throw Exception("Illegal leaf state: " + leaf->state()); + + auto & modify_values = tuple.values; + if (!exists) + { + /// Either rid does not exists in this leaf, or all entries with same rid are DT_DEL, simply append an new entry. + + ++num_entries; + + leaf->shiftEntries(pos, 1); + leaf->sids[pos] = rid - delta; + leaf->mutations[pos] = DTMutation(); + ++(leaf->count); + + auto & mutation = leaf->mutations[pos]; + if (modify_values.size() == 1) + { + mutation.type = modify_values[0].column; + mutation.value = modify_values[0].value; + } + else + { + DTModifiesPtr modifies = new DTModifies(modify_values.size()); + for (size_t i = 0; i < modify_values.size(); ++i) + { + (*modifies)[i] = DTModify(modify_values[i].column, modify_values[i].value); + } + mutation.type = DT_MULTI_MOD; + mutation.value = reinterpret_cast(modifies); + } + } + else + { + /// In-place update for DT_INS. + if (leaf->mutations[pos].type == DT_INS) + { + leaf->mutations[pos].value = insert_value_space->withModify(leaf->value(pos), *modify_value_space, tuple); + // No leaf entry update. + return; + } + + /// Deal with existing modifies. + DTModifiesPtr modifies; + { + auto & m = leaf->mutations[pos]; + if (m.type != DT_MULTI_MOD) + { + if (modify_values.size() == 1 && m.type == modify_values[0].column) + { + modify_value_space->removeFromModify(m.value, m.type); + m.value = modify_values[0].value; + // No leaf entry update. + return; + } + /// Create modify chain and move the value of current entry into. + modifies = new DTModifies(); + modifies->emplace_back(m.type, m.value); + m.type = DT_MULTI_MOD; + m.value = reinterpret_cast(modifies); + } + else + { + modifies = reinterpret_cast(m.value); + } + } + + /// TODO improve algorithm here + for (const auto & value : modify_values) + { + auto it = modifies->begin(); + auto end = modifies->end(); + for (; it != end; ++it) + { + auto & old_modify = *it; + if (value.column == old_modify.column_id) + { + modify_value_space->removeFromModify(old_modify.value, old_modify.column_id); + old_modify.value = value.value; + break; + } + else if (value.column < old_modify.column_id) + { + modifies->insert(it, DTModify(value.column, value.value)); + break; + } + } + if (it == end) + modifies->emplace_back(value.column, value.value); + } + } + afterLeafUpdated(leaf); +} + +DT_TEMPLATE +void DT_CLASS::addDelete(const UInt64 rid) +{ + ++delete_count; + + size_t pos; + LeafPtr leaf; + Int64 delta; + std::tie(leaf, delta) = findLeftLeaf(rid); + std::tie(pos, delta) = leaf->searchRid(rid, delta); + + bool merge = false; + size_t merge_pos; + + bool exists = pos != leaf->count && leaf->rid(pos, delta) == rid; + if (exists && leaf->type(pos) == DT_DEL) + { + /// Skip DT_DEL entries. + EntryIterator leaf_it(leaf, pos, delta); + EntryIterator leaf_end(this->end()); + while (leaf_it != leaf_end && leaf_it.getRid() == rid && leaf_it.getType() == DT_DEL) + { + merge = true; + merge_pos = leaf_it.getPos(); + + ++leaf_it; + } + leaf = leaf_it.getLeaf(); + pos = leaf_it.getPos(); + delta = leaf_it.getDelta(); + + exists = leaf_it != leaf_end && leaf->rid(pos, delta) == rid; + } + + if (unlikely(!isRootOnly() && !leaf->legal())) + throw Exception("Illegal leaf state: " + leaf->state()); + + if (exists) + { + /// Delete existing insert entry. + if (leaf->mutations[pos].type == DT_INS) + { + --num_entries; + + insert_value_space->removeFromInsert(leaf->mutations[pos].value); + leaf->shiftEntries(pos + 1, -1); + --(leaf->count); + + afterLeafUpdated(leaf); + + return; + } + else + { + // Modify + --num_entries; + + auto & m = leaf->mutations[pos]; + if (m.type == DT_MULTI_MOD) + { + DTModifiesPtr modifies = reinterpret_cast(m.value); + for (const auto & md : *modifies) + modify_value_space->removeFromModify(md.value, md.column_id); + delete modifies; + } + else + { + modify_value_space->removeFromModify(m.value, m.type); + } + + leaf->shiftEntries(pos + 1, -1); + --(leaf->count); + } + } + + if (merge) + { + /// Simply increase delete count at the last one of delete chain. + ++(leaf->mutations[merge_pos].value); + } + else + { + ++num_entries; + + leaf->shiftEntries(pos, 1); + leaf->sids[pos] = rid - delta; + leaf->mutations[pos] = DTMutation(DT_DEL, 1); + ++(leaf->count); + } + + afterLeafUpdated(leaf); +} + +DT_TEMPLATE +void DT_CLASS::addInsert(const UInt64 rid, const UInt64 tuple_id) +{ + ++insert_count; + + size_t pos; + LeafPtr leaf; + Int64 delta; + std::tie(leaf, delta) = findRightLeaf(rid); + std::tie(pos, delta) = leaf->searchRid(rid, delta); + + bool exists = pos != leaf->count && leaf->rid(pos, delta) == rid; + if (exists && leaf->type(pos) == DT_DEL) + { + /// Skip DT_DEL entries. + EntryIterator leaf_it(leaf, pos, delta); + EntryIterator leaf_end(this->end()); + while (leaf_it != leaf_end && leaf_it.getRid() == rid && leaf_it.getType() == DT_DEL) + { + ++leaf_it; + } + leaf = leaf_it.getLeaf(); + pos = leaf_it.getPos(); + delta = leaf_it.getDelta(); + } + + if (unlikely(!isRootOnly() && !leaf->legal())) + throw Exception("Illegal leaf state: " + leaf->state()); + + ++num_entries; + + leaf->shiftEntries(pos, 1); + leaf->sids[pos] = rid - delta; + leaf->mutations[pos].type = DT_INS; + leaf->mutations[pos].value = tuple_id; + ++(leaf->count); + + afterLeafUpdated(leaf); +} + +DT_TEMPLATE +template +typename DT_CLASS::LeafAndDeltaPtr DT_CLASS::findRightLeaf(const UInt64 id) const +{ + NodePtr node = root; + Int64 delta = 0; + while (!isLeaf(node)) + { + InternPtr intern = as(Intern, node); + size_t i = 0; + for (; i < intern->count - 1; ++i) + { + delta += intern->deltas[i]; + if constexpr (isRid) + { + if (id < (intern->rid(i, delta))) + { + delta -= intern->deltas[i]; + break; + } + } + else + { + if (id < intern->sid(i)) + { + delta -= intern->deltas[i]; + break; + } + } + } + node = intern->children[i]; + } + return {as(Leaf, node), delta}; +} + +DT_TEMPLATE +template +typename DT_CLASS::LeafAndDeltaPtr DT_CLASS::findLeftLeaf(const UInt64 id) const +{ + NodePtr node = root; + Int64 delta = 0; + bool checkLeaf = false; + while (!isLeaf(node)) + { + InternPtr intern = as(Intern, node); + size_t i = 0; + for (; i < intern->count - 1; ++i) + { + delta += intern->deltas[i]; + bool less; + bool equal; + if constexpr (isRid) + { + less = id < (intern->rid(i, delta)); + equal = id == (intern->rid(i, delta)); + } + else + { + less = id < intern->sid(i); + equal = id == intern->sid(i); + } + if (less || equal) + { + delta -= intern->deltas[i]; + if (equal) + { + checkLeaf = true; + } + break; + } + } + node = intern->children[i]; + } + LeafPtr leaf = as(Leaf, node); + if (checkLeaf) + { + // We can't simply call leaf->exists(id, delta) because of bug (likely) of C++ compliler, both Clang(6.0.0) and GCC. + // They think '<' is an operator and complain like: + // invalid operands of types '' and 'bool' to binary 'operator<' + + bool exists; + if constexpr (isRid) + exists = leaf->existsRid(id, delta); + else + exists = leaf->existsSid(id); + if (!exists) + { + delta += leaf->getDelta(); + leaf = leaf->next; + } + } + return {leaf, delta}; +} + +DT_TEMPLATE +template +typename DT_CLASS::InterAndSid DT_CLASS::submitMinSid(T * node, UInt64 subtree_min_sid) +{ + if (!node) + return {}; + + auto parent = node->parent; + if (!parent) + return {}; + + if constexpr (std::is_same::value) + subtree_min_sid = as(Leaf, node)->sids[0]; + + auto pos = parent->searchChild(asNode(node)); + if (pos != 0) + { + parent->sids[pos - 1] = subtree_min_sid; + return {}; + } + else + { + return {parent, subtree_min_sid}; + } +} + +DT_TEMPLATE +template +typename DT_CLASS::InternPtr DT_CLASS::afterNodeUpdated(T * node) +{ + if (!node) + return {}; + + constexpr bool is_leaf = std::is_same::value; + + if (root == asNode(node) && !isLeaf(root) && node->count == 1) + { + /// Decrease tree height. + root = as(Intern, root)->children[0]; + + --(node->count); + freeNode(node); + + if (isLeaf(root)) + as(Leaf, root)->parent = nullptr; + else + as(Intern, root)->parent = nullptr; + --height; + + LOG_TRACE(log, "height " + toString(height + 1) + " -> " + toString(height)); + + return {}; + } + + auto parent = node->parent; + bool parent_updated = false; + + if (T::overflow(node->count)) // split + { + if (!parent) + { + /// Increase tree height. + parent = createNode(); + root = asNode(parent); + + parent->deltas[0] = node->getDelta(); + parent->children[0] = asNode(node); + ++(parent->count); + parent->refreshChildParent(); + + ++height; + + LOG_TRACE(log, "height " + toString(height - 1) + " -> " + toString(height)); + } + + auto pos = parent->searchChild(asNode(node)); + + T * next_n = createNode(); + + UInt64 sep_sid = node->split(next_n); + + // handle parent update + parent->shiftEntries(pos + 1, 1); + // for current node + parent->deltas[pos] = node->getDelta(); + // for next node + parent->sids[pos] = sep_sid; + parent->deltas[pos + 1] = next_n->getDelta(); + parent->children[pos + 1] = asNode(next_n); + + ++(parent->count); + + if constexpr (is_leaf) + { + if (as(Leaf, node) == right_leaf) + right_leaf = as(Leaf, next_n); + } + + parent_updated = true; + + // LOG_TRACE(log, nodeName(node) << " split"); + } + else if (T::underflow(node->count) && root != asNode(node)) // adopt or merge + { + auto pos = parent->searchChild(asNode(node)); + + // currently we always adopt from the right one if possible + bool is_sibling_left; + size_t sibling_pos; + T * sibling; + + if (unlikely(parent->count <= 1)) + throw Exception("Unexpected parent entry count: " + toString(parent->count)); + + if (pos == parent->count - 1) + { + is_sibling_left = true; + sibling_pos = pos - 1; + sibling = as(T, parent->children[sibling_pos]); + } + else + { + is_sibling_left = false; + sibling_pos = pos + 1; + sibling = as(T, parent->children[sibling_pos]); + } + + auto after_adopt = (node->count + sibling->count) / 2; + if (T::underflow(after_adopt)) + { + // Do merge. + // adoption won't work because the sibling doesn't have enough entries. + + node->merge(sibling, is_sibling_left, pos); + freeNode(sibling); + + pos = std::min(pos, sibling_pos); + parent->deltas[pos] = node->getDelta(); + parent->children[pos] = asNode(node); + parent->shiftEntries(pos + 2, -1); + + if constexpr (is_leaf) + { + if (is_sibling_left && (as(Leaf, sibling) == left_leaf)) + left_leaf = as(Leaf, node); + else if (!is_sibling_left && as(Leaf, sibling) == right_leaf) + right_leaf = as(Leaf, node); + } + --(parent->count); + + // LOG_TRACE(log, nodeName(node) << " merge"); + } + else + { + // Do adoption. + + auto adopt_count = after_adopt - node->count; + auto new_sep_sid = node->adopt(sibling, is_sibling_left, adopt_count, pos); + + parent->sids[std::min(pos, sibling_pos)] = new_sep_sid; + parent->deltas[pos] = node->getDelta(); + parent->deltas[sibling_pos] = sibling->getDelta(); + + // LOG_TRACE(log, nodeName(node) << " adoption"); + } + + parent_updated = true; + } + else if (parent) + { + auto pos = parent->searchChild(asNode(node)); + auto delta = node->getDelta(); + parent_updated = parent->deltas[pos] != delta; + parent->deltas[pos] = delta; + } + + if (parent_updated) + return parent; + else + return {}; +} + +#undef as +#undef asNode +#undef isLeaf +#undef nodeName + +#undef DT_TEMPLATE +#undef DT_CLASS + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/Tablet.h b/dbms/src/Storages/DeltaMerge/Tablet.h new file mode 100644 index 00000000000..29e343f9b6b --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Tablet.h @@ -0,0 +1,13 @@ +#pragma once + +#include + +namespace DB +{ +class Tablet +{ + + + +}; +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/Tuple.h b/dbms/src/Storages/DeltaMerge/Tuple.h new file mode 100644 index 00000000000..eb09da9e262 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Tuple.h @@ -0,0 +1,36 @@ +#pragma once + +#include + +#include +#include + +namespace DB +{ + +struct ColumnAndValue +{ + UInt16 column; + UInt64 value; + + ColumnAndValue(UInt16 column_, UInt64 value_) : column(column_), value(value_) {} +}; + +using ColumnAndValues = std::vector; + +/// A tuple referenced to columns. +struct RefTuple +{ + ColumnAndValues values; + + RefTuple(UInt16 column, UInt64 value) : values{ColumnAndValue(column, value)} {} + + RefTuple(const ColumnAndValues & values_) : values(values_) + { + std::sort(values.begin(), values.end(), [](const ColumnAndValue & a, const ColumnAndValue & b) { return a.column < b.column; }); + } +}; + +using RefTuples = std::vector; + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/ValueSpace.h b/dbms/src/Storages/DeltaMerge/ValueSpace.h new file mode 100644 index 00000000000..c7d2cacc80e --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/ValueSpace.h @@ -0,0 +1,271 @@ +#pragma once + +#include +#include + +#include + +#include +#include +#include + +#include +#include + +namespace DB +{ + +/// Store the tuple values of delta tree. +/// Garbage collection is poorly supported. +class MemoryValueSpace +{ + using ValueSpace = MemoryValueSpace; + +public: + MemoryValueSpace(String name, const NamesAndTypesList & name_types, const SortDescription & sort_desc); + + ~MemoryValueSpace() + { + LOG_TRACE(log, "MM free"); + } + + Ids addFromInsert(const Block & block); + RefTuples addFromModify(const Block & block); + + void removeFromInsert(UInt64 id); + void removeFromModify(UInt64 id, size_t column_id); + + UInt64 withModify(UInt64 old_tuple_id, const ValueSpace & modify_value_space, const RefTuple & tuple); + + void gc(); + + struct SplitColumn + { + static constexpr size_t SPLIT_SIZE = 65536; + + using DeleteMark = std::vector; + using DeleteMarks = std::vector; + + String name; + DataTypePtr type; + MutableColumns columns; + DeleteMarks delete_marks; + + SplitColumn(const String & name_, const DataTypePtr type_) : name(name_), type(type_) + { + columns.push_back(type->createColumn()); + delete_marks.emplace_back(); + } + + void append(const IColumn & src) + { + size_t remaining = src.size(); + while (remaining) + { + auto & last = columns.back(); + auto & last_mark = delete_marks.back(); + + auto append = std::min(SPLIT_SIZE - last->size(), remaining); + + last->insertRangeFrom(src, src.size() - remaining, append); + for (size_t i = 0; i < append; ++i) + last_mark.push_back(false); + + remaining -= append; + + if (last->size() >= SPLIT_SIZE) + { + columns.push_back(last->cloneEmpty()); + delete_marks.emplace_back(); + } + } + } + + void append(const IColumn & src, size_t n) + { + auto & last = columns.back(); + auto & last_mark = delete_marks.back(); + + last->insertFrom(src, n); + last_mark.push_back(false); + + if (last->size() >= SPLIT_SIZE) + { + columns.push_back(last->cloneEmpty()); + delete_marks.emplace_back(); + } + } + + void append(const SplitColumn & split_column, size_t id) + { + const auto & column = split_column.columns[id / SPLIT_SIZE]; + append(*column, id % SPLIT_SIZE); + } + + void remove(UInt64 id) + { + auto & mark = delete_marks[id / SPLIT_SIZE]; + mark[id % SPLIT_SIZE] = true; + } + + void gc() + { + /// Don't gc the last one + for (size_t i = 0; i < delete_marks.size() - 1; ++i) + { + auto & mark = delete_marks[i]; + if (!mark.size()) + continue; + bool has_deleted = true; + for (bool v : mark) + has_deleted &= v; + if (has_deleted) + { + DeleteMark empty_mark; + mark.swap(empty_mark); + columns[i] = type->createColumn(); + } + } + } + + IColumn & chunk(UInt64 id) const { return *(columns[id / SPLIT_SIZE]); } + UInt64 offsetInChunk(UInt64 id) const { return id % SPLIT_SIZE; } + + size_t size() const { return (columns.size() - 1) * SPLIT_SIZE + columns.back()->size(); } + }; + + const SplitColumn & columnAt(size_t column_id) const { return split_columns[column_id]; } + const NamesAndTypes & namesAndTypes() const { return names_and_types; } + +private: + NamesAndTypes names_and_types; + std::vector split_columns; + std::set sort_column_names; + size_t num_columns; + + Logger * log; +}; + +MemoryValueSpace::MemoryValueSpace(String name, const NamesAndTypesList & name_types, const SortDescription & sort_desc) + : log(&Logger::get("MemoryValueSpace(" + name + ")")) +{ + + for (const auto & nt : name_types) + { + names_and_types.emplace_back(nt.name, nt.type); + split_columns.emplace_back(nt.name, nt.type); + } + for (const auto & desc : sort_desc) + sort_column_names.insert(desc.column_name); + + num_columns = split_columns.size(); + + LOG_TRACE(log, "MM create"); +} + +Ids MemoryValueSpace::addFromInsert(const Block & block) +{ + if (unlikely(block.columns() < split_columns.size())) + throw Exception("Not enough columns!"); + + Ids ids; + UInt64 id = split_columns.front().size(); + for (size_t i = 0; i < block.rows(); ++i) + ids.push_back(id++); + for (auto & split_column : split_columns) + split_column.append(*block.getByName(split_column.name).column); + + return ids; +} + +RefTuples MemoryValueSpace::addFromModify(const Block & block) +{ + if (unlikely(block.columns() < split_columns.size())) + throw Exception("Not enough columns!"); + + RefTuples tuples; + auto rows = block.rows(); + + std::vector> idmap; + for (size_t column_id = 0; column_id < split_columns.size(); ++column_id) + { + auto & split_column = split_columns[column_id]; + auto offset = split_column.size(); + if (sort_column_names.find(split_column.name) == sort_column_names.end() && block.has(split_column.name)) + { + split_column.append(*block.getByName(split_column.name).column); + + idmap.emplace_back(); + auto & ids = idmap[column_id]; + for (size_t row_id = 0; row_id < rows; ++row_id) + ids.push_back(offset++); + } + else + { + idmap.emplace_back(rows, INVALID_ID); + } + } + + for (size_t row_id = 0; row_id < rows; ++row_id) + { + ColumnAndValues values; + for (size_t column_id = 0; column_id < split_columns.size(); ++column_id) + { + auto value_id = idmap[column_id][row_id]; + if (value_id != INVALID_ID) + values.emplace_back(column_id, value_id); + } + tuples.emplace_back(values); + } + + return tuples; +} + +void MemoryValueSpace::removeFromInsert(UInt64 id) +{ + for (size_t i = 0; i < num_columns; ++i) + split_columns[i].remove(id); +} + +void MemoryValueSpace::removeFromModify(UInt64 id, size_t column_id) +{ + split_columns[column_id].remove(id); +} + +UInt64 MemoryValueSpace::withModify(UInt64 old_tuple_id, const ValueSpace & modify_value_space, const RefTuple & tuple) +{ + // TODO improvement: in-place update for fixed size type, like numbers. + for (size_t column_id = 0; column_id < split_columns.size(); ++column_id) + { + auto & split_column = split_columns[column_id]; + size_t new_value_id = INVALID_ID; + for (const auto & cv : tuple.values) + { + if (cv.column == column_id) + { + new_value_id = cv.value; + break; + } + } + if (new_value_id != INVALID_ID) + { + split_column.append(modify_value_space.split_columns[column_id], new_value_id); + } + else + { + split_column.append(this->split_columns[column_id], old_tuple_id); + } + } + + removeFromInsert(old_tuple_id); + + return split_columns.front().size() - 1; +} + +void MemoryValueSpace::gc() +{ + for (auto & sc : split_columns) + sc.gc(); +} + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/tests/CMakeLists.txt b/dbms/src/Storages/DeltaMerge/tests/CMakeLists.txt new file mode 100644 index 00000000000..7e9af1fcb2b --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/tests/CMakeLists.txt @@ -0,0 +1,4 @@ +include_directories (${CMAKE_CURRENT_BINARY_DIR}) + +add_executable (delta_tree delta_tree.cpp) +target_link_libraries (delta_tree dbms) diff --git a/dbms/src/Storages/DeltaMerge/tests/delta_tree.cpp b/dbms/src/Storages/DeltaMerge/tests/delta_tree.cpp new file mode 100644 index 00000000000..7d38312f27c --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/tests/delta_tree.cpp @@ -0,0 +1,192 @@ +#include +#include + +#include +#include + +#include +#include + +#define psizeof(M) std::cout << "sizeof(" #M "): " << sizeof(M) << std::endl +#define print(M) std::cout << "" #M ": " << M << std::endl + +using namespace DB; + + +class FakeValueSpace; +using MyDeltaTree = DeltaTree; + +class FakeValueSpace +{ + using ValueSpace = FakeValueSpace; + +public: + void removeFromInsert(UInt64 id) + { + // + std::cout << "remove : " << id << std::endl; + } + + void removeFromModify(UInt64 id, size_t column_id) + { + // + std::cout << "remove : " << id << ", column:" << column_id << std::endl; + } + + UInt64 withModify(UInt64 old_tuple_id, const ValueSpace & /*modify_value_space*/, const RefTuple & tuple) + { + std::cout << "withModify, old_tuple_id:" << old_tuple_id << ", modifies:["; + for (const auto & m : tuple.values) + { + std::cout << m.column << ","; + } + std::cout << "]" << std::endl; + return old_tuple_id; + } +}; + +using FakeValueSpacePtr = std::shared_ptr; + +void print_sizes() +{ + psizeof(MyDeltaTree::Leaf); + psizeof(MyDeltaTree::Intern); + + psizeof(DefaultDeltaTree::Leaf); + psizeof(DefaultDeltaTree::Intern); +} + +void printTree(MyDeltaTree & tree) +{ + print(tree.getHeight()); + for (auto it = tree.begin(), end = tree.end(); it != end; ++it) + { + std::cout << "(" << it.getRid() << "|" << it.getSid() << "|" << DTTypeString(it.getMutation().type) << "|" + << DB::toString(it.getMutation().value) << "),"; + } + std::cout << std::endl; +} + +void insertTest(MyDeltaTree & tree) +{ + for (int i = 0; i < 100; ++i) + { + tree.addInsert(i, i); + tree.checkAll(); + } + + std::cout << "a====\n"; + printTree(tree); + + for (int i = 0; i < 100; ++i) + { + tree.addDelete(0); + tree.checkAll(); + } + + std::cout << "b====\n"; + printTree(tree); + + for (int i = 0; i < 100; ++i) + { + tree.addInsert(i, i); + tree.checkAll(); + } + + std::cout << "1111====\n"; + printTree(tree); + + for (int i = 0; i < 100; ++i) + { + tree.addDelete(0); + tree.checkAll(); + } + + std::cout << "c====\n"; + printTree(tree); + + tree.addModify(8, 0, 8); + tree.addModify(8, 1, 8); + tree.addModify(8, 0, 8); + + std::cout << "d====\n"; + printTree(tree); + + tree.addModify(97, 0, 97); + tree.addModify(97, 0, 97); + tree.addModify(97, 8, 97); + tree.addModify(97, 1, 97); + + std::cout << "e====\n"; + printTree(tree); + + for (int i = 5; i <= 8; ++i) + { + tree.addDelete(i); + tree.checkAll(); + } + + std::cout << "f====\n"; + printTree(tree); + + for (int i = 5; i <= 8; ++i) + { + tree.addModify(i, 1, i); + tree.checkAll(); + } + + std::cout << "g====\n"; + printTree(tree); + + tree.addDelete(30); + tree.addDelete(30); + tree.addDelete(30); + tree.addDelete(63); + tree.addDelete(64); + tree.addDelete(64); + tree.addDelete(30); + tree.addDelete(30); + tree.addDelete(0); + tree.addDelete(0); + tree.addDelete(0); + tree.addDelete(0); + tree.addDelete(1); + tree.addDelete(64); + + std::cout << "g111====\n"; + printTree(tree); + + for (int i = 0; i < 5; ++i) + { + tree.addInsert(i, i); + } + + std::cout << "h====\n"; + printTree(tree); +} + +int main(int, char **) +{ + print_sizes(); + FakeValueSpacePtr insert_vs = std::make_shared(); + FakeValueSpacePtr modify_vs = std::make_shared(); + MyDeltaTree delta_tree(insert_vs, modify_vs); + try + { + insertTest(delta_tree); + } + catch (const DB::Exception & ex) + { + std::cout << "Caught exception " << ex.displayText() << "\n"; + } + catch (const std::exception & ex) + { + std::cout << "Caught exception " << ex.what() << "\n"; + } + catch (...) + { + std::cout << "Caught unhandled exception\n"; + } + + return 0; +} diff --git a/dbms/src/Storages/IStorage.h b/dbms/src/Storages/IStorage.h index 579822f88e3..d8b5892ba97 100644 --- a/dbms/src/Storages/IStorage.h +++ b/dbms/src/Storages/IStorage.h @@ -103,6 +103,9 @@ class IStorage : public std::enable_shared_from_this, private boost::n /** Returns true if the storage replicates SELECT, INSERT and ALTER commands among replicas. */ virtual bool supportsReplication() const { return false; } + /** Returns true if the storage supports UPSERT, DELETE or UPDATE. */ + virtual bool supportsModification() const { return false; } + /** Does not allow you to change the structure or name of the table. * If you change the data in the table, you will need to specify will_modify_data = true. * This will take an extra lock that does not allow starting ALTER MODIFY. diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp new file mode 100644 index 00000000000..1af16707c9f --- /dev/null +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -0,0 +1,273 @@ +#include + +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include + +#include +#include + +#include + +#include +#include +#include +#include +#include +#include + +namespace DB +{ +StorageDeltaMerge::StorageDeltaMerge(const std::string & path_, + const std::string & name_, + const ColumnsDescription & columns_, + const ASTPtr & primary_expr_ast_, + bool attach, + size_t max_compress_block_size_) + : IStorage{columns_} + , path(path_) + , name(name_) + , max_compress_block_size(max_compress_block_size_) + , stable_storage(StorageTinyLog::create(path_, name_, columns_, attach, max_compress_block_size_)) + , log(&Logger::get("StorageDeltaMerge")) +{ + primary_sort_descr.reserve(primary_expr_ast_->children.size()); + for (const ASTPtr & ast : primary_expr_ast_->children) + { + String name = ast->getColumnName(); + primary_sort_descr.emplace_back(name, 1, 1); + } + + initDelta(); +} + +void StorageDeltaMerge::initDelta() +{ + insert_value_space = std::make_shared(name, stable_storage->getColumns().getAllPhysical(), primary_sort_descr); + modify_value_space = std::make_shared(name, stable_storage->getColumns().getAllPhysical(), primary_sort_descr); + delta_tree = std::make_shared(insert_value_space, modify_value_space); + + // Force tcmalloc to return memory back to system. + // https://internal.pingcap.net/jira/browse/FLASH-41 + MallocExtension::instance()->ReleaseFreeMemory(); +} + +BlockInputStreams StorageDeltaMerge::read( // + const Names & column_names, + const SelectQueryInfo & /*query_info*/, + const Context & context, + QueryProcessingStage::Enum & processed_stage, + size_t max_block_size, + unsigned /*num_streams*/) +{ + return read(column_names, processed_stage, max_block_size, context.getSettingsRef().max_read_buffer_size); +} + +BlockInputStreams StorageDeltaMerge::read( // + const Names & column_names, + QueryProcessingStage::Enum & processed_stage, + const size_t max_block_size, + const size_t max_read_buffer_size) +{ + auto stable_input_stream_ptr + = static_cast(*stable_storage).read(column_names, processed_stage, max_block_size, max_read_buffer_size).at(0); + auto delta_merge_stream_ptr = std::make_shared(stable_input_stream_ptr, delta_tree, max_block_size); + return {delta_merge_stream_ptr}; +} + +BlockOutputStreamPtr StorageDeltaMerge::write(const ASTPtr & query, const Settings & settings) +{ + const ASTInsertQuery * insert_query = typeid_cast(&*query); + const ASTDeleteQuery * delete_query = typeid_cast(&*query); + Action action; + if (insert_query) + action = insert_query->is_upsert ? Action::Upsert : Action::Insert; + else if (delete_query) + action = Action::Delete; + else + throw Exception("Should be insert or delete query."); + + Names primary_keys; + for (const auto & s : primary_sort_descr) + primary_keys.push_back(s.column_name); + auto max_block_size = settings.max_block_size; + + DeltaMergeBlockOutputStream::InputStreamCreator input_stream_creator = [this, primary_keys, max_block_size]() { + auto stage = QueryProcessingStage::Enum::FetchColumns; + return this->read(primary_keys, stage, max_block_size, DBMS_DEFAULT_BUFFER_SIZE).at(0); + }; + DeltaMergeBlockOutputStream::Flusher flusher = [this]() { this->flushDelta(); }; + + return std::make_shared(input_stream_creator, delta_tree, action, primary_sort_descr, flusher, settings.delta_merge_size); +} + +void StorageDeltaMerge::flushDelta() +{ + // We deal with it later. + // auto lock = lockForAlter(__PRETTY_FUNCTION__); + + auto new_stable_storage = StorageTinyLog::create(path, name + "_flush_", getColumns(), false, max_compress_block_size); + auto ouput = new_stable_storage->write({}, {}); + auto header = ouput->getHeader(); + auto stage = QueryProcessingStage::Enum::FetchColumns; + auto input = this->read(header.getNames(), stage, DEFAULT_BLOCK_SIZE, DBMS_DEFAULT_BUFFER_SIZE).at(0); + + copyData(*input, *ouput, []() { return false; }); + + Poco::File(path + escapeForFileName(name)).remove(true); + + stable_storage = new_stable_storage; + stable_storage->rename(path, "", name); + initDelta(); +} + +UInt64 StorageDeltaMerge::stable_size() +{ + auto stage = QueryProcessingStage::Enum::FetchColumns; + /// TODO: get minimum column + auto stable_input_stream_ptr = static_cast(*stable_storage) + .read({getColumns().getNamesOfPhysical().at(0)}, stage, DEFAULT_BLOCK_SIZE, DBMS_DEFAULT_BUFFER_SIZE) + .at(0); + stable_input_stream_ptr->readPrefix(); + size_t stable_tuples = 0; + Block input_block = stable_input_stream_ptr->read(); + while (input_block) + { + stable_tuples += input_block.rows(); + input_block = stable_input_stream_ptr->read(); + } + stable_input_stream_ptr->readSuffix(); + return stable_tuples; +} + +std::tuple StorageDeltaMerge::delta_status() +{ + UInt64 entries = 0; + UInt64 inserts = 0; + UInt64 deletes = 0; + UInt64 modifies = 0; + auto it = delta_tree->begin(); + auto end_it = delta_tree->end(); + for (; it != end_it; ++it) + { + ++entries; + if (it.getType() == DT_INS) + ++inserts; + else if (it.getType() == DT_DEL) + deletes += it.getValue(); + else + ++modifies; + } + return {entries, inserts, deletes, modifies}; +} + +BlockInputStreamPtr StorageDeltaMerge::status() +{ + Block block; + + block.insert({std::make_shared(), "stable_tuples"}); + block.insert({std::make_shared(), "delta_entries"}); + block.insert({std::make_shared(), "delta_inserts"}); + block.insert({std::make_shared(), "delta_deletes"}); + block.insert({std::make_shared(), "delta_updates"}); + + auto columns = block.mutateColumns(); + auto dt_status = delta_status(); + columns[0]->insert(stable_size()); + columns[1]->insert(std::get<0>(dt_status)); + columns[2]->insert(std::get<1>(dt_status)); + columns[3]->insert(std::get<2>(dt_status)); + columns[4]->insert(std::get<3>(dt_status)); + + return std::make_shared(block); +} + +void StorageDeltaMerge::check() +{ + delta_tree->checkAll(); + + auto entry_it = delta_tree->begin(); + auto entry_end = delta_tree->end(); + if (entry_it == entry_end || entry_it.getType() != DT_INS) + return; + + Ids vs_column_offsets; + const auto & names_and_types = insert_value_space->namesAndTypes(); + for (const auto & c : primary_sort_descr) + { + for (size_t i = 0; i < names_and_types.size(); ++i) + { + if (c.column_name == names_and_types[i].name) + vs_column_offsets.push_back(i); + } + } + + auto last_entry = entry_it; + ++entry_it; + + for (; entry_it != entry_end; ++entry_it) + { + if (entry_it.getType() != DT_INS) + continue; + + auto res = compareTuple( + insert_value_space, entry_it.getValue(), insert_value_space, last_entry.getValue(), primary_sort_descr, vs_column_offsets); + if (res <= 0) + throw Exception("Algorithm broken: tuples should be ordered"); + last_entry = entry_it; + } +} + +static ASTPtr extractKeyExpressionList(IAST & node) +{ + const ASTFunction * expr_func = typeid_cast(&node); + + if (expr_func && expr_func->name == "tuple") + { + /// Primary key is specified in tuple. + return expr_func->children.at(0); + } + else + { + /// Primary key consists of one column. + auto res = std::make_shared(); + res->children.push_back(node.ptr()); + return res; + } +} + +namespace ErrorCodes +{ + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; +} + +void registerStorageDeltaMerge(StorageFactory & factory) +{ + factory.registerStorage("DeltaMerge", [](const StorageFactory::Arguments & args) { + if (args.engine_args.size() > 1) + throw Exception("Engine DeltaMerge expects only one parameter. e.g. engine = DeltaMerge((a, b))"); + if (args.engine_args.size() < 1) + throw Exception( + "Engine DeltaMerge needs primary key. e.g. engine = DeltaMerge((a, b))", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + ASTPtr primary_expr_list = extractKeyExpressionList(*args.engine_args[0]); + return StorageDeltaMerge::create(args.data_path, + args.table_name, + args.columns, + primary_expr_list, + args.attach, + args.context.getSettings().max_compress_block_size); + }); +} + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/StorageDeltaMerge.h b/dbms/src/Storages/StorageDeltaMerge.h new file mode 100644 index 00000000000..b559280d92e --- /dev/null +++ b/dbms/src/Storages/StorageDeltaMerge.h @@ -0,0 +1,100 @@ +#pragma once + +#include +#include + +#include + +#include +#include +#include +#include +#include + +#include + +namespace DB +{ +class StorageDeltaMerge : public ext::shared_ptr_helper, public IStorage +{ +public: + bool supportsModification() const override + { + return true; + } + + String getName() const override + { + return "DeltaMerge"; + } + String getTableName() const override + { + return name; + } + + bool checkData() const override + { + return stable_storage->checkData(); + }; + + String getDataPath() const override + { + return stable_storage->getDataPath(); + } + + void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override + { + name = new_table_name; + stable_storage->rename(new_path_to_db, new_database_name, new_table_name); + } + + BlockInputStreams read(const Names & column_names, + const SelectQueryInfo & query_info, + const Context & context, + QueryProcessingStage::Enum & processed_stage, + size_t max_block_size, + unsigned num_streams) override; + + BlockInputStreams read(const Names & column_names, + QueryProcessingStage::Enum & processed_stage, + const size_t max_block_size, + const size_t max_read_buffer_size); + + BlockOutputStreamPtr write(const ASTPtr & query, const Settings & settings) override; + + UInt64 stable_size(); + + /// entries, inserts, deletes, modifies + std::tuple delta_status(); + + void flushDelta(); + + BlockInputStreamPtr status(); + + void initDelta(); + + void check(); + + StorageDeltaMerge(const std::string & path_, + const std::string & name_, + const ColumnsDescription & columns_, + const ASTPtr & primary_expr_ast_, + bool attach, + size_t max_compress_block_size_); + +private: + String path; + String name; + size_t max_compress_block_size; + + StoragePtr stable_storage; + SortDescription primary_sort_descr; + + DeltaTreePtr delta_tree; + ValueSpacePtr insert_value_space; + ValueSpacePtr modify_value_space; + + Logger * log; +}; + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/StorageMergeTree.h b/dbms/src/Storages/StorageMergeTree.h index 6cdfe9ea03d..e93792da9a6 100644 --- a/dbms/src/Storages/StorageMergeTree.h +++ b/dbms/src/Storages/StorageMergeTree.h @@ -42,6 +42,7 @@ class StorageMergeTree : public ext::shared_ptr_helper, public bool supportsPrewhere() const override { return data.supportsPrewhere(); } bool supportsFinal() const override { return data.supportsFinal(); } bool supportsIndexForIn() const override { return true; } + bool supportsModification() const override { return data.merging_params.mode == MergeTreeData::MergingParams::Mode::Mutable || data.merging_params.mode == MergeTreeData::MergingParams::Mode::Txn; } bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand) const override { return data.mayBenefitFromIndexForIn(left_in_operand); } const ColumnsDescription & getColumns() const override { return data.getColumns(); } diff --git a/dbms/src/Storages/StorageTinyLog.cpp b/dbms/src/Storages/StorageTinyLog.cpp index cb62c911e63..09d2f091c9e 100644 --- a/dbms/src/Storages/StorageTinyLog.cpp +++ b/dbms/src/Storages/StorageTinyLog.cpp @@ -171,7 +171,14 @@ Block TinyLogBlockInputStream::readImpl() { /// if there are no files in the folder, it means that the table is empty - if (Poco::DirectoryIterator(storage.full_path()) == Poco::DirectoryIterator()) + auto it = Poco::DirectoryIterator(storage.full_path()); + size_t count = 0; + for (;it != Poco::DirectoryIterator(); ++it) + { + if (it.name() != "sizes.json") + ++ count; + } + if (!count) return res; } @@ -336,7 +343,6 @@ void StorageTinyLog::rename(const String & new_path_to_db, const String & /*new_ it->second.data_file = Poco::File(path + escapeForFileName(name) + '/' + Poco::Path(it->second.data_file.path()).getFileName()); } - BlockInputStreams StorageTinyLog::read( const Names & column_names, const SelectQueryInfo & /*query_info*/, @@ -344,14 +350,22 @@ BlockInputStreams StorageTinyLog::read( QueryProcessingStage::Enum & processed_stage, const size_t max_block_size, const unsigned /*num_streams*/) +{ + return read(column_names, processed_stage, max_block_size, context.getSettingsRef().max_read_buffer_size); +} + +BlockInputStreams StorageTinyLog::read( + const Names & column_names, + QueryProcessingStage::Enum & processed_stage, + const size_t max_block_size, + const size_t max_read_buffer_size) { check(column_names); processed_stage = QueryProcessingStage::FetchColumns; - return BlockInputStreams(1, std::make_shared( - max_block_size, Nested::collect(getColumns().getAllPhysical().addTypes(column_names)), *this, context.getSettingsRef().max_read_buffer_size)); + return {std::make_shared( + max_block_size, Nested::collect(getColumns().getAllPhysical().addTypes(column_names)), *this, max_read_buffer_size)}; } - BlockOutputStreamPtr StorageTinyLog::write( const ASTPtr & /*query*/, const Settings & /*settings*/) { diff --git a/dbms/src/Storages/StorageTinyLog.h b/dbms/src/Storages/StorageTinyLog.h index acaf7944f30..bd5a2924f40 100644 --- a/dbms/src/Storages/StorageTinyLog.h +++ b/dbms/src/Storages/StorageTinyLog.h @@ -35,6 +35,12 @@ friend class TinyLogBlockOutputStream; size_t max_block_size, unsigned num_streams) override; + BlockInputStreams read( + const Names & column_names, + QueryProcessingStage::Enum & processed_stage, + const size_t max_block_size, + const size_t max_read_buffer_size); + BlockOutputStreamPtr write(const ASTPtr & query, const Settings & settings) override; void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override; diff --git a/dbms/src/Storages/registerStorages.cpp b/dbms/src/Storages/registerStorages.cpp index 6f140d92562..42563a29491 100644 --- a/dbms/src/Storages/registerStorages.cpp +++ b/dbms/src/Storages/registerStorages.cpp @@ -9,6 +9,7 @@ namespace DB void registerStorageLog(StorageFactory & factory); void registerStorageTinyLog(StorageFactory & factory); +void registerStorageDeltaMerge(StorageFactory & factory); void registerStorageStripeLog(StorageFactory & factory); void registerStorageMergeTree(StorageFactory & factory); void registerStorageNull(StorageFactory & factory);