Skip to content

Commit

Permalink
Fix an exception thrown while reading from ColumnFileTiny after add c…
Browse files Browse the repository at this point in the history
…olumn (pingcap#6729)

close pingcap#6726

Signed-off-by: ywqzzy <[email protected]>
  • Loading branch information
JaySon-Huang authored and ywqzzy committed Feb 13, 2023
1 parent f6779c1 commit 5a88db5
Show file tree
Hide file tree
Showing 9 changed files with 182 additions and 61 deletions.
19 changes: 10 additions & 9 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetSnapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@ class BlockOrDelete
, block_offset(offset_)
{}
explicit BlockOrDelete(const RowKeyRange & delete_range_)
: delete_range(delete_range_)
: block_offset(0)
, delete_range(delete_range_)
{}

bool isBlock() { return (bool)block; }
bool isBlock() { return static_cast<bool>(block); }
auto & getBlock() { return block; };
auto getBlockOffset() { return block_offset; }
auto getBlockOffset() const { return block_offset; }
auto & getDeleteRange() { return delete_range; }
};

Expand All @@ -60,12 +61,12 @@ class ColumnFileSetSnapshot : public std::enable_shared_from_this<ColumnFileSetS
StorageSnapshotPtr storage_snap;

ColumnFiles column_files;
size_t rows;
size_t bytes;
size_t deletes;
size_t rows{0};
size_t bytes{0};
size_t deletes{0};

bool is_common_handle;
size_t rowkey_column_size;
bool is_common_handle{false};
size_t rowkey_column_size{0};

public:
explicit ColumnFileSetSnapshot(const StorageSnapshotPtr & storage_snap_)
Expand Down Expand Up @@ -99,4 +100,4 @@ class ColumnFileSetSnapshot : public std::enable_shared_from_this<ColumnFileSetS
};

} // namespace DM
} // namespace DB
} // namespace DB
17 changes: 13 additions & 4 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Exception.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h>
#include <Storages/DeltaMerge/DMContext.h>
#include <Storages/DeltaMerge/convertColumnTypeHelpers.h>
Expand Down Expand Up @@ -71,11 +72,17 @@ Columns ColumnFileTiny::readFromDisk(const PageReader & page_reader, //
}
else
{
// New column after ddl is not exist in this pack, fill with default value
// New column after ddl is not exist in this CFTiny, fill with default value
columns[index - col_start] = createColumnWithDefaultValue(cd, rows);
}
}

// All columns to be read are not exist in this CFTiny and filled with default value,
// we can skip reading from disk
if (fields.second.empty())
return columns;

// Read the columns from disk and apply DDL cast if need
auto page_map = page_reader.read({fields});
Page page = page_map[data_page_id];
for (size_t index = col_start; index < col_end; ++index)
Expand Down Expand Up @@ -183,7 +190,7 @@ Block ColumnFileTiny::readBlockForMinorCompaction(const PageReader & page_reader
}
}

