From c268a8e74142e1b6755f7126508d9944e20b174e Mon Sep 17 00:00:00 2001 From: lidezhu <47731263+lidezhu@users.noreply.github.com> Date: Thu, 4 Apr 2019 16:19:11 +0800 Subject: [PATCH] add delta merge engine test (#18) --- .../DeltaMerge/DeltaMergeBlockInputStream.h | 2 +- .../DeltaMerge/DeltaMergeBlockOutputStream.h | 3 + dbms/src/Storages/DeltaMerge/ValueSpace.cpp | 129 +++ dbms/src/Storages/DeltaMerge/ValueSpace.h | 127 +-- .../Storages/DeltaMerge/tests/CMakeLists.txt | 4 + .../tests/delta_merge_block_stream.cpp | 845 ++++++++++++++++++ .../DeltaMerge/tests/delta_merge_storage.cpp | 119 +++ .../Storages/DeltaMerge/tests/delta_tree.cpp | 277 +++++- 8 files changed, 1374 insertions(+), 132 deletions(-) create mode 100644 dbms/src/Storages/DeltaMerge/ValueSpace.cpp create mode 100644 dbms/src/Storages/DeltaMerge/tests/delta_merge_block_stream.cpp create mode 100644 dbms/src/Storages/DeltaMerge/tests/delta_merge_storage.cpp diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeBlockInputStream.h b/dbms/src/Storages/DeltaMerge/DeltaMergeBlockInputStream.h index 898edd5bcfd..a255974963f 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeBlockInputStream.h @@ -223,7 +223,7 @@ class DeltaMergeBlockInputStream final : public IProfilingBlockInputStream for (size_t column_id = 0; column_id < num_columns; ++column_id) { auto offset = vs_column_offsets[column_id]; - if (has_modifies[offset] != INVALID_ID) + if (has_modifies[offset] == INVALID_ID) { output_columns[column_id]->insertFrom(*stable_block_columns[column_id], stable_block_pos); } diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeBlockOutputStream.h b/dbms/src/Storages/DeltaMerge/DeltaMergeBlockOutputStream.h index 220d8ca13ad..1c18f86e2b7 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeBlockOutputStream.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeBlockOutputStream.h @@ -247,7 +247,10 @@ struct RidGenerator if (modify_block_dup_next[modify_block_pos++]) return rid; else + { + ++stable_block_pos; return rid++; + } } else if (res > 0) { diff --git a/dbms/src/Storages/DeltaMerge/ValueSpace.cpp b/dbms/src/Storages/DeltaMerge/ValueSpace.cpp new file mode 100644 index 00000000000..4806dd80cf4 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/ValueSpace.cpp @@ -0,0 +1,129 @@ +#include + +#include + +namespace DB +{ +MemoryValueSpace::MemoryValueSpace(String name, const NamesAndTypesList & name_types, const SortDescription & sort_desc) + : log(&Logger::get("MemoryValueSpace(" + name + ")")) +{ + + for (const auto & nt : name_types) + { + names_and_types.emplace_back(nt.name, nt.type); + split_columns.emplace_back(nt.name, nt.type); + } + for (const auto & desc : sort_desc) + sort_column_names.insert(desc.column_name); + + num_columns = split_columns.size(); + + LOG_TRACE(log, "MM create"); +} + +Ids MemoryValueSpace::addFromInsert(const Block & block) +{ + if (unlikely(block.columns() < split_columns.size())) + throw Exception("Not enough columns!"); + + Ids ids; + UInt64 id = split_columns.front().size(); + for (size_t i = 0; i < block.rows(); ++i) + ids.push_back(id++); + for (auto & split_column : split_columns) + split_column.append(*block.getByName(split_column.name).column); + + return ids; +} + +RefTuples MemoryValueSpace::addFromModify(const Block & block) +{ + if (unlikely(block.columns() < split_columns.size())) + throw Exception("Not enough columns!"); + + RefTuples tuples; + auto rows = block.rows(); + + std::vector> idmap; + for (size_t column_id = 0; column_id < split_columns.size(); ++column_id) + { + auto & split_column = split_columns[column_id]; + auto offset = split_column.size(); + if (sort_column_names.find(split_column.name) == sort_column_names.end() && block.has(split_column.name)) + { + split_column.append(*block.getByName(split_column.name).column); + + idmap.emplace_back(); + auto & ids = idmap[column_id]; + for (size_t row_id = 0; row_id < rows; ++row_id) + ids.push_back(offset++); + } + else + { + idmap.emplace_back(rows, INVALID_ID); + } + } + + for (size_t row_id = 0; row_id < rows; ++row_id) + { + ColumnAndValues values; + for (size_t column_id = 0; column_id < split_columns.size(); ++column_id) + { + auto value_id = idmap[column_id][row_id]; + if (value_id != INVALID_ID) + values.emplace_back(column_id, value_id); + } + tuples.emplace_back(values); + } + + return tuples; +} + +void MemoryValueSpace::removeFromInsert(UInt64 id) +{ + for (size_t i = 0; i < num_columns; ++i) + split_columns[i].remove(id); +} + +void MemoryValueSpace::removeFromModify(UInt64 id, size_t column_id) +{ + split_columns[column_id].remove(id); +} + +UInt64 MemoryValueSpace::withModify(UInt64 old_tuple_id, const ValueSpace & modify_value_space, const RefTuple & tuple) +{ + // TODO improvement: in-place update for fixed size type, like numbers. + for (size_t column_id = 0; column_id < split_columns.size(); ++column_id) + { + auto & split_column = split_columns[column_id]; + size_t new_value_id = INVALID_ID; + for (const auto & cv : tuple.values) + { + if (cv.column == column_id) + { + new_value_id = cv.value; + break; + } + } + if (new_value_id != INVALID_ID) + { + split_column.append(modify_value_space.split_columns[column_id], new_value_id); + } + else + { + split_column.append(this->split_columns[column_id], old_tuple_id); + } + } + + removeFromInsert(old_tuple_id); + + return split_columns.front().size() - 1; +} + +void MemoryValueSpace::gc() +{ + for (auto & sc : split_columns) + sc.gc(); +} + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/ValueSpace.h b/dbms/src/Storages/DeltaMerge/ValueSpace.h index c7d2cacc80e..8965e8a405d 100644 --- a/dbms/src/Storages/DeltaMerge/ValueSpace.h +++ b/dbms/src/Storages/DeltaMerge/ValueSpace.h @@ -24,10 +24,7 @@ class MemoryValueSpace public: MemoryValueSpace(String name, const NamesAndTypesList & name_types, const SortDescription & sort_desc); - ~MemoryValueSpace() - { - LOG_TRACE(log, "MM free"); - } + ~MemoryValueSpace() { LOG_TRACE(log, "MM free"); } Ids addFromInsert(const Block & block); RefTuples addFromModify(const Block & block); @@ -146,126 +143,4 @@ class MemoryValueSpace Logger * log; }; -MemoryValueSpace::MemoryValueSpace(String name, const NamesAndTypesList & name_types, const SortDescription & sort_desc) - : log(&Logger::get("MemoryValueSpace(" + name + ")")) -{ - - for (const auto & nt : name_types) - { - names_and_types.emplace_back(nt.name, nt.type); - split_columns.emplace_back(nt.name, nt.type); - } - for (const auto & desc : sort_desc) - sort_column_names.insert(desc.column_name); - - num_columns = split_columns.size(); - - LOG_TRACE(log, "MM create"); -} - -Ids MemoryValueSpace::addFromInsert(const Block & block) -{ - if (unlikely(block.columns() < split_columns.size())) - throw Exception("Not enough columns!"); - - Ids ids; - UInt64 id = split_columns.front().size(); - for (size_t i = 0; i < block.rows(); ++i) - ids.push_back(id++); - for (auto & split_column : split_columns) - split_column.append(*block.getByName(split_column.name).column); - - return ids; -} - -RefTuples MemoryValueSpace::addFromModify(const Block & block) -{ - if (unlikely(block.columns() < split_columns.size())) - throw Exception("Not enough columns!"); - - RefTuples tuples; - auto rows = block.rows(); - - std::vector> idmap; - for (size_t column_id = 0; column_id < split_columns.size(); ++column_id) - { - auto & split_column = split_columns[column_id]; - auto offset = split_column.size(); - if (sort_column_names.find(split_column.name) == sort_column_names.end() && block.has(split_column.name)) - { - split_column.append(*block.getByName(split_column.name).column); - - idmap.emplace_back(); - auto & ids = idmap[column_id]; - for (size_t row_id = 0; row_id < rows; ++row_id) - ids.push_back(offset++); - } - else - { - idmap.emplace_back(rows, INVALID_ID); - } - } - - for (size_t row_id = 0; row_id < rows; ++row_id) - { - ColumnAndValues values; - for (size_t column_id = 0; column_id < split_columns.size(); ++column_id) - { - auto value_id = idmap[column_id][row_id]; - if (value_id != INVALID_ID) - values.emplace_back(column_id, value_id); - } - tuples.emplace_back(values); - } - - return tuples; -} - -void MemoryValueSpace::removeFromInsert(UInt64 id) -{ - for (size_t i = 0; i < num_columns; ++i) - split_columns[i].remove(id); -} - -void MemoryValueSpace::removeFromModify(UInt64 id, size_t column_id) -{ - split_columns[column_id].remove(id); -} - -UInt64 MemoryValueSpace::withModify(UInt64 old_tuple_id, const ValueSpace & modify_value_space, const RefTuple & tuple) -{ - // TODO improvement: in-place update for fixed size type, like numbers. - for (size_t column_id = 0; column_id < split_columns.size(); ++column_id) - { - auto & split_column = split_columns[column_id]; - size_t new_value_id = INVALID_ID; - for (const auto & cv : tuple.values) - { - if (cv.column == column_id) - { - new_value_id = cv.value; - break; - } - } - if (new_value_id != INVALID_ID) - { - split_column.append(modify_value_space.split_columns[column_id], new_value_id); - } - else - { - split_column.append(this->split_columns[column_id], old_tuple_id); - } - } - - removeFromInsert(old_tuple_id); - - return split_columns.front().size() - 1; -} - -void MemoryValueSpace::gc() -{ - for (auto & sc : split_columns) - sc.gc(); -} - } // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/tests/CMakeLists.txt b/dbms/src/Storages/DeltaMerge/tests/CMakeLists.txt index 7e9af1fcb2b..64235e768e1 100644 --- a/dbms/src/Storages/DeltaMerge/tests/CMakeLists.txt +++ b/dbms/src/Storages/DeltaMerge/tests/CMakeLists.txt @@ -1,4 +1,8 @@ include_directories (${CMAKE_CURRENT_BINARY_DIR}) add_executable (delta_tree delta_tree.cpp) +add_executable (delta_merge_block_stream delta_merge_block_stream.cpp) +add_executable (delta_merge_storage delta_merge_storage.cpp) target_link_libraries (delta_tree dbms) +target_link_libraries (delta_merge_block_stream dbms) +target_link_libraries (delta_merge_storage dbms) diff --git a/dbms/src/Storages/DeltaMerge/tests/delta_merge_block_stream.cpp b/dbms/src/Storages/DeltaMerge/tests/delta_merge_block_stream.cpp new file mode 100644 index 00000000000..e1fd8546817 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/tests/delta_merge_block_stream.cpp @@ -0,0 +1,845 @@ +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace DB; + +std::string treeToString(DeltaTreePtr tree) +{ + std::string result = ""; + std::string temp; + for (auto it = tree->begin(), end = tree->end(); it != end; ++it) + { + temp = ""; + temp += "("; + temp += std::to_string(it.getRid()); + temp += "|"; + temp += std::to_string(it.getSid()); + temp += "|"; + temp += DTTypeString(it.getMutation().type); + temp += "|"; + temp += DB::toString(it.getMutation().value); + temp += "),"; + result += temp; + } + return result; +} + +void deltaTreeInsertTest(Context context) +{ + Block sample; + + ColumnWithTypeAndName col1; + col1.name = "col1"; + col1.type = std::make_shared(); + IColumn::MutablePtr tempcol = col1.type->createColumn(); + + + for (int i = 0; i < 100; i++) + { + Field field = UInt64(i * 2); + tempcol->insert(field); + } + + col1.column = std::move(tempcol); + + sample.insert(col1); + + Block sample2; + + ColumnWithTypeAndName col2; + col2.name = "col1"; + col2.type = std::make_shared(); + IColumn::MutablePtr tempcol2 = col2.type->createColumn(); + + + for (int i = 0; i < 100; i++) + { + Field field = UInt64(i * 2 + 1); + tempcol2->insert(field); + } + + col2.column = std::move(tempcol2); + + sample2.insert(col2); + + + NamesAndTypesList names_and_types_list{ + {"col1", std::make_shared()}, + }; + + DataTypes data_types; + Names column_names; + + for (const auto & name_type : names_and_types_list) + { + data_types.push_back(name_type.type); + column_names.push_back(name_type.name); + } + + // create table + StoragePtr storage = StorageMemory::create("mytemptable", ColumnsDescription{names_and_types_list}); + storage->startup(); + + BlockOutputStreamPtr output = storage->write(ASTPtr(), context.getSettingsRef()); + + output->writePrefix(); + + output->write(sample); + + output->writeSuffix(); + + QueryProcessingStage::Enum stage; + + BlockInputStreamPtr in = storage->read(column_names, {}, context, stage, 8192, 1)[0]; + + ValueSpacePtr insert_value_space = std::make_shared("insert_value_space", names_and_types_list, SortDescription{}); + ValueSpacePtr modify_value_space = std::make_shared("modify_value_space", names_and_types_list, SortDescription{}); + DeltaTreePtr delta_tree = std::make_shared(insert_value_space, modify_value_space); + + Ids id_vec = insert_value_space->addFromInsert(sample2); + for (unsigned int i = 0; i < id_vec.size(); i++) + { + delta_tree->addInsert(2 * i + 1, id_vec[i]); + } + + DeltaMergeBlockInputStream dms(in, delta_tree, 8192); + + dms.readPrefix(); + + while (Block block = dms.read()) + { + for (auto iter = block.begin(); iter != block.end(); iter++) + { + auto c = iter->column; + for (unsigned int i = 0; i < c->size(); i++) + { + assert(c->getUInt(i) == i); + } + } + } + + dms.readSuffix(); +} + +void deltaTreeUpdateTest(Context context) +{ + Block sample; + + ColumnWithTypeAndName col1; + col1.name = "col1"; + col1.type = std::make_shared(); + IColumn::MutablePtr tempcol = col1.type->createColumn(); + + + for (int i = 0; i < 100; i++) + { + Field field = UInt64(i * 2); + tempcol->insert(field); + } + + col1.column = std::move(tempcol); + + sample.insert(col1); + + Block sample2; + + ColumnWithTypeAndName col2; + col2.name = "col1"; + col2.type = std::make_shared(); + IColumn::MutablePtr tempcol2 = col2.type->createColumn(); + + + for (int i = 0; i < 100; i++) + { + Field field = UInt64(i * 2 + 1); + tempcol2->insert(field); + } + + col2.column = std::move(tempcol2); + + sample2.insert(col2); + + + NamesAndTypesList names_and_types_list{ + {"col1", std::make_shared()}, + }; + + DataTypes data_types; + Names column_names; + + for (const auto & name_type : names_and_types_list) + { + data_types.push_back(name_type.type); + column_names.push_back(name_type.name); + } + + // create table + StoragePtr storage = StorageMemory::create("mytemptable", ColumnsDescription{names_and_types_list}); + storage->startup(); + + BlockOutputStreamPtr output = storage->write(ASTPtr(), context.getSettingsRef()); + + output->writePrefix(); + + output->write(sample); + + output->writeSuffix(); + + QueryProcessingStage::Enum stage; + + BlockInputStreamPtr in = storage->read(column_names, {}, context, stage, 8192, 1)[0]; + + ValueSpacePtr insert_value_space = std::make_shared("insert_value_space", names_and_types_list, SortDescription{}); + ValueSpacePtr modify_value_space = std::make_shared("modify_value_space", names_and_types_list, SortDescription{}); + DeltaTreePtr delta_tree = std::make_shared(insert_value_space, modify_value_space); + + RefTuples rts = insert_value_space->addFromModify(sample2); + for (unsigned int i = 0; i < rts.size(); i++) + { + delta_tree->addModify(i, rts[i]); + } + + //std::cout << treeToString(delta_tree) << std::endl; + + DeltaMergeBlockInputStream dms(in, delta_tree, 8192); + + dms.readPrefix(); + + while (Block block = dms.read()) + { + for (auto iter = block.begin(); iter != block.end(); iter++) + { + auto c = iter->column; + for (unsigned int i = 0; i < c->size(); i++) + { + //std::cout << c->getUInt(i) << std::endl; + assert(c->getUInt(i) == i * 2 + 1); + } + } + } + + dms.readSuffix(); +} + +void deltaTreeDeleteTest(Context context) +{ + Block sample; + ColumnWithTypeAndName col1; + col1.name = "col1"; + col1.type = std::make_shared(); + IColumn::MutablePtr tempcol = col1.type->createColumn(); + + for (int i = 0; i < 100; i++) + { + Field field = UInt64(i); + tempcol->insert(field); + } + + col1.column = std::move(tempcol); + sample.insert(col1); + + NamesAndTypesList names_and_types_list{ + {"col1", std::make_shared()}, + }; + + DataTypes data_types; + Names column_names; + + for (const auto & name_type : names_and_types_list) + { + data_types.push_back(name_type.type); + column_names.push_back(name_type.name); + } + + // create table + StoragePtr storage = StorageMemory::create("mytemptable", ColumnsDescription{names_and_types_list}); + storage->startup(); + + BlockOutputStreamPtr output = storage->write(ASTPtr(), context.getSettingsRef()); + + output->writePrefix(); + + output->write(sample); + + output->writeSuffix(); + + QueryProcessingStage::Enum stage; + + BlockInputStreamPtr in = storage->read(column_names, {}, context, stage, 8192, 1)[0]; + + ValueSpacePtr insert_value_space = std::make_shared("insert_value_space", names_and_types_list, SortDescription{}); + ValueSpacePtr modify_value_space = std::make_shared("modify_value_space", names_and_types_list, SortDescription{}); + DeltaTreePtr delta_tree = std::make_shared(insert_value_space, modify_value_space); + + for (unsigned int i = 0; i < 50; i++) + { + delta_tree->addDelete(0); + } + + DeltaMergeBlockInputStream dms(in, delta_tree, 8192); + + dms.readPrefix(); + + while (Block block = dms.read()) + { + for (auto iter = block.begin(); iter != block.end(); iter++) + { + auto c = iter->column; + for (unsigned int i = 0; i < c->size(); i++) + { + //std::cout << i+50 << std::endl; + assert(c->getUInt(i) == i + 50); + } + } + } + + dms.readSuffix(); +} + +void deltaTreeOutputInsertTest(Context context) +{ + // block write into stable storage + Block sample; + ColumnWithTypeAndName col1; + col1.name = "col1"; + col1.type = std::make_shared(); + IColumn::MutablePtr tempcol = col1.type->createColumn(); + ColumnWithTypeAndName col2; + col2.name = "col2"; + col2.type = std::make_shared(); + IColumn::MutablePtr tempcol2 = col2.type->createColumn(); + + for (int i = 0; i < 100; i++) + { + Field field = UInt64(2 * i); + tempcol->insert(field); + } + + for (int i = 0; i < 100; i++) + { + Field field("a", 1); + tempcol2->insert(field); + } + + col1.column = std::move(tempcol); + col2.column = std::move(tempcol2); + sample.insert(col1); + sample.insert(col2); + + NamesAndTypesList names_and_types_list{ + {"col1", std::make_shared()}, {"col2", std::make_shared()}, + }; + + DataTypes data_types; + Names column_names; + + for (const auto & name_type : names_and_types_list) + { + data_types.push_back(name_type.type); + column_names.push_back(name_type.name); + } + + // create table + StoragePtr storage = StorageMemory::create("mytemptable", ColumnsDescription{names_and_types_list}); + storage->startup(); + + BlockOutputStreamPtr output = storage->write(ASTPtr(), context.getSettingsRef()); + + output->writePrefix(); + output->write(sample); + output->writeSuffix(); + + DeltaMergeBlockOutputStream::InputStreamCreator input_stream_creator = [storage, column_names, context]() { + auto stage = QueryProcessingStage::Enum::FetchColumns; + return storage->read(column_names, {}, context, stage, 8192, 1)[0]; + }; + + SortDescription sd{}; + sd.emplace_back("col1", 1, 1); + + ValueSpacePtr insert_value_space = std::make_shared("insert_value_space", names_and_types_list, sd); + ValueSpacePtr modify_value_space = std::make_shared("modify_value_space", names_and_types_list, sd); + DeltaTreePtr delta_tree = std::make_shared(insert_value_space, modify_value_space); + + BlockOutputStreamPtr delta_output_stream + = std::make_shared(input_stream_creator, delta_tree, Action::Insert, sd, []() {}, 10000); + + // data write through DeltaMergeBlockOutputStream + Block sample_delta; + ColumnWithTypeAndName col1_delta; + col1_delta.name = "col1"; + col1_delta.type = std::make_shared(); + IColumn::MutablePtr tempcol_delta = col1_delta.type->createColumn(); + ColumnWithTypeAndName col2_delta; + col2_delta.name = "col2"; + col2_delta.type = std::make_shared(); + IColumn::MutablePtr tempcol2_delta = col2.type->createColumn(); + + for (int i = 0; i < 100; i++) + { + Field field = UInt64(2 * i + 1); + tempcol_delta->insert(field); + } + + for (int i = 0; i < 100; i++) + { + Field field("a", 1); + tempcol2_delta->insert(field); + } + col1_delta.column = std::move(tempcol_delta); + col2_delta.column = std::move(tempcol2_delta); + sample_delta.insert(col1_delta); + sample_delta.insert(col2_delta); + + delta_output_stream->write(sample_delta); + + QueryProcessingStage::Enum stage2; + BlockInputStreamPtr in = storage->read(column_names, {}, context, stage2, 8192, 1)[0]; + DeltaMergeBlockInputStream dms(in, delta_tree, 8192); + + dms.readPrefix(); + + while (Block block = dms.read()) + { + for (auto iter = block.begin(); iter != block.end(); iter++) + { + auto c = iter->column; + for (unsigned int i = 0; i < c->size(); i++) + { + if (iter->name == "col1") + { + //std::cout << c->getUInt(i) << std::endl; + assert(c->getUInt(i) == i); + } + else if (iter->name == "col2") + { + //std::cout << c->getDataAt(i) << std::endl; + assert(c->getDataAt(i) == "a"); + } + } + } + } + + dms.readSuffix(); +} + +void deltaTreeOutputUpsertTest(Context context) +{ + // block write into stable storage + Block sample; + ColumnWithTypeAndName col1; + col1.name = "col1"; + col1.type = std::make_shared(); + IColumn::MutablePtr tempcol = col1.type->createColumn(); + ColumnWithTypeAndName col2; + col2.name = "col2"; + col2.type = std::make_shared(); + IColumn::MutablePtr tempcol2 = col2.type->createColumn(); + + for (int i = 0; i < 100; i++) + { + Field field = UInt64(2 * i); + tempcol->insert(field); + } + + for (int i = 0; i < 100; i++) + { + Field field("a", 1); + tempcol2->insert(field); + } + + col1.column = std::move(tempcol); + col2.column = std::move(tempcol2); + sample.insert(col1); + sample.insert(col2); + + NamesAndTypesList names_and_types_list{ + {"col1", std::make_shared()}, {"col2", std::make_shared()}, + }; + + DataTypes data_types; + Names column_names; + + for (const auto & name_type : names_and_types_list) + { + data_types.push_back(name_type.type); + column_names.push_back(name_type.name); + } + + // create table + StoragePtr storage = StorageMemory::create("mytemptable", ColumnsDescription{names_and_types_list}); + storage->startup(); + + BlockOutputStreamPtr output = storage->write(ASTPtr(), context.getSettingsRef()); + + output->writePrefix(); + output->write(sample); + output->writeSuffix(); + + DeltaMergeBlockOutputStream::InputStreamCreator input_stream_creator = [storage, column_names, context]() { + auto stage = QueryProcessingStage::Enum::FetchColumns; + return storage->read(column_names, {}, context, stage, 8192, 1)[0]; + }; + + SortDescription sd{}; + sd.emplace_back("col1", 1, 1); + + ValueSpacePtr insert_value_space = std::make_shared("insert_value_space", names_and_types_list, sd); + ValueSpacePtr modify_value_space = std::make_shared("modify_value_space", names_and_types_list, sd); + DeltaTreePtr delta_tree = std::make_shared(insert_value_space, modify_value_space); + + BlockOutputStreamPtr delta_output_stream + = std::make_shared(input_stream_creator, delta_tree, Action::Upsert, sd, []() {}, 10000); + + // data write through DeltaMergeBlockOutputStream + Block sample_delta; + ColumnWithTypeAndName col1_delta; + col1_delta.name = "col1"; + col1_delta.type = std::make_shared(); + IColumn::MutablePtr tempcol_delta = col1_delta.type->createColumn(); + ColumnWithTypeAndName col2_delta; + col2_delta.name = "col2"; + col2_delta.type = std::make_shared(); + IColumn::MutablePtr tempcol2_delta = col2.type->createColumn(); + + for (int i = 0; i < 100; i++) + { + Field field = UInt64(2 * i); + tempcol_delta->insert(field); + } + + for (int i = 0; i < 100; i++) + { + Field field("b", 1); + tempcol2_delta->insert(field); + } + col1_delta.column = std::move(tempcol_delta); + col2_delta.column = std::move(tempcol2_delta); + sample_delta.insert(col1_delta); + sample_delta.insert(col2_delta); + + delta_output_stream->write(sample_delta); + + QueryProcessingStage::Enum stage2; + BlockInputStreamPtr in = storage->read(column_names, {}, context, stage2, 8192, 1)[0]; + DeltaMergeBlockInputStream dms(in, delta_tree, 8192); + + dms.readPrefix(); + + while (Block block = dms.read()) + { + for (auto iter = block.begin(); iter != block.end(); iter++) + { + auto c = iter->column; + for (unsigned int i = 0; i < c->size(); i++) + { + if (iter->name == "col1") + { + //std::cout << c->getUInt(i) << std::endl; + assert(c->getUInt(i) == 2 * i); + } + else if (iter->name == "col2") + { + //std::cout << c->getDataAt(i) << std::endl; + assert(c->getDataAt(i) == "b"); + } + } + } + } + + dms.readSuffix(); +} + +void deltaTreeOutputDeleteTest(Context context) +{ + // block write into stable storage + Block sample; + ColumnWithTypeAndName col1; + col1.name = "col1"; + col1.type = std::make_shared(); + IColumn::MutablePtr tempcol = col1.type->createColumn(); + ColumnWithTypeAndName col2; + col2.name = "col2"; + col2.type = std::make_shared(); + IColumn::MutablePtr tempcol2 = col2.type->createColumn(); + + for (int i = 0; i < 100; i++) + { + Field field = UInt64(2 * i); + tempcol->insert(field); + } + + for (int i = 0; i < 100; i++) + { + Field field("a", 1); + tempcol2->insert(field); + } + + col1.column = std::move(tempcol); + col2.column = std::move(tempcol2); + sample.insert(col1); + sample.insert(col2); + + NamesAndTypesList names_and_types_list{ + {"col1", std::make_shared()}, {"col2", std::make_shared()}, + }; + + DataTypes data_types; + Names column_names; + + for (const auto & name_type : names_and_types_list) + { + data_types.push_back(name_type.type); + column_names.push_back(name_type.name); + } + + // create table + StoragePtr storage = StorageMemory::create("mytemptable", ColumnsDescription{names_and_types_list}); + storage->startup(); + + BlockOutputStreamPtr output = storage->write(ASTPtr(), context.getSettingsRef()); + + output->writePrefix(); + output->write(sample); + output->writeSuffix(); + + DeltaMergeBlockOutputStream::InputStreamCreator input_stream_creator = [storage, column_names, context]() { + auto stage = QueryProcessingStage::Enum::FetchColumns; + return storage->read(column_names, {}, context, stage, 8192, 1)[0]; + }; + + SortDescription sd{}; + sd.emplace_back("col1", 1, 1); + + ValueSpacePtr insert_value_space = std::make_shared("insert_value_space", names_and_types_list, sd); + ValueSpacePtr modify_value_space = std::make_shared("modify_value_space", names_and_types_list, sd); + DeltaTreePtr delta_tree = std::make_shared(insert_value_space, modify_value_space); + + BlockOutputStreamPtr delta_output_stream + = std::make_shared(input_stream_creator, delta_tree, Action::Delete, sd, []() {}, 10000); + + // data write through DeltaMergeBlockOutputStream + Block sample_delta; + ColumnWithTypeAndName col1_delta; + col1_delta.name = "col1"; + col1_delta.type = std::make_shared(); + IColumn::MutablePtr tempcol_delta = col1_delta.type->createColumn(); + ColumnWithTypeAndName col2_delta; + col2_delta.name = "col2"; + col2_delta.type = std::make_shared(); + IColumn::MutablePtr tempcol2_delta = col2.type->createColumn(); + + for (int i = 0; i < 100; i++) + { + Field field = UInt64(2 * i); + tempcol_delta->insert(field); + } + + for (int i = 0; i < 100; i++) + { + Field field("a", 1); + tempcol2_delta->insert(field); + } + col1_delta.column = std::move(tempcol_delta); + col2_delta.column = std::move(tempcol2_delta); + sample_delta.insert(col1_delta); + sample_delta.insert(col2_delta); + + delta_output_stream->write(sample_delta); + + QueryProcessingStage::Enum stage2; + BlockInputStreamPtr in = storage->read(column_names, {}, context, stage2, 8192, 1)[0]; + DeltaMergeBlockInputStream dms(in, delta_tree, 8192); + + dms.readPrefix(); + + while (Block block = dms.read()) + { + for (auto iter = block.begin(); iter != block.end(); iter++) + { + auto c = iter->column; + for (unsigned int i = 0; i < c->size(); i++) + { + if (iter->name == "col1") + { + std::cout << c->getUInt(i) << std::endl; + //assert(c->getUInt(i) == i); + } + else if (iter->name == "col2") + { + std::cout << c->getDataAt(i) << std::endl; + //assert(c->getDataAt(i) == "a"); + } + } + } + } + + dms.readSuffix(); +} + +void deltaTreeOutputUpdateTest(Context context) +{ + // block write into stable storage + Block sample; + ColumnWithTypeAndName col1; + col1.name = "col1"; + col1.type = std::make_shared(); + IColumn::MutablePtr tempcol = col1.type->createColumn(); + ColumnWithTypeAndName col2; + col2.name = "col2"; + col2.type = std::make_shared(); + IColumn::MutablePtr tempcol2 = col2.type->createColumn(); + + for (int i = 0; i < 100; i++) + { + Field field = UInt64(2 * i); + tempcol->insert(field); + } + + for (int i = 0; i < 100; i++) + { + Field field("a", 1); + tempcol2->insert(field); + } + + col1.column = std::move(tempcol); + col2.column = std::move(tempcol2); + sample.insert(col1); + sample.insert(col2); + + NamesAndTypesList names_and_types_list{ + {"col1", std::make_shared()}, {"col2", std::make_shared()}, + }; + + DataTypes data_types; + Names column_names; + + for (const auto & name_type : names_and_types_list) + { + data_types.push_back(name_type.type); + column_names.push_back(name_type.name); + } + + // create table + StoragePtr storage = StorageMemory::create("mytemptable", ColumnsDescription{names_and_types_list}); + storage->startup(); + + BlockOutputStreamPtr output = storage->write(ASTPtr(), context.getSettingsRef()); + + output->writePrefix(); + output->write(sample); + output->writeSuffix(); + + DeltaMergeBlockOutputStream::InputStreamCreator input_stream_creator = [storage, column_names, context]() { + auto stage = QueryProcessingStage::Enum::FetchColumns; + return storage->read(column_names, {}, context, stage, 8192, 1)[0]; + }; + + SortDescription sd{}; + sd.emplace_back("col1", 1, 1); + + ValueSpacePtr insert_value_space = std::make_shared("insert_value_space", names_and_types_list, sd); + ValueSpacePtr modify_value_space = std::make_shared("modify_value_space", names_and_types_list, sd); + DeltaTreePtr delta_tree = std::make_shared(insert_value_space, modify_value_space); + + BlockOutputStreamPtr delta_output_stream + = std::make_shared(input_stream_creator, delta_tree, Action::Update, sd, []() {}, 10000); + + // data write through DeltaMergeBlockOutputStream + Block sample_delta; + ColumnWithTypeAndName col1_delta; + col1_delta.name = "col1"; + col1_delta.type = std::make_shared(); + IColumn::MutablePtr tempcol_delta = col1_delta.type->createColumn(); + ColumnWithTypeAndName col2_delta; + col2_delta.name = "col2"; + col2_delta.type = std::make_shared(); + IColumn::MutablePtr tempcol2_delta = col2.type->createColumn(); + + for (int i = 0; i < 100; i++) + { + Field field = UInt64(2 * i); + tempcol_delta->insert(field); + } + + for (int i = 0; i < 100; i++) + { + Field field("b", 1); + tempcol2_delta->insert(field); + } + col1_delta.column = std::move(tempcol_delta); + col2_delta.column = std::move(tempcol2_delta); + sample_delta.insert(col1_delta); + sample_delta.insert(col2_delta); + + delta_output_stream->write(sample_delta); + + QueryProcessingStage::Enum stage2; + BlockInputStreamPtr in = storage->read(column_names, {}, context, stage2, 8192, 1)[0]; + DeltaMergeBlockInputStream dms(in, delta_tree, 8192); + + dms.readPrefix(); + + while (Block block = dms.read()) + { + for (auto iter = block.begin(); iter != block.end(); iter++) + { + auto c = iter->column; + for (unsigned int i = 0; i < c->size(); i++) + { + if (iter->name == "col1") + { + std::cout << c->getUInt(i) << std::endl; + assert(c->getUInt(i) == 2 * i); + } + else if (iter->name == "col2") + { + std::cout << c->getDataAt(i) << std::endl; + //assert(c->getDataAt(i) == "b"); + } + } + } + } + + dms.readSuffix(); +} + +int main(int, char **) try +{ + Context context = Context::createGlobal(); + deltaTreeInsertTest(context); + deltaTreeUpdateTest(context); + deltaTreeDeleteTest(context); + deltaTreeOutputInsertTest(context); + std::cout << "after deltaTreeOutputInsertTest test\n"; + deltaTreeOutputUpsertTest(context); + std::cout << "after deltaTreeOutputUpsertTest test\n"; + deltaTreeOutputDeleteTest(context); + std::cout << "after deltaTreeOutputDeleteTest test\n"; + deltaTreeOutputUpdateTest(context); + std::cout << "after deltaTreeOutputUpdateTest test\n"; + std::cout << "test complete\n"; +} +catch (const DB::Exception & e) +{ + std::cerr << e.what() << ", " << e.displayText() << std::endl; + return 1; +} diff --git a/dbms/src/Storages/DeltaMerge/tests/delta_merge_storage.cpp b/dbms/src/Storages/DeltaMerge/tests/delta_merge_storage.cpp new file mode 100644 index 00000000000..9d6db5d01d7 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/tests/delta_merge_storage.cpp @@ -0,0 +1,119 @@ +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace DB; + +int main(int, char **) try +{ + // block write into stable storage + Block sample; + ColumnWithTypeAndName col1; + col1.name = "col1"; + col1.type = std::make_shared(); + IColumn::MutablePtr m_col = col1.type->createColumn(); + ColumnWithTypeAndName col2; + col2.name = "col2"; + col2.type = std::make_shared(); + IColumn::MutablePtr m_col2 = col2.type->createColumn(); + + // insert form large to small + for (int i = 0; i < 100; i++) + { + Field field = UInt64(99 - i); + m_col->insert(field); + } + + for (int i = 0; i < 100; i++) + { + Field field("a", 1); + m_col2->insert(field); + } + + col1.column = std::move(m_col); + col2.column = std::move(m_col2); + sample.insert(col1); + sample.insert(col2); + + NamesAndTypesList names_and_types_list{ + {"col1", std::make_shared()}, {"col2", std::make_shared()}, + }; + + DataTypes data_types; + Names column_names; + + for (const auto & name_type : names_and_types_list) + { + data_types.push_back(name_type.type); + column_names.push_back(name_type.name); + } + + ASTPtr astptr(new ASTIdentifier("mytemptable", ASTIdentifier::Kind::Table)); + astptr->children.emplace_back(new ASTIdentifier("col1")); + + ASTPtr insertptr(new ASTInsertQuery()); + + Context context = Context::createGlobal(); + + StoragePtr storage = StorageDeltaMerge::create(".", "mytemptable", ColumnsDescription{names_and_types_list}, astptr, false, 1000); + storage->startup(); + BlockOutputStreamPtr output = storage->write(insertptr, context.getSettingsRef()); + + output->writePrefix(); + output->write(sample); + output->writeSuffix(); + + QueryProcessingStage::Enum stage2; + BlockInputStreamPtr dms = storage->read(column_names, {}, context, stage2, 8192, 1)[0]; + + dms->readPrefix(); + + while (Block block = dms->read()) + { + for (auto iter = block.begin(); iter != block.end(); iter++) + { + auto c = iter->column; + for (unsigned int i = 0; i < c->size(); i++) + { + if (iter->name == "col1") + { + //std::cout << c->getUInt(i) << std::endl; + assert(c->getUInt(i) == i); + } + else if (iter->name == "col2") + { + //std::cout << c->getDataAt(i) << std::endl; + //assert(c->getDataAt(i) == "b"); + } + } + } + } + + dms->readSuffix(); + + std::cout << "test complete\n"; +} +catch (const DB::Exception & e) +{ + std::cerr << e.what() << ", " << e.displayText() << std::endl; + return 1; +} diff --git a/dbms/src/Storages/DeltaMerge/tests/delta_tree.cpp b/dbms/src/Storages/DeltaMerge/tests/delta_tree.cpp index 7d38312f27c..28c98a62205 100644 --- a/dbms/src/Storages/DeltaMerge/tests/delta_tree.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/delta_tree.cpp @@ -1,9 +1,8 @@ #include #include -#include #include - +#include #include #include @@ -12,7 +11,6 @@ using namespace DB; - class FakeValueSpace; using MyDeltaTree = DeltaTree; @@ -67,6 +65,27 @@ void printTree(MyDeltaTree & tree) std::cout << std::endl; } +std::string treeToString(MyDeltaTree & tree) +{ + std::string result = ""; + std::string temp; + for (auto it = tree.begin(), end = tree.end(); it != end; ++it) + { + temp = ""; + temp += "("; + temp += std::to_string(it.getRid()); + temp += "|"; + temp += std::to_string(it.getSid()); + temp += "|"; + temp += DTTypeString(it.getMutation().type); + temp += "|"; + temp += DB::toString(it.getMutation().value); + temp += "),"; + result += temp; + } + return result; +} + void insertTest(MyDeltaTree & tree) { for (int i = 0; i < 100; ++i) @@ -165,15 +184,264 @@ void insertTest(MyDeltaTree & tree) printTree(tree); } + +void deleteAfterInsertTest(MyDeltaTree & tree) +{ + std::cout << "insert test2 begin====\n"; + + int batch_num = 100; + + std::string expectedResult; + for (int i = 0; i < batch_num; ++i) + { + tree.addInsert(i, i); + tree.checkAll(); + expectedResult += "(" + std::to_string(i) + "|0|INS|" + std::to_string(i) + "),"; + assert(expectedResult == treeToString(tree)); + } + std::cout << "after many insert 1\n"; + + expectedResult = ""; + + for (int i = 0; i < batch_num; ++i) + { + tree.addDelete(0); + tree.checkAll(); + expectedResult = ""; + for (int j = 0; j < batch_num - i - 1; j++) + { + expectedResult += "(" + std::to_string(j) + "|0|INS|" + std::to_string(j + i + 1) + "),"; + } + //std::cout << expectedResult << std::endl; + //std::cout << treeToString(tree) << std::endl; + assert(expectedResult == treeToString(tree)); + } + + expectedResult = ""; + assert(expectedResult == treeToString(tree)); + std::cout << "after many delete 1\n"; + + for (int i = 0; i < batch_num; ++i) + { + tree.addInsert(0, i); + tree.checkAll(); + expectedResult = ""; + for (int j = 0; j <= i; j++) + { + expectedResult += "(" + std::to_string(j) + "|0|INS|" + std::to_string(i - j) + "),"; + } + assert(expectedResult == treeToString(tree)); + } + std::cout << "after many insert 2\n"; + + for (int i = batch_num - 1; i >= 0; --i) + { + tree.addDelete(i); + tree.checkAll(); + expectedResult = ""; + for (int j = 0; j < i; j++) + { + expectedResult += "(" + std::to_string(j) + "|0|INS|" + std::to_string(batch_num - j - 1) + "),"; + } + //std::cout << expectedResult << std::endl; + //std::cout << treeToString(tree) << std::endl; + assert(expectedResult == treeToString(tree)); + } + std::cout << "after many delete 2\n"; +} + +void deleteTest1(MyDeltaTree & tree) +{ + std::cout << "delete test begin====\n"; + + int batch_num = 100; + + std::string expectedResult; + // delete stable from begin to end with merge + for (int i = 0; i < batch_num; ++i) + { + tree.addDelete(0); + tree.checkAll(); + expectedResult = "(0|0|DEL|" + std::to_string(i + 1) + "),"; + //std::cout << expectedResult << std::endl; + //std::cout << treeToString(tree) << std::endl; + assert(expectedResult == treeToString(tree)); + } +} + + +void deleteTest2(MyDeltaTree & tree) +{ + std::cout << "delete test2 begin====\n"; + + int batch_num = 100; + + std::string expectedResult; + // delete stable from end to begin + // this kind of delete behavior may be improved to trigger merge + for (int i = batch_num - 1; i >= 0; --i) + { + tree.addDelete(i); + tree.checkAll(); + expectedResult = ""; + for (int j = i; j < batch_num; j++) + { + expectedResult += "(" + std::to_string(i) + "|" + std::to_string(j) + "|DEL|1),"; + } + + //std::cout << expectedResult << std::endl; + //std::cout << treeToString(tree) << std::endl; + assert(expectedResult == treeToString(tree)); + } +} + +// insert skip delete entry +void insertSkipDelete(MyDeltaTree & tree) +{ + std::cout << "delete test2 begin====\n"; + + int batch_num = 100; + + tree.addDelete(0); + + std::string expectedResult; + + expectedResult = "(0|0|DEL|1),"; + assert(expectedResult == treeToString(tree)); + + for (int i = 0; i < batch_num; ++i) + { + tree.addInsert(0, i); + tree.checkAll(); + expectedResult = "(0|0|DEL|1),"; + for (int j = 0; j <= i; j++) + { + expectedResult += "(" + std::to_string(j) + "|1|INS|" + std::to_string(i - j) + "),"; + } + + std::cout << expectedResult << std::endl; + std::cout << treeToString(tree) << std::endl; + assert(expectedResult == treeToString(tree)); + } +} + +// delete after update +void deleteAfterUpdateTest(MyDeltaTree & tree) +{ + std::cout << "update test begin====\n"; + + int batch_num = 100; + + std::string expectedResult; + std::string expectedResult2; + // multiple update to the same row and same column + for (int i = 0; i < batch_num; ++i) + { + tree.addModify(i, 0, 2 * i); + tree.checkAll(); + expectedResult = expectedResult2 + "(" + std::to_string(i) + "|" + std::to_string(i) + "|0|" + std::to_string(2 * i) + "),"; + std::cout << expectedResult << std::endl; + std::cout << treeToString(tree) << std::endl; + assert(expectedResult == treeToString(tree)); + + tree.addModify(i, 0, 2 * i + 1); + tree.checkAll(); + expectedResult2 = expectedResult2 + "(" + std::to_string(i) + "|" + std::to_string(i) + "|0|" + std::to_string(2 * i + 1) + "),"; + //std::cout << expectedResult2 << std::endl; + //std::cout << treeToString(tree) << std::endl; + assert(expectedResult2 == treeToString(tree)); + } + + for (int i = batch_num - 1; i >= 0; --i) + { + tree.addDelete(i); + tree.checkAll(); + expectedResult = ""; + for (int j = 0; j < i; j++) + { + expectedResult += "(" + std::to_string(j) + "|" + std::to_string(j) + "|0|" + std::to_string(2 * j + 1) + "),"; + } + for (int j = i; j < batch_num; j++) + { + expectedResult += "(" + std::to_string(i) + "|" + std::to_string(j) + "|DEL|1),"; + } + //std::cout << expectedResult << std::endl; + //std::cout << treeToString(tree) << std::endl; + assert(expectedResult == treeToString(tree)); + } + std::cout << "after deleteAfterUpdateTest 1\n"; +} + +// update skip delete +void updateSkipDelete(MyDeltaTree & tree) +{ + std::cout << "delete test2 begin====\n"; + + tree.addDelete(0); + + std::string expectedResult; + + expectedResult = "(0|0|DEL|1),"; + assert(expectedResult == treeToString(tree)); + + tree.addModify(0, 0, 0); + tree.checkAll(); + expectedResult = "(0|0|DEL|1),(0|1|0|0),"; + + std::cout << expectedResult << std::endl; + std::cout << treeToString(tree) << std::endl; + assert(expectedResult == treeToString(tree)); + + std::cout << "updateSkipDelete tests complete\n"; +} + +// in-place update +void inplaceUpdate(MyDeltaTree & tree) +{ + std::cout << "insert test2 begin====\n"; + + int batch_num = 100; + + std::string expectedResult; + + for (int i = 0; i < batch_num; ++i) + { + tree.addInsert(i, i); + tree.checkAll(); + expectedResult = expectedResult + "(" + std::to_string(i) + "|0|INS|" + std::to_string(i) + "),"; + assert(expectedResult == treeToString(tree)); + tree.addModify(i, 0, i); + tree.checkAll(); + assert(expectedResult == treeToString(tree)); + } + + std::cout << "after in-place update delete 2\n"; +} + + int main(int, char **) { print_sizes(); FakeValueSpacePtr insert_vs = std::make_shared(); FakeValueSpacePtr modify_vs = std::make_shared(); MyDeltaTree delta_tree(insert_vs, modify_vs); + MyDeltaTree delta_tree2(insert_vs, modify_vs); + MyDeltaTree delta_tree3(insert_vs, modify_vs); + MyDeltaTree delta_tree4(insert_vs, modify_vs); + MyDeltaTree delta_tree5(insert_vs, modify_vs); + MyDeltaTree delta_tree6(insert_vs, modify_vs); + MyDeltaTree delta_tree7(insert_vs, modify_vs); try { - insertTest(delta_tree); + //insertTest(delta_tree); + deleteAfterInsertTest(delta_tree); + deleteTest1(delta_tree2); + deleteTest2(delta_tree3); + insertSkipDelete(delta_tree4); + deleteAfterUpdateTest(delta_tree5); + updateSkipDelete(delta_tree6); + inplaceUpdate(delta_tree7); + std::cout << "tests pass\n"; } catch (const DB::Exception & ex) { @@ -187,6 +455,5 @@ int main(int, char **) { std::cout << "Caught unhandled exception\n"; } - return 0; }