Skip to content

Commit

Permalink
add delta merge engine test (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
lidezhu authored and flowbehappy committed Jun 19, 2019
1 parent 4d893c0 commit a9dee28
Show file tree
Hide file tree
Showing 8 changed files with 1,374 additions and 132 deletions.
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ class DeltaMergeBlockInputStream final : public IProfilingBlockInputStream
for (size_t column_id = 0; column_id < num_columns; ++column_id)
{
auto offset = vs_column_offsets[column_id];
if (has_modifies[offset] != INVALID_ID)
if (has_modifies[offset] == INVALID_ID)
{
output_columns[column_id]->insertFrom(*stable_block_columns[column_id], stable_block_pos);
}
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeBlockOutputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,10 @@ struct RidGenerator
if (modify_block_dup_next[modify_block_pos++])
return rid;
else
{
++stable_block_pos;
return rid++;
}
}
else if (res > 0)
{
Expand Down
129 changes: 129 additions & 0 deletions dbms/src/Storages/DeltaMerge/ValueSpace.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
#include <Common/Exception.h>

#include <Storages/DeltaMerge/ValueSpace.h>

namespace DB
{
MemoryValueSpace::MemoryValueSpace(String name, const NamesAndTypesList & name_types, const SortDescription & sort_desc)
: log(&Logger::get("MemoryValueSpace(" + name + ")"))
{

for (const auto & nt : name_types)
{
names_and_types.emplace_back(nt.name, nt.type);
split_columns.emplace_back(nt.name, nt.type);
}
for (const auto & desc : sort_desc)
sort_column_names.insert(desc.column_name);

num_columns = split_columns.size();

LOG_TRACE(log, "MM create");
}

Ids MemoryValueSpace::addFromInsert(const Block & block)
{
if (unlikely(block.columns() < split_columns.size()))
throw Exception("Not enough columns!");

Ids ids;
UInt64 id = split_columns.front().size();
for (size_t i = 0; i < block.rows(); ++i)
ids.push_back(id++);
for (auto & split_column : split_columns)
split_column.append(*block.getByName(split_column.name).column);

return ids;
}

RefTuples MemoryValueSpace::addFromModify(const Block & block)
{
if (unlikely(block.columns() < split_columns.size()))
throw Exception("Not enough columns!");

RefTuples tuples;
auto rows = block.rows();

std::vector<std::vector<UInt64>> idmap;
for (size_t column_id = 0; column_id < split_columns.size(); ++column_id)
{
auto & split_column = split_columns[column_id];
auto offset = split_column.size();
if (sort_column_names.find(split_column.name) == sort_column_names.end() && block.has(split_column.name))
{
split_column.append(*block.getByName(split_column.name).column);

idmap.emplace_back();
auto & ids = idmap[column_id];
for (size_t row_id = 0; row_id < rows; ++row_id)
ids.push_back(offset++);
}
else
{
idmap.emplace_back(rows, INVALID_ID);
}
}

for (size_t row_id = 0; row_id < rows; ++row_id)
{
ColumnAndValues values;
for (size_t column_id = 0; column_id < split_columns.size(); ++column_id)
{
auto value_id = idmap[column_id][row_id];
if (value_id != INVALID_ID)
values.emplace_back(column_id, value_id);
}
tuples.emplace_back(values);
}

return tuples;
}

void MemoryValueSpace::removeFromInsert(UInt64 id)
{
for (size_t i = 0; i < num_columns; ++i)
split_columns[i].remove(id);
}

void MemoryValueSpace::removeFromModify(UInt64 id, size_t column_id)
{
split_columns[column_id].remove(id);
}

UInt64 MemoryValueSpace::withModify(UInt64 old_tuple_id, const ValueSpace & modify_value_space, const RefTuple & tuple)
{
// TODO improvement: in-place update for fixed size type, like numbers.
for (size_t column_id = 0; column_id < split_columns.size(); ++column_id)
{
auto & split_column = split_columns[column_id];
size_t new_value_id = INVALID_ID;
for (const auto & cv : tuple.values)
{
if (cv.column == column_id)
{
new_value_id = cv.value;
break;
}
}
if (new_value_id != INVALID_ID)
{
split_column.append(modify_value_space.split_columns[column_id], new_value_id);
}
else
{
split_column.append(this->split_columns[column_id], old_tuple_id);
}
}

removeFromInsert(old_tuple_id);

return split_columns.front().size() - 1;
}

void MemoryValueSpace::gc()
{
for (auto & sc : split_columns)
sc.gc();
}

} // namespace DB
127 changes: 1 addition & 126 deletions dbms/src/Storages/DeltaMerge/ValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@ class MemoryValueSpace
public:
MemoryValueSpace(String name, const NamesAndTypesList & name_types, const SortDescription & sort_desc);

~MemoryValueSpace()
{
LOG_TRACE(log, "MM free");
}
~MemoryValueSpace() { LOG_TRACE(log, "MM free"); }

Ids addFromInsert(const Block & block);
RefTuples addFromModify(const Block & block);
Expand Down Expand Up @@ -146,126 +143,4 @@ class MemoryValueSpace
Logger * log;
};

MemoryValueSpace::MemoryValueSpace(String name, const NamesAndTypesList & name_types, const SortDescription & sort_desc)
: log(&Logger::get("MemoryValueSpace(" + name + ")"))
{

for (const auto & nt : name_types)
{
names_and_types.emplace_back(nt.name, nt.type);
split_columns.emplace_back(nt.name, nt.type);
}
for (const auto & desc : sort_desc)
sort_column_names.insert(desc.column_name);

num_columns = split_columns.size();

LOG_TRACE(log, "MM create");
}

Ids MemoryValueSpace::addFromInsert(const Block & block)
{
if (unlikely(block.columns() < split_columns.size()))
throw Exception("Not enough columns!");

Ids ids;
UInt64 id = split_columns.front().size();
for (size_t i = 0; i < block.rows(); ++i)
ids.push_back(id++);
for (auto & split_column : split_columns)
split_column.append(*block.getByName(split_column.name).column);

return ids;
}

RefTuples MemoryValueSpace::addFromModify(const Block & block)
{
if (unlikely(block.columns() < split_columns.size()))
throw Exception("Not enough columns!");

RefTuples tuples;
auto rows = block.rows();

std::vector<std::vector<UInt64>> idmap;
for (size_t column_id = 0; column_id < split_columns.size(); ++column_id)
{
auto & split_column = split_columns[column_id];
auto offset = split_column.size();
if (sort_column_names.find(split_column.name) == sort_column_names.end() && block.has(split_column.name))
{
split_column.append(*block.getByName(split_column.name).column);

idmap.emplace_back();
auto & ids = idmap[column_id];
for (size_t row_id = 0; row_id < rows; ++row_id)
ids.push_back(offset++);
}
else
{
idmap.emplace_back(rows, INVALID_ID);
}
}

for (size_t row_id = 0; row_id < rows; ++row_id)
{
ColumnAndValues values;
for (size_t column_id = 0; column_id < split_columns.size(); ++column_id)
{
auto value_id = idmap[column_id][row_id];
if (value_id != INVALID_ID)
values.emplace_back(column_id, value_id);
}
tuples.emplace_back(values);
}

return tuples;
}

void MemoryValueSpace::removeFromInsert(UInt64 id)
{
for (size_t i = 0; i < num_columns; ++i)
split_columns[i].remove(id);
}

void MemoryValueSpace::removeFromModify(UInt64 id, size_t column_id)
{
split_columns[column_id].remove(id);
}

UInt64 MemoryValueSpace::withModify(UInt64 old_tuple_id, const ValueSpace & modify_value_space, const RefTuple & tuple)
{
// TODO improvement: in-place update for fixed size type, like numbers.
for (size_t column_id = 0; column_id < split_columns.size(); ++column_id)
{
auto & split_column = split_columns[column_id];
size_t new_value_id = INVALID_ID;
for (const auto & cv : tuple.values)
{
if (cv.column == column_id)
{
new_value_id = cv.value;
break;
}
}
if (new_value_id != INVALID_ID)
{
split_column.append(modify_value_space.split_columns[column_id], new_value_id);
}
else
{
split_column.append(this->split_columns[column_id], old_tuple_id);
}
}

removeFromInsert(old_tuple_id);

return split_columns.front().size() - 1;
}

void MemoryValueSpace::gc()
{
for (auto & sc : split_columns)
sc.gc();
}

} // namespace DB
4 changes: 4 additions & 0 deletions dbms/src/Storages/DeltaMerge/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
include_directories (${CMAKE_CURRENT_BINARY_DIR})

add_executable (delta_tree delta_tree.cpp)
add_executable (delta_merge_block_stream delta_merge_block_stream.cpp)
add_executable (delta_merge_storage delta_merge_storage.cpp)
target_link_libraries (delta_tree dbms)
target_link_libraries (delta_merge_block_stream dbms)
target_link_libraries (delta_merge_storage dbms)
Loading

0 comments on commit a9dee28

Please sign in to comment.