Skip to content

Commit

Permalink
Fix decode error when "NULL" value in the column with "primary key" f…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored and JaySon-Huang committed Sep 27, 2022
1 parent 04c303a commit 242d324
Show file tree
Hide file tree
Showing 20 changed files with 513 additions and 159 deletions.
2 changes: 1 addition & 1 deletion dbms/src/Columns/ColumnAggregateFunction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ void ColumnAggregateFunction::insertRangeFrom(const IColumn & from, size_t start
if (start + length > from_concrete.getData().size())
throw Exception(
fmt::format(
"Parameters start = {}, length = {} are out of bound in ColumnAggregateFunction::insertRangeFrom method (data.size() = {}).",
"Parameters are out of bound in ColumnAggregateFunction::insertRangeFrom method, start={}, length={}, from.size()={}",
start,
length,
from_concrete.getData().size()),
Expand Down
9 changes: 7 additions & 2 deletions dbms/src/Columns/ColumnArray.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -414,8 +414,13 @@ void ColumnArray::insertRangeFrom(const IColumn & src, size_t start, size_t leng
const ColumnArray & src_concrete = static_cast<const ColumnArray &>(src);

if (start + length > src_concrete.getOffsets().size())
throw Exception("Parameter out of bound in ColumnArray::insertRangeFrom method.",
ErrorCodes::PARAMETER_OUT_OF_BOUND);
throw Exception(
fmt::format(
"Parameters are out of bound in ColumnArray::insertRangeFrom method, start={}, length={}, src.size()={}",
start,
length,
src_concrete.getOffsets().size()),
ErrorCodes::PARAMETER_OUT_OF_BOUND);

size_t nested_offset = src_concrete.offsetAt(start);
size_t nested_length = src_concrete.getOffsets()[start + length - 1] - nested_offset;
Expand Down
9 changes: 7 additions & 2 deletions dbms/src/Columns/ColumnDecimal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,13 @@ void ColumnDecimal<T>::insertRangeFrom(const IColumn & src, size_t start, size_t
const ColumnDecimal & src_vec = static_cast<const ColumnDecimal &>(src);

if (start + length > src_vec.data.size())
throw Exception("Parameters start = " + toString(start) + ", length = " + toString(length) + " are out of bound in ColumnDecimal<T>::insertRangeFrom method (data.size() = " + toString(src_vec.data.size()) + ").",
ErrorCodes::PARAMETER_OUT_OF_BOUND);
throw Exception(
fmt::format(
"Parameters are out of bound in ColumnDecimal<T>::insertRangeFrom method, start={}, length={}, src.size()={}",
start,
length,
src_vec.data.size()),
ErrorCodes::PARAMETER_OUT_OF_BOUND);

size_t old_size = data.size();
data.resize(old_size + length);
Expand Down
13 changes: 7 additions & 6 deletions dbms/src/Columns/ColumnFixedString.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,13 @@ void ColumnFixedString::insertRangeFrom(const IColumn & src, size_t start, size_
const ColumnFixedString & src_concrete = static_cast<const ColumnFixedString &>(src);

if (start + length > src_concrete.size())
throw Exception("Parameters start = "
+ toString(start) + ", length = "
+ toString(length) + " are out of bound in ColumnFixedString::insertRangeFrom method"
" (size() = "
+ toString(src_concrete.size()) + ").",
ErrorCodes::PARAMETER_OUT_OF_BOUND);
throw Exception(
fmt::format(
"Parameters are out of bound in ColumnFixedString::insertRangeFrom method, start={}, length={}, src.size()={}",
start,
length,
src_concrete.size()),
ErrorCodes::PARAMETER_OUT_OF_BOUND);

size_t old_size = chars.size();
chars.resize(old_size + length * n);
Expand Down
9 changes: 7 additions & 2 deletions dbms/src/Columns/ColumnString.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,13 @@ void ColumnString::insertRangeFrom(const IColumn & src, size_t start, size_t len
const ColumnString & src_concrete = static_cast<const ColumnString &>(src);

if (start + length > src_concrete.offsets.size())
throw Exception("Parameter out of bound in IColumnString::insertRangeFrom method.",
ErrorCodes::PARAMETER_OUT_OF_BOUND);
throw Exception(
fmt::format(
"Parameters are out of bound in ColumnString::insertRangeFrom method, start={}, length={}, src.size()={}",
start,
length,
src_concrete.size()),
ErrorCodes::PARAMETER_OUT_OF_BOUND);

size_t nested_offset = src_concrete.offsetAt(start);
size_t nested_length = src_concrete.offsets[start + length - 1] - nested_offset;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Columns/ColumnVector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ void ColumnVector<T>::insertRangeFrom(const IColumn & src, size_t start, size_t
if (start + length > src_vec.data.size())
throw Exception(
fmt::format(
"Parameters start = {}, length = {} are out of bound in ColumnVector<T>::insertRangeFrom method (data.size() = {}).",
"Parameters are out of bound in ColumnVector<T>::insertRangeFrom method, start={}, length={}, src.size()={}",
start,
length,
src_vec.data.size()),
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,8 @@ void MockRaftCommand::dbgFuncRegionSnapshot(Context & context, const ASTs & args
}
region_info.set_start_key(start_key.toString());
region_info.set_end_key(end_key.toString());
*region_info.add_peers() = createPeer(1, true);
*region_info.add_peers() = createPeer(2, true);
*region_info.add_peers() = tests::createPeer(1, true);
*region_info.add_peers() = tests::createPeer(2, true);
auto peer_id = 1;
auto start_decoded_key = RecordKVFormat::decodeTiKVKey(start_key);
auto end_decoded_key = RecordKVFormat::decodeTiKVKey(end_key);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/SSTFilesToBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class SSTFilesToBlockInputStream final : public IBlockInputStream
bool force_decode_,
TMTContext & tmt_,
size_t expected_size_ = DEFAULT_MERGE_BLOCK_SIZE);
~SSTFilesToBlockInputStream();
~SSTFilesToBlockInputStream() override;

String getName() const override { return "SSTFilesToBlockInputStream"; }

Expand Down
33 changes: 28 additions & 5 deletions dbms/src/Storages/Transaction/ApplySnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <Storages/Transaction/SSTReader.h>
#include <Storages/Transaction/SchemaSyncer.h>
#include <Storages/Transaction/TMTContext.h>
#include <Storages/Transaction/Types.h>

#include <ext/scope_guard.h>

Expand Down Expand Up @@ -304,7 +305,17 @@ std::vector<UInt64> KVStore::preHandleSnapshotToFiles(
uint64_t term,
TMTContext & tmt)
{
return preHandleSSTsToDTFiles(new_region, snaps, index, term, DM::FileConvertJobType::ApplySnapshot, tmt);
std::vector<UInt64> ingest_ids;
try
{
ingest_ids = preHandleSSTsToDTFiles(new_region, snaps, index, term, DM::FileConvertJobType::ApplySnapshot, tmt);
}
catch (DB::Exception & e)
{
e.addMessage(fmt::format("(while preHandleSnapshot region_id={}, index={}, term={})", new_region->id(), index, term));
e.rethrow();
}
return ingest_ids;
}

/// `preHandleSSTsToDTFiles` read data from SSTFiles and generate DTFile(s) for commited data
Expand All @@ -324,7 +335,8 @@ std::vector<UInt64> KVStore::preHandleSSTsToDTFiles(
// Use failpoint to change the expected_block_size for some test cases
fiu_do_on(FailPoints::force_set_sst_to_dtfile_block_size, { expected_block_size = 3; });

PageIds ids;
PageIds generated_ingest_ids;
TableID physical_table_id = InvalidTableID;
while (true)
{
// If any schema changes is detected during decoding SSTs to DTFiles, we need to cancel and recreate DTFiles with
Expand All @@ -349,6 +361,7 @@ std::vector<UInt64> KVStore::preHandleSSTsToDTFiles(
/* ignore_cache= */ false,
context.getSettingsRef().safe_point_update_interval_seconds);
}
physical_table_id = storage->getTableInfo().id;

// Read from SSTs and refine the boundary of blocks output to DTFiles
auto sst_stream = std::make_shared<DM::SSTFilesToBlockInputStream>(
Expand All @@ -372,7 +385,7 @@ std::vector<UInt64> KVStore::preHandleSSTsToDTFiles(
stream->writePrefix();
stream->write();
stream->writeSuffix();
ids = stream->ingestIds();
generated_ingest_ids = stream->ingestIds();

(void)table_drop_lock; // the table should not be dropped during ingesting file
break;
Expand Down Expand Up @@ -416,12 +429,13 @@ std::vector<UInt64> KVStore::preHandleSSTsToDTFiles(
else
{
// Other unrecoverable error, throw
e.addMessage(fmt::format("physical_table_id={}", physical_table_id));
throw;
}
}
}

return ids;
return generated_ingest_ids;
}

template <typename RegionPtrWrap>
Expand Down Expand Up @@ -589,7 +603,16 @@ RegionPtr KVStore::handleIngestSSTByDTFile(const RegionPtr & region, const SSTVi
}

// Decode the KV pairs in ingesting SST into DTFiles
PageIds ingest_ids = preHandleSSTsToDTFiles(tmp_region, snaps, index, term, DM::FileConvertJobType::IngestSST, tmt);
PageIds ingest_ids;
try
{
ingest_ids = preHandleSSTsToDTFiles(tmp_region, snaps, index, term, DM::FileConvertJobType::IngestSST, tmt);
}
catch (DB::Exception & e)
{
e.addMessage(fmt::format("(while handleIngestSST region_id={}, index={}, term={})", tmp_region->id(), index, term));
e.rethrow();
}

// If `ingest_ids` is empty, ingest SST won't write delete_range for ingest region, it is safe to
// ignore the step of calling `ingestFiles`
Expand Down
40 changes: 16 additions & 24 deletions dbms/src/Storages/Transaction/PartitionStreams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,31 +241,26 @@ std::variant<RegionDataReadInfoList, RegionException::RegionReadStatus, LockInfo
return data_list_read;
}

std::optional<RegionDataReadInfoList> ReadRegionCommitCache(const RegionPtr & region, bool lock_region = true)
std::optional<RegionDataReadInfoList> ReadRegionCommitCache(const RegionPtr & region, bool lock_region)
{
auto scanner = region->createCommittedScanner(lock_region);

/// Some sanity checks for region meta.
{
if (region->isPendingRemove())
return std::nullopt;
}
if (region->isPendingRemove())
return std::nullopt;

/// Read raw KVs from region cache.
{
// Shortcut for empty region.
if (!scanner.hasNext())
return std::nullopt;
// Shortcut for empty region.
if (!scanner.hasNext())
return std::nullopt;

RegionDataReadInfoList data_list_read;
data_list_read.reserve(scanner.writeMapSize());

do
{
data_list_read.emplace_back(scanner.next());
} while (scanner.hasNext());
return data_list_read;
}
RegionDataReadInfoList data_list_read;
data_list_read.reserve(scanner.writeMapSize());
do
{
data_list_read.emplace_back(scanner.next());
} while (scanner.hasNext());
return data_list_read;
}

void RemoveRegionCommitCache(const RegionPtr & region, const RegionDataReadInfoList & data_list_read, bool lock_region = true)
Expand Down Expand Up @@ -447,7 +442,7 @@ RegionPtrWithBlock::CachePtr GenRegionPreDecodeBlockData(const RegionPtr & regio
std::optional<RegionDataReadInfoList> data_list_read = std::nullopt;
try
{
data_list_read = ReadRegionCommitCache(region);
data_list_read = ReadRegionCommitCache(region, true);
if (!data_list_read)
return nullptr;
}
Expand Down Expand Up @@ -603,24 +598,21 @@ Block GenRegionBlockDataWithSchema(const RegionPtr & region, //
const DecodingStorageSchemaSnapshotConstPtr & schema_snap,
Timestamp gc_safepoint,
bool force_decode,
TMTContext & tmt)
TMTContext & /* */)
{
// In 5.0.1, feature `compaction filter` is enabled by default. Under such feature tikv will do gc in write & default cf individually.
// If some rows were updated and add tiflash replica, tiflash store may receive region snapshot with unmatched data in write & default cf sst files.
fiu_do_on(FailPoints::force_set_safepoint_when_decode_block,
{ gc_safepoint = 10000000; }); // Mock a GC safepoint for testing compaction filter
region->tryCompactionFilter(gc_safepoint);

std::optional<RegionDataReadInfoList> data_list_read = std::nullopt;
data_list_read = ReadRegionCommitCache(region);
std::optional<RegionDataReadInfoList> data_list_read = ReadRegionCommitCache(region, true);

Block res_block;
// No committed data, just return
if (!data_list_read)
return res_block;

auto context = tmt.getContext();

{
Stopwatch watch;
{
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Storages/Transaction/PartitionStreams.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@

#include <Storages/ColumnsDescription.h>
#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/TableLockHolder.h>
#include <Storages/Transaction/DecodingStorageSchemaSnapshot.h>
#include <Storages/Transaction/RegionDataRead.h>
#include <Storages/Transaction/TiDB.h>

namespace DB
{
class Region;
using RegionPtr = std::shared_ptr<Region>;
class StorageDeltaMerge;
class TMTContext;

std::optional<RegionDataReadInfoList> ReadRegionCommitCache(const RegionPtr & region, bool lock_region);

std::tuple<TableLockHolder, std::shared_ptr<StorageDeltaMerge>, DecodingStorageSchemaSnapshotConstPtr> //
AtomicGetStorageSchema(const RegionPtr & region, TMTContext & tmt);
Expand Down
37 changes: 19 additions & 18 deletions dbms/src/Storages/Transaction/RegionBlockReader.cpp
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
#include <Columns/ColumnsNumber.h>
#include <Common/typeid_cast.h>
#include <Core/Names.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/IManageableStorage.h>
#include <Storages/Transaction/Datum.h>
#include <Storages/Transaction/DatumCodec.h>
#include <Storages/Transaction/DecodingStorageSchemaSnapshot.h>
#include <Storages/Transaction/Region.h>
#include <Storages/Transaction/RegionBlockReader.h>
#include <Storages/Transaction/RowCodec.h>
#include <Storages/Transaction/TiDB.h>
#include <Storages/Transaction/Types.h>

namespace DB
{
Expand Down Expand Up @@ -95,6 +99,7 @@ bool RegionBlockReader::readImpl(Block & block, const RegionDataReadInfoList & d
raw_column->reserve(expected_rows);
}
}

size_t index = 0;
for (const auto & [pk, write_type, commit_ts, value_ptr] : data_list)
{
Expand Down Expand Up @@ -142,37 +147,33 @@ bool RegionBlockReader::readImpl(Block & block, const RegionDataReadInfoList & d
}
else
{
if (schema_snapshot->pk_is_handle)
{
if (!appendRowToBlock(*value_ptr, column_ids_iter, read_column_ids.end(), block, next_column_pos, schema_snapshot->column_infos, schema_snapshot->pk_column_ids[0], force_decode))
return false;
}
else
{
if (!appendRowToBlock(*value_ptr, column_ids_iter, read_column_ids.end(), block, next_column_pos, schema_snapshot->column_infos, InvalidColumnID, force_decode))
return false;
}
// Parse column value from encoded value
if (!appendRowToBlock(*value_ptr, column_ids_iter, read_column_ids.end(), block, next_column_pos, schema_snapshot, force_decode))
return false;
}
}

/// set extra handle column and pk columns if need
/// set extra handle column and pk columns from encoded key if need
if constexpr (pk_type != TMTPKType::STRING)
{
// extra handle column's type is always Int64
// For non-common handle, extra handle column's type is always Int64.
// We need to copy the handle value from encoded key.
const auto handle_value = static_cast<Int64>(pk);
auto * raw_extra_column = const_cast<IColumn *>((block.getByPosition(extra_handle_column_pos)).column.get());
static_cast<ColumnInt64 *>(raw_extra_column)->getData().push_back(Int64(pk));
static_cast<ColumnInt64 *>(raw_extra_column)->getData().push_back(handle_value);
// For pk_is_handle == true, we need to decode the handle value from encoded key, and insert
// to the specify column
if (!pk_column_ids.empty())
{
auto * raw_pk_column = const_cast<IColumn *>((block.getByPosition(pk_pos_map.at(pk_column_ids[0]))).column.get());
if constexpr (pk_type == TMTPKType::INT64)
static_cast<ColumnInt64 *>(raw_pk_column)->getData().push_back(Int64(pk));
static_cast<ColumnInt64 *>(raw_pk_column)->getData().push_back(handle_value);
else if constexpr (pk_type == TMTPKType::UINT64)
static_cast<ColumnUInt64 *>(raw_pk_column)->getData().push_back(UInt64(pk));
static_cast<ColumnUInt64 *>(raw_pk_column)->getData().push_back(UInt64(handle_value));
else
{
// The pk_type must be Int32/Uint32 or more narrow type
// The pk_type must be Int32/UInt32 or more narrow type
// so cannot tell its' exact type here, just use `insert(Field)`
HandleID handle_value(static_cast<Int64>(pk));
raw_pk_column->insert(Field(handle_value));
if (unlikely(raw_pk_column->getInt(index) != handle_value))
{
Expand All @@ -182,7 +183,7 @@ bool RegionBlockReader::readImpl(Block & block, const RegionDataReadInfoList & d
}
else
{
throw Exception("Detected overflow value when decoding pk column of type " + raw_pk_column->getName(),
throw Exception(fmt::format("Detected overflow value when decoding pk column, type={} handle={}", raw_pk_column->getName(), handle_value),
ErrorCodes::LOGICAL_ERROR);
}
}
Expand Down
Loading

0 comments on commit 242d324

Please sign in to comment.