Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix decode error when "NULL" value in the column with "primary key" flag (#5879) #5932

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbms/src/Columns/ColumnAggregateFunction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ void ColumnAggregateFunction::insertRangeFrom(const IColumn & from, size_t start
if (start + length > from_concrete.getData().size())
throw Exception(
fmt::format(
"Parameters start = {}, length = {} are out of bound in ColumnAggregateFunction::insertRangeFrom method (data.size() = {}).",
"Parameters are out of bound in ColumnAggregateFunction::insertRangeFrom method, start={}, length={}, from.size()={}",
start,
length,
from_concrete.getData().size()),
Expand Down
9 changes: 7 additions & 2 deletions dbms/src/Columns/ColumnArray.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -414,8 +414,13 @@ void ColumnArray::insertRangeFrom(const IColumn & src, size_t start, size_t leng
const ColumnArray & src_concrete = static_cast<const ColumnArray &>(src);

if (start + length > src_concrete.getOffsets().size())
throw Exception("Parameter out of bound in ColumnArray::insertRangeFrom method.",
ErrorCodes::PARAMETER_OUT_OF_BOUND);
throw Exception(
fmt::format(
"Parameters are out of bound in ColumnArray::insertRangeFrom method, start={}, length={}, src.size()={}",
start,
length,
src_concrete.getOffsets().size()),
ErrorCodes::PARAMETER_OUT_OF_BOUND);

size_t nested_offset = src_concrete.offsetAt(start);
size_t nested_length = src_concrete.getOffsets()[start + length - 1] - nested_offset;
Expand Down
9 changes: 7 additions & 2 deletions dbms/src/Columns/ColumnDecimal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,13 @@ void ColumnDecimal<T>::insertRangeFrom(const IColumn & src, size_t start, size_t
const ColumnDecimal & src_vec = static_cast<const ColumnDecimal &>(src);

if (start + length > src_vec.data.size())
throw Exception("Parameters start = " + toString(start) + ", length = " + toString(length) + " are out of bound in ColumnDecimal<T>::insertRangeFrom method (data.size() = " + toString(src_vec.data.size()) + ").",
ErrorCodes::PARAMETER_OUT_OF_BOUND);
throw Exception(
fmt::format(
"Parameters are out of bound in ColumnDecimal<T>::insertRangeFrom method, start={}, length={}, src.size()={}",
start,
length,
src_vec.data.size()),
ErrorCodes::PARAMETER_OUT_OF_BOUND);

size_t old_size = data.size();
data.resize(old_size + length);
Expand Down
13 changes: 7 additions & 6 deletions dbms/src/Columns/ColumnFixedString.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,13 @@ void ColumnFixedString::insertRangeFrom(const IColumn & src, size_t start, size_
const ColumnFixedString & src_concrete = static_cast<const ColumnFixedString &>(src);

if (start + length > src_concrete.size())
throw Exception("Parameters start = "
+ toString(start) + ", length = "
+ toString(length) + " are out of bound in ColumnFixedString::insertRangeFrom method"
" (size() = "
+ toString(src_concrete.size()) + ").",
ErrorCodes::PARAMETER_OUT_OF_BOUND);
throw Exception(
fmt::format(
"Parameters are out of bound in ColumnFixedString::insertRangeFrom method, start={}, length={}, src.size()={}",
start,
length,
src_concrete.size()),
ErrorCodes::PARAMETER_OUT_OF_BOUND);

size_t old_size = chars.size();
chars.resize(old_size + length * n);
Expand Down
9 changes: 7 additions & 2 deletions dbms/src/Columns/ColumnString.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,13 @@ void ColumnString::insertRangeFrom(const IColumn & src, size_t start, size_t len
const ColumnString & src_concrete = static_cast<const ColumnString &>(src);

if (start + length > src_concrete.offsets.size())
throw Exception("Parameter out of bound in IColumnString::insertRangeFrom method.",
ErrorCodes::PARAMETER_OUT_OF_BOUND);
throw Exception(
fmt::format(
"Parameters are out of bound in ColumnString::insertRangeFrom method, start={}, length={}, src.size()={}",
start,
length,
src_concrete.size()),
ErrorCodes::PARAMETER_OUT_OF_BOUND);

size_t nested_offset = src_concrete.offsetAt(start);
size_t nested_length = src_concrete.offsets[start + length - 1] - nested_offset;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Columns/ColumnVector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ void ColumnVector<T>::insertRangeFrom(const IColumn & src, size_t start, size_t
if (start + length > src_vec.data.size())
throw Exception(
fmt::format(
"Parameters start = {}, length = {} are out of bound in ColumnVector<T>::insertRangeFrom method (data.size() = {}).",
"Parameters are out of bound in ColumnVector<T>::insertRangeFrom method, start={}, length={}, src.size()={}",
start,
length,
src_vec.data.size()),
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,8 @@ void MockRaftCommand::dbgFuncRegionSnapshot(Context & context, const ASTs & args
}
region_info.set_start_key(start_key.toString());
region_info.set_end_key(end_key.toString());
*region_info.add_peers() = createPeer(1, true);
*region_info.add_peers() = createPeer(2, true);
*region_info.add_peers() = tests::createPeer(1, true);
*region_info.add_peers() = tests::createPeer(2, true);
auto peer_id = 1;
auto start_decoded_key = RecordKVFormat::decodeTiKVKey(start_key);
auto end_decoded_key = RecordKVFormat::decodeTiKVKey(end_key);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/SSTFilesToBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class SSTFilesToBlockInputStream final : public IBlockInputStream
bool force_decode_,
TMTContext & tmt_,
size_t expected_size_ = DEFAULT_MERGE_BLOCK_SIZE);
~SSTFilesToBlockInputStream();
~SSTFilesToBlockInputStream() override;

String getName() const override { return "SSTFilesToBlockInputStream"; }

Expand Down
33 changes: 28 additions & 5 deletions dbms/src/Storages/Transaction/ApplySnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <Storages/Transaction/SSTReader.h>
#include <Storages/Transaction/SchemaSyncer.h>
#include <Storages/Transaction/TMTContext.h>
#include <Storages/Transaction/Types.h>

#include <ext/scope_guard.h>

Expand Down Expand Up @@ -304,7 +305,17 @@ std::vector<UInt64> KVStore::preHandleSnapshotToFiles(
uint64_t term,
TMTContext & tmt)
{
return preHandleSSTsToDTFiles(new_region, snaps, index, term, DM::FileConvertJobType::ApplySnapshot, tmt);
std::vector<UInt64> ingest_ids;
try
{
ingest_ids = preHandleSSTsToDTFiles(new_region, snaps, index, term, DM::FileConvertJobType::ApplySnapshot, tmt);
}
catch (DB::Exception & e)
{
e.addMessage(fmt::format("(while preHandleSnapshot region_id={}, index={}, term={})", new_region->id(), index, term));
e.rethrow();
}
return ingest_ids;
}

/// `preHandleSSTsToDTFiles` read data from SSTFiles and generate DTFile(s) for commited data
Expand All @@ -324,7 +335,8 @@ std::vector<UInt64> KVStore::preHandleSSTsToDTFiles(
// Use failpoint to change the expected_block_size for some test cases
fiu_do_on(FailPoints::force_set_sst_to_dtfile_block_size, { expected_block_size = 3; });

PageIds ids;
PageIds generated_ingest_ids;
TableID physical_table_id = InvalidTableID;
while (true)
{
// If any schema changes is detected during decoding SSTs to DTFiles, we need to cancel and recreate DTFiles with
Expand All @@ -349,6 +361,7 @@ std::vector<UInt64> KVStore::preHandleSSTsToDTFiles(
/* ignore_cache= */ false,
context.getSettingsRef().safe_point_update_interval_seconds);
}
physical_table_id = storage->getTableInfo().id;

// Read from SSTs and refine the boundary of blocks output to DTFiles
auto sst_stream = std::make_shared<DM::SSTFilesToBlockInputStream>(
Expand All @@ -372,7 +385,7 @@ std::vector<UInt64> KVStore::preHandleSSTsToDTFiles(
stream->writePrefix();
stream->write();
stream->writeSuffix();
ids = stream->ingestIds();
generated_ingest_ids = stream->ingestIds();

(void)table_drop_lock; // the table should not be dropped during ingesting file
break;
Expand Down Expand Up @@ -416,12 +429,13 @@ std::vector<UInt64> KVStore::preHandleSSTsToDTFiles(
else
{
// Other unrecoverable error, throw
e.addMessage(fmt::format("physical_table_id={}", physical_table_id));
throw;
}
}
}

return ids;
return generated_ingest_ids;
}

template <typename RegionPtrWrap>
Expand Down Expand Up @@ -589,7 +603,16 @@ RegionPtr KVStore::handleIngestSSTByDTFile(const RegionPtr & region, const SSTVi
}

// Decode the KV pairs in ingesting SST into DTFiles
PageIds ingest_ids = preHandleSSTsToDTFiles(tmp_region, snaps, index, term, DM::FileConvertJobType::IngestSST, tmt);
PageIds ingest_ids;
try
{
ingest_ids = preHandleSSTsToDTFiles(tmp_region, snaps, index, term, DM::FileConvertJobType::IngestSST, tmt);
}
catch (DB::Exception & e)
{
e.addMessage(fmt::format("(while handleIngestSST region_id={}, index={}, term={})", tmp_region->id(), index, term));
e.rethrow();
}

// If `ingest_ids` is empty, ingest SST won't write delete_range for ingest region, it is safe to
// ignore the step of calling `ingestFiles`
Expand Down
40 changes: 16 additions & 24 deletions dbms/src/Storages/Transaction/PartitionStreams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,31 +241,26 @@ std::variant<RegionDataReadInfoList, RegionException::RegionReadStatus, LockInfo
return data_list_read;
}

std::optional<RegionDataReadInfoList> ReadRegionCommitCache(const RegionPtr & region, bool lock_region = true)
std::optional<RegionDataReadInfoList> ReadRegionCommitCache(const RegionPtr & region, bool lock_region)
{
auto scanner = region->createCommittedScanner(lock_region);

/// Some sanity checks for region meta.
{
if (region->isPendingRemove())
return std::nullopt;
}
if (region->isPendingRemove())
return std::nullopt;

/// Read raw KVs from region cache.
{
// Shortcut for empty region.
if (!scanner.hasNext())
return std::nullopt;
// Shortcut for empty region.
if (!scanner.hasNext())
return std::nullopt;

RegionDataReadInfoList data_list_read;
data_list_read.reserve(scanner.writeMapSize());

do
{
data_list_read.emplace_back(scanner.next());
} while (scanner.hasNext());
return data_list_read;
}
RegionDataReadInfoList data_list_read;
data_list_read.reserve(scanner.writeMapSize());
do
{
data_list_read.emplace_back(scanner.next());
} while (scanner.hasNext());
return data_list_read;
}

void RemoveRegionCommitCache(const RegionPtr & region, const RegionDataReadInfoList & data_list_read, bool lock_region = true)
Expand Down Expand Up @@ -447,7 +442,7 @@ RegionPtrWithBlock::CachePtr GenRegionPreDecodeBlockData(const RegionPtr & regio
std::optional<RegionDataReadInfoList> data_list_read = std::nullopt;
try
{
data_list_read = ReadRegionCommitCache(region);
data_list_read = ReadRegionCommitCache(region, true);
if (!data_list_read)
return nullptr;
}
Expand Down Expand Up @@ -603,24 +598,21 @@ Block GenRegionBlockDataWithSchema(const RegionPtr & region, //
const DecodingStorageSchemaSnapshotConstPtr & schema_snap,
Timestamp gc_safepoint,
bool force_decode,
TMTContext & tmt)
TMTContext & /* */)
{
// In 5.0.1, feature `compaction filter` is enabled by default. Under such feature tikv will do gc in write & default cf individually.
// If some rows were updated and add tiflash replica, tiflash store may receive region snapshot with unmatched data in write & default cf sst files.
fiu_do_on(FailPoints::force_set_safepoint_when_decode_block,
{ gc_safepoint = 10000000; }); // Mock a GC safepoint for testing compaction filter
region->tryCompactionFilter(gc_safepoint);

std::optional<RegionDataReadInfoList> data_list_read = std::nullopt;
data_list_read = ReadRegionCommitCache(region);
std::optional<RegionDataReadInfoList> data_list_read = ReadRegionCommitCache(region, true);

Block res_block;
// No committed data, just return
if (!data_list_read)
return res_block;

auto context = tmt.getContext();

{
Stopwatch watch;
{
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Storages/Transaction/PartitionStreams.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@

#include <Storages/ColumnsDescription.h>
#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/TableLockHolder.h>
#include <Storages/Transaction/DecodingStorageSchemaSnapshot.h>
#include <Storages/Transaction/RegionDataRead.h>
#include <Storages/Transaction/TiDB.h>

namespace DB
{
class Region;
using RegionPtr = std::shared_ptr<Region>;
class StorageDeltaMerge;
class TMTContext;

std::optional<RegionDataReadInfoList> ReadRegionCommitCache(const RegionPtr & region, bool lock_region);

std::tuple<TableLockHolder, std::shared_ptr<StorageDeltaMerge>, DecodingStorageSchemaSnapshotConstPtr> //
AtomicGetStorageSchema(const RegionPtr & region, TMTContext & tmt);
Expand Down
37 changes: 19 additions & 18 deletions dbms/src/Storages/Transaction/RegionBlockReader.cpp
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
#include <Columns/ColumnsNumber.h>
#include <Common/typeid_cast.h>
#include <Core/Names.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/IManageableStorage.h>
#include <Storages/Transaction/Datum.h>
#include <Storages/Transaction/DatumCodec.h>
#include <Storages/Transaction/DecodingStorageSchemaSnapshot.h>
#include <Storages/Transaction/Region.h>
#include <Storages/Transaction/RegionBlockReader.h>
#include <Storages/Transaction/RowCodec.h>
#include <Storages/Transaction/TiDB.h>
#include <Storages/Transaction/Types.h>

namespace DB
{
Expand Down Expand Up @@ -95,6 +99,7 @@ bool RegionBlockReader::readImpl(Block & block, const RegionDataReadInfoList & d
raw_column->reserve(expected_rows);
}
}

size_t index = 0;
for (const auto & [pk, write_type, commit_ts, value_ptr] : data_list)
{
Expand Down Expand Up @@ -142,37 +147,33 @@ bool RegionBlockReader::readImpl(Block & block, const RegionDataReadInfoList & d
}
else
{
if (schema_snapshot->pk_is_handle)
{
if (!appendRowToBlock(*value_ptr, column_ids_iter, read_column_ids.end(), block, next_column_pos, schema_snapshot->column_infos, schema_snapshot->pk_column_ids[0], force_decode))
return false;
}
else
{
if (!appendRowToBlock(*value_ptr, column_ids_iter, read_column_ids.end(), block, next_column_pos, schema_snapshot->column_infos, InvalidColumnID, force_decode))
return false;
}
// Parse column value from encoded value
if (!appendRowToBlock(*value_ptr, column_ids_iter, read_column_ids.end(), block, next_column_pos, schema_snapshot, force_decode))
return false;
}
}

/// set extra handle column and pk columns if need
/// set extra handle column and pk columns from encoded key if need
if constexpr (pk_type != TMTPKType::STRING)
{
// extra handle column's type is always Int64
// For non-common handle, extra handle column's type is always Int64.
// We need to copy the handle value from encoded key.
const auto handle_value = static_cast<Int64>(pk);
auto * raw_extra_column = const_cast<IColumn *>((block.getByPosition(extra_handle_column_pos)).column.get());
static_cast<ColumnInt64 *>(raw_extra_column)->getData().push_back(Int64(pk));
static_cast<ColumnInt64 *>(raw_extra_column)->getData().push_back(handle_value);
// For pk_is_handle == true, we need to decode the handle value from encoded key, and insert
// to the specify column
if (!pk_column_ids.empty())
{
auto * raw_pk_column = const_cast<IColumn *>((block.getByPosition(pk_pos_map.at(pk_column_ids[0]))).column.get());
if constexpr (pk_type == TMTPKType::INT64)
static_cast<ColumnInt64 *>(raw_pk_column)->getData().push_back(Int64(pk));
static_cast<ColumnInt64 *>(raw_pk_column)->getData().push_back(handle_value);
else if constexpr (pk_type == TMTPKType::UINT64)
static_cast<ColumnUInt64 *>(raw_pk_column)->getData().push_back(UInt64(pk));
static_cast<ColumnUInt64 *>(raw_pk_column)->getData().push_back(UInt64(handle_value));
else
{
// The pk_type must be Int32/Uint32 or more narrow type
// The pk_type must be Int32/UInt32 or more narrow type
// so cannot tell its' exact type here, just use `insert(Field)`
HandleID handle_value(static_cast<Int64>(pk));
raw_pk_column->insert(Field(handle_value));
if (unlikely(raw_pk_column->getInt(index) != handle_value))
{
Expand All @@ -182,7 +183,7 @@ bool RegionBlockReader::readImpl(Block & block, const RegionDataReadInfoList & d
}
else
{
throw Exception("Detected overflow value when decoding pk column of type " + raw_pk_column->getName(),
throw Exception(fmt::format("Detected overflow value when decoding pk column, type={} handle={}", raw_pk_column->getName(), handle_value),
ErrorCodes::LOGICAL_ERROR);
}
}
Expand Down
Loading