Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: Lloyd-Pottiger <[email protected]>
  • Loading branch information
Lloyd-Pottiger committed May 23, 2023
2 parents 2a00cb8 + 4dc69db commit d32f3a7
Show file tree
Hide file tree
Showing 12 changed files with 327 additions and 71 deletions.
2 changes: 1 addition & 1 deletion contrib/tiflash-proxy
Submodule tiflash-proxy updated 151 files
4 changes: 3 additions & 1 deletion dbms/src/Core/SpillHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ void SpillHandler::spillBlocks(Blocks && blocks)
Stopwatch watch;
RUNTIME_CHECK_MSG(spiller->isSpillFinished() == false, "{}: spill after the spiller is finished.", spiller->config.spill_id);
auto block_size = blocks.size();
LOG_DEBUG(spiller->logger, "Spilling {} blocks data into temporary file {}", block_size, current_spill_file_name);
LOG_DEBUG(spiller->logger, "Spilling {} blocks data", block_size);

FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_during_spill);

Expand All @@ -96,6 +96,8 @@ void SpillHandler::spillBlocks(Blocks && blocks)
{
if (unlikely(!block || block.rows() == 0))
continue;
/// erase constant column
spiller->removeConstantColumns(block);
if (unlikely(writer == nullptr))
{
std::tie(rows_in_file, bytes_in_file) = setUpNextSpilledFile();
Expand Down
23 changes: 21 additions & 2 deletions dbms/src/Core/Spiller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,25 @@ Spiller::Spiller(const SpillConfig & config_, bool is_input_sorted_, UInt64 part
{
RUNTIME_CHECK_MSG(spill_dir.isDirectory(), "Spill dir {} is a file", spill_dir.path());
}
for (size_t i = 0; i < input_schema.columns(); ++i)
{
if (input_schema.getByPosition(i).column != nullptr && input_schema.getByPosition(i).column->isColumnConst())
const_column_indexes.push_back(i);
}
RUNTIME_CHECK_MSG(const_column_indexes.size() < input_schema.columns(), "Try to spill blocks containing only constant columns, it is meaningless to spill blocks containing only constant columns");
header_without_constants = input_schema;
removeConstantColumns(header_without_constants);
}

void Spiller::removeConstantColumns(Block & block) const
{
/// note must erase the constant column in reverse order because the index stored in const_column_indexes is based on
/// the original Block, if the column before the index is removed, the index has to be updated or it becomes invalid index
for (auto it = const_column_indexes.rbegin(); it != const_column_indexes.rend(); ++it) // NOLINT
{
RUNTIME_CHECK_MSG(block.getByPosition(*it).column->isColumnConst(), "The {}-th column in block must be constant column", *it);
block.erase(*it);
}
}

CachedSpillHandlerPtr Spiller::createCachedSpillHandler(
Expand Down Expand Up @@ -200,7 +219,7 @@ BlockInputStreams Spiller::restoreBlocks(UInt64 partition_id, UInt64 max_stream_
restore_stream_read_rows.push_back(file->getSpillDetails().rows);
if (release_spilled_file_on_restore)
file_infos.back().file = std::move(file);
ret.push_back(std::make_shared<SpilledFilesInputStream>(std::move(file_infos), input_schema, config.file_provider, spill_version));
ret.push_back(std::make_shared<SpilledFilesInputStream>(std::move(file_infos), input_schema, header_without_constants, const_column_indexes, config.file_provider, spill_version));
}
}
else
Expand All @@ -221,7 +240,7 @@ BlockInputStreams Spiller::restoreBlocks(UInt64 partition_id, UInt64 max_stream_
for (UInt64 i = 0; i < spill_file_read_stream_num; ++i)
{
if (likely(!file_infos[i].empty()))
ret.push_back(std::make_shared<SpilledFilesInputStream>(std::move(file_infos[i]), input_schema, config.file_provider, spill_version));
ret.push_back(std::make_shared<SpilledFilesInputStream>(std::move(file_infos[i]), input_schema, header_without_constants, const_column_indexes, config.file_provider, spill_version));
}
}
for (size_t i = 0; i < spill_file_read_stream_num; ++i)
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Core/Spiller.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ class Spiller
bool hasSpilledData() const { return has_spilled_data; };
/// only for test now
bool releaseSpilledFileOnRestore() const { return release_spilled_file_on_restore; }
void removeConstantColumns(Block & block) const;

private:
friend class SpillHandler;
Expand All @@ -117,6 +118,8 @@ class Spiller
const UInt64 partition_num;
/// todo remove input_schema if spiller does not rely on BlockInputStream
const Block input_schema;
std::vector<size_t> const_column_indexes;
Block header_without_constants;
const LoggerPtr logger;
std::mutex spill_finished_mutex;
bool spill_finished = false;
Expand Down
155 changes: 131 additions & 24 deletions dbms/src/Core/tests/gtest_spiller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ class SpillerTest : public testing::Test
if (spiller_dir.exists())
spiller_dir.remove(true);
}
Blocks generateBlocks(size_t block_num)
Blocks generateBlocks(size_t block_num, const Block & schema)
{
Blocks ret;
for (size_t i = 0; i < block_num; ++i)
{
ColumnsWithTypeAndName data;
for (const auto & type_and_name : spiller_test_header)
for (const auto & type_and_name : schema)
{
auto column = type_and_name.type->createColumn();
for (size_t k = 0; k < 100; ++k)
Expand All @@ -70,6 +70,10 @@ class SpillerTest : public testing::Test
}
return ret;
}
Blocks generateBlocks(size_t block_num)
{
return generateBlocks(block_num, spiller_test_header);
}
Blocks generateSortedBlocks(size_t block_num)
{
Blocks ret;
Expand Down Expand Up @@ -491,33 +495,136 @@ try
}
CATCH

TEST_F(SpillerTest, SpillAndRestoreConstantData)
TEST_F(SpillerTest, SpillAllConstantBlock)
try
{
Spiller spiller(*spill_config_ptr, false, 1, spiller_test_header, logger);
Blocks ret;
ColumnsWithTypeAndName data;
for (const auto & type_and_name : spiller_test_header)
{
auto column = type_and_name.type->createColumnConst(100, Field(static_cast<Int64>(1)));
data.push_back(ColumnWithTypeAndName(std::move(column), type_and_name.type, type_and_name.name));
}
ret.emplace_back(data);
auto reference = ret;
spiller.spillBlocks(std::move(ret), 0);
spiller.finishSpill();
auto block_streams = spiller.restoreBlocks(0, 2);
GTEST_ASSERT_EQ(block_streams.size(), 1);
Blocks restored_blocks;
for (auto & block_stream : block_streams)
auto constant_header = spiller_test_header;
for (auto & type_and_name : constant_header)
type_and_name.column = type_and_name.type->createColumnConst(1, Field(static_cast<Int64>(1)));

Spiller spiller(*spill_config_ptr, false, 1, constant_header, logger);
GTEST_FAIL();
}
catch (Exception & e)
{
GTEST_ASSERT_EQ(e.message(), "Check const_column_indexes.size() < input_schema.columns() failed: Try to spill blocks containing only constant columns, it is meaningless to spill blocks containing only constant columns");
}

TEST_F(SpillerTest, SpillWithConstantSchemaAndNonConstantData)
try
{
NamesAndTypes names_and_types;
names_and_types.emplace_back("col0", DataTypeFactory::instance().get("Int64"));
names_and_types.emplace_back("col1", DataTypeFactory::instance().get("UInt64"));

std::vector<bool> const_columns_flag = {
true,
false,
};


ColumnsWithTypeAndName columns;
for (size_t i = 0; i < names_and_types.size(); i++)
{
for (Block block = block_stream->read(); block; block = block_stream->read())
restored_blocks.push_back(block);
if (const_columns_flag[i])
{
/// const column
columns.emplace_back(names_and_types[i].type->createColumnConst(1, Field(static_cast<Int64>(1))),
names_and_types[i].type,
names_and_types[i].name);
}
else
{
/// normal column
columns.emplace_back(names_and_types[i].type->createColumn(),
names_and_types[i].type,
names_and_types[i].name);
}
}
GTEST_ASSERT_EQ(reference.size(), restored_blocks.size());
for (size_t i = 0; i < reference.size(); ++i)
Block header(columns);
Spiller spiller(*spill_config_ptr, false, 1, header, logger);
auto all_blocks = generateBlocks(20, header);
spiller.spillBlocks(std::move(all_blocks), 0);
GTEST_FAIL();
}
catch (Exception & e)
{
GTEST_ASSERT_EQ(e.message().find("Check block.getByPosition(*it).column->isColumnConst() failed: The 0-th column in block must be constant column") != std::string::npos, true);
}

TEST_F(SpillerTest, SpillAndRestoreConstantData)
try
{
NamesAndTypes names_and_types;
names_and_types.emplace_back("col0", DataTypeFactory::instance().get("Int64"));
names_and_types.emplace_back("col1", DataTypeFactory::instance().get("UInt64"));
names_and_types.emplace_back("col2", DataTypeFactory::instance().get("Nullable(Int64)"));
names_and_types.emplace_back("col3", DataTypeFactory::instance().get("Nullable(UInt64)"));
names_and_types.emplace_back("col4", DataTypeFactory::instance().get("Int64"));
names_and_types.emplace_back("col5", DataTypeFactory::instance().get("UInt64"));

std::vector<std::vector<bool>> const_columns_flags = {
{false, false, false, false, false, true},
{false, true, true, true, true, true},
{true, false, false, false, false, false},
{true, true, true, true, true, false},
{true, false, true, false, true, false},
{false, true, false, true, false, true},
{false, true, false, true, false, false},
{true, false, true, false, true, true},
};

for (const auto & const_columns_flag : const_columns_flags)
{
blockEqual(materializeBlock(reference[i]), restored_blocks[i]);
ColumnsWithTypeAndName columns;
for (size_t i = 0; i < names_and_types.size(); i++)
{
if (const_columns_flag[i])
{
/// const column
columns.emplace_back(names_and_types[i].type->createColumnConst(1, Field(static_cast<Int64>(1))),
names_and_types[i].type,
names_and_types[i].name);
}
else
{
/// normal column
columns.emplace_back(names_and_types[i].type->createColumn(),
names_and_types[i].type,
names_and_types[i].name);
}
}
Block header(columns);
Spiller spiller(*spill_config_ptr, false, 1, header, logger);
auto all_blocks = generateBlocks(20, header);
for (auto & block : all_blocks)
{
for (size_t i = 0; i < const_columns_flag.size(); i++)
{
if (header.getByPosition(i).column->isColumnConst())
{
Field constant_field;
header.getByPosition(i).column->get(0, constant_field);
block.getByPosition(i).column = header.getByPosition(i).type->createColumnConst(block.rows(), constant_field);
}
}
}
auto reference = all_blocks;
spiller.spillBlocks(std::move(all_blocks), 0);
spiller.finishSpill();
auto block_streams = spiller.restoreBlocks(0, 1);
GTEST_ASSERT_EQ(block_streams.size(), 1);
Blocks restored_blocks;
for (auto & block_stream : block_streams)
{
for (Block block = block_stream->read(); block; block = block_stream->read())
restored_blocks.push_back(block);
}
GTEST_ASSERT_EQ(reference.size(), restored_blocks.size());
for (size_t i = 0; i < reference.size(); ++i)
{
blockEqual(materializeBlock(reference[i]), restored_blocks[i]);
}
}
}
CATCH
Expand Down
29 changes: 27 additions & 2 deletions dbms/src/DataStreams/SpilledFilesInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,43 @@ namespace FailPoints
extern const char random_restore_from_disk_failpoint[];
} // namespace FailPoints

SpilledFilesInputStream::SpilledFilesInputStream(std::vector<SpilledFileInfo> && spilled_file_infos_, const Block & header_, const FileProviderPtr & file_provider_, Int64 max_supported_spill_version_)
SpilledFilesInputStream::SpilledFilesInputStream(
std::vector<SpilledFileInfo> && spilled_file_infos_,
const Block & header_,
const Block & header_without_constants_,
const std::vector<size_t> & const_column_indexes_,
const FileProviderPtr & file_provider_,
Int64 max_supported_spill_version_)
: spilled_file_infos(std::move(spilled_file_infos_))
, header(header_)
, header_without_constants(header_without_constants_)
, const_column_indexes(const_column_indexes_)
, file_provider(file_provider_)
, max_supported_spill_version(max_supported_spill_version_)
{
RUNTIME_CHECK_MSG(!spilled_file_infos.empty(), "Spilled files must not be empty");
current_reading_file_index = 0;
current_file_stream = std::make_unique<SpilledFileStream>(std::move(spilled_file_infos[0]), header, file_provider, max_supported_spill_version);
current_file_stream = std::make_unique<SpilledFileStream>(std::move(spilled_file_infos[0]), header_without_constants, file_provider, max_supported_spill_version);
}

Block SpilledFilesInputStream::readImpl()
{
auto ret = readInternal();
if likely (ret)
{
assert(ret.columns() != 0);
size_t rows = ret.rows();
for (const auto index : const_column_indexes)
{
const auto & col_type_name = header.getByPosition(index);
assert(col_type_name.column->isColumnConst());
ret.insert(index, {col_type_name.column->cloneResized(rows), col_type_name.type, col_type_name.name});
}
}
return ret;
}

Block SpilledFilesInputStream::readInternal()
{
if (unlikely(current_file_stream == nullptr))
return {};
Expand Down
12 changes: 10 additions & 2 deletions dbms/src/DataStreams/SpilledFilesInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

namespace DB
{

struct SpilledFileInfo
{
String path;
Expand All @@ -35,12 +34,19 @@ struct SpilledFileInfo
class SpilledFilesInputStream : public IProfilingBlockInputStream
{
public:
SpilledFilesInputStream(std::vector<SpilledFileInfo> && spilled_file_infos, const Block & header, const FileProviderPtr & file_provider, Int64 max_supported_spill_version);
SpilledFilesInputStream(
std::vector<SpilledFileInfo> && spilled_file_infos,
const Block & header,
const Block & header_without_constants,
const std::vector<size_t> & const_column_indexes,
const FileProviderPtr & file_provider,
Int64 max_supported_spill_version);
Block getHeader() const override;
String getName() const override;

protected:
Block readImpl() override;
Block readInternal();

private:
struct SpilledFileStream
Expand Down Expand Up @@ -68,6 +74,8 @@ class SpilledFilesInputStream : public IProfilingBlockInputStream
std::vector<SpilledFileInfo> spilled_file_infos;
size_t current_reading_file_index;
Block header;
Block header_without_constants;
std::vector<size_t> const_column_indexes;
FileProviderPtr file_provider;
Int64 max_supported_spill_version;
std::unique_ptr<SpilledFileStream> current_file_stream;
Expand Down
7 changes: 3 additions & 4 deletions dbms/src/Flash/Coprocessor/TiDBTableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@ TiDBTableScan::TiDBTableScan(
, pushed_down_filters(is_partition_table_scan ? std::move(table_scan->partition_table_scan().pushed_down_filter_conditions()) : std::move(table_scan->tbl_scan().pushed_down_filter_conditions()))
// Only No-partition table need keep order when tablescan executor required keep order.
// If keep_order is not set, keep order for safety.
// When keep_order is true, we will not push down filters to tablescan in TiDB.
// So even if keep_order is not set (may be lost), but pushed down filters is not empty, we can't keep order.
, keep_order(!is_partition_table_scan && pushed_down_filters.empty() && (!table_scan->tbl_scan().has_keep_order() || table_scan->tbl_scan().keep_order()))
, is_fast_scan(table_scan->tbl_scan().is_fast_scan())
, keep_order(!is_partition_table_scan && (table_scan->tbl_scan().keep_order() || !table_scan->tbl_scan().has_keep_order()))
, is_fast_scan(is_partition_table_scan ? table_scan->partition_table_scan().is_fast_scan() : table_scan->tbl_scan().is_fast_scan())
{
if (is_partition_table_scan)
{
Expand Down Expand Up @@ -81,6 +79,7 @@ void TiDBTableScan::constructTableScanForRemoteRead(tipb::TableScan * tipb_table
for (auto id : partition_table_scan.primary_prefix_column_ids())
tipb_table_scan->add_primary_prefix_column_ids(id);
tipb_table_scan->set_is_fast_scan(partition_table_scan.is_fast_scan());
tipb_table_scan->set_keep_order(false);
}
else
{
Expand Down
Loading

0 comments on commit d32f3a7

Please sign in to comment.