ColumnTinyFilePtr ColumnFileTiny::writeColumnFile(DMContext & context, const Block & block, size_t offset, size_t limit, WriteBatches & wbs, const BlockPtr & schema, const CachePtr & cache)
ColumnFileTinyPtr ColumnFileTiny::writeColumnFile(DMContext & context, const Block & block, size_t offset, size_t limit, WriteBatches & wbs, const BlockPtr & schema, const CachePtr & cache)
{
auto page_id = writeColumnFileData(context, block, offset, limit, wbs);
auto new_column_file_schema = schema ? schema : std::make_shared<Block>(block.cloneEmpty());
Expand All @@ -201,7 +208,9 @@ PageId ColumnFileTiny::writeColumnFileData(DMContext & context, const Block & bl
{
auto last_buf_size = write_buf.count();
serializeColumn(write_buf, *col.column, col.type, offset, limit, context.db_context.getSettingsRef().dt_compression_method, context.db_context.getSettingsRef().dt_compression_level);
col_data_sizes.push_back(write_buf.count() - last_buf_size);
size_t serialized_size = write_buf.count() - last_buf_size;
RUNTIME_CHECK_MSG(serialized_size != 0, "try to persist a block with empty column, colname={} colid={} block={}", col.name, col.column_id, block.dumpJsonStructure());
col_data_sizes.push_back(serialized_size);
}

auto data_size = write_buf.count();
Expand Down Expand Up @@ -252,4 +261,4 @@ ColumnFileReaderPtr ColumnFileTinyReader::createNewReader(const ColumnDefinesPtr
}

} // namespace DM
} // namespace DB
} // namespace DB
6 changes: 3 additions & 3 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace DB
namespace DM
{
class ColumnFileTiny;
using ColumnTinyFilePtr = std::shared_ptr<ColumnFileTiny>;
using ColumnFileTinyPtr = std::shared_ptr<ColumnFileTiny>;

/// A column file which data is stored in PageStorage.
/// It may be created in two ways:
Expand Down Expand Up @@ -88,7 +88,7 @@ class ColumnFileTiny : public ColumnFilePersisted
/// Replace the schema with a new schema, and the new schema instance should be exactly the same as the previous one.
void resetIdenticalSchema(BlockPtr schema_) { schema = schema_; }

ColumnTinyFilePtr cloneWith(PageId new_data_page_id)
ColumnFileTinyPtr cloneWith(PageId new_data_page_id)
{
auto new_tiny_file = std::make_shared<ColumnFileTiny>(*this);
new_tiny_file->data_page_id = new_data_page_id;
Expand All @@ -109,7 +109,7 @@ class ColumnFileTiny : public ColumnFilePersisted

Block readBlockForMinorCompaction(const PageReader & page_reader) const;

static ColumnTinyFilePtr writeColumnFile(DMContext & context, const Block & block, size_t offset, size_t limit, WriteBatches & wbs, const BlockPtr & schema = nullptr, const CachePtr & cache = nullptr);
static ColumnFileTinyPtr writeColumnFile(DMContext & context, const Block & block, size_t offset, size_t limit, WriteBatches & wbs, const BlockPtr & schema = nullptr, const CachePtr & cache = nullptr);

static PageId writeColumnFileData(DMContext & context, const Block & block, size_t offset, size_t limit, WriteBatches & wbs);

Expand Down
11 changes: 9 additions & 2 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -561,8 +561,15 @@ void DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_
// The [offset, rows - offset] can be exceeding the Segment's rowkey_range. Cut the range
// to fit the segment.
auto [cur_offset, cur_limit] = rowkey_range.getPosRange(handle_column, offset, rows - offset);
if (unlikely(cur_offset != offset))
throw Exception(fmt::format("cur_offset does not equal to offset. is_common_handle {} start_key {} cur_offset {} cur_limit {} rows {} offset {} rowkey_range {}", is_common_handle, start_key.toRowKeyValue().toString(), cur_offset, cur_limit, rows, offset, rowkey_range.toDebugString()), ErrorCodes::LOGICAL_ERROR);
RUNTIME_CHECK_MSG(cur_offset == offset && cur_limit != 0,
"invalid cur_offset or cur_limit. is_common_handle={} start_key={} cur_offset={} cur_limit={} rows={} offset={} rowkey_range={}",
is_common_handle,
start_key.toRowKeyValue().toString(),
cur_offset,
cur_limit,
rows,
offset,
rowkey_range.toDebugString());

limit = cur_limit;
auto alloc_bytes = block.bytes(offset, limit);
Expand Down
44 changes: 18 additions & 26 deletions dbms/src/Storages/DeltaMerge/WriteBatches.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ struct WriteBatches : private boost::noncopyable

WriteLimiterPtr write_limiter;

WriteBatches(StoragePool & storage_pool_, const WriteLimiterPtr & write_limiter_ = nullptr)
explicit WriteBatches(StoragePool & storage_pool_, const WriteLimiterPtr & write_limiter_ = nullptr)
: ns_id(storage_pool_.getNamespaceId())
, log(ns_id)
, data(ns_id)
Expand All @@ -58,12 +58,11 @@ struct WriteBatches : private boost::noncopyable
{
if constexpr (DM_RUN_CHECK)
{
Poco::Logger * logger = &Poco::Logger::get("WriteBatches");
auto check_empty = [&](const WriteBatch & wb, const String & name) {
if (!wb.empty())
{
StackTrace trace;
LOG_ERROR(logger,
LOG_ERROR(Logger::get(),
"!!!=========================Modifications in {} haven't persisted=========================!!! Stack trace: {}",
name,
trace.toString());
Expand Down Expand Up @@ -91,20 +90,19 @@ struct WriteBatches : private boost::noncopyable

if constexpr (DM_RUN_CHECK)
{
Poco::Logger * logger = &Poco::Logger::get("WriteBatches");
auto check = [](const WriteBatch & wb, const String & what, Poco::Logger * logger) {
auto check = [](const WriteBatch & wb, const String & what) {
if (wb.empty())
return;
for (const auto & w : wb.getWrites())
{
if (unlikely(w.type == WriteBatchWriteType::DEL))
throw Exception("Unexpected deletes in " + what);
}
LOG_TRACE(logger, "Write into {} : {}", what, wb.toString());
LOG_TRACE(Logger::get(), "Write into {} : {}", what, wb.toString());
};

check(log, "log", logger);
check(data, "data", logger);
check(log, "log");
check(data, "data");
}

for (auto & w : log.getWrites())
Expand Down Expand Up @@ -135,21 +133,19 @@ struct WriteBatches : private boost::noncopyable

if constexpr (DM_RUN_CHECK)
{
Poco::Logger * logger = &Poco::Logger::get("WriteBatches");

auto check = [](const WriteBatch & wb, const String & what, Poco::Logger * logger) {
auto check = [](const WriteBatch & wb, const String & what) {
if (wb.empty())
return;
for (const auto & w : wb.getWrites())
{
if (unlikely(w.type != WriteBatchWriteType::DEL))
throw Exception("Expected deletes in " + what);
}
LOG_TRACE(logger, "Rollback remove from {} : {}", what, wb.toString());
LOG_TRACE(Logger::get(), "Rollback remove from {} : {}", what, wb.toString());
};

check(log_wb, "log_wb", logger);
check(data_wb, "data_wb", logger);
check(log_wb, "log_wb");
check(data_wb, "data_wb");
}

storage_pool.logWriter()->write(std::move(log_wb), write_limiter);
Expand All @@ -163,20 +159,18 @@ struct WriteBatches : private boost::noncopyable
{
if constexpr (DM_RUN_CHECK)
{
Poco::Logger * logger = &Poco::Logger::get("WriteBatches");

auto check = [](const WriteBatch & wb, const String & what, Poco::Logger * logger) {
auto check = [](const WriteBatch & wb, const String & what) {
if (wb.empty())
return;
for (const auto & w : wb.getWrites())
{
if (unlikely(w.type != WriteBatchWriteType::PUT))
throw Exception("Expected puts in " + what);
}
LOG_TRACE(logger, "Write into {} : {}", what, wb.toString());
LOG_TRACE(Logger::get(), "Write into {} : {}", what, wb.toString());
};

check(meta, "meta", logger);
check(meta, "meta");
}

storage_pool.metaWriter()->write(std::move(meta), write_limiter);
Expand All @@ -187,22 +181,20 @@ struct WriteBatches : private boost::noncopyable
{
if constexpr (DM_RUN_CHECK)
{
Poco::Logger * logger = &Poco::Logger::get("WriteBatches");

auto check = [](const WriteBatch & wb, const String & what, Poco::Logger * logger) {
auto check = [](const WriteBatch & wb, const String & what) {
if (wb.empty())
return;
for (const auto & w : wb.getWrites())
{
if (unlikely(w.type != WriteBatchWriteType::DEL))
throw Exception("Expected deletes in " + what);
}
LOG_TRACE(logger, "Write into {} : {}", what, wb.toString());
LOG_TRACE(Logger::get(), "Write into {} : {}", what, wb.toString());
};

check(removed_log, "removed_log", logger);
check(removed_data, "removed_data", logger);
check(removed_meta, "removed_meta", logger);
check(removed_log, "removed_log");
check(removed_data, "removed_data");
check(removed_meta, "removed_meta");
}

storage_pool.logWriter()->write(std::move(removed_log), write_limiter);
Expand Down
52 changes: 52 additions & 0 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_column_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Exception.h>
#include <Core/ColumnsWithTypeAndName.h>
#include <Interpreters/Context.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileBig.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileDeleteRange.h>
Expand All @@ -23,9 +25,12 @@
#include <Storages/DeltaMerge/File/DMFileWriter.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/tests/DMTestEnv.h>
#include <Storages/Transaction/Types.h>
#include <Storages/tests/TiFlashStorageTestBasic.h>
#include <TestUtils/FunctionTestUtils.h>
#include <TestUtils/TiFlashTestBasic.h>

#include <ext/scope_guard.h>
#include <vector>

namespace DB
Expand Down Expand Up @@ -191,6 +196,53 @@ try
}
CATCH

TEST_F(ColumnFileTest, SerializeEmptyBlock)
try
{
size_t num_rows_write = 0;
Block block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false);
WriteBatches wbs(dmContext().storage_pool);
EXPECT_THROW(ColumnFileTiny::writeColumnFile(dmContext(), block, 0, num_rows_write, wbs), DB::Exception);
}
CATCH

TEST_F(ColumnFileTest, ReadColumns)
try
{
size_t num_rows_write = 10;
Block block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false);
ColumnFileTinyPtr cf;
{
WriteBatches wbs(dmContext().storage_pool);
cf = ColumnFileTiny::writeColumnFile(dmContext(), block, 0, num_rows_write, wbs);
wbs.writeAll();
}
auto storage_snap = std::make_shared<StorageSnapshot>(dmContext().storage_pool, nullptr, "", true);

{
// Read columns exactly the same as we have written
auto columns_to_read = std::make_shared<ColumnDefines>(getColumnDefinesFromBlock(block));
auto reader = cf->getReader(dmContext(), storage_snap, columns_to_read);
auto block_read = reader->readNextBlock();
ASSERT_BLOCK_EQ(block_read, block);
}

{
// Only read with a column that is not exist in ColumnFileTiny
ColumnID added_colid = 100;
String added_colname = "added_col";
auto columns_to_read = std::make_shared<ColumnDefines>(ColumnDefines{ColumnDefine(added_colid, added_colname, typeFromString("Int64"))});
auto reader = cf->getReader(dmContext(), storage_snap, columns_to_read);
auto block_read = reader->readNextBlock();
ASSERT_COLUMNS_EQ_R(
ColumnsWithTypeAndName({
createColumn<Int64>(std::vector<Int64>(num_rows_write, 0)),
}),
block_read.getColumnsWithTypeAndName());
}
}
CATCH

} // namespace tests
} // namespace DM
} // namespace DB
1 change: 1 addition & 0 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <Storages/DeltaMerge/File/DMFileBlockOutputStream.h>
#include <Storages/DeltaMerge/Segment.h>
#include <Storages/DeltaMerge/StoragePool.h>
#include <Storages/DeltaMerge/WriteBatches.h>
#include <Storages/DeltaMerge/tests/DMTestEnv.h>
#include <Storages/DeltaMerge/tests/gtest_dm_simple_pk_test_basic.h>
#include <Storages/Transaction/TMTContext.h>
Expand Down
Loading

0 comments on commit 5a88db5

Please sign in to comment.