diff --git a/dbms/src/Flash/tests/gtest_executor.cpp b/dbms/src/Flash/tests/gtest_executor.cpp
index b4ba1a75563..d0e7b7e6c67 100644
--- a/dbms/src/Flash/tests/gtest_executor.cpp
+++ b/dbms/src/Flash/tests/gtest_executor.cpp
@@ -69,18 +69,18 @@ try
                        .filter(eq(col("s1"), col("s2")))
                        .build(context);
     {
-        executeStreams(request,
-                       {toNullableVec<String>({"banana"}),
-                        toNullableVec<String>({"banana"})});
+        ASSERT_COLUMNS_EQ_R(executeStreams(request),
+                            createColumns({toNullableVec<String>({"banana"}),
+                                           toNullableVec<String>({"banana"})}));
     }
 
     request = context.receive("exchange1")
                   .filter(eq(col("s1"), col("s2")))
                   .build(context);
     {
-        executeStreams(request,
-                       {toNullableVec<String>({"banana"}),
-                        toNullableVec<String>({"banana"})});
+        ASSERT_COLUMNS_EQ_R(executeStreams(request),
+                            createColumns({toNullableVec<String>({"banana"}),
+                                           toNullableVec<String>({"banana"})}));
     }
 }
 CATCH
@@ -99,25 +99,23 @@ try
                           "  table_scan_0 | {<0, String>, <1, String>}\n"
                           "  table_scan_1 | {<0, String>, <1, String>}\n";
         ASSERT_DAGREQUEST_EQAUL(expected, request);
-        executeStreams(request,
-                       {toNullableVec<String>({"banana", "banana"}),
-                        toNullableVec<String>({"apple", "banana"}),
-                        toNullableVec<String>({"banana", "banana"}),
-                        toNullableVec<String>({"apple", "banana"})},
-                       2);
-
-        executeStreams(request,
-                       {toNullableVec<String>({"banana", "banana"}),
-                        toNullableVec<String>({"apple", "banana"}),
-                        toNullableVec<String>({"banana", "banana"}),
-                        toNullableVec<String>({"apple", "banana"})},
-                       5);
-
-        executeStreams(request,
-                       {toNullableVec<String>({"banana", "banana"}),
-                        toNullableVec<String>({"apple", "banana"}),
-                        toNullableVec<String>({"banana", "banana"}),
-                        toNullableVec<String>({"apple", "banana"})});
+        ASSERT_COLUMNS_EQ_R(executeStreams(request, 2),
+                            createColumns({toNullableVec<String>({"banana", "banana"}),
+                                           toNullableVec<String>({"apple", "banana"}),
+                                           toNullableVec<String>({"banana", "banana"}),
+                                           toNullableVec<String>({"apple", "banana"})}));
+
+        ASSERT_COLUMNS_EQ_R(executeStreams(request, 5),
+                            createColumns({toNullableVec<String>({"banana", "banana"}),
+                                           toNullableVec<String>({"apple", "banana"}),
+                                           toNullableVec<String>({"banana", "banana"}),
+                                           toNullableVec<String>({"apple", "banana"})}));
+
+        ASSERT_COLUMNS_EQ_R(executeStreams(request),
+                            createColumns({toNullableVec<String>({"banana", "banana"}),
+                                           toNullableVec<String>({"apple", "banana"}),
+                                           toNullableVec<String>({"banana", "banana"}),
+                                           toNullableVec<String>({"apple", "banana"})}));
     }
     request = context
                   .scan("test_db", "l_table")
@@ -132,10 +130,9 @@ try
                           "   table_scan_0 | {<0, String>, <1, String>}\n"
                           "   table_scan_1 | {<0, String>, <1, String>}\n";
         ASSERT_DAGREQUEST_EQAUL(expected, request);
-        executeStreams(request,
-                       {toNullableVec<String>({"banana", "banana"}),
-                        toNullableVec<String>({"apple", "banana"})},
-                       2);
+        ASSERT_COLUMNS_EQ_R(executeStreams(request, 2),
+                            createColumns({toNullableVec<String>({"banana", "banana"}),
+                                           toNullableVec<String>({"apple", "banana"})}));
     }
 
     request = context
@@ -149,18 +146,16 @@ try
                           "  table_scan_0 | {<0, String>, <1, String>}\n"
                           "  table_scan_1 | {<0, String>, <1, String>}\n";
         ASSERT_DAGREQUEST_EQAUL(expected, request);
-        executeStreams(request,
-                       {toNullableVec<String>({"banana", "banana", "banana", "banana"}),
-                        toNullableVec<String>({"apple", "apple", "apple", "banana"}),
-                        toNullableVec<String>({"banana", "banana", "banana", {}}),
-                        toNullableVec<String>({"apple", "apple", "apple", {}})},
-                       2);
-        executeStreams(request,
-                       {toNullableVec<String>({"banana", "banana", "banana", "banana"}),
-                        toNullableVec<String>({"apple", "apple", "apple", "banana"}),
-                        toNullableVec<String>({"banana", "banana", "banana", {}}),
-                        toNullableVec<String>({"apple", "apple", "apple", {}})},
-                       3);
+        ASSERT_COLUMNS_EQ_R(executeStreams(request, 2),
+                            createColumns({toNullableVec<String>({"banana", "banana", "banana", "banana"}),
+                                           toNullableVec<String>({"apple", "apple", "apple", "banana"}),
+                                           toNullableVec<String>({"banana", "banana", "banana", {}}),
+                                           toNullableVec<String>({"apple", "apple", "apple", {}})}));
+        ASSERT_COLUMNS_EQ_R(executeStreams(request, 3),
+                            createColumns({toNullableVec<String>({"banana", "banana", "banana", "banana"}),
+                                           toNullableVec<String>({"apple", "apple", "apple", "banana"}),
+                                           toNullableVec<String>({"banana", "banana", "banana", {}}),
+                                           toNullableVec<String>({"apple", "apple", "apple", {}})}));
     }
 }
 CATCH
@@ -179,25 +174,23 @@ try
                           "  exchange_receiver_0 | type:PassThrough, {<0, String>, <1, String>}\n"
                           "  exchange_receiver_1 | type:PassThrough, {<0, String>, <1, String>}\n";
         ASSERT_DAGREQUEST_EQAUL(expected, request);
