Skip to content

Commit

Permalink
This is an automated cherry-pick of #9361
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
JinheLin authored and ti-chi-bot committed Sep 27, 2024
1 parent 030c678 commit d3f204a
Show file tree
Hide file tree
Showing 2 changed files with 250 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ Block LateMaterializationBlockInputStream::readImpl()
}
for (auto & col : filter_column_block)
{
if (col.name == filter_column_name)
continue;
col.column = col.column->filter(col_filter, passed_count);
}
}
Expand Down
248 changes: 248 additions & 0 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3885,6 +3885,254 @@ try
}
CATCH

<<<<<<< HEAD
=======

TEST_F(DeltaMergeStoreTest, RSResult)
try
{
auto log = Logger::get(GET_GTEST_FULL_NAME);
auto table_column_defines = DMTestEnv::getDefaultColumns();
ColumnDefine cd_time(1, "col_time", std::make_shared<DataTypeInt64>());
table_column_defines->push_back(cd_time);

store = reload(table_column_defines);

auto create_data = [&](Int64 start, Int64 limit) {
std::vector<Int64> v(limit, 0);
std::iota(v.begin(), v.end(), start); // start ... start + limit - 1
return v;
};

auto create_block = [&](UInt64 beg, UInt64 end, UInt64 ts) {
auto block = DMTestEnv::prepareSimpleWriteBlock(beg, end, false, ts);
auto time_data = create_data(0, end - beg);
auto col_time = createColumn<Int64>(time_data, cd_time.name, cd_time.id);
block.insert(col_time);
block.checkNumberOfRows();
return block;
};

auto check = [&](PushDownFilterPtr filter, RSResult expected_res, const std::vector<Int64> & expected_data) {
auto in = store->read(
*db_context,
db_context->getSettingsRef(),
store->getTableColumns(),
{RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())},
/* num_streams= */ 1,
/* start_ts= */ std::numeric_limits<UInt64>::max(),
filter,
std::vector<RuntimeFilterPtr>{},
0,
"",
/* keep_order= */ false,
/* is_fast_scan= */ false,
/* expected_block_size= */ 1024)[0];

Int64 rows = 0;
in->readPrefix();
while (true)
{
auto b = in->read();
if (!b)
break;
rows += b.rows();
ASSERT_EQ(b.getRSResult(), expected_res) << fmt::format("{} vs {}", b.getRSResult(), expected_res);
const auto * v = toColumnVectorDataPtr<Int64>(b.getByName("col_time").column);
ASSERT_NE(v, nullptr);
ASSERT_EQ(v->size(), expected_data.size());
ASSERT_TRUE(std::equal(v->begin(), v->end(), expected_data.begin()))
<< fmt::format("{} vs {}", *v, expected_data);
}
in->readSuffix();
ASSERT_EQ(rows, expected_data.size());
};

const String table_info_json = R"json({
"cols":[
{"comment":"","default":null,"default_bit":null,"id":1,"name":{"L":"col_time","O":"col_time"},"offset":-1,"origin_default":null,"state":0,"type":{"Charset":null,"Collate":null,"Decimal":5,"Elems":null,"Flag":1,"Flen":0,"Tp":11}}
],
"pk_is_handle":false,"index_info":[],"is_common_handle":false,
"name":{"L":"t_111","O":"t_111"},"partition":null,
"comment":"Mocked.","id":30,"schema_version":-1,"state":0,"tiflash_replica":{"Count":0},"update_timestamp":1636471547239654
})json";

auto create_filter = [&](Int64 value) {
auto filter = generatePushDownFilter(
*db_context,
table_info_json,
fmt::format("select * from default.t_111 where col_time >= {}", value));
RUNTIME_CHECK(filter->extra_cast != nullptr);
RUNTIME_CHECK(filter->rs_operator != nullptr);
auto rs_unsupported = typeid_cast<const Unsupported *>(filter->rs_operator.get());
RUNTIME_CHECK(rs_unsupported == nullptr, filter->rs_operator->toDebugString());
RUNTIME_CHECK(filter->before_where != nullptr);
LOG_DEBUG(
log,
"value={} extra_cast={} rs_operator={} before_where={}",
value,
filter->extra_cast->dumpActions(),
filter->rs_operator->toDebugString(),
filter->before_where->dumpActions());
return filter;
};

DB::registerFunctions();

constexpr Int64 num_rows = 128;
auto filter_all = create_filter(0);
auto filter_all_data = create_data(0, num_rows);
auto filter_some = create_filter(64);
auto filter_some_data = create_data(64, num_rows - 64);
auto filter_none = create_filter(128);
auto filter_none_data = std::vector<Int64>{};

// Disable delta merge to ensure read data from delta
FailPointHelper::enableFailPoint(FailPoints::pause_before_dt_background_delta_merge);

auto block = create_block(0, num_rows, 1);
store->write(*db_context, db_context->getSettingsRef(), block);

LOG_DEBUG(log, "Check delta");
// Delta always return Some
check(filter_all, RSResult::Some, filter_all_data);
check(filter_some, RSResult::Some, filter_some_data);
check(filter_none, RSResult::Some, filter_none_data);

LOG_DEBUG(log, "Check stable");
FailPointHelper::disableFailPoint(FailPoints::pause_before_dt_background_delta_merge);
store->mergeDeltaAll(*db_context);
check(filter_all, RSResult::All, filter_all_data);
check(filter_some, RSResult::Some, filter_some_data);
check(filter_none, RSResult::Some, filter_none_data);
}
CATCH

TEST_F(DeltaMergeStoreTest, LMAllWithMultiVersionRecords)
try
{
auto log = Logger::get(GET_GTEST_FULL_NAME);
auto table_column_defines = DMTestEnv::getDefaultColumns();
ColumnDefine cd_time(1, "col_time", std::make_shared<DataTypeInt64>());
ColumnDefine cd_int(2, "col_int", std::make_shared<DataTypeInt64>());
table_column_defines->push_back(cd_time);
table_column_defines->push_back(cd_int);

store = reload(table_column_defines);

auto create_data = [&](Int64 start, Int64 limit) {
std::vector<Int64> v(limit, 0);
std::iota(v.begin(), v.end(), start); // start ... start + limit - 1
return v;
};

auto create_block = [&](UInt64 beg, UInt64 end, UInt64 ts) {
auto block = DMTestEnv::prepareSimpleWriteBlock(beg, end, false, ts);
auto data = create_data(0, end - beg);
block.insert(createColumn<Int64>(data, cd_time.name, cd_time.id));
block.insert(createColumn<Int64>(data, cd_int.name, cd_int.id));
block.checkNumberOfRows();
return block;
};

auto check = [&](PushDownFilterPtr filter, RSResult expected_res, const std::vector<Int64> & expected_data) {
auto in = store->read(
*db_context,
db_context->getSettingsRef(),
store->getTableColumns(),
{RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())},
/* num_streams= */ 1,
/* start_ts= */ std::numeric_limits<UInt64>::max(),
filter,
std::vector<RuntimeFilterPtr>{},
0,
"",
/* keep_order= */ false,
/* is_fast_scan= */ false,
/* expected_block_size= */ 1024)[0];

Int64 rows = 0;
in->readPrefix();
while (true)
{
auto b = in->read();
if (!b)
break;
rows += b.rows();
ASSERT_EQ(b.getRSResult(), expected_res) << fmt::format("{} vs {}", b.getRSResult(), expected_res);
const auto * v = toColumnVectorDataPtr<Int64>(b.getByName("col_time").column);
ASSERT_NE(v, nullptr);
ASSERT_EQ(v->size(), expected_data.size());
ASSERT_TRUE(std::equal(v->begin(), v->end(), expected_data.begin()))
<< fmt::format("{} vs {}", *v, expected_data);
}
in->readSuffix();
ASSERT_EQ(rows, expected_data.size());
};

const String table_info_json = R"json({
"cols":[
{"comment":"","default":null,"default_bit":null,"id":1,"name":{"L":"col_time","O":"col_time"},"offset":-1,"origin_default":null,"state":0,"type":{"Charset":null,"Collate":null,"Decimal":5,"Elems":null,"Flag":1,"Flen":0,"Tp":11}},
{"comment":"","default":null,"default_bit":null,"id":2,"name":{"L":"col_int","O":"col_int"},"offset":-1,"origin_default":null,"state":0,"type":{"Charset":null,"Collate":null,"Decimal":5,"Elems":null,"Flag":1,"Flen":0,"Tp":8}}
],
"pk_is_handle":false,"index_info":[],"is_common_handle":false,
"name":{"L":"t_111","O":"t_111"},"partition":null,
"comment":"Mocked.","id":30,"schema_version":-1,"state":0,"tiflash_replica":{"Count":0},"update_timestamp":1636471547239654
})json";

auto create_filter = [&](Int64 value) {
auto filter = generatePushDownFilter(
*db_context,
table_info_json,
fmt::format("select * from default.t_111 where col_time >= {}", value));
RUNTIME_CHECK(filter->extra_cast != nullptr);
RUNTIME_CHECK(filter->rs_operator != nullptr);
auto rs_unsupported = typeid_cast<const Unsupported *>(filter->rs_operator.get());
RUNTIME_CHECK(rs_unsupported == nullptr, filter->rs_operator->toDebugString());
RUNTIME_CHECK(filter->before_where != nullptr);
LOG_DEBUG(
log,
"value={} extra_cast={} rs_operator={} before_where={}",
value,
filter->extra_cast->dumpActions(),
filter->rs_operator->toDebugString(),
filter->before_where->dumpActions());
return filter;
};

DB::registerFunctions();

constexpr Int64 num_rows = 128;
auto filter_all = create_filter(0);
auto filter_all_data = create_data(0, num_rows);

// Write multi-version records.
{
auto block = create_block(0, num_rows, 1);
store->write(*db_context, db_context->getSettingsRef(), block);
}
{
auto block = create_block(0, num_rows, 2);
store->write(*db_context, db_context->getSettingsRef(), block);
}
store->mergeDeltaAll(*db_context);

// Ensure multi-version records.
ASSERT_EQ(store->id_to_segment.size(), 1);
auto seg = store->id_to_segment.begin()->second;
seg->stable->calculateStableProperty(
*store->newDMContext(*db_context, db_context->getSettingsRef()),
RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()),
store->isCommonHandle());
const auto & property = seg->stable->getStableProperty();
ASSERT_EQ(property.num_versions, num_rows * 2);
ASSERT_EQ(property.num_puts, num_rows);

check(filter_all, RSResult::All, filter_all_data);
}
CATCH

>>>>>>> 55cb9b9af1 (Storages: Skip filtering for filter column (#9361))
} // namespace tests
} // namespace DM
} // namespace DB

0 comments on commit d3f204a

Please sign in to comment.