Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix decode error when "NULL" value in the column with "primary key" flag (#5879) #5935

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions dbms/src/Columns/ColumnAggregateFunction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,20 @@ void ColumnAggregateFunction::insertRangeFrom(const IColumn & from, size_t start
const ColumnAggregateFunction & from_concrete = static_cast<const ColumnAggregateFunction &>(from);

if (start + length > from_concrete.getData().size())
<<<<<<< HEAD
throw Exception("Parameters start = " + toString(start) + ", length = " + toString(length)
+ " are out of bound in ColumnAggregateFunction::insertRangeFrom method"
" (data.size() = "
+ toString(from_concrete.getData().size())
+ ").",
=======
throw Exception(
fmt::format(
"Parameters are out of bound in ColumnAggregateFunction::insertRangeFrom method, start={}, length={}, from.size()={}",
start,
length,
from_concrete.getData().size()),
>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879))
ErrorCodes::PARAMETER_OUT_OF_BOUND);

if (!empty() && src.get() != &from_concrete)
Expand Down
9 changes: 9 additions & 0 deletions dbms/src/Columns/ColumnArray.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,16 @@ void ColumnArray::insertRangeFrom(const IColumn & src, size_t start, size_t leng
const ColumnArray & src_concrete = static_cast<const ColumnArray &>(src);

if (start + length > src_concrete.getOffsets().size())
<<<<<<< HEAD
throw Exception("Parameter out of bound in ColumnArray::insertRangeFrom method.",
=======
throw Exception(
fmt::format(
"Parameters are out of bound in ColumnArray::insertRangeFrom method, start={}, length={}, src.size()={}",
start,
length,
src_concrete.getOffsets().size()),
>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879))
ErrorCodes::PARAMETER_OUT_OF_BOUND);

size_t nested_offset = src_concrete.offsetAt(start);
Expand Down
9 changes: 9 additions & 0 deletions dbms/src/Columns/ColumnDecimal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,17 @@ void ColumnDecimal<T>::insertRangeFrom(const IColumn & src, size_t start, size_t
const ColumnDecimal & src_vec = static_cast<const ColumnDecimal &>(src);

if (start + length > src_vec.data.size())
<<<<<<< HEAD
throw Exception("Parameters start = " + toString(start) + ", length = " + toString(length) +
" are out of bound in ColumnDecimal<T>::insertRangeFrom method (data.size() = " + toString(src_vec.data.size()) + ").",
=======
throw Exception(
fmt::format(
"Parameters are out of bound in ColumnDecimal<T>::insertRangeFrom method, start={}, length={}, src.size()={}",
start,
length,
src_vec.data.size()),
>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879))
ErrorCodes::PARAMETER_OUT_OF_BOUND);

size_t old_size = data.size();
Expand Down
9 changes: 9 additions & 0 deletions dbms/src/Columns/ColumnFixedString.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,19 @@ void ColumnFixedString::insertRangeFrom(const IColumn & src, size_t start, size_
const ColumnFixedString & src_concrete = static_cast<const ColumnFixedString &>(src);

if (start + length > src_concrete.size())
<<<<<<< HEAD
throw Exception("Parameters start = "
+ toString(start) + ", length = "
+ toString(length) + " are out of bound in ColumnFixedString::insertRangeFrom method"
" (size() = " + toString(src_concrete.size()) + ").",
=======
throw Exception(
fmt::format(
"Parameters are out of bound in ColumnFixedString::insertRangeFrom method, start={}, length={}, src.size()={}",
start,
length,
src_concrete.size()),
>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879))
ErrorCodes::PARAMETER_OUT_OF_BOUND);

size_t old_size = chars.size();
Expand Down
9 changes: 9 additions & 0 deletions dbms/src/Columns/ColumnString.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,16 @@ void ColumnString::insertRangeFrom(const IColumn & src, size_t start, size_t len
const ColumnString & src_concrete = static_cast<const ColumnString &>(src);

if (start + length > src_concrete.offsets.size())
<<<<<<< HEAD
throw Exception("Parameter out of bound in IColumnString::insertRangeFrom method.",
=======
throw Exception(
fmt::format(
"Parameters are out of bound in ColumnString::insertRangeFrom method, start={}, length={}, src.size()={}",
start,
length,
src_concrete.size()),
>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879))
ErrorCodes::PARAMETER_OUT_OF_BOUND);

size_t nested_offset = src_concrete.offsetAt(start);
Expand Down
9 changes: 9 additions & 0 deletions dbms/src/Columns/ColumnVector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,19 @@ void ColumnVector<T>::insertRangeFrom(const IColumn & src, size_t start, size_t
const ColumnVector & src_vec = static_cast<const ColumnVector &>(src);

if (start + length > src_vec.data.size())
<<<<<<< HEAD
throw Exception("Parameters start = "
+ toString(start) + ", length = "
+ toString(length) + " are out of bound in ColumnVector<T>::insertRangeFrom method"
" (data.size() = " + toString(src_vec.data.size()) + ").",
=======
throw Exception(
fmt::format(
"Parameters are out of bound in ColumnVector<T>::insertRangeFrom method, start={}, length={}, src.size()={}",
start,
length,
src_vec.data.size()),
>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879))
ErrorCodes::PARAMETER_OUT_OF_BOUND);

size_t old_size = data.size();
Expand Down
12 changes: 12 additions & 0 deletions dbms/src/Storages/DeltaMerge/SSTFilesToBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ using BoundedSSTFilesToBlockInputStreamPtr = std::shared_ptr<BoundedSSTFilesToBl
class SSTFilesToBlockInputStream final : public IBlockInputStream
{
public:
<<<<<<< HEAD
SSTFilesToBlockInputStream(RegionPtr region_,
const SSTViewVec & snaps_,
const TiFlashRaftProxyHelper * proxy_helper_,
Expand All @@ -50,6 +51,17 @@ class SSTFilesToBlockInputStream final : public IBlockInputStream
TMTContext & tmt_,
size_t expected_size_ = DEFAULT_MERGE_BLOCK_SIZE);
~SSTFilesToBlockInputStream();
=======
SSTFilesToBlockInputStream(RegionPtr region_,
const SSTViewVec & snaps_,
const TiFlashRaftProxyHelper * proxy_helper_,
DecodingStorageSchemaSnapshotConstPtr schema_snap_,
Timestamp gc_safepoint_,
bool force_decode_,
TMTContext & tmt_,
size_t expected_size_ = DEFAULT_MERGE_BLOCK_SIZE);
~SSTFilesToBlockInputStream() override;
>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879))

