From f9a6179edc161093c74876ed0feb9b7ac69a35ee Mon Sep 17 00:00:00 2001
From: lidezhu <47731263+lidezhu@users.noreply.github.com>
Date: Thu, 4 Apr 2019 16:19:11 +0800
Subject: [PATCH] add delta merge engine test (#18)

---
 .../DeltaMerge/DeltaMergeBlockInputStream.h   |   2 +-
 .../DeltaMerge/DeltaMergeBlockOutputStream.h  |   3 +
 dbms/src/Storages/DeltaMerge/ValueSpace.cpp   | 129 +++
 dbms/src/Storages/DeltaMerge/ValueSpace.h     | 127 +--
 .../Storages/DeltaMerge/tests/CMakeLists.txt  |   4 +
 .../tests/delta_merge_block_stream.cpp        | 845 ++++++++++++++++++
 .../DeltaMerge/tests/delta_merge_storage.cpp  | 119 +++
 .../Storages/DeltaMerge/tests/delta_tree.cpp  | 277 +++++-
 8 files changed, 1374 insertions(+), 132 deletions(-)
 create mode 100644 dbms/src/Storages/DeltaMerge/ValueSpace.cpp
 create mode 100644 dbms/src/Storages/DeltaMerge/tests/delta_merge_block_stream.cpp
 create mode 100644 dbms/src/Storages/DeltaMerge/tests/delta_merge_storage.cpp

diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeBlockInputStream.h b/dbms/src/Storages/DeltaMerge/DeltaMergeBlockInputStream.h
index 898edd5bcfd..a255974963f 100644
--- a/dbms/src/Storages/DeltaMerge/DeltaMergeBlockInputStream.h
+++ b/dbms/src/Storages/DeltaMerge/DeltaMergeBlockInputStream.h
@@ -223,7 +223,7 @@ class DeltaMergeBlockInputStream final : public IProfilingBlockInputStream
             for (size_t column_id = 0; column_id < num_columns; ++column_id)
             {
                 auto offset = vs_column_offsets[column_id];
-                if (has_modifies[offset] != INVALID_ID)
+                if (has_modifies[offset] == INVALID_ID)
                 {
                     output_columns[column_id]->insertFrom(*stable_block_columns[column_id], stable_block_pos);
                 }
diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeBlockOutputStream.h b/dbms/src/Storages/DeltaMerge/DeltaMergeBlockOutputStream.h
index 220d8ca13ad..1c18f86e2b7 100644
--- a/dbms/src/Storages/DeltaMerge/DeltaMergeBlockOutputStream.h
+++ b/dbms/src/Storages/DeltaMerge/DeltaMergeBlockOutputStream.h
@@ -247,7 +247,10 @@ struct RidGenerator
                 if (modify_block_dup_next[modify_block_pos++])
                     return rid;
                 else
+                {
+                    ++stable_block_pos;
                     return rid++;
+                }
             }
             else if (res > 0)
             {
diff --git a/dbms/src/Storages/DeltaMerge/ValueSpace.cpp b/dbms/src/Storages/DeltaMerge/ValueSpace.cpp
new file mode 100644
index 00000000000..4806dd80cf4
--- /dev/null
+++ b/dbms/src/Storages/DeltaMerge/ValueSpace.cpp
@@ -0,0 +1,129 @@
+#include <Common/Exception.h>
+
+#include <Storages/DeltaMerge/ValueSpace.h>
+
+namespace DB
+{
+MemoryValueSpace::MemoryValueSpace(String name, const NamesAndTypesList & name_types, const SortDescription & sort_desc)
+    : log(&Logger::get("MemoryValueSpace(" + name + ")"))
+{
+
+    for (const auto & nt : name_types)
+    {
+        names_and_types.emplace_back(nt.name, nt.type);
+        split_columns.emplace_back(nt.name, nt.type);
+    }
+    for (const auto & desc : sort_desc)
+        sort_column_names.insert(desc.column_name);
+
+    num_columns = split_columns.size();
+
+    LOG_TRACE(log, "MM create");
+}
+
+Ids MemoryValueSpace::addFromInsert(const Block & block)
+{
+    if (unlikely(block.columns() < split_columns.size()))
+        throw Exception("Not enough columns!");
+
+    Ids    ids;
+    UInt64 id = split_columns.front().size();
+    for (size_t i = 0; i < block.rows(); ++i)
+        ids.push_back(id++);
+    for (auto & split_column : split_columns)
+        split_column.append(*block.getByName(split_column.name).column);
+
+    return ids;
+}
+
+RefTuples MemoryValueSpace::addFromModify(const Block & block)
+{
+    if (unlikely(block.columns() < split_columns.size()))
+        throw Exception("Not enough columns!");
+
+    RefTuples tuples;
+    auto      rows = block.rows();
+
+    std::vector<std::vector<UInt64>> idmap;
+    for (size_t column_id = 0; column_id < split_columns.size(); ++column_id)
+    {
+        auto & split_column = split_columns[column_id];
+        auto   offset       = split_column.size();
+        if (sort_column_names.find(split_column.name) == sort_column_names.end() && block.has(split_column.name))
+        {
+            split_column.append(*block.getByName(split_column.name).column);
+
+            idmap.emplace_back();
+            auto & ids = idmap[column_id];
+            for (size_t row_id = 0; row_id < rows; ++row_id)
+                ids.push_back(offset++);
+        }
+        else
+        {
+            idmap.emplace_back(rows, INVALID_ID);
+        }
+    }
+
+    for (size_t row_id = 0; row_id < rows; ++row_id)
+    {
+        ColumnAndValues values;
+        for (size_t column_id = 0; column_id < split_columns.size(); ++column_id)
+        {
+            auto value_id = idmap[column_id][row_id];
+            if (value_id != INVALID_ID)
+                values.emplace_back(column_id, value_id);
+        }
+        tuples.emplace_back(values);
+    }
+
+    return tuples;
+}
+
+void MemoryValueSpace::removeFromInsert(UInt64 id)
+{
+    for (size_t i = 0; i < num_columns; ++i)
+        split_columns[i].remove(id);
+}
+
+void MemoryValueSpace::removeFromModify(UInt64 id, size_t column_id)
+{
+    split_columns[column_id].remove(id);
+}
+
+UInt64 MemoryValueSpace::withModify(UInt64 old_tuple_id, const ValueSpace & modify_value_space, const RefTuple & tuple)
+{
+    // TODO improvement: in-place update for fixed size type, like numbers.
+    for (size_t column_id = 0; column_id < split_columns.size(); ++column_id)
+    {
+        auto & split_column = split_columns[column_id];
+        size_t new_value_id = INVALID_ID;
+        for (const auto & cv : tuple.values)
+        {
+            if (cv.column == column_id)
+            {
+                new_value_id = cv.value;
+                break;
+            }
+        }
+        if (new_value_id != INVALID_ID)
+        {
+            split_column.append(modify_value_space.split_columns[column_id], new_value_id);
+        }
+        else
+        {
+            split_column.append(this->split_columns[column_id], old_tuple_id);
+        }
+    }
+
+    removeFromInsert(old_tuple_id);
+
+    return split_columns.front().size() - 1;
+}
+
+void MemoryValueSpace::gc()
+{
+    for (auto & sc : split_columns)
+        sc.gc();
+}
+
+} // namespace DB
\ No newline at end of file
diff --git a/dbms/src/Storages/DeltaMerge/ValueSpace.h b/dbms/src/Storages/DeltaMerge/ValueSpace.h
index c7d2cacc80e..8965e8a405d 100644
--- a/dbms/src/Storages/DeltaMerge/ValueSpace.h
+++ b/dbms/src/Storages/DeltaMerge/ValueSpace.h
@@ -24,10 +24,7 @@ class MemoryValueSpace
 public:
     MemoryValueSpace(String name, const NamesAndTypesList & name_types, const SortDescription & sort_desc);
 
-    ~MemoryValueSpace()
-    {
-        LOG_TRACE(log, "MM free");
-    }
+    ~MemoryValueSpace() { LOG_TRACE(log, "MM free"); }
 
     Ids       addFromInsert(const Block & block);
     RefTuples addFromModify(const Block & block);
@@ -146,126 +143,4 @@ class MemoryValueSpace
     Logger * log;
 };
 
-MemoryValueSpace::MemoryValueSpace(String name, const NamesAndTypesList & name_types, const SortDescription & sort_desc)
-    : log(&Logger::get("MemoryValueSpace(" + name + ")"))
-{
-
-    for (const auto & nt : name_types)
-    {
-        names_and_types.emplace_back(nt.name, nt.type);
-        split_columns.emplace_back(nt.name, nt.type);
-    }
-    for (const auto & desc : sort_desc)
-        sort_column_names.insert(desc.column_name);
-
-    num_columns = split_columns.size();
-
-    LOG_TRACE(log, "MM create");
-}
-
-Ids MemoryValueSpace::addFromInsert(const Block & block)
-{
-    if (unlikely(block.columns() < split_columns.size()))
-        throw Exception("Not enough columns!");
-
-    Ids    ids;
-    UInt64 id = split_columns.front().size();
-    for (size_t i = 0; i < block.rows(); ++i)
-        ids.push_back(id++);
-    for (auto & split_column : split_columns)
-        split_column.append(*block.getByName(split_column.name).column);
-
-    return ids;
-}
-
-RefTuples MemoryValueSpace::addFromModify(const Block & block)
-{
-    if (unlikely(block.columns() < split_columns.size()))
-        throw Exception("Not enough columns!");
-
-    RefTuples tuples;
-    auto      rows = block.rows();
-
-    std::vector<std::vector<UInt64>> idmap;
-    for (size_t column_id = 0; column_id < split_columns.size(); ++column_id)
-    {
-        auto & split_column = split_columns[column_id];
-        auto   offset       = split_column.size();
-        if (sort_column_names.find(split_column.name) == sort_column_names.end() && block.has(split_column.name))
-        {
-            split_column.append(*block.getByName(split_column.name).column);
-
-            idmap.emplace_back();
-            auto & ids = idmap[column_id];
-            for (size_t row_id = 0; row_id < rows; ++row_id)
-                ids.push_back(offset++);
-        }
-        else
-        {
-            idmap.emplace_back(rows, INVALID_ID);
-        }
-    }
-
-    for (size_t row_id = 0; row_id < rows; ++row_id)
-    {
-        ColumnAndValues values;
-        for (size_t column_id = 0; column_id < split_columns.size(); ++column_id)
-        {
-            auto value_id = idmap[column_id][row_id];
-            if (value_id != INVALID_ID)
-                values.emplace_back(column_id, value_id);
-        }
-        tuples.emplace_back(values);
-    }
-
-    return tuples;
-}
-
-void MemoryValueSpace::removeFromInsert(UInt64 id)
-{
-    for (size_t i = 0; i < num_columns; ++i)
-        split_columns[i].remove(id);
-}
-
-void MemoryValueSpace::removeFromModify(UInt64 id, size_t column_id)
-{
-    split_columns[column_id].remove(id);
-}
-
-UInt64 MemoryValueSpace::withModify(UInt64 old_tuple_id, const ValueSpace & modify_value_space, const RefTuple & tuple)
-{
-    // TODO improvement: in-place update for fixed size type, like numbers.
-    for (size_t column_id = 0; column_id < split_columns.size(); ++column_id)
-    {
-        auto & split_column = split_columns[column_id];
-        size_t new_value_id = INVALID_ID;
-        for (const auto & cv : tuple.values)
-        {
-            if (cv.column == column_id)
-            {
-                new_value_id = cv.value;
-                break;
-            }
-        }
-        if (new_value_id != INVALID_ID)
-        {
-            split_column.append(modify_value_space.split_columns[column_id], new_value_id);
-        }
-        else
-        {
-            split_column.append(this->split_columns[column_id], old_tuple_id);
-        }
-    }
-
-    removeFromInsert(old_tuple_id);
-
-    return split_columns.front().size() - 1;
-}
-
-void MemoryValueSpace::gc()
-{
-    for (auto & sc : split_columns)
-        sc.gc();
-}
-
 } // namespace DB
\ No newline at end of file
diff --git a/dbms/src/Storages/DeltaMerge/tests/CMakeLists.txt b/dbms/src/Storages/DeltaMerge/tests/CMakeLists.txt
index 7e9af1fcb2b..64235e768e1 100644
--- a/dbms/src/Storages/DeltaMerge/tests/CMakeLists.txt
+++ b/dbms/src/Storages/DeltaMerge/tests/CMakeLists.txt
@@ -1,4 +1,8 @@
 include_directories (${CMAKE_CURRENT_BINARY_DIR})
 
 add_executable (delta_tree delta_tree.cpp)
+add_executable (delta_merge_block_stream delta_merge_block_stream.cpp)
+add_executable (delta_merge_storage delta_merge_storage.cpp)
 target_link_libraries (delta_tree dbms)
+target_link_libraries (delta_merge_block_stream dbms)
+target_link_libraries (delta_merge_storage dbms)
diff --git a/dbms/src/Storages/DeltaMerge/tests/delta_merge_block_stream.cpp b/dbms/src/Storages/DeltaMerge/tests/delta_merge_block_stream.cpp
new file mode 100644
index 00000000000..e1fd8546817
--- /dev/null
+++ b/dbms/src/Storages/DeltaMerge/tests/delta_merge_block_stream.cpp
@@ -0,0 +1,845 @@
+#include <string>
+#include <iostream>
+
+#include <Columns/IColumn.h>
+#include <Core/Block.h>
+#include <Core/ColumnWithTypeAndName.h>
+#include <IO/ReadBufferFromFile.h>
+#include <IO/WriteBufferFromFile.h>
+#include <DataTypes/DataTypeString.h>
+#include <DataTypes/DataTypesNumber.h>
+#include <DataStreams/copyData.h>
+#include <Storages/StorageMemory.h>
+#include <Core/Field.h>
+#include <Interpreters/Context.h>
+#include <Storages/ColumnsDescription.h>
+#include <Storages/DeltaMerge/DeltaMergeBlockInputStream.h>
+#include <Storages/DeltaMerge/DeltaMergeDefines.h>
+#include <Core/SortDescription.h>
+#include <Storages/DeltaMerge/DeltaMergeBlockOutputStream.h>
+#include <Storages/DeltaMerge/DeltaTree.h>
+
+using namespace DB;
+
+std::string treeToString(DeltaTreePtr tree)
+{
+    std::string result = "";
+    std::string temp;
+    for (auto it = tree->begin(), end = tree->end(); it != end; ++it)
+    {
+        temp = "";
+        temp += "(";
+        temp += std::to_string(it.getRid());
+        temp += "|";
+        temp += std::to_string(it.getSid());
+        temp += "|";
+        temp += DTTypeString(it.getMutation().type);
+        temp += "|";
+        temp += DB::toString(it.getMutation().value);
+        temp += "),";
+        result += temp;
+    }
+    return result;
+}
+
+void deltaTreeInsertTest(Context context)
+{
+    Block sample;
+
+    ColumnWithTypeAndName col1;
+    col1.name                   = "col1";
+    col1.type                   = std::make_shared<DataTypeUInt64>();
+    IColumn::MutablePtr tempcol = col1.type->createColumn();
+
+
+    for (int i = 0; i < 100; i++)
+    {
+        Field field = UInt64(i * 2);
+        tempcol->insert(field);
+    }
+
+    col1.column = std::move(tempcol);
+
+    sample.insert(col1);
+
+    Block sample2;
+
+    ColumnWithTypeAndName col2;
+    col2.name                    = "col1";
+    col2.type                    = std::make_shared<DataTypeUInt64>();
+    IColumn::MutablePtr tempcol2 = col2.type->createColumn();
+
+
+    for (int i = 0; i < 100; i++)
+    {
+        Field field = UInt64(i * 2 + 1);
+        tempcol2->insert(field);
+    }
+
+    col2.column = std::move(tempcol2);
+
+    sample2.insert(col2);
+
+
+    NamesAndTypesList names_and_types_list{
+        {"col1", std::make_shared<DataTypeUInt64>()},
+    };
+
+    DataTypes data_types;
+    Names     column_names;
+
+    for (const auto & name_type : names_and_types_list)
+    {
+        data_types.push_back(name_type.type);
+        column_names.push_back(name_type.name);
+    }
+
+    // create table
+    StoragePtr storage = StorageMemory::create("mytemptable", ColumnsDescription{names_and_types_list});
+    storage->startup();
+
+    BlockOutputStreamPtr output = storage->write(ASTPtr(), context.getSettingsRef());
+
+    output->writePrefix();
+
+    output->write(sample);
+
+    output->writeSuffix();
+
+    QueryProcessingStage::Enum stage;
+
+    BlockInputStreamPtr in = storage->read(column_names, {}, context, stage, 8192, 1)[0];
+
+    ValueSpacePtr insert_value_space = std::make_shared<MemoryValueSpace>("insert_value_space", names_and_types_list, SortDescription{});
+    ValueSpacePtr modify_value_space = std::make_shared<MemoryValueSpace>("modify_value_space", names_and_types_list, SortDescription{});
+    DeltaTreePtr  delta_tree         = std::make_shared<DefaultDeltaTree>(insert_value_space, modify_value_space);
+
+    Ids id_vec = insert_value_space->addFromInsert(sample2);
+    for (unsigned int i = 0; i < id_vec.size(); i++)
+    {
+        delta_tree->addInsert(2 * i + 1, id_vec[i]);
+    }
+
+    DeltaMergeBlockInputStream dms(in, delta_tree, 8192);
+
+    dms.readPrefix();
+
+    while (Block block = dms.read())
+    {
+        for (auto iter = block.begin(); iter != block.end(); iter++)
+        {
+            auto c = iter->column;
+            for (unsigned int i = 0; i < c->size(); i++)
+            {
+                assert(c->getUInt(i) == i);
+            }
+        }
+    }
+
+    dms.readSuffix();
+}
+
+void deltaTreeUpdateTest(Context context)
+{
+    Block sample;
+
+    ColumnWithTypeAndName col1;
+    col1.name                   = "col1";
+    col1.type                   = std::make_shared<DataTypeUInt64>();
+    IColumn::MutablePtr tempcol = col1.type->createColumn();
+
+
+    for (int i = 0; i < 100; i++)
+    {
+        Field field = UInt64(i * 2);
+        tempcol->insert(field);
+    }
+
+    col1.column = std::move(tempcol);
+
+    sample.insert(col1);
+
+    Block sample2;
+
+    ColumnWithTypeAndName col2;
+    col2.name                    = "col1";
+    col2.type                    = std::make_shared<DataTypeUInt64>();
+    IColumn::MutablePtr tempcol2 = col2.type->createColumn();
+
+
+    for (int i = 0; i < 100; i++)
+    {
+        Field field = UInt64(i * 2 + 1);
+        tempcol2->insert(field);
+    }
+
+    col2.column = std::move(tempcol2);
+
+    sample2.insert(col2);
+
+
+    NamesAndTypesList names_and_types_list{
+        {"col1", std::make_shared<DataTypeUInt64>()},
+    };
+
+    DataTypes data_types;
+    Names     column_names;
+
+    for (const auto & name_type : names_and_types_list)
+    {
+        data_types.push_back(name_type.type);
+        column_names.push_back(name_type.name);
+    }
+
+    // create table
+    StoragePtr storage = StorageMemory::create("mytemptable", ColumnsDescription{names_and_types_list});
+    storage->startup();
+
+    BlockOutputStreamPtr output = storage->write(ASTPtr(), context.getSettingsRef());
+
+    output->writePrefix();
+
+    output->write(sample);
+
+    output->writeSuffix();
+
+    QueryProcessingStage::Enum stage;
+
+    BlockInputStreamPtr in = storage->read(column_names, {}, context, stage, 8192, 1)[0];
+
+    ValueSpacePtr insert_value_space = std::make_shared<MemoryValueSpace>("insert_value_space", names_and_types_list, SortDescription{});
+    ValueSpacePtr modify_value_space = std::make_shared<MemoryValueSpace>("modify_value_space", names_and_types_list, SortDescription{});
+    DeltaTreePtr  delta_tree         = std::make_shared<DefaultDeltaTree>(insert_value_space, modify_value_space);
+
+    RefTuples rts = insert_value_space->addFromModify(sample2);
+    for (unsigned int i = 0; i < rts.size(); i++)
+    {
+        delta_tree->addModify(i, rts[i]);
+    }
+
+    //std::cout << treeToString(delta_tree) << std::endl;
+
+    DeltaMergeBlockInputStream dms(in, delta_tree, 8192);
+
+    dms.readPrefix();
+
+    while (Block block = dms.read())
+    {
+        for (auto iter = block.begin(); iter != block.end(); iter++)
+        {
+            auto c = iter->column;
+            for (unsigned int i = 0; i < c->size(); i++)
+            {
+                //std::cout << c->getUInt(i) << std::endl;
+                assert(c->getUInt(i) == i * 2 + 1);
+            }
+        }
+    }
+
+    dms.readSuffix();
+}
+
+void deltaTreeDeleteTest(Context context)
+{
+    Block                 sample;
+    ColumnWithTypeAndName col1;
+    col1.name                   = "col1";
+    col1.type                   = std::make_shared<DataTypeUInt64>();
+    IColumn::MutablePtr tempcol = col1.type->createColumn();
+
+    for (int i = 0; i < 100; i++)
+    {
+        Field field = UInt64(i);
+        tempcol->insert(field);
+    }
+
+    col1.column = std::move(tempcol);
+    sample.insert(col1);
+
+    NamesAndTypesList names_and_types_list{
+        {"col1", std::make_shared<DataTypeUInt64>()},
+    };
+
+    DataTypes data_types;
+    Names     column_names;
+
+    for (const auto & name_type : names_and_types_list)
+    {
+        data_types.push_back(name_type.type);
+        column_names.push_back(name_type.name);
+    }
+
+    // create table
+    StoragePtr storage = StorageMemory::create("mytemptable", ColumnsDescription{names_and_types_list});
+    storage->startup();
+
+    BlockOutputStreamPtr output = storage->write(ASTPtr(), context.getSettingsRef());
+
+    output->writePrefix();
+
+    output->write(sample);
+
+    output->writeSuffix();
+
+    QueryProcessingStage::Enum stage;
+
+    BlockInputStreamPtr in = storage->read(column_names, {}, context, stage, 8192, 1)[0];
+
+    ValueSpacePtr insert_value_space = std::make_shared<MemoryValueSpace>("insert_value_space", names_and_types_list, SortDescription{});
+    ValueSpacePtr modify_value_space = std::make_shared<MemoryValueSpace>("modify_value_space", names_and_types_list, SortDescription{});
+    DeltaTreePtr  delta_tree         = std::make_shared<DefaultDeltaTree>(insert_value_space, modify_value_space);
+
+    for (unsigned int i = 0; i < 50; i++)
+    {
+        delta_tree->addDelete(0);
+    }
+
+    DeltaMergeBlockInputStream dms(in, delta_tree, 8192);
+
+    dms.readPrefix();
+
+    while (Block block = dms.read())
+    {
+        for (auto iter = block.begin(); iter != block.end(); iter++)
+        {
+            auto c = iter->column;
+            for (unsigned int i = 0; i < c->size(); i++)
+            {
+                //std::cout << i+50 << std::endl;
+                assert(c->getUInt(i) == i + 50);
+            }
+        }
+    }
+
+    dms.readSuffix();
+}
+
+void deltaTreeOutputInsertTest(Context context)
+{
+    // block write into stable storage
+    Block                 sample;
+    ColumnWithTypeAndName col1;
+    col1.name                     = "col1";
+    col1.type                     = std::make_shared<DataTypeUInt64>();
+    IColumn::MutablePtr   tempcol = col1.type->createColumn();
+    ColumnWithTypeAndName col2;
+    col2.name                    = "col2";
+    col2.type                    = std::make_shared<DataTypeString>();
+    IColumn::MutablePtr tempcol2 = col2.type->createColumn();
+
+    for (int i = 0; i < 100; i++)
+    {
+        Field field = UInt64(2 * i);
+        tempcol->insert(field);
+    }
+
+    for (int i = 0; i < 100; i++)
+    {
+        Field field("a", 1);
+        tempcol2->insert(field);
+    }
+
+    col1.column = std::move(tempcol);
+    col2.column = std::move(tempcol2);
+    sample.insert(col1);
+    sample.insert(col2);
+
+    NamesAndTypesList names_and_types_list{
+        {"col1", std::make_shared<DataTypeUInt64>()}, {"col2", std::make_shared<DataTypeString>()},
+    };
+
+    DataTypes data_types;
+    Names     column_names;
+
+    for (const auto & name_type : names_and_types_list)
+    {
+        data_types.push_back(name_type.type);
+        column_names.push_back(name_type.name);
+    }
+
+    // create table
+    StoragePtr storage = StorageMemory::create("mytemptable", ColumnsDescription{names_and_types_list});
+    storage->startup();
+
+    BlockOutputStreamPtr output = storage->write(ASTPtr(), context.getSettingsRef());
+
+    output->writePrefix();
+    output->write(sample);
+    output->writeSuffix();
+
+    DeltaMergeBlockOutputStream::InputStreamCreator input_stream_creator = [storage, column_names, context]() {
+        auto stage = QueryProcessingStage::Enum::FetchColumns;
+        return storage->read(column_names, {}, context, stage, 8192, 1)[0];
+    };
+
+    SortDescription sd{};
+    sd.emplace_back("col1", 1, 1);
+
+    ValueSpacePtr insert_value_space = std::make_shared<MemoryValueSpace>("insert_value_space", names_and_types_list, sd);
+    ValueSpacePtr modify_value_space = std::make_shared<MemoryValueSpace>("modify_value_space", names_and_types_list, sd);
+    DeltaTreePtr  delta_tree         = std::make_shared<DefaultDeltaTree>(insert_value_space, modify_value_space);
+
+    BlockOutputStreamPtr delta_output_stream
+        = std::make_shared<DeltaMergeBlockOutputStream>(input_stream_creator, delta_tree, Action::Insert, sd, []() {}, 10000);
+
+    // data write through DeltaMergeBlockOutputStream
+    Block                 sample_delta;
+    ColumnWithTypeAndName col1_delta;
+    col1_delta.name                     = "col1";
+    col1_delta.type                     = std::make_shared<DataTypeUInt64>();
+    IColumn::MutablePtr   tempcol_delta = col1_delta.type->createColumn();
+    ColumnWithTypeAndName col2_delta;
+    col2_delta.name                    = "col2";
+    col2_delta.type                    = std::make_shared<DataTypeString>();
+    IColumn::MutablePtr tempcol2_delta = col2.type->createColumn();
+
+    for (int i = 0; i < 100; i++)
+    {
+        Field field = UInt64(2 * i + 1);
+        tempcol_delta->insert(field);
+    }
+
+    for (int i = 0; i < 100; i++)
+    {
+        Field field("a", 1);
+        tempcol2_delta->insert(field);
+    }
+    col1_delta.column = std::move(tempcol_delta);
+    col2_delta.column = std::move(tempcol2_delta);
+    sample_delta.insert(col1_delta);
+    sample_delta.insert(col2_delta);
+
+    delta_output_stream->write(sample_delta);
+
+    QueryProcessingStage::Enum stage2;
+    BlockInputStreamPtr        in = storage->read(column_names, {}, context, stage2, 8192, 1)[0];
+    DeltaMergeBlockInputStream dms(in, delta_tree, 8192);
+
+    dms.readPrefix();
+
+    while (Block block = dms.read())
+    {
+        for (auto iter = block.begin(); iter != block.end(); iter++)
+        {
+            auto c = iter->column;
+            for (unsigned int i = 0; i < c->size(); i++)
+            {
+                if (iter->name == "col1")
+                {
+                    //std::cout << c->getUInt(i) << std::endl;
+                    assert(c->getUInt(i) == i);
+                }
+                else if (iter->name == "col2")
+                {
+                    //std::cout << c->getDataAt(i) << std::endl;
+                    assert(c->getDataAt(i) == "a");
+                }
+            }
+        }
+    }
+
+    dms.readSuffix();
+}
+
+void deltaTreeOutputUpsertTest(Context context)
+{
+    // block write into stable storage
+    Block                 sample;
+    ColumnWithTypeAndName col1;
+    col1.name                     = "col1";
+    col1.type                     = std::make_shared<DataTypeUInt64>();
+    IColumn::MutablePtr   tempcol = col1.type->createColumn();
+    ColumnWithTypeAndName col2;
+    col2.name                    = "col2";
+    col2.type                    = std::make_shared<DataTypeString>();
+    IColumn::MutablePtr tempcol2 = col2.type->createColumn();
+
+    for (int i = 0; i < 100; i++)
+    {
+        Field field = UInt64(2 * i);
+        tempcol->insert(field);
+    }
+
+    for (int i = 0; i < 100; i++)
+    {
+        Field field("a", 1);
+        tempcol2->insert(field);
+    }
+
+    col1.column = std::move(tempcol);
+    col2.column = std::move(tempcol2);
+    sample.insert(col1);
+    sample.insert(col2);
+
+    NamesAndTypesList names_and_types_list{
+        {"col1", std::make_shared<DataTypeUInt64>()}, {"col2", std::make_shared<DataTypeString>()},
+    };
+
+    DataTypes data_types;
+    Names     column_names;
+
+    for (const auto & name_type : names_and_types_list)
+    {
+        data_types.push_back(name_type.type);
+        column_names.push_back(name_type.name);
+    }
+
+    // create table
+    StoragePtr storage = StorageMemory::create("mytemptable", ColumnsDescription{names_and_types_list});
+    storage->startup();
+
+    BlockOutputStreamPtr output = storage->write(ASTPtr(), context.getSettingsRef());
+
+    output->writePrefix();
+    output->write(sample);
+    output->writeSuffix();
+
+    DeltaMergeBlockOutputStream::InputStreamCreator input_stream_creator = [storage, column_names, context]() {
+        auto stage = QueryProcessingStage::Enum::FetchColumns;
+        return storage->read(column_names, {}, context, stage, 8192, 1)[0];
+    };
+
+    SortDescription sd{};
+    sd.emplace_back("col1", 1, 1);
+
+    ValueSpacePtr insert_value_space = std::make_shared<MemoryValueSpace>("insert_value_space", names_and_types_list, sd);
+    ValueSpacePtr modify_value_space = std::make_shared<MemoryValueSpace>("modify_value_space", names_and_types_list, sd);
+    DeltaTreePtr  delta_tree         = std::make_shared<DefaultDeltaTree>(insert_value_space, modify_value_space);
+
+    BlockOutputStreamPtr delta_output_stream
+        = std::make_shared<DeltaMergeBlockOutputStream>(input_stream_creator, delta_tree, Action::Upsert, sd, []() {}, 10000);
+
+    // data write through DeltaMergeBlockOutputStream
+    Block                 sample_delta;
+    ColumnWithTypeAndName col1_delta;
+    col1_delta.name                     = "col1";
+    col1_delta.type                     = std::make_shared<DataTypeUInt64>();
+    IColumn::MutablePtr   tempcol_delta = col1_delta.type->createColumn();
+    ColumnWithTypeAndName col2_delta;
+    col2_delta.name                    = "col2";
+    col2_delta.type                    = std::make_shared<DataTypeString>();
+    IColumn::MutablePtr tempcol2_delta = col2.type->createColumn();
+
+    for (int i = 0; i < 100; i++)
+    {
+        Field field = UInt64(2 * i);
+        tempcol_delta->insert(field);
+    }
+
+    for (int i = 0; i < 100; i++)
+    {
+        Field field("b", 1);
+        tempcol2_delta->insert(field);
+    }
+    col1_delta.column = std::move(tempcol_delta);
+    col2_delta.column = std::move(tempcol2_delta);
+    sample_delta.insert(col1_delta);
+    sample_delta.insert(col2_delta);
+
+    delta_output_stream->write(sample_delta);
+
+    QueryProcessingStage::Enum stage2;
+    BlockInputStreamPtr        in = storage->read(column_names, {}, context, stage2, 8192, 1)[0];
+    DeltaMergeBlockInputStream dms(in, delta_tree, 8192);
+
+    dms.readPrefix();
+
+    while (Block block = dms.read())
+    {
+        for (auto iter = block.begin(); iter != block.end(); iter++)
+        {
+            auto c = iter->column;
+            for (unsigned int i = 0; i < c->size(); i++)
+            {
+                if (iter->name == "col1")
+                {
+                    //std::cout << c->getUInt(i) << std::endl;
+                    assert(c->getUInt(i) == 2 * i);
+                }
+                else if (iter->name == "col2")
+                {
+                    //std::cout << c->getDataAt(i) << std::endl;
+                    assert(c->getDataAt(i) == "b");
+                }
+            }
+        }
+    }
+
+    dms.readSuffix();
+}
+
+void deltaTreeOutputDeleteTest(Context context)
+{
+    // block write into stable storage
+    Block                 sample;
+    ColumnWithTypeAndName col1;
+    col1.name                     = "col1";
+    col1.type                     = std::make_shared<DataTypeUInt64>();
+    IColumn::MutablePtr   tempcol = col1.type->createColumn();
+    ColumnWithTypeAndName col2;
+    col2.name                    = "col2";
+    col2.type                    = std::make_shared<DataTypeString>();
+    IColumn::MutablePtr tempcol2 = col2.type->createColumn();
+
+    for (int i = 0; i < 100; i++)
+    {
+        Field field = UInt64(2 * i);
+        tempcol->insert(field);
+    }
+
+    for (int i = 0; i < 100; i++)
+    {
+        Field field("a", 1);
+        tempcol2->insert(field);
+    }
+
+    col1.column = std::move(tempcol);
+    col2.column = std::move(tempcol2);
+    sample.insert(col1);
+    sample.insert(col2);
+
+    NamesAndTypesList names_and_types_list{
+        {"col1", std::make_shared<DataTypeUInt64>()}, {"col2", std::make_shared<DataTypeString>()},
+    };
+
+    DataTypes data_types;
+    Names     column_names;
+
+    for (const auto & name_type : names_and_types_list)
+    {
+        data_types.push_back(name_type.type);
+        column_names.push_back(name_type.name);
+    }
+
+    // create table
+    StoragePtr storage = StorageMemory::create("mytemptable", ColumnsDescription{names_and_types_list});
+    storage->startup();
+
+    BlockOutputStreamPtr output = storage->write(ASTPtr(), context.getSettingsRef());
+
+    output->writePrefix();
+    output->write(sample);
+    output->writeSuffix();
+
+    DeltaMergeBlockOutputStream::InputStreamCreator input_stream_creator = [storage, column_names, context]() {
+        auto stage = QueryProcessingStage::Enum::FetchColumns;
+        return storage->read(column_names, {}, context, stage, 8192, 1)[0];
+    };
+
+    SortDescription sd{};
+    sd.emplace_back("col1", 1, 1);
+
+    ValueSpacePtr insert_value_space = std::make_shared<MemoryValueSpace>("insert_value_space", names_and_types_list, sd);
+    ValueSpacePtr modify_value_space = std::make_shared<MemoryValueSpace>("modify_value_space", names_and_types_list, sd);
+    DeltaTreePtr  delta_tree         = std::make_shared<DefaultDeltaTree>(insert_value_space, modify_value_space);
+
+    BlockOutputStreamPtr delta_output_stream
+        = std::make_shared<DeltaMergeBlockOutputStream>(input_stream_creator, delta_tree, Action::Delete, sd, []() {}, 10000);
+
+    // data write through DeltaMergeBlockOutputStream
+    Block                 sample_delta;
+    ColumnWithTypeAndName col1_delta;
+    col1_delta.name                     = "col1";
+    col1_delta.type                     = std::make_shared<DataTypeUInt64>();
+    IColumn::MutablePtr   tempcol_delta = col1_delta.type->createColumn();
+    ColumnWithTypeAndName col2_delta;
+    col2_delta.name                    = "col2";
+    col2_delta.type                    = std::make_shared<DataTypeString>();
+    IColumn::MutablePtr tempcol2_delta = col2.type->createColumn();
+
+    for (int i = 0; i < 100; i++)
+    {
+        Field field = UInt64(2 * i);
+        tempcol_delta->insert(field);
+    }
+
+    for (int i = 0; i < 100; i++)
+    {
+        Field field("a", 1);
+        tempcol2_delta->insert(field);
+    }
+    col1_delta.column = std::move(tempcol_delta);
+    col2_delta.column = std::move(tempcol2_delta);
+    sample_delta.insert(col1_delta);
+    sample_delta.insert(col2_delta);
+
+    delta_output_stream->write(sample_delta);
+
+    QueryProcessingStage::Enum stage2;
+    BlockInputStreamPtr        in = storage->read(column_names, {}, context, stage2, 8192, 1)[0];
+    DeltaMergeBlockInputStream dms(in, delta_tree, 8192);
+
+    dms.readPrefix();
+
+    while (Block block = dms.read())
+    {
+        for (auto iter = block.begin(); iter != block.end(); iter++)
+        {
+            auto c = iter->column;
+            for (unsigned int i = 0; i < c->size(); i++)
+            {
+                if (iter->name == "col1")
+                {
+                    std::cout << c->getUInt(i) << std::endl;
+                    //assert(c->getUInt(i) == i);
+                }
+                else if (iter->name == "col2")
+                {
+                    std::cout << c->getDataAt(i) << std::endl;
+                    //assert(c->getDataAt(i) == "a");
+                }
+            }
+        }
+    }
+
+    dms.readSuffix();
+}
+
+void deltaTreeOutputUpdateTest(Context context)
+{
+    // block write into stable storage
+    Block                 sample;
+    ColumnWithTypeAndName col1;
+    col1.name                     = "col1";
+    col1.type                     = std::make_shared<DataTypeUInt64>();
+    IColumn::MutablePtr   tempcol = col1.type->createColumn();
+    ColumnWithTypeAndName col2;
+    col2.name                    = "col2";
+    col2.type                    = std::make_shared<DataTypeString>();
+    IColumn::MutablePtr tempcol2 = col2.type->createColumn();
+
+    for (int i = 0; i < 100; i++)
+    {
+        Field field = UInt64(2 * i);
+        tempcol->insert(field);
+    }
+
+    for (int i = 0; i < 100; i++)
+    {
+        Field field("a", 1);
+        tempcol2->insert(field);
+    }
+
+    col1.column = std::move(tempcol);
+    col2.column = std::move(tempcol2);
+    sample.insert(col1);
+    sample.insert(col2);
+
+    NamesAndTypesList names_and_types_list{
+        {"col1", std::make_shared<DataTypeUInt64>()}, {"col2", std::make_shared<DataTypeString>()},
+    };
+
+    DataTypes data_types;
+    Names     column_names;
+
+    for (const auto & name_type : names_and_types_list)
+    {
+        data_types.push_back(name_type.type);
+        column_names.push_back(name_type.name);
+    }
+
+    // create table
+    StoragePtr storage = StorageMemory::create("mytemptable", ColumnsDescription{names_and_types_list});
+    storage->startup();
+
+    BlockOutputStreamPtr output = storage->write(ASTPtr(), context.getSettingsRef());
+
+    output->writePrefix();
+    output->write(sample);
+    output->writeSuffix();
+
+    DeltaMergeBlockOutputStream::InputStreamCreator input_stream_creator = [storage, column_names, context]() {
+        auto stage = QueryProcessingStage::Enum::FetchColumns;
+        return storage->read(column_names, {}, context, stage, 8192, 1)[0];
+    };
+
+    SortDescription sd{};
+    sd.emplace_back("col1", 1, 1);
+
+    ValueSpacePtr insert_value_space = std::make_shared<MemoryValueSpace>("insert_value_space", names_and_types_list, sd);
+    ValueSpacePtr modify_value_space = std::make_shared<MemoryValueSpace>("modify_value_space", names_and_types_list, sd);
+    DeltaTreePtr  delta_tree         = std::make_shared<DefaultDeltaTree>(insert_value_space, modify_value_space);
+
+    BlockOutputStreamPtr delta_output_stream
+        = std::make_shared<DeltaMergeBlockOutputStream>(input_stream_creator, delta_tree, Action::Update, sd, []() {}, 10000);
+
+    // data write through DeltaMergeBlockOutputStream
+    Block                 sample_delta;
+    ColumnWithTypeAndName col1_delta;
+    col1_delta.name                     = "col1";
+    col1_delta.type                     = std::make_shared<DataTypeUInt64>();
+    IColumn::MutablePtr   tempcol_delta = col1_delta.type->createColumn();
+    ColumnWithTypeAndName col2_delta;
+    col2_delta.name                    = "col2";
+    col2_delta.type                    = std::make_shared<DataTypeString>();
+    IColumn::MutablePtr tempcol2_delta = col2.type->createColumn();
+
+    for (int i = 0; i < 100; i++)
+    {
+        Field field = UInt64(2 * i);
+        tempcol_delta->insert(field);
+    }
+
+    for (int i = 0; i < 100; i++)
+    {
+        Field field("b", 1);
+        tempcol2_delta->insert(field);
+    }
+    col1_delta.column = std::move(tempcol_delta);
+    col2_delta.column = std::move(tempcol2_delta);
+    sample_delta.insert(col1_delta);
+    sample_delta.insert(col2_delta);
+
+    delta_output_stream->write(sample_delta);
+
+    QueryProcessingStage::Enum stage2;
+    BlockInputStreamPtr        in = storage->read(column_names, {}, context, stage2, 8192, 1)[0];
+    DeltaMergeBlockInputStream dms(in, delta_tree, 8192);
+
+    dms.readPrefix();
+
+    while (Block block = dms.read())
+    {
+        for (auto iter = block.begin(); iter != block.end(); iter++)
+        {
+            auto c = iter->column;
+            for (unsigned int i = 0; i < c->size(); i++)
+            {
+                if (iter->name == "col1")
+                {
+                    std::cout << c->getUInt(i) << std::endl;
+                    assert(c->getUInt(i) == 2 * i);
+                }
+                else if (iter->name == "col2")
+                {
+                    std::cout << c->getDataAt(i) << std::endl;
+                    //assert(c->getDataAt(i) == "b");
+                }
+            }
+        }
+    }
+
+    dms.readSuffix();
+}
+
+int main(int, char **) try
+{
+    Context context = Context::createGlobal();
+    deltaTreeInsertTest(context);
+    deltaTreeUpdateTest(context);
+    deltaTreeDeleteTest(context);
+    deltaTreeOutputInsertTest(context);
+    std::cout << "after deltaTreeOutputInsertTest test\n";
+    deltaTreeOutputUpsertTest(context);
+    std::cout << "after deltaTreeOutputUpsertTest test\n";
+    deltaTreeOutputDeleteTest(context);
+    std::cout << "after deltaTreeOutputDeleteTest test\n";
+    deltaTreeOutputUpdateTest(context);
+    std::cout << "after deltaTreeOutputUpdateTest test\n";
+    std::cout << "test complete\n";
+}
+catch (const DB::Exception & e)
+{
+    std::cerr << e.what() << ", " << e.displayText() << std::endl;
+    return 1;
+}
diff --git a/dbms/src/Storages/DeltaMerge/tests/delta_merge_storage.cpp b/dbms/src/Storages/DeltaMerge/tests/delta_merge_storage.cpp
new file mode 100644
index 00000000000..9d6db5d01d7
--- /dev/null
+++ b/dbms/src/Storages/DeltaMerge/tests/delta_merge_storage.cpp
@@ -0,0 +1,119 @@
+#include <string>
+#include <iostream>
+
+#include <Columns/IColumn.h>
+#include <Core/Block.h>
+#include <Core/ColumnWithTypeAndName.h>
+#include <IO/ReadBufferFromFile.h>
+#include <IO/WriteBufferFromFile.h>
+#include <DataTypes/DataTypeString.h>
+#include <DataTypes/DataTypesNumber.h>
+#include <DataStreams/copyData.h>
+#include <Core/Field.h>
+#include <Interpreters/Context.h>
+#include <Storages/ColumnsDescription.h>
+#include <Storages/DeltaMerge/DeltaMergeBlockInputStream.h>
+#include <Storages/DeltaMerge/DeltaMergeDefines.h>
+#include <Core/SortDescription.h>
+#include <Parsers/ASTIdentifier.h>
+#include <Parsers/ASTInsertQuery.h>
+#include <Storages/DeltaMerge/DeltaMergeBlockOutputStream.h>
+#include <Storages/DeltaMerge/DeltaTree.h>
+#include <Storages/StorageDeltaMerge.h>
+
+using namespace DB;
+
+int main(int, char **) try
+{
+    // block write into stable storage
+    Block                 sample;
+    ColumnWithTypeAndName col1;
+    col1.name                   = "col1";
+    col1.type                   = std::make_shared<DataTypeUInt64>();
+    IColumn::MutablePtr   m_col = col1.type->createColumn();
+    ColumnWithTypeAndName col2;
+    col2.name                  = "col2";
+    col2.type                  = std::make_shared<DataTypeString>();
+    IColumn::MutablePtr m_col2 = col2.type->createColumn();
+
+    // insert form large to small
+    for (int i = 0; i < 100; i++)
+    {
+        Field field = UInt64(99 - i);
+        m_col->insert(field);
+    }
+
+    for (int i = 0; i < 100; i++)
+    {
+        Field field("a", 1);
+        m_col2->insert(field);
+    }
+
+    col1.column = std::move(m_col);
+    col2.column = std::move(m_col2);
+    sample.insert(col1);
+    sample.insert(col2);
+
+    NamesAndTypesList names_and_types_list{
+        {"col1", std::make_shared<DataTypeUInt64>()}, {"col2", std::make_shared<DataTypeString>()},
+    };
+
+    DataTypes data_types;
+    Names     column_names;
+
+    for (const auto & name_type : names_and_types_list)
+    {
+        data_types.push_back(name_type.type);
+        column_names.push_back(name_type.name);
+    }
+
+    ASTPtr astptr(new ASTIdentifier("mytemptable", ASTIdentifier::Kind::Table));
+    astptr->children.emplace_back(new ASTIdentifier("col1"));
+
+    ASTPtr insertptr(new ASTInsertQuery());
+
+    Context context = Context::createGlobal();
+
+    StoragePtr storage = StorageDeltaMerge::create(".", "mytemptable", ColumnsDescription{names_and_types_list}, astptr, false, 1000);
+    storage->startup();
+    BlockOutputStreamPtr output = storage->write(insertptr, context.getSettingsRef());
+
+    output->writePrefix();
+    output->write(sample);
+    output->writeSuffix();
+
+    QueryProcessingStage::Enum stage2;
+    BlockInputStreamPtr dms = storage->read(column_names, {}, context, stage2, 8192, 1)[0];
+
+    dms->readPrefix();
+
+    while (Block block = dms->read())
+    {
+        for (auto iter = block.begin(); iter != block.end(); iter++)
+        {
+            auto c = iter->column;
+            for (unsigned int i = 0; i < c->size(); i++)
+            {
+                if (iter->name == "col1")
+                {
+                    //std::cout << c->getUInt(i) << std::endl;
+                    assert(c->getUInt(i) == i);
+                }
+                else if (iter->name == "col2")
+                {
+                    //std::cout << c->getDataAt(i) << std::endl;
+                    //assert(c->getDataAt(i) == "b");
+                }
+            }
+        }
+    }
+
+    dms->readSuffix();
+
+    std::cout << "test complete\n";
+}
+catch (const DB::Exception & e)
+{
+    std::cerr << e.what() << ", " << e.displayText() << std::endl;
+    return 1;
+}
diff --git a/dbms/src/Storages/DeltaMerge/tests/delta_tree.cpp b/dbms/src/Storages/DeltaMerge/tests/delta_tree.cpp
index 7d38312f27c..28c98a62205 100644
--- a/dbms/src/Storages/DeltaMerge/tests/delta_tree.cpp
+++ b/dbms/src/Storages/DeltaMerge/tests/delta_tree.cpp
@@ -1,9 +1,8 @@
 #include <cmath>
 #include <iostream>
 
-#include <Storages/DeltaMerge/DeltaTree.h>
 #include <Storages/DeltaMerge/DeltaMergeDefines.h>
-
+#include <Storages/DeltaMerge/DeltaTree.h>
 #include <Storages/DeltaMerge/Tuple.h>
 #include <Storages/DeltaMerge/ValueSpace.h>
 
@@ -12,7 +11,6 @@
 
 using namespace DB;
 
-
 class FakeValueSpace;
 using MyDeltaTree = DeltaTree<FakeValueSpace, 2, 10>;
 
@@ -67,6 +65,27 @@ void printTree(MyDeltaTree & tree)
     std::cout << std::endl;
 }
 
+std::string treeToString(MyDeltaTree & tree)
+{
+    std::string result = "";
+    std::string temp;
+    for (auto it = tree.begin(), end = tree.end(); it != end; ++it)
+    {
+        temp = "";
+        temp += "(";
+        temp += std::to_string(it.getRid());
+        temp += "|";
+        temp += std::to_string(it.getSid());
+        temp += "|";
+        temp += DTTypeString(it.getMutation().type);
+        temp += "|";
+        temp += DB::toString(it.getMutation().value);
+        temp += "),";
+        result += temp;
+    }
+    return result;
+}
+
 void insertTest(MyDeltaTree & tree)
 {
     for (int i = 0; i < 100; ++i)
@@ -165,15 +184,264 @@ void insertTest(MyDeltaTree & tree)
     printTree(tree);
 }
 
+
+void deleteAfterInsertTest(MyDeltaTree & tree)
+{
+    std::cout << "insert test2 begin====\n";
+
+    int batch_num = 100;
+
+    std::string expectedResult;
+    for (int i = 0; i < batch_num; ++i)
+    {
+        tree.addInsert(i, i);
+        tree.checkAll();
+        expectedResult += "(" + std::to_string(i) + "|0|INS|" + std::to_string(i) + "),";
+        assert(expectedResult == treeToString(tree));
+    }
+    std::cout << "after many insert 1\n";
+
+    expectedResult = "";
+
+    for (int i = 0; i < batch_num; ++i)
+    {
+        tree.addDelete(0);
+        tree.checkAll();
+        expectedResult = "";
+        for (int j = 0; j < batch_num - i - 1; j++)
+        {
+            expectedResult += "(" + std::to_string(j) + "|0|INS|" + std::to_string(j + i + 1) + "),";
+        }
+        //std::cout << expectedResult << std::endl;
+        //std::cout << treeToString(tree) << std::endl;
+        assert(expectedResult == treeToString(tree));
+    }
+
+    expectedResult = "";
+    assert(expectedResult == treeToString(tree));
+    std::cout << "after many delete 1\n";
+
+    for (int i = 0; i < batch_num; ++i)
+    {
+        tree.addInsert(0, i);
+        tree.checkAll();
+        expectedResult = "";
+        for (int j = 0; j <= i; j++)
+        {
+            expectedResult += "(" + std::to_string(j) + "|0|INS|" + std::to_string(i - j) + "),";
+        }
+        assert(expectedResult == treeToString(tree));
+    }
+    std::cout << "after many insert 2\n";
+
+    for (int i = batch_num - 1; i >= 0; --i)
+    {
+        tree.addDelete(i);
+        tree.checkAll();
+        expectedResult = "";
+        for (int j = 0; j < i; j++)
+        {
+            expectedResult += "(" + std::to_string(j) + "|0|INS|" + std::to_string(batch_num - j - 1) + "),";
+        }
+        //std::cout << expectedResult << std::endl;
+        //std::cout << treeToString(tree) << std::endl;
+        assert(expectedResult == treeToString(tree));
+    }
+    std::cout << "after many delete 2\n";
+}
+
+void deleteTest1(MyDeltaTree & tree)
+{
+    std::cout << "delete test begin====\n";
+
+    int batch_num = 100;
+
+    std::string expectedResult;
+    // delete stable from begin to end with merge
+    for (int i = 0; i < batch_num; ++i)
+    {
+        tree.addDelete(0);
+        tree.checkAll();
+        expectedResult = "(0|0|DEL|" + std::to_string(i + 1) + "),";
+        //std::cout << expectedResult << std::endl;
+        //std::cout << treeToString(tree) << std::endl;
+        assert(expectedResult == treeToString(tree));
+    }
+}
+
+
+void deleteTest2(MyDeltaTree & tree)
+{
+    std::cout << "delete test2 begin====\n";
+
+    int batch_num = 100;
+
+    std::string expectedResult;
+    // delete stable from end to begin
+    // this kind of delete behavior may be improved to trigger merge
+    for (int i = batch_num - 1; i >= 0; --i)
+    {
+        tree.addDelete(i);
+        tree.checkAll();
+        expectedResult = "";
+        for (int j = i; j < batch_num; j++)
+        {
+            expectedResult += "(" + std::to_string(i) + "|" + std::to_string(j) + "|DEL|1),";
+        }
+
+        //std::cout << expectedResult << std::endl;
+        //std::cout << treeToString(tree) << std::endl;
+        assert(expectedResult == treeToString(tree));
+    }
+}
+
+// insert skip delete entry
+void insertSkipDelete(MyDeltaTree & tree)
+{
+    std::cout << "delete test2 begin====\n";
+
+    int batch_num = 100;
+
+    tree.addDelete(0);
+
+    std::string expectedResult;
+
+    expectedResult = "(0|0|DEL|1),";
+    assert(expectedResult == treeToString(tree));
+
+    for (int i = 0; i < batch_num; ++i)
+    {
+        tree.addInsert(0, i);
+        tree.checkAll();
+        expectedResult = "(0|0|DEL|1),";
+        for (int j = 0; j <= i; j++)
+        {
+            expectedResult += "(" + std::to_string(j) + "|1|INS|" + std::to_string(i - j) + "),";
+        }
+
+        std::cout << expectedResult << std::endl;
+        std::cout << treeToString(tree) << std::endl;
+        assert(expectedResult == treeToString(tree));
+    }
+}
+
+// delete after update
+void deleteAfterUpdateTest(MyDeltaTree & tree)
+{
+    std::cout << "update test begin====\n";
+
+    int batch_num = 100;
+
+    std::string expectedResult;
+    std::string expectedResult2;
+    // multiple update to the same row and same column
+    for (int i = 0; i < batch_num; ++i)
+    {
+        tree.addModify(i, 0, 2 * i);
+        tree.checkAll();
+        expectedResult = expectedResult2 + "(" + std::to_string(i) + "|" + std::to_string(i) + "|0|" + std::to_string(2 * i) + "),";
+        std::cout << expectedResult << std::endl;
+        std::cout << treeToString(tree) << std::endl;
+        assert(expectedResult == treeToString(tree));
+
+        tree.addModify(i, 0, 2 * i + 1);
+        tree.checkAll();
+        expectedResult2 = expectedResult2 + "(" + std::to_string(i) + "|" + std::to_string(i) + "|0|" + std::to_string(2 * i + 1) + "),";
+        //std::cout << expectedResult2 << std::endl;
+        //std::cout << treeToString(tree) << std::endl;
+        assert(expectedResult2 == treeToString(tree));
+    }
+
+    for (int i = batch_num - 1; i >= 0; --i)
+    {
+        tree.addDelete(i);
+        tree.checkAll();
+        expectedResult = "";
+        for (int j = 0; j < i; j++)
+        {
+            expectedResult += "(" + std::to_string(j) + "|" + std::to_string(j) + "|0|" + std::to_string(2 * j + 1) + "),";
+        }
+        for (int j = i; j < batch_num; j++)
+        {
+            expectedResult += "(" + std::to_string(i) + "|" + std::to_string(j) + "|DEL|1),";
+        }
+        //std::cout << expectedResult << std::endl;
+        //std::cout << treeToString(tree) << std::endl;
+        assert(expectedResult == treeToString(tree));
+    }
+    std::cout << "after deleteAfterUpdateTest 1\n";
+}
+
+// update skip delete
+void updateSkipDelete(MyDeltaTree & tree)
+{
+    std::cout << "delete test2 begin====\n";
+
+    tree.addDelete(0);
+
+    std::string expectedResult;
+
+    expectedResult = "(0|0|DEL|1),";
+    assert(expectedResult == treeToString(tree));
+
+    tree.addModify(0, 0, 0);
+    tree.checkAll();
+    expectedResult = "(0|0|DEL|1),(0|1|0|0),";
+
+    std::cout << expectedResult << std::endl;
+    std::cout << treeToString(tree) << std::endl;
+    assert(expectedResult == treeToString(tree));
+
+    std::cout << "updateSkipDelete tests complete\n";
+}
+
+// in-place update
+void inplaceUpdate(MyDeltaTree & tree)
+{
+    std::cout << "insert test2 begin====\n";
+
+    int batch_num = 100;
+
+    std::string expectedResult;
+
+    for (int i = 0; i < batch_num; ++i)
+    {
+        tree.addInsert(i, i);
+        tree.checkAll();
+        expectedResult = expectedResult + "(" + std::to_string(i) + "|0|INS|" + std::to_string(i) + "),";
+        assert(expectedResult == treeToString(tree));
+        tree.addModify(i, 0, i);
+        tree.checkAll();
+        assert(expectedResult == treeToString(tree));
+    }
+
+    std::cout << "after in-place update delete 2\n";
+}
+
+
 int main(int, char **)
 {
     print_sizes();
     FakeValueSpacePtr insert_vs = std::make_shared<FakeValueSpace>();
     FakeValueSpacePtr modify_vs = std::make_shared<FakeValueSpace>();
     MyDeltaTree       delta_tree(insert_vs, modify_vs);
+    MyDeltaTree       delta_tree2(insert_vs, modify_vs);
+    MyDeltaTree       delta_tree3(insert_vs, modify_vs);
+    MyDeltaTree       delta_tree4(insert_vs, modify_vs);
+    MyDeltaTree       delta_tree5(insert_vs, modify_vs);
+    MyDeltaTree       delta_tree6(insert_vs, modify_vs);
+    MyDeltaTree       delta_tree7(insert_vs, modify_vs);
     try
     {
-        insertTest(delta_tree);
+        //insertTest(delta_tree);
+        deleteAfterInsertTest(delta_tree);
+        deleteTest1(delta_tree2);
+        deleteTest2(delta_tree3);
+        insertSkipDelete(delta_tree4);
+        deleteAfterUpdateTest(delta_tree5);
+        updateSkipDelete(delta_tree6);
+        inplaceUpdate(delta_tree7);
+        std::cout << "tests pass\n";
     }
     catch (const DB::Exception & ex)
     {
@@ -187,6 +455,5 @@ int main(int, char **)
     {
         std::cout << "Caught unhandled exception\n";
     }
-
     return 0;
 }