-        executeStreams(request,
-                       {toNullableVec<String>({"banana", "banana"}),
-                        toNullableVec<String>({"apple", "banana"}),
-                        toNullableVec<String>({"banana", "banana"}),
-                        toNullableVec<String>({"apple", "banana"})},
-                       2);
-
-        executeStreams(request,
-                       {toNullableVec<String>({"banana", "banana"}),
-                        toNullableVec<String>({"apple", "banana"}),
-                        toNullableVec<String>({"banana", "banana"}),
-                        toNullableVec<String>({"apple", "banana"})},
-                       5);
-
-        executeStreams(request,
-                       {toNullableVec<String>({"banana", "banana"}),
-                        toNullableVec<String>({"apple", "banana"}),
-                        toNullableVec<String>({"banana", "banana"}),
-                        toNullableVec<String>({"apple", "banana"})});
+        ASSERT_COLUMNS_EQ_R(executeStreams(request, 2),
+                            createColumns({toNullableVec<String>({"banana", "banana"}),
+                                           toNullableVec<String>({"apple", "banana"}),
+                                           toNullableVec<String>({"banana", "banana"}),
+                                           toNullableVec<String>({"apple", "banana"})}));
+
+        ASSERT_COLUMNS_EQ_R(executeStreams(request, 5),
+                            createColumns({toNullableVec<String>({"banana", "banana"}),
+                                           toNullableVec<String>({"apple", "banana"}),
+                                           toNullableVec<String>({"banana", "banana"}),
+                                           toNullableVec<String>({"apple", "banana"})}));
+
+        ASSERT_COLUMNS_EQ_R(executeStreams(request),
+                            createColumns({toNullableVec<String>({"banana", "banana"}),
+                                           toNullableVec<String>({"apple", "banana"}),
+                                           toNullableVec<String>({"banana", "banana"}),
+                                           toNullableVec<String>({"apple", "banana"})}));
     }
 }
 CATCH
@@ -216,12 +209,11 @@ try
                           "  table_scan_0 | {<0, String>, <1, String>}\n"
                           "  exchange_receiver_1 | type:PassThrough, {<0, String>, <1, String>}\n";
         ASSERT_DAGREQUEST_EQAUL(expected, request);
-        executeStreams(request,
-                       {toNullableVec<String>({"banana", "banana"}),
-                        toNullableVec<String>({"apple", "banana"}),
-                        toNullableVec<String>({"banana", "banana"}),
-                        toNullableVec<String>({"apple", "banana"})},
-                       2);
+        ASSERT_COLUMNS_EQ_R(executeStreams(request, 2),
+                            createColumns({toNullableVec<String>({"banana", "banana"}),
+                                           toNullableVec<String>({"apple", "banana"}),
+                                           toNullableVec<String>({"banana", "banana"}),
+                                           toNullableVec<String>({"apple", "banana"})}));
     }
 }
 CATCH
diff --git a/dbms/src/Flash/tests/gtest_limit_executor.cpp b/dbms/src/Flash/tests/gtest_limit_executor.cpp
index e4a3aa5db5e..47482540b39 100644
--- a/dbms/src/Flash/tests/gtest_limit_executor.cpp
+++ b/dbms/src/Flash/tests/gtest_limit_executor.cpp
@@ -68,7 +68,7 @@ try
         else
             expect_cols = {toNullableVec<String>(col_name, ColumnWithData(col0.begin(), col0.begin() + limit_num))};
 
-        executeStreams(request, expect_cols);
+        ASSERT_COLUMNS_EQ_R(executeStreams(request), expect_cols);
     }
 }
 CATCH
diff --git a/dbms/src/Flash/tests/gtest_projection_executor.cpp b/dbms/src/Flash/tests/gtest_projection_executor.cpp
index 4f6401eb483..2ff0fdff780 100644
--- a/dbms/src/Flash/tests/gtest_projection_executor.cpp
+++ b/dbms/src/Flash/tests/gtest_projection_executor.cpp
@@ -44,17 +44,16 @@ class ExecutorProjectionTestRunner : public DB::tests::ExecutorTest
     }
 
     template <typename T>
-    std::shared_ptr<tipb::DAGRequest> buildDAGRequest(T param, const String & sort_col)
+    std::shared_ptr<tipb::DAGRequest> buildDAGRequest(T param)
     {
-        /// topN is introduced, so that we can get stable results in concurrency environment.
-        return context.scan(db_name, table_name).project(param).topN(sort_col, false, 100).build(context);
+        return context.scan(db_name, table_name).project(param).build(context);
     };
 
     void executeWithConcurrency(const std::shared_ptr<tipb::DAGRequest> & request, const ColumnsWithTypeAndName & expect_columns)
     {
         for (size_t i = 1; i < 10; i += 2)
         {
-            executeStreams(request, expect_columns, i);
+            ASSERT_COLUMNS_EQ_UR(executeStreams(request, i), expect_columns);
         }
     }
 
@@ -87,11 +86,11 @@ TEST_F(ExecutorProjectionTestRunner, Projection)
 try
 {
     /// Check single column
-    auto request = buildDAGRequest<MockColumnNames>({col_names[4]}, col_names[4]);
+    auto request = buildDAGRequest<MockColumnNames>({col_names[4]});
     executeWithConcurrency(request, {toNullableVec<Int32>(col_names[4], col4_sorted_asc)});
 
     /// Check multi columns
-    request = buildDAGRequest<MockColumnNames>({col_names[0], col_names[4]}, col_names[4]);
+    request = buildDAGRequest<MockColumnNames>({col_names[0], col_names[4]});
     executeWithConcurrency(request,
                            {
                                toNullableVec<String>(col_names[0], col0_sorted_asc),
@@ -99,14 +98,14 @@ try
                            });
 
     /// Check multi columns
-    request = buildDAGRequest<MockColumnNames>({col_names[0], col_names[1], col_names[4]}, col_names[4]);
+    request = buildDAGRequest<MockColumnNames>({col_names[0], col_names[1], col_names[4]});
     executeWithConcurrency(request,
                            {toNullableVec<String>(col_names[0], col0_sorted_asc),
                             toNullableVec<String>(col_names[1], col1_sorted_asc),
                             toNullableVec<Int32>(col_names[4], col4_sorted_asc)});
 
     /// Check duplicate columns
-    request = buildDAGRequest<MockColumnNames>({col_names[4], col_names[4], col_names[4]}, col_names[4]);
+    request = buildDAGRequest<MockColumnNames>({col_names[4], col_names[4], col_names[4]});
     executeWithConcurrency(request,
                            {toNullableVec<Int32>(col_names[4], col4_sorted_asc),
                             toNullableVec<Int32>(col_names[4], col4_sorted_asc),
@@ -125,7 +124,7 @@ try
             columns.push_back(expect_column);
         }
 
-        request = buildDAGRequest<MockColumnNamesVec>(projection_input, col_names[4]);
+        request = buildDAGRequest<MockColumnNamesVec>(projection_input);
         executeWithConcurrency(request, columns);
     }
 }
@@ -139,18 +138,18 @@ try
     /// Test "equal" function
 
     /// Data type: TypeString
-    request = buildDAGRequest<MockAsts>({eq(col(col_names[0]), col(col_names[0])), col(col_names[4])}, col_names[4]);
+    request = buildDAGRequest<MockAsts>({eq(col(col_names[0]), col(col_names[0])), col(col_names[4])});
     executeWithConcurrency(request,
                            {toNullableVec<UInt64>({{}, 1, 1, 1, 1, 1, 1}),
                             toNullableVec<Int32>(col_names[4], col4_sorted_asc)});
 
-    request = buildDAGRequest<MockAsts>({eq(col(col_names[0]), col(col_names[1])), col(col_names[4])}, col_names[4]);
+    request = buildDAGRequest<MockAsts>({eq(col(col_names[0]), col(col_names[1])), col(col_names[4])});
     executeWithConcurrency(request,
                            {toNullableVec<UInt64>({{}, 0, 1, 0, {}, 0, 0}),
                             toNullableVec<Int32>(col_names[4], col4_sorted_asc)});
 
     /// Data type: TypeLong
-    request = buildDAGRequest<MockAsts>({eq(col(col_names[3]), col(col_names[4])), col(col_names[4])}, col_names[4]);
+    request = buildDAGRequest<MockAsts>({eq(col(col_names[3]), col(col_names[4])), col(col_names[4])});
     executeWithConcurrency(request,
                            {toNullableVec<UInt64>({{}, 0, 0, 0, {}, 1, 0}),
                             toNullableVec<Int32>(col_names[4], col4_sorted_asc)});
@@ -159,23 +158,23 @@ try
     /// Test "greater" function
 
     /// Data type: TypeString
-    request = buildDAGRequest<MockAsts>({gt(col(col_names[0]), col(col_names[1])), col(col_names[4])}, col_names[4]);
+    request = buildDAGRequest<MockAsts>({gt(col(col_names[0]), col(col_names[1])), col(col_names[4])});
     executeWithConcurrency(request,
                            {toNullableVec<UInt64>({{}, 0, 0, 0, {}, 0, 0}),
                             toNullableVec<Int32>(col_names[4], col4_sorted_asc)});
 
-    request = buildDAGRequest<MockAsts>({gt(col(col_names[1]), col(col_names[0])), col(col_names[4])}, col_names[4]);
+    request = buildDAGRequest<MockAsts>({gt(col(col_names[1]), col(col_names[0])), col(col_names[4])});
     executeWithConcurrency(request,
                            {toNullableVec<UInt64>({{}, 1, 0, 1, {}, 1, 1}),
                             toNullableVec<Int32>(col_names[4], col4_sorted_asc)});
 
     /// Data type: TypeLong
-    request = buildDAGRequest<MockAsts>({gt(col(col_names[3]), col(col_names[4])), col(col_names[4])}, col_names[4]);
+    request = buildDAGRequest<MockAsts>({gt(col(col_names[3]), col(col_names[4])), col(col_names[4])});
     executeWithConcurrency(request,
                            {toNullableVec<UInt64>({{}, 0, 1, 1, {}, 0, 0}),
                             toNullableVec<Int32>(col_names[4], col4_sorted_asc)});
 
-    request = buildDAGRequest<MockAsts>({gt(col(col_names[4]), col(col_names[3])), col(col_names[4])}, col_names[4]);
+    request = buildDAGRequest<MockAsts>({gt(col(col_names[4]), col(col_names[3])), col(col_names[4])});
     executeWithConcurrency(request,
                            {toNullableVec<UInt64>({{}, 1, 0, 0, {}, 0, 1}),
                             toNullableVec<Int32>(col_names[4], col4_sorted_asc)});
@@ -184,18 +183,18 @@ try
     /// Test "and" function
 
     /// Data type: TypeString
-    request = buildDAGRequest<MockAsts>({And(col(col_names[0]), col(col_names[0])), col(col_names[4])}, col_names[4]);
+    request = buildDAGRequest<MockAsts>({And(col(col_names[0]), col(col_names[0])), col(col_names[4])});
     executeWithConcurrency(request,
                            {toNullableVec<UInt64>({{}, 0, 0, 0, 0, 0, 0}),
                             toNullableVec<Int32>(col_names[4], col4_sorted_asc)});
 
-    request = buildDAGRequest<MockAsts>({And(col(col_names[0]), col(col_names[1])), col(col_names[4])}, col_names[4]);
+    request = buildDAGRequest<MockAsts>({And(col(col_names[0]), col(col_names[1])), col(col_names[4])});
     executeWithConcurrency(request,
                            {toNullableVec<UInt64>({0, 0, 0, 0, 0, 0, 0}),
                             toNullableVec<Int32>(col_names[4], col4_sorted_asc)});
 
     /// Data type: TypeLong
-    request = buildDAGRequest<MockAsts>({And(col(col_names[3]), col(col_names[4])), col(col_names[4])}, col_names[4]);
+    request = buildDAGRequest<MockAsts>({And(col(col_names[3]), col(col_names[4])), col(col_names[4])});
     executeWithConcurrency(request,
                            {toNullableVec<UInt64>({{}, 1, 0, 0, {}, 1, 0}),
                             toNullableVec<Int32>(col_names[4], col4_sorted_asc)});
@@ -203,7 +202,7 @@ try
     /// Test "not" function
 
     /// Data type: TypeString
-    request = buildDAGRequest<MockAsts>({NOT(col(col_names[0])), NOT(col(col_names[1])), NOT(col(col_names[2])), col(col_names[4])}, col_names[4]);
+    request = buildDAGRequest<MockAsts>({NOT(col(col_names[0])), NOT(col(col_names[1])), NOT(col(col_names[2])), col(col_names[4])});
     executeWithConcurrency(request,
                            {toNullableVec<UInt64>({{}, 1, 1, 1, 1, 1, 1}),
                             toNullableVec<UInt64>({1, 1, 1, 1, {}, 1, 1}),
@@ -211,7 +210,7 @@ try
                             toNullableVec<Int32>(col_names[4], col4_sorted_asc)});
 
     /// Data type: TypeLong
-    request = buildDAGRequest<MockAsts>({NOT(col(col_names[3])), NOT(col(col_names[4])), col(col_names[4])}, col_names[4]);
+    request = buildDAGRequest<MockAsts>({NOT(col(col_names[3])), NOT(col(col_names[4])), col(col_names[4])});
     executeWithConcurrency(request,
                            {toNullableVec<UInt64>({{}, 0, 1, 0, {}, 0, 1}),
                             toNullableVec<UInt64>({{}, 0, 0, 1, 0, 0, 0}),
diff --git a/dbms/src/Flash/tests/gtest_topn_executor.cpp b/dbms/src/Flash/tests/gtest_topn_executor.cpp
index 0e55702795d..d5466b5c87d 100644
--- a/dbms/src/Flash/tests/gtest_topn_executor.cpp
+++ b/dbms/src/Flash/tests/gtest_topn_executor.cpp
@@ -103,10 +103,10 @@ try
                 else
                     expect_cols.push_back({toNullableVec<String>(single_col_name, ColumnWithString(col0.begin(), col0.begin() + limit_num))});
 
-                executeStreams(request, expect_cols[0]);
-                executeStreams(request, expect_cols[0], 2);
-                executeStreams(request, expect_cols[0], 4);
-                executeStreams(request, expect_cols[0], 8);
+                ASSERT_COLUMNS_EQ_R(executeStreams(request), expect_cols[0]);
+                ASSERT_COLUMNS_EQ_R(executeStreams(request, 2), expect_cols[0]);
+                ASSERT_COLUMNS_EQ_R(executeStreams(request, 4), expect_cols[0]);
+                ASSERT_COLUMNS_EQ_R(executeStreams(request, 8), expect_cols[0]);
             }
         }
     }
@@ -139,7 +139,7 @@ try
         for (size_t i = 0; i < test_num; ++i)
         {
             request = buildDAGRequest(table_name, order_by_items[i], 100);
-            executeStreams(request, expect_cols[i]);
+            ASSERT_COLUMNS_EQ_R(executeStreams(request), expect_cols[i]);
         }
     }
 }
@@ -173,7 +173,7 @@ try
             func_projection = {col0_ast, col1_ast, col2_ast, col3_ast, func_ast};
 
             request = buildDAGRequest(table_name, order_by_items, 100, func_projection, output_projection);
-            executeStreams(request, expect_cols[0]);
+            ASSERT_COLUMNS_EQ_R(executeStreams(request), expect_cols[0]);
         }
     }
 
@@ -191,7 +191,7 @@ try
             func_projection = {col0_ast, col1_ast, col2_ast, col3_ast, func_ast};
 
             request = buildDAGRequest(table_name, order_by_items, 100, func_projection, output_projection);
-            executeStreams(request, expect_cols[0]);
+            ASSERT_COLUMNS_EQ_R(executeStreams(request), expect_cols[0]);
         }
     }
 
@@ -209,7 +209,7 @@ try
             func_projection = {col0_ast, col1_ast, col2_ast, col3_ast, func_ast};
 
             request = buildDAGRequest(table_name, order_by_items, 100, func_projection, output_projection);
-            executeStreams(request, expect_cols[0]);
+            ASSERT_COLUMNS_EQ_R(executeStreams(request), expect_cols[0]);
         }
     }
 
diff --git a/dbms/src/TestUtils/ExecutorTestUtils.cpp b/dbms/src/TestUtils/ExecutorTestUtils.cpp
index 881ebaf88db..634e483abd2 100644
--- a/dbms/src/TestUtils/ExecutorTestUtils.cpp
+++ b/dbms/src/TestUtils/ExecutorTestUtils.cpp
@@ -104,41 +104,39 @@ Block mergeBlocks(Blocks blocks)
     return Block(actual_columns);
 }
 
-void readBlock(BlockInputStreamPtr stream, const ColumnsWithTypeAndName & expect_columns)
+DB::ColumnsWithTypeAndName readBlock(BlockInputStreamPtr stream)
 {
     Blocks actual_blocks;
-    Block except_block(expect_columns);
     stream->readPrefix();
     while (auto block = stream->read())
     {
         actual_blocks.push_back(block);
     }
     stream->readSuffix();
-    Block actual_block = mergeBlocks(actual_blocks);
-    ASSERT_BLOCK_EQ(except_block, actual_block);
+    return mergeBlocks(actual_blocks).getColumnsWithTypeAndName();
 }
 } // namespace
 
-void ExecutorTest::executeStreams(const std::shared_ptr<tipb::DAGRequest> & request, std::unordered_map<String, ColumnsWithTypeAndName> & source_columns_map, const ColumnsWithTypeAndName & expect_columns, size_t concurrency)
+DB::ColumnsWithTypeAndName ExecutorTest::executeStreams(const std::shared_ptr<tipb::DAGRequest> & request, std::unordered_map<String, ColumnsWithTypeAndName> & source_columns_map, size_t concurrency)
 {
     DAGContext dag_context(*request, "executor_test", concurrency);
     dag_context.setColumnsForTest(source_columns_map);
     context.context.setDAGContext(&dag_context);
     // Currently, don't care about regions information in tests.
     DAGQuerySource dag(context.context);
-    readBlock(executeQuery(dag, context.context, false, QueryProcessingStage::Complete).in, expect_columns);
+    return readBlock(executeQuery(dag, context.context, false, QueryProcessingStage::Complete).in);
 }
 
-void ExecutorTest::executeStreams(const std::shared_ptr<tipb::DAGRequest> & request, const ColumnsWithTypeAndName & expect_columns, size_t concurrency)
+DB::ColumnsWithTypeAndName ExecutorTest::executeStreams(const std::shared_ptr<tipb::DAGRequest> & request, size_t concurrency)
 {
-    executeStreams(request, context.executorIdColumnsMap(), expect_columns, concurrency);
+    return executeStreams(request, context.executorIdColumnsMap(), concurrency);
 }
 
-void ExecutorTest::executeStreamsWithSingleSource(const std::shared_ptr<tipb::DAGRequest> & request, const ColumnsWithTypeAndName & source_columns, const ColumnsWithTypeAndName & expect_columns, SourceType type, size_t concurrency)
+DB::ColumnsWithTypeAndName ExecutorTest::executeStreamsWithSingleSource(const std::shared_ptr<tipb::DAGRequest> & request, const ColumnsWithTypeAndName & source_columns, SourceType type, size_t concurrency)
 {
     std::unordered_map<String, ColumnsWithTypeAndName> source_columns_map;
     source_columns_map[getSourceName(type)] = source_columns;
-    executeStreams(request, source_columns_map, expect_columns, concurrency);
+    return executeStreams(request, source_columns_map, concurrency);
 }
 
 void ExecutorTest::dagRequestEqual(const String & expected_string, const std::shared_ptr<tipb::DAGRequest> & actual)
diff --git a/dbms/src/TestUtils/ExecutorTestUtils.h b/dbms/src/TestUtils/ExecutorTestUtils.h
index 87bb7115bed..59b829e04b5 100644
--- a/dbms/src/TestUtils/ExecutorTestUtils.h
+++ b/dbms/src/TestUtils/ExecutorTestUtils.h
@@ -25,6 +25,9 @@
 namespace DB::tests
 {
 void executeInterpreter(const std::shared_ptr<tipb::DAGRequest> & request, Context & context);
+
+::testing::AssertionResult check_columns_equality(const ColumnsWithTypeAndName & expected, const ColumnsWithTypeAndName & actual, bool _restrict);
+
 class ExecutorTest : public ::testing::Test
 {
 protected:
@@ -72,20 +75,17 @@ class ExecutorTest : public ::testing::Test
         }
     }
 
-    void executeStreams(
+    ColumnsWithTypeAndName executeStreams(
         const std::shared_ptr<tipb::DAGRequest> & request,
         std::unordered_map<String, ColumnsWithTypeAndName> & source_columns_map,
-        const ColumnsWithTypeAndName & expect_columns,
         size_t concurrency = 1);
-    void executeStreams(
+    ColumnsWithTypeAndName executeStreams(
         const std::shared_ptr<tipb::DAGRequest> & request,
-        const ColumnsWithTypeAndName & expect_columns,
         size_t concurrency = 1);
 
-    void executeStreamsWithSingleSource(
+    ColumnsWithTypeAndName executeStreamsWithSingleSource(
         const std::shared_ptr<tipb::DAGRequest> & request,
         const ColumnsWithTypeAndName & source_columns,
-        const ColumnsWithTypeAndName & expect_columns,
         SourceType type = TableScan,
         size_t concurrency = 1);
 
@@ -96,4 +96,4 @@ class ExecutorTest : public ::testing::Test
 
 #define ASSERT_DAGREQUEST_EQAUL(str, request) dagRequestEqual((str), (request));
 #define ASSERT_BLOCKINPUTSTREAM_EQAUL(str, request, concurrency) executeInterpreter((str), (request), (concurrency))
-} // namespace DB::tests
\ No newline at end of file
+} // namespace DB::tests
diff --git a/dbms/src/TestUtils/FunctionTestUtils.cpp b/dbms/src/TestUtils/FunctionTestUtils.cpp
index 7fb526aeb01..9fbf3c9691f 100644
--- a/dbms/src/TestUtils/FunctionTestUtils.cpp
+++ b/dbms/src/TestUtils/FunctionTestUtils.cpp
@@ -14,6 +14,7 @@
 
 #include <Columns/ColumnNullable.h>
 #include <Core/ColumnNumbers.h>
+#include <Core/Row.h>
 #include <DataTypes/DataTypeNothing.h>
 #include <Flash/Coprocessor/DAGCodec.h>
 #include <Flash/Coprocessor/DAGExpressionAnalyzer.h>
@@ -23,7 +24,10 @@
 #include <TestUtils/ColumnsToTiPBExpr.h>
 #include <TestUtils/FunctionTestUtils.h>
 #include <TestUtils/TiFlashTestBasic.h>
-#include <fmt/core.h>
+
+#include <ext/enumerate.h>
+#include <set>
+
 
 namespace DB
 {
@@ -103,22 +107,117 @@ ::testing::AssertionResult columnEqual(
     return columnEqual(expected.column, actual.column);
 }
 
-void blockEqual(
+::testing::AssertionResult blockEqual(
     const Block & expected,
     const Block & actual)
 {
     size_t columns = actual.columns();
     size_t expected_columns = expected.columns();
 
-    ASSERT_EQ(expected_columns, columns);
+    ASSERT_EQUAL(expected_columns, columns, "Block size mismatch");
 
     for (size_t i = 0; i < columns; ++i)
     {
         const auto & expected_col = expected.getByPosition(i);
         const auto & actual_col = actual.getByPosition(i);
-        ASSERT_EQ(actual_col.type->getName(), expected_col.type->getName());
-        ASSERT_COLUMN_EQ(expected_col.column, actual_col.column);
+        auto cmp_res = columnEqual(expected_col, actual_col);
+        if (!cmp_res)
+            return cmp_res;
+    }
+    return ::testing::AssertionSuccess();
+}
+
+/// size of each column should be the same
+std::multiset<Row> columnsToRowSet(const ColumnsWithTypeAndName & cols)
+{
+    if (cols.empty())
+        return {};
+    if (cols[0].column->empty())
+        return {};
+
+    size_t cols_size = cols.size();
+    std::vector<Row> rows{cols[0].column->size()};
+
+    for (auto & r : rows)
+    {
+        r.resize(cols_size, true);
+    }
+
+    for (auto const & [col_id, col] : ext::enumerate(cols))
+    {
+        for (size_t i = 0, size = col.column->size(); i < size; ++i)
+        {
+            new (rows[i].place(col_id)) Field((*col.column)[i]);
+        }
+    }
+    return {std::make_move_iterator(rows.begin()), std::make_move_iterator(rows.end())};
+}
+
+::testing::AssertionResult columnsEqual(
+    const ColumnsWithTypeAndName & expected,
+    const ColumnsWithTypeAndName & actual,
+    bool _restrict)
+{
+    if (_restrict)
+        return blockEqual(Block(expected), Block(actual));
+
+    auto expect_cols_size = expected.size();
+    auto actual_cols_size = actual.size();
+
+    ASSERT_EQUAL(expect_cols_size, actual_cols_size, "Columns size mismatch");
+
+    for (size_t i = 0; i < expect_cols_size; ++i)
+    {
+        auto const & expect_col = expected[i];
+        auto const & actual_col = actual[i];
+        ASSERT_EQUAL(expect_col.column->getName(), actual_col.column->getName(), fmt::format("Column {} name mismatch", i));
+        ASSERT_EQUAL(expect_col.column->size(), actual_col.column->size(), fmt::format("Column {} size mismatch", i));
+        auto type_eq = dataTypeEqual(expected[i].type, actual[i].type);
+        if (!type_eq)
+            return type_eq;
+    }
+
+    auto const expected_row_set = columnsToRowSet(expected);
+    auto const actual_row_set = columnsToRowSet(actual);
+
+    if (expected_row_set != actual_row_set)
+    {
+        FmtBuffer buf;
+
+        auto expect_it = expected_row_set.begin();
+        auto actual_it = actual_row_set.begin();
+
+        buf.append("Columns row set mismatch\n").append("expected_row_set:\n");
+        for (; expect_it != expected_row_set.end(); ++expect_it, ++actual_it)
+        {
+            buf.joinStr(
+                   expect_it->begin(),
+                   expect_it->end(),
+                   [](const auto & v, FmtBuffer & fb) { fb.append(v.toString()); },
+                   " ")
+                .append("\n");
+            if (*expect_it != *actual_it)
+                break;
+        }
+
+        ++actual_it;
+
+        buf.append("...\nactual_row_set:\n");
+        for (auto it = actual_row_set.begin(); it != actual_it; ++it)
+        {
+            buf.joinStr(
+                   it->begin(),
+                   it->end(),
+                   [](const auto & v, FmtBuffer & fb) { fb.append(v.toString()); },
+                   " ")
+                .append("\n");
+        }
+        buf.append("...\n");
+
+        return testing::AssertionFailure() << buf.toString();
     }
+
+    return testing::AssertionSuccess();
 }
 
 std::pair<ExpressionActionsPtr, String> buildFunction(
@@ -275,5 +374,10 @@ ColumnWithTypeAndName toNullableDatetimeVec(String name, const std::vector<Strin
     DataTypePtr data_type = makeNullable(std::make_shared<DataTypeMyDateTime>(fsp));
     return {makeColumn<Nullable<MyDateTime>>(data_type, vec), data_type, name, 0};
 }
+
+ColumnsWithTypeAndName createColumns(const ColumnsWithTypeAndName & cols)
+{
+    return cols;
+}
 } // namespace tests
 } // namespace DB
diff --git a/dbms/src/TestUtils/FunctionTestUtils.h b/dbms/src/TestUtils/FunctionTestUtils.h
index d6b7351df05..ad01e2e8441 100644
--- a/dbms/src/TestUtils/FunctionTestUtils.h
+++ b/dbms/src/TestUtils/FunctionTestUtils.h
@@ -514,6 +514,13 @@ ColumnWithTypeAndName createConstColumn(
     return createConstColumn<T>(data_type_args, size, InferredFieldType<T>(std::nullopt), name);
 }
 
+// This wrapper function only serves to construct columns input for function-like macros,
+// since preprocessor recognizes `{col1, col2, col3}` as three arguments instead of one.
+// E.g. preprocessor does not allow us to write `ASSERT_COLUMNS_EQ_R({col1, col2, col3}, actual_cols)`,
+//  but with this func we can write `ASSERT_COLUMNS_EQ_R(createColumns{col1, col2, col3}, actual_cols)` instead.
+ColumnsWithTypeAndName createColumns(const ColumnsWithTypeAndName & cols);
+
+
 ::testing::AssertionResult dataTypeEqual(
     const DataTypePtr & expected,
     const DataTypePtr & actual);
@@ -527,10 +534,15 @@ ::testing::AssertionResult columnEqual(
     const ColumnWithTypeAndName & expected,
     const ColumnWithTypeAndName & actual);
 
-void blockEqual(
+::testing::AssertionResult blockEqual(
     const Block & expected,
     const Block & actual);
 
+::testing::AssertionResult columnsEqual(
+    const ColumnsWithTypeAndName & expected,
+    const ColumnsWithTypeAndName & actual,
+    bool _restrict);
+
 ColumnWithTypeAndName executeFunction(
     Context & context,
     const String & func_name,
@@ -756,5 +768,10 @@ class FunctionTest : public ::testing::Test
 
 #define ASSERT_COLUMN_EQ(expected, actual) ASSERT_TRUE(DB::tests::columnEqual((expected), (actual)))
 #define ASSERT_BLOCK_EQ(expected, actual) DB::tests::blockEqual((expected), (actual))
+
+/// restrictly checking columns equality, both data set and each row's offset should be the same
+#define ASSERT_COLUMNS_EQ_R(expected, actual) ASSERT_TRUE(DB::tests::columnsEqual((expected), (actual), true))
+/// unrestrictly checking columns equality, only checking data set equality
+#define ASSERT_COLUMNS_EQ_UR(expected, actual) ASSERT_TRUE(DB::tests::columnsEqual((expected), (actual), false))
 } // namespace tests
 } // namespace DB
diff --git a/dbms/src/WindowFunctions/tests/gtest_window_functions.cpp b/dbms/src/WindowFunctions/tests/gtest_window_functions.cpp
index 3addf73a642..06253cac66e 100644
--- a/dbms/src/WindowFunctions/tests/gtest_window_functions.cpp
+++ b/dbms/src/WindowFunctions/tests/gtest_window_functions.cpp
@@ -69,11 +69,10 @@ try
                        .sort({{"partition", false}, {"order", false}, {"partition", false}, {"order", false}}, true)
                        .window(RowNumber(), {"order", false}, {"partition", false}, buildDefaultRowsFrame())
                        .build(context);
-    executeStreams(
-        request,
-        {toNullableVec<Int64>("partition", {1, 1, 1, 1, 2, 2, 2, 2}),
-         toNullableVec<Int64>("order", {1, 1, 2, 2, 1, 1, 2, 2}),
-         toNullableVec<Int64>("row_number", {1, 2, 3, 4, 1, 2, 3, 4})});
+    ASSERT_COLUMNS_EQ_R(executeStreams(request),
+                        createColumns({toNullableVec<Int64>("partition", {1, 1, 1, 1, 2, 2, 2, 2}),
+                                       toNullableVec<Int64>("order", {1, 1, 2, 2, 1, 1, 2, 2}),
+                                       toNullableVec<Int64>("row_number", {1, 2, 3, 4, 1, 2, 3, 4})}));
 
     // null input
     executeStreamsWithSingleSource(
@@ -82,10 +81,8 @@ try
         {});
 
     // nullable
-    executeStreamsWithSingleSource(
-        request,
-        {toNullableVec<Int64>("partition", {{}, 1, 1, 1, 1, 2, 2, 2, 2}), {toNullableVec<Int64>("order", {{}, 1, 1, 2, 2, 1, 1, 2, 2})}},
-        {toNullableVec<Int64>("partition", {{}, 1, 1, 1, 1, 2, 2, 2, 2}), toNullableVec<Int64>("order", {{}, 1, 1, 2, 2, 1, 1, 2, 2}), toNullableVec<Int64>("row_number", {1, 1, 2, 3, 4, 1, 2, 3, 4})});
+    ASSERT_COLUMNS_EQ_R(executeStreamsWithSingleSource(request, {toNullableVec<Int64>("partition", {{}, 1, 1, 1, 1, 2, 2, 2, 2}), {toNullableVec<Int64>("order", {{}, 1, 1, 2, 2, 1, 1, 2, 2})}}),
+                        createColumns({toNullableVec<Int64>("partition", {{}, 1, 1, 1, 1, 2, 2, 2, 2}), toNullableVec<Int64>("order", {{}, 1, 1, 2, 2, 1, 1, 2, 2}), toNullableVec<Int64>("row_number", {1, 1, 2, 3, 4, 1, 2, 3, 4})}));
 
     // string - sql : select *, row_number() over w1 from test2 window w1 as (partition by partition_string order by order_string)
     request = context
@@ -94,20 +91,18 @@ try
                   .window(RowNumber(), {"order", false}, {"partition", false}, buildDefaultRowsFrame())
                   .build(context);
 
-    executeStreams(
-        request,
-        {toNullableVec<String>("partition", {"apple", "apple", "apple", "apple", "banana", "banana", "banana", "banana"}),
-         toNullableVec<String>("order", {"apple", "apple", "banana", "banana", "apple", "apple", "banana", "banana"}),
-         toNullableVec<Int64>("row_number", {1, 2, 3, 4, 1, 2, 3, 4})});
+    ASSERT_COLUMNS_EQ_R(executeStreams(request),
+                        createColumns({toNullableVec<String>("partition", {"apple", "apple", "apple", "apple", "banana", "banana", "banana", "banana"}),
+                                       toNullableVec<String>("order", {"apple", "apple", "banana", "banana", "apple", "apple", "banana", "banana"}),
+                                       toNullableVec<Int64>("row_number", {1, 2, 3, 4, 1, 2, 3, 4})}));
 
     // nullable
-    executeStreamsWithSingleSource(
-        request,
-        {toNullableVec<String>("partition", {"banana", "banana", "banana", "banana", {}, "apple", "apple", "apple", "apple"}),
-         toNullableVec<String>("order", {"apple", "apple", "banana", "banana", {}, "apple", "apple", "banana", "banana"})},
-        {toNullableVec<String>("partition", {{}, "apple", "apple", "apple", "apple", "banana", "banana", "banana", "banana"}),
-         toNullableVec<String>("order", {{}, "apple", "apple", "banana", "banana", "apple", "apple", "banana", "banana"}),
-         toNullableVec<Int64>("row_number", {1, 1, 2, 3, 4, 1, 2, 3, 4})});
+    ASSERT_COLUMNS_EQ_R(executeStreamsWithSingleSource(request,
+                                                       {toNullableVec<String>("partition", {"banana", "banana", "banana", "banana", {}, "apple", "apple", "apple", "apple"}),
+                                                        toNullableVec<String>("order", {"apple", "apple", "banana", "banana", {}, "apple", "apple", "banana", "banana"})}),
+                        createColumns({toNullableVec<String>("partition", {{}, "apple", "apple", "apple", "apple", "banana", "banana", "banana", "banana"}),
+                                       toNullableVec<String>("order", {{}, "apple", "apple", "banana", "banana", "apple", "apple", "banana", "banana"}),
+                                       toNullableVec<Int64>("row_number", {1, 1, 2, 3, 4, 1, 2, 3, 4})}));
 
     // float64 - sql : select *, row_number() over w1 from test3 window w1 as (partition by partition_float order by order_float64)
     request = context
@@ -116,20 +111,18 @@ try
                   .window(RowNumber(), {"order", false}, {"partition", false}, buildDefaultRowsFrame())
                   .build(context);
 
-    executeStreams(
-        request,
-        {toNullableVec<Float64>("partition", {1.00, 1.00, 1.00, 1.00, 2.00, 2.00, 2.00, 2.00}),
-         toNullableVec<Float64>("order", {1.00, 1.00, 2.00, 2.00, 1.00, 1.00, 2.00, 2.00}),
-         toNullableVec<Int64>("row_number", {1, 2, 3, 4, 1, 2, 3, 4})});
+    ASSERT_COLUMNS_EQ_R(executeStreams(request),
+                        createColumns({toNullableVec<Float64>("partition", {1.00, 1.00, 1.00, 1.00, 2.00, 2.00, 2.00, 2.00}),
+                                       toNullableVec<Float64>("order", {1.00, 1.00, 2.00, 2.00, 1.00, 1.00, 2.00, 2.00}),
+                                       toNullableVec<Int64>("row_number", {1, 2, 3, 4, 1, 2, 3, 4})}));
 
     // nullable
-    executeStreamsWithSingleSource(
-        request,
-        {toNullableVec<Float64>("partition", {{}, 1.00, 1.00, 1.00, 1.00, 2.00, 2.00, 2.00, 2.00}),
-         toNullableVec<Float64>("order", {{}, 1.00, 1.00, 2.00, 2.00, 1.00, 1.00, 2.00, 2.00})},
-        {toNullableVec<Float64>("partition", {{}, 1.00, 1.00, 1.00, 1.00, 2.00, 2.00, 2.00, 2.00}),
-         toNullableVec<Float64>("order", {{}, 1.00, 1.00, 2.00, 2.00, 1.00, 1.00, 2.00, 2.00}),
-         toNullableVec<Int64>("row_number", {1, 1, 2, 3, 4, 1, 2, 3, 4})});
+    ASSERT_COLUMNS_EQ_R(executeStreamsWithSingleSource(request,
+                                                       {toNullableVec<Float64>("partition", {{}, 1.00, 1.00, 1.00, 1.00, 2.00, 2.00, 2.00, 2.00}),
+                                                        toNullableVec<Float64>("order", {{}, 1.00, 1.00, 2.00, 2.00, 1.00, 1.00, 2.00, 2.00})}),
+                        createColumns({toNullableVec<Float64>("partition", {{}, 1.00, 1.00, 1.00, 1.00, 2.00, 2.00, 2.00, 2.00}),
+                                       toNullableVec<Float64>("order", {{}, 1.00, 1.00, 2.00, 2.00, 1.00, 1.00, 2.00, 2.00}),
+                                       toNullableVec<Int64>("row_number", {1, 1, 2, 3, 4, 1, 2, 3, 4})}));
 
     // datetime - select *, row_number() over w1 from test4 window w1 as (partition by partition_datetime order by order_datetime);
     request = context
@@ -137,22 +130,20 @@ try
                   .sort({{"partition", false}, {"order", false}, {"partition", false}, {"order", false}}, true)
                   .window(RowNumber(), {"order", false}, {"partition", false}, buildDefaultRowsFrame())
                   .build(context);
-    executeStreamsWithSingleSource(
-        request,
-        {toNullableDatetimeVec("partition", {"20220101010102", "20220101010102", "20220101010102", "20220101010102", "20220101010101", "20220101010101", "20220101010101", "20220101010101"}, 0),
-         toDatetimeVec("order", {"20220101010101", "20220101010101", "20220101010102", "20220101010102", "20220101010101", "20220101010101", "20220101010102", "20220101010102"}, 0)},
-        {toNullableDatetimeVec("partition", {"20220101010101", "20220101010101", "20220101010101", "20220101010101", "20220101010102", "20220101010102", "20220101010102", "20220101010102"}, 0),
-         toNullableDatetimeVec("order", {"20220101010101", "20220101010101", "20220101010102", "20220101010102", "20220101010101", "20220101010101", "20220101010102", "20220101010102"}, 0),
-         toNullableVec<Int64>("row_number", {1, 2, 3, 4, 1, 2, 3, 4})});
+    ASSERT_COLUMNS_EQ_R(executeStreamsWithSingleSource(request,
+                                                       {toNullableDatetimeVec("partition", {"20220101010102", "20220101010102", "20220101010102", "20220101010102", "20220101010101", "20220101010101", "20220101010101", "20220101010101"}, 0),
+                                                        toDatetimeVec("order", {"20220101010101", "20220101010101", "20220101010102", "20220101010102", "20220101010101", "20220101010101", "20220101010102", "20220101010102"}, 0)}),
+                        createColumns({toNullableDatetimeVec("partition", {"20220101010101", "20220101010101", "20220101010101", "20220101010101", "20220101010102", "20220101010102", "20220101010102", "20220101010102"}, 0),
+                                       toNullableDatetimeVec("order", {"20220101010101", "20220101010101", "20220101010102", "20220101010102", "20220101010101", "20220101010101", "20220101010102", "20220101010102"}, 0),
+                                       toNullableVec<Int64>("row_number", {1, 2, 3, 4, 1, 2, 3, 4})}));
 
     // nullable
-    executeStreamsWithSingleSource(
-        request,
-        {toNullableDatetimeVec("partition", {"20220101010102", {}, "20220101010102", "20220101010102", "20220101010102", "20220101010101", "20220101010101", "20220101010101", "20220101010101"}, 0),
-         toNullableDatetimeVec("order", {"20220101010101", {}, "20220101010101", "20220101010102", "20220101010102", "20220101010101", "20220101010101", "20220101010102", "20220101010102"}, 0)},
-        {toNullableDatetimeVec("partition", {{}, "20220101010101", "20220101010101", "20220101010101", "20220101010101", "20220101010102", "20220101010102", "20220101010102", "20220101010102"}, 0),
-         toNullableDatetimeVec("order", {{}, "20220101010101", "20220101010101", "20220101010102", "20220101010102", "20220101010101", "20220101010101", "20220101010102", "20220101010102"}, 0),
-         toNullableVec<Int64>("row_number", {1, 1, 2, 3, 4, 1, 2, 3, 4})});
+    ASSERT_COLUMNS_EQ_R(executeStreamsWithSingleSource(request,
+                                                       {toNullableDatetimeVec("partition", {"20220101010102", {}, "20220101010102", "20220101010102", "20220101010102", "20220101010101", "20220101010101", "20220101010101", "20220101010101"}, 0),
+                                                        toNullableDatetimeVec("order", {"20220101010101", {}, "20220101010101", "20220101010102", "20220101010102", "20220101010101", "20220101010101", "20220101010102", "20220101010102"}, 0)}),
+                        createColumns({toNullableDatetimeVec("partition", {{}, "20220101010101", "20220101010101", "20220101010101", "20220101010101", "20220101010102", "20220101010102", "20220101010102", "20220101010102"}, 0),
+                                       toNullableDatetimeVec("order", {{}, "20220101010101", "20220101010101", "20220101010102", "20220101010102", "20220101010101", "20220101010101", "20220101010102", "20220101010102"}, 0),
+                                       toNullableVec<Int64>("row_number", {1, 1, 2, 3, 4, 1, 2, 3, 4})}));
 
     // 2 partiton key and 2 order key
     // sql : select *, row_number() over w1 from test6 window w1 as (partition by partition_int1, partition_int2 order by order_int1,order_int2)
@@ -162,41 +153,38 @@ try
                   .window(RowNumber(), {{"order1", false}, {"order2", false}}, {{"partition1", false}, {"partition2", false}}, buildDefaultRowsFrame())
                   .build(context);
 
-    executeStreams(
-        request,
-        {toNullableVec<Int64>("partition1", {1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2}),
-         toNullableVec<Int64>("partition2", {1, 1, 1, 2, 2, 2, 1, 1, 1, 2, 2, 2}),
-         toNullableVec<Int64>("order1", {1, 1, 2, 1, 1, 2, 1, 1, 2, 1, 1, 2}),
-         toNullableVec<Int64>("order2", {1, 2, 2, 1, 2, 2, 1, 2, 2, 1, 2, 2}),
-         toNullableVec<Int64>("row_number", {1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3})});
+    ASSERT_COLUMNS_EQ_R(executeStreams(request),
+                        createColumns({toNullableVec<Int64>("partition1", {1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2}),
+                                       toNullableVec<Int64>("partition2", {1, 1, 1, 2, 2, 2, 1, 1, 1, 2, 2, 2}),
+                                       toNullableVec<Int64>("order1", {1, 1, 2, 1, 1, 2, 1, 1, 2, 1, 1, 2}),
+                                       toNullableVec<Int64>("order2", {1, 2, 2, 1, 2, 2, 1, 2, 2, 1, 2, 2}),
+                                       toNullableVec<Int64>("row_number", {1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3})}));
 
     /***** rank, dense_rank *****/
     request = context.scan("test_db", "test_table_for_rank").sort({{"partition", false}, {"order", false}}, true).window({Rank(), DenseRank()}, {{"order", false}}, {{"partition", false}}, MockWindowFrame{}).build(context);
-    executeStreams(
-        request,
-        {toNullableVec<Int64>("partition", {1, 1, 1, 1, 2, 2, 2, 2}),
-         toNullableVec<Int64>("order", {1, 1, 2, 2, 1, 1, 2, 2}),
-         toNullableVec<Int64>("rank", {1, 1, 3, 3, 1, 1, 3, 3}),
-         toNullableVec<Int64>("dense_rank", {1, 1, 2, 2, 1, 1, 2, 2})});
+    ASSERT_COLUMNS_EQ_R(executeStreams(request),
+                        createColumns({toNullableVec<Int64>("partition", {1, 1, 1, 1, 2, 2, 2, 2}),
+                                       toNullableVec<Int64>("order", {1, 1, 2, 2, 1, 1, 2, 2}),
+                                       toNullableVec<Int64>("rank", {1, 1, 3, 3, 1, 1, 3, 3}),
+                                       toNullableVec<Int64>("dense_rank", {1, 1, 2, 2, 1, 1, 2, 2})}));
 
     // nullable
-    executeStreamsWithSingleSource(
-        request,
-        {toNullableVec<Int64>("partition", {{}, 1, 1, 1, 1, 2, 2, 2, 2}),
-         toNullableVec<Int64>("order", {{}, 1, 1, 2, 2, 1, 1, 2, 2})},
-        {toNullableVec<Int64>("partition", {{}, 1, 1, 1, 1, 2, 2, 2, 2}),
-         toNullableVec<Int64>("order", {{}, 1, 1, 2, 2, 1, 1, 2, 2}),
-         toNullableVec<Int64>("rank", {1, 1, 1, 3, 3, 1, 1, 3, 3}),
-         toNullableVec<Int64>("dense_rank", {1, 1, 1, 2, 2, 1, 1, 2, 2})});
-
-    executeStreamsWithSingleSource(
-        request,
-        {toNullableVec<Int64>("partition", {{}, {}, 1, 1, 1, 1, 2, 2, 2, 2}),
-         toNullableVec<Int64>("order", {{}, 1, 1, 1, 2, 2, 1, 1, 2, 2})},
-        {toNullableVec<Int64>("partition", {{}, {}, 1, 1, 1, 1, 2, 2, 2, 2}),
-         toNullableVec<Int64>("order", {{}, 1, 1, 1, 2, 2, 1, 1, 2, 2}),
-         toNullableVec<Int64>("rank", {1, 2, 1, 1, 3, 3, 1, 1, 3, 3}),
-         toNullableVec<Int64>("dense_rank", {1, 2, 1, 1, 2, 2, 1, 1, 2, 2})});
+    ASSERT_COLUMNS_EQ_R(executeStreamsWithSingleSource(request,
+                                                       {toNullableVec<Int64>("partition", {{}, 1, 1, 1, 1, 2, 2, 2, 2}),
+                                                        toNullableVec<Int64>("order", {{}, 1, 1, 2, 2, 1, 1, 2, 2})}),
+                        createColumns({toNullableVec<Int64>("partition", {{}, 1, 1, 1, 1, 2, 2, 2, 2}),
+                                       toNullableVec<Int64>("order", {{}, 1, 1, 2, 2, 1, 1, 2, 2}),
+                                       toNullableVec<Int64>("rank", {1, 1, 1, 3, 3, 1, 1, 3, 3}),
+                                       toNullableVec<Int64>("dense_rank", {1, 1, 1, 2, 2, 1, 1, 2, 2})}));
+
+    ASSERT_COLUMNS_EQ_R(executeStreamsWithSingleSource(
+                            request,
+                            {toNullableVec<Int64>("partition", {{}, {}, 1, 1, 1, 1, 2, 2, 2, 2}),
+                             toNullableVec<Int64>("order", {{}, 1, 1, 1, 2, 2, 1, 1, 2, 2})}),
+                        createColumns({toNullableVec<Int64>("partition", {{}, {}, 1, 1, 1, 1, 2, 2, 2, 2}),
+                                       toNullableVec<Int64>("order", {{}, 1, 1, 1, 2, 2, 1, 1, 2, 2}),
+                                       toNullableVec<Int64>("rank", {1, 2, 1, 1, 3, 3, 1, 1, 3, 3}),
+                                       toNullableVec<Int64>("dense_rank", {1, 2, 1, 1, 2, 2, 1, 1, 2, 2})}));
 }
 CATCH