String getName() const override { return "SSTFilesToBlockInputStream"; }

Expand Down
45 changes: 41 additions & 4 deletions dbms/src/Storages/Transaction/ApplySnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
#include <Storages/Transaction/SSTReader.h>
#include <Storages/Transaction/SchemaSyncer.h>
#include <Storages/Transaction/TMTContext.h>
<<<<<<< HEAD
=======
#include <Storages/Transaction/Types.h>
#include <TiDB/Schema/SchemaSyncer.h>
>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879))

#include <ext/scope_guard.h>

Expand Down Expand Up @@ -258,6 +263,7 @@ void KVStore::onSnapshot(const RegionPtrWrap & new_region_wrap, RegionPtr old_re
}
}

<<<<<<< HEAD
extern RegionPtrWithBlock::CachePtr GenRegionPreDecodeBlockData(const RegionPtr &, Context &);

/// `preHandleSnapshotToBlock` read data from SSTFiles and predoced the data as a block
Expand Down Expand Up @@ -321,10 +327,22 @@ RegionPreDecodeBlockDataPtr KVStore::preHandleSnapshotToBlock(
return cache;
}

=======
>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879))
std::vector<UInt64> KVStore::preHandleSnapshotToFiles(
RegionPtr new_region, const SSTViewVec snaps, uint64_t index, uint64_t term, TMTContext & tmt)
{
return preHandleSSTsToDTFiles(new_region, snaps, index, term, DM::FileConvertJobType::ApplySnapshot, tmt);
std::vector<UInt64> ingest_ids;
try
{
ingest_ids = preHandleSSTsToDTFiles(new_region, snaps, index, term, DM::FileConvertJobType::ApplySnapshot, tmt);
}
catch (DB::Exception & e)
{
e.addMessage(fmt::format("(while preHandleSnapshot region_id={}, index={}, term={})", new_region->id(), index, term));
e.rethrow();
}
return ingest_ids;
}

/// `preHandleSSTsToDTFiles` read data from SSTFiles and generate DTFile(s) for commited data
Expand All @@ -339,7 +357,15 @@ std::vector<UInt64> KVStore::preHandleSSTsToDTFiles(
// Use failpoint to change the expected_block_size for some test cases
fiu_do_on(FailPoints::force_set_sst_to_dtfile_block_size, { expected_block_size = 3; });

<<<<<<< HEAD
PageIds ids;
=======
Stopwatch watch;
SCOPE_EXIT({ GET_METRIC(tiflash_raft_command_duration_seconds, type_apply_snapshot_predecode).Observe(watch.elapsedSeconds()); });

PageIds generated_ingest_ids;
TableID physical_table_id = InvalidTableID;
>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879))
while (true)
{
// If any schema changes is detected during decoding SSTs to DTFiles, we need to cancel and recreate DTFiles with
Expand All @@ -364,6 +390,7 @@ std::vector<UInt64> KVStore::preHandleSSTsToDTFiles(
/* ignore_cache= */ false,
context.getSettingsRef().safe_point_update_interval_seconds);
}
physical_table_id = storage->getTableInfo().id;

// Read from SSTs and refine the boundary of blocks output to DTFiles
auto sst_stream = std::make_shared<DM::SSTFilesToBlockInputStream>(
Expand All @@ -375,7 +402,7 @@ std::vector<UInt64> KVStore::preHandleSSTsToDTFiles(
stream->writePrefix();
stream->write();
stream->writeSuffix();
ids = stream->ingestIds();
generated_ingest_ids = stream->ingestIds();

(void)table_drop_lock; // the table should not be dropped during ingesting file
break;
Expand Down Expand Up @@ -420,12 +447,13 @@ std::vector<UInt64> KVStore::preHandleSSTsToDTFiles(
else
{
// Other unrecoverable error, throw
e.addMessage(fmt::format("physical_table_id={}", physical_table_id));
throw;
}
}
}

return ids;
return generated_ingest_ids;
}

template <typename RegionPtrWrap>
Expand Down Expand Up @@ -594,7 +622,16 @@ RegionPtr KVStore::handleIngestSSTByDTFile(const RegionPtr & region, const SSTVi
}

// Decode the KV pairs in ingesting SST into DTFiles
PageIds ingest_ids = preHandleSSTsToDTFiles(tmp_region, snaps, index, term, DM::FileConvertJobType::IngestSST, tmt);
PageIds ingest_ids;
try
{
ingest_ids = preHandleSSTsToDTFiles(tmp_region, snaps, index, term, DM::FileConvertJobType::IngestSST, tmt);
}
catch (DB::Exception & e)
{
e.addMessage(fmt::format("(while handleIngestSST region_id={}, index={}, term={})", tmp_region->id(), index, term));
e.rethrow();
}

// If `ingest_ids` is empty, ingest SST won't write delete_range for ingest region, it is safe to
// ignore the step of calling `ingestFiles`
Expand Down
46 changes: 25 additions & 21 deletions dbms/src/Storages/Transaction/PartitionStreams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,31 +238,26 @@ std::variant<RegionDataReadInfoList, RegionException::RegionReadStatus, LockInfo
return data_list_read;
}

std::optional<RegionDataReadInfoList> ReadRegionCommitCache(const RegionPtr & region, bool lock_region = true)
std::optional<RegionDataReadInfoList> ReadRegionCommitCache(const RegionPtr & region, bool lock_region)
{
auto scanner = region->createCommittedScanner(lock_region);

/// Some sanity checks for region meta.
{
if (region->isPendingRemove())
return std::nullopt;
}
if (region->isPendingRemove())
return std::nullopt;

/// Read raw KVs from region cache.
{
// Shortcut for empty region.
if (!scanner.hasNext())
return std::nullopt;
// Shortcut for empty region.
if (!scanner.hasNext())
return std::nullopt;

RegionDataReadInfoList data_list_read;
data_list_read.reserve(scanner.writeMapSize());

do
{
data_list_read.emplace_back(scanner.next());
} while (scanner.hasNext());
return data_list_read;
}
RegionDataReadInfoList data_list_read;
data_list_read.reserve(scanner.writeMapSize());
do
{
data_list_read.emplace_back(scanner.next());
} while (scanner.hasNext());
return data_list_read;
}

void RemoveRegionCommitCache(const RegionPtr & region, const RegionDataReadInfoList & data_list_read, bool lock_region = true)
Expand Down Expand Up @@ -401,7 +396,7 @@ RegionPtrWithBlock::CachePtr GenRegionPreDecodeBlockData(const RegionPtr & regio
std::optional<RegionDataReadInfoList> data_list_read = std::nullopt;
try
{
data_list_read = ReadRegionCommitCache(region);
data_list_read = ReadRegionCommitCache(region, true);
if (!data_list_read)
return nullptr;
}
Expand Down Expand Up @@ -567,28 +562,37 @@ static Block sortColumnsBySchemaSnap(Block && ori, const DM::ColumnDefines & sch
/// The return value is a block that store the committed data scanned and removed from `region`.
/// The columns of returned block is sorted by `schema_snap`.
Block GenRegionBlockDataWithSchema(const RegionPtr & region, //
<<<<<<< HEAD
const DecodingStorageSchemaSnapshot & schema_snap,
Timestamp gc_safepoint,
bool force_decode,
TMTContext & tmt)
=======
const DecodingStorageSchemaSnapshotConstPtr & schema_snap,
Timestamp gc_safepoint,
bool force_decode,
TMTContext & /* */)
>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879))
{
// In 5.0.1, feature `compaction filter` is enabled by default. Under such feature tikv will do gc in write & default cf individually.
// If some rows were updated and add tiflash replica, tiflash store may receive region snapshot with unmatched data in write & default cf sst files.
fiu_do_on(FailPoints::force_set_safepoint_when_decode_block,
{ gc_safepoint = 10000000; }); // Mock a GC safepoint for testing compaction filter
region->tryCompactionFilter(gc_safepoint);

std::optional<RegionDataReadInfoList> data_list_read = std::nullopt;
data_list_read = ReadRegionCommitCache(region);
std::optional<RegionDataReadInfoList> data_list_read = ReadRegionCommitCache(region, true);

Block res_block;
// No committed data, just return
if (!data_list_read)
return res_block;

<<<<<<< HEAD
auto context = tmt.getContext();
auto metrics = context.getTiFlashMetrics();

=======
>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879))
{
Stopwatch watch;
// Compare schema_snap with current schema, throw exception if changed.
Expand Down
12 changes: 12 additions & 0 deletions dbms/src/Storages/Transaction/PartitionStreams.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

#include <Storages/ColumnsDescription.h>
#include <Storages/DeltaMerge/DeltaMergeDefines.h>
<<<<<<< HEAD
=======
#include <Storages/TableLockHolder.h>
#include <Storages/Transaction/DecodingStorageSchemaSnapshot.h>
#include <Storages/Transaction/RegionDataRead.h>
>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879))
#include <Storages/Transaction/TiDB.h>

namespace DB
Expand All @@ -11,6 +17,7 @@ class Region;
using RegionPtr = std::shared_ptr<Region>;
class StorageDeltaMerge;

<<<<<<< HEAD
/**
* A snapshot of the table structure of a DeltaTree storage. We use it to decode Raft snapshot
* data with a consistent table structure.
Expand Down Expand Up @@ -38,6 +45,11 @@ struct DecodingStorageSchemaSnapshot
};

std::tuple<TableLockHolder, std::shared_ptr<StorageDeltaMerge>, DecodingStorageSchemaSnapshot> //
=======
std::optional<RegionDataReadInfoList> ReadRegionCommitCache(const RegionPtr & region, bool lock_region);

std::tuple<TableLockHolder, std::shared_ptr<StorageDeltaMerge>, DecodingStorageSchemaSnapshotConstPtr> //
>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879))
AtomicGetStorageSchema(const RegionPtr & region, TMTContext & tmt);

Block GenRegionBlockDataWithSchema(const RegionPtr & region, //
Expand Down
Loading