Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#5879
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
JaySon-Huang authored and ti-chi-bot committed Sep 16, 2022
1 parent 25545c4 commit 8bbf2ca
Show file tree
Hide file tree
Showing 18 changed files with 526 additions and 141 deletions.
2 changes: 1 addition & 1 deletion dbms/src/Columns/ColumnAggregateFunction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ void ColumnAggregateFunction::insertRangeFrom(const IColumn & from, size_t start
if (start + length > from_concrete.getData().size())
throw Exception(
fmt::format(
"Parameters start = {}, length = {} are out of bound in ColumnAggregateFunction::insertRangeFrom method (data.size() = {}).",
"Parameters are out of bound in ColumnAggregateFunction::insertRangeFrom method, start={}, length={}, from.size()={}",
start,
length,
from_concrete.getData().size()),
Expand Down
9 changes: 7 additions & 2 deletions dbms/src/Columns/ColumnArray.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -428,8 +428,13 @@ void ColumnArray::insertRangeFrom(const IColumn & src, size_t start, size_t leng
const ColumnArray & src_concrete = static_cast<const ColumnArray &>(src);

if (start + length > src_concrete.getOffsets().size())
throw Exception("Parameter out of bound in ColumnArray::insertRangeFrom method.",
ErrorCodes::PARAMETER_OUT_OF_BOUND);
throw Exception(
fmt::format(
"Parameters are out of bound in ColumnArray::insertRangeFrom method, start={}, length={}, src.size()={}",
start,
length,
src_concrete.getOffsets().size()),
ErrorCodes::PARAMETER_OUT_OF_BOUND);

size_t nested_offset = src_concrete.offsetAt(start);
size_t nested_length = src_concrete.getOffsets()[start + length - 1] - nested_offset;
Expand Down
9 changes: 7 additions & 2 deletions dbms/src/Columns/ColumnDecimal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,13 @@ void ColumnDecimal<T>::insertRangeFrom(const IColumn & src, size_t start, size_t
const ColumnDecimal & src_vec = static_cast<const ColumnDecimal &>(src);

if (start + length > src_vec.data.size())
throw Exception("Parameters start = " + toString(start) + ", length = " + toString(length) + " are out of bound in ColumnDecimal<T>::insertRangeFrom method (data.size() = " + toString(src_vec.data.size()) + ").",
ErrorCodes::PARAMETER_OUT_OF_BOUND);
throw Exception(
fmt::format(
"Parameters are out of bound in ColumnDecimal<T>::insertRangeFrom method, start={}, length={}, src.size()={}",
start,
length,
src_vec.data.size()),
ErrorCodes::PARAMETER_OUT_OF_BOUND);

size_t old_size = data.size();
data.resize(old_size + length);
Expand Down
13 changes: 7 additions & 6 deletions dbms/src/Columns/ColumnFixedString.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,13 @@ void ColumnFixedString::insertRangeFrom(const IColumn & src, size_t start, size_
const ColumnFixedString & src_concrete = static_cast<const ColumnFixedString &>(src);

if (start + length > src_concrete.size())
throw Exception("Parameters start = "
+ toString(start) + ", length = "
+ toString(length) + " are out of bound in ColumnFixedString::insertRangeFrom method"
" (size() = "
+ toString(src_concrete.size()) + ").",
ErrorCodes::PARAMETER_OUT_OF_BOUND);
throw Exception(
fmt::format(
"Parameters are out of bound in ColumnFixedString::insertRangeFrom method, start={}, length={}, src.size()={}",
start,
length,
src_concrete.size()),
ErrorCodes::PARAMETER_OUT_OF_BOUND);

size_t old_size = chars.size();
chars.resize(old_size + length * n);
Expand Down
9 changes: 7 additions & 2 deletions dbms/src/Columns/ColumnString.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,13 @@ void ColumnString::insertRangeFrom(const IColumn & src, size_t start, size_t len
const ColumnString & src_concrete = static_cast<const ColumnString &>(src);

if (start + length > src_concrete.offsets.size())
throw Exception("Parameter out of bound in IColumnString::insertRangeFrom method.",
ErrorCodes::PARAMETER_OUT_OF_BOUND);
throw Exception(
fmt::format(
"Parameters are out of bound in ColumnString::insertRangeFrom method, start={}, length={}, src.size()={}",
start,
length,
src_concrete.size()),
ErrorCodes::PARAMETER_OUT_OF_BOUND);

size_t nested_offset = src_concrete.offsetAt(start);
size_t nested_length = src_concrete.offsets[start + length - 1] - nested_offset;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Columns/ColumnVector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ void ColumnVector<T>::insertRangeFrom(const IColumn & src, size_t start, size_t
if (start + length > src_vec.data.size())
throw Exception(
fmt::format(
"Parameters start = {}, length = {} are out of bound in ColumnVector<T>::insertRangeFrom method (data.size() = {}).",
"Parameters are out of bound in ColumnVector<T>::insertRangeFrom method, start={}, length={}, src.size()={}",
start,
length,
src_vec.data.size()),
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/SSTFilesToBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class SSTFilesToBlockInputStream final : public IBlockInputStream
bool force_decode_,
TMTContext & tmt_,
size_t expected_size_ = DEFAULT_MERGE_BLOCK_SIZE);
~SSTFilesToBlockInputStream();
~SSTFilesToBlockInputStream() override;

String getName() const override { return "SSTFilesToBlockInputStream"; }

Expand Down
45 changes: 41 additions & 4 deletions dbms/src/Storages/Transaction/ApplySnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@
#include <Storages/Transaction/SSTReader.h>
#include <Storages/Transaction/SchemaSyncer.h>
#include <Storages/Transaction/TMTContext.h>
<<<<<<< HEAD
=======
#include <Storages/Transaction/Types.h>
#include <TiDB/Schema/SchemaSyncer.h>
>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879))

#include <ext/scope_guard.h>

Expand Down Expand Up @@ -246,6 +251,7 @@ void KVStore::onSnapshot(const RegionPtrWrap & new_region_wrap, RegionPtr old_re
}
}

<<<<<<< HEAD
extern RegionPtrWithBlock::CachePtr GenRegionPreDecodeBlockData(const RegionPtr &, Context &);

/// `preHandleSnapshotToBlock` read data from SSTFiles and predoced the data as a block
Expand Down Expand Up @@ -317,14 +323,26 @@ RegionPreDecodeBlockDataPtr KVStore::preHandleSnapshotToBlock(
return cache;
}

=======
>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879))
std::vector<UInt64> KVStore::preHandleSnapshotToFiles(
RegionPtr new_region,
const SSTViewVec snaps,
uint64_t index,
uint64_t term,
TMTContext & tmt)
{
return preHandleSSTsToDTFiles(new_region, snaps, index, term, DM::FileConvertJobType::ApplySnapshot, tmt);
std::vector<UInt64> ingest_ids;
try
{
ingest_ids = preHandleSSTsToDTFiles(new_region, snaps, index, term, DM::FileConvertJobType::ApplySnapshot, tmt);
}
catch (DB::Exception & e)
{
e.addMessage(fmt::format("(while preHandleSnapshot region_id={}, index={}, term={})", new_region->id(), index, term));
e.rethrow();
}
return ingest_ids;
}

/// `preHandleSSTsToDTFiles` read data from SSTFiles and generate DTFile(s) for commited data
Expand All @@ -344,7 +362,15 @@ std::vector<UInt64> KVStore::preHandleSSTsToDTFiles(
// Use failpoint to change the expected_block_size for some test cases
fiu_do_on(FailPoints::force_set_sst_to_dtfile_block_size, { expected_block_size = 3; });

<<<<<<< HEAD
PageIds ids;
=======
Stopwatch watch;
SCOPE_EXIT({ GET_METRIC(tiflash_raft_command_duration_seconds, type_apply_snapshot_predecode).Observe(watch.elapsedSeconds()); });

PageIds generated_ingest_ids;
TableID physical_table_id = InvalidTableID;
>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879))
while (true)
{
// If any schema changes is detected during decoding SSTs to DTFiles, we need to cancel and recreate DTFiles with
Expand All @@ -369,6 +395,7 @@ std::vector<UInt64> KVStore::preHandleSSTsToDTFiles(
/* ignore_cache= */ false,
context.getSettingsRef().safe_point_update_interval_seconds);
}
physical_table_id = storage->getTableInfo().id;

// Read from SSTs and refine the boundary of blocks output to DTFiles
auto sst_stream = std::make_shared<DM::SSTFilesToBlockInputStream>(
Expand All @@ -392,7 +419,7 @@ std::vector<UInt64> KVStore::preHandleSSTsToDTFiles(
stream->writePrefix();
stream->write();
stream->writeSuffix();
ids = stream->ingestIds();
generated_ingest_ids = stream->ingestIds();

(void)table_drop_lock; // the table should not be dropped during ingesting file
break;
Expand Down Expand Up @@ -436,12 +463,13 @@ std::vector<UInt64> KVStore::preHandleSSTsToDTFiles(
else
{
// Other unrecoverable error, throw
e.addMessage(fmt::format("physical_table_id={}", physical_table_id));
throw;
}
}
}

return ids;
return generated_ingest_ids;
}

template <typename RegionPtrWrap>
Expand Down Expand Up @@ -611,7 +639,16 @@ RegionPtr KVStore::handleIngestSSTByDTFile(const RegionPtr & region, const SSTVi
}

// Decode the KV pairs in ingesting SST into DTFiles
PageIds ingest_ids = preHandleSSTsToDTFiles(tmp_region, snaps, index, term, DM::FileConvertJobType::IngestSST, tmt);
PageIds ingest_ids;
try
{
ingest_ids = preHandleSSTsToDTFiles(tmp_region, snaps, index, term, DM::FileConvertJobType::IngestSST, tmt);
}
catch (DB::Exception & e)
{
e.addMessage(fmt::format("(while handleIngestSST region_id={}, index={}, term={})", tmp_region->id(), index, term));
e.rethrow();
}

// If `ingest_ids` is empty, ingest SST won't write delete_range for ingest region, it is safe to
// ignore the step of calling `ingestFiles`
Expand Down
40 changes: 16 additions & 24 deletions dbms/src/Storages/Transaction/PartitionStreams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,31 +253,26 @@ std::variant<RegionDataReadInfoList, RegionException::RegionReadStatus, LockInfo
return data_list_read;
}

std::optional<RegionDataReadInfoList> ReadRegionCommitCache(const RegionPtr & region, bool lock_region = true)
std::optional<RegionDataReadInfoList> ReadRegionCommitCache(const RegionPtr & region, bool lock_region)
{
auto scanner = region->createCommittedScanner(lock_region);

/// Some sanity checks for region meta.
{
if (region->isPendingRemove())
return std::nullopt;
}
if (region->isPendingRemove())
return std::nullopt;

/// Read raw KVs from region cache.
{
// Shortcut for empty region.
if (!scanner.hasNext())
return std::nullopt;
// Shortcut for empty region.
if (!scanner.hasNext())
return std::nullopt;

RegionDataReadInfoList data_list_read;
data_list_read.reserve(scanner.writeMapSize());

do
{
data_list_read.emplace_back(scanner.next());
} while (scanner.hasNext());
return data_list_read;
}
RegionDataReadInfoList data_list_read;
data_list_read.reserve(scanner.writeMapSize());
do
{
data_list_read.emplace_back(scanner.next());
} while (scanner.hasNext());
return data_list_read;
}

void RemoveRegionCommitCache(const RegionPtr & region, const RegionDataReadInfoList & data_list_read, bool lock_region = true)
Expand Down Expand Up @@ -459,7 +454,7 @@ RegionPtrWithBlock::CachePtr GenRegionPreDecodeBlockData(const RegionPtr & regio
std::optional<RegionDataReadInfoList> data_list_read = std::nullopt;
try
{
data_list_read = ReadRegionCommitCache(region);
data_list_read = ReadRegionCommitCache(region, true);
if (!data_list_read)
return nullptr;
}
Expand Down Expand Up @@ -617,24 +612,21 @@ Block GenRegionBlockDataWithSchema(const RegionPtr & region, //
const DecodingStorageSchemaSnapshotConstPtr & schema_snap,
Timestamp gc_safepoint,
bool force_decode,
TMTContext & tmt)
TMTContext & /* */)
{
// In 5.0.1, feature `compaction filter` is enabled by default. Under such feature tikv will do gc in write & default cf individually.
// If some rows were updated and add tiflash replica, tiflash store may receive region snapshot with unmatched data in write & default cf sst files.
fiu_do_on(FailPoints::force_set_safepoint_when_decode_block,
{ gc_safepoint = 10000000; }); // Mock a GC safepoint for testing compaction filter
region->tryCompactionFilter(gc_safepoint);

std::optional<RegionDataReadInfoList> data_list_read = std::nullopt;
data_list_read = ReadRegionCommitCache(region);
std::optional<RegionDataReadInfoList> data_list_read = ReadRegionCommitCache(region, true);

Block res_block;
// No committed data, just return
if (!data_list_read)
return res_block;

auto context = tmt.getContext();

{
Stopwatch watch;
{
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Storages/Transaction/PartitionStreams.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

#include <Storages/ColumnsDescription.h>
#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/TableLockHolder.h>
#include <Storages/Transaction/DecodingStorageSchemaSnapshot.h>
#include <Storages/Transaction/RegionDataRead.h>
#include <Storages/Transaction/TiDB.h>

namespace DB
Expand All @@ -25,6 +27,8 @@ class Region;
using RegionPtr = std::shared_ptr<Region>;
class StorageDeltaMerge;

std::optional<RegionDataReadInfoList> ReadRegionCommitCache(const RegionPtr & region, bool lock_region);

std::tuple<TableLockHolder, std::shared_ptr<StorageDeltaMerge>, DecodingStorageSchemaSnapshotConstPtr> //
AtomicGetStorageSchema(const RegionPtr & region, TMTContext & tmt);

Expand Down
39 changes: 22 additions & 17 deletions dbms/src/Storages/Transaction/RegionBlockReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,18 @@
// limitations under the License.

#include <Columns/ColumnsNumber.h>
#include <Common/typeid_cast.h>
#include <Core/Names.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/IManageableStorage.h>
#include <Storages/Transaction/Datum.h>
#include <Storages/Transaction/DatumCodec.h>
#include <Storages/Transaction/DecodingStorageSchemaSnapshot.h>
#include <Storages/Transaction/Region.h>
#include <Storages/Transaction/RegionBlockReader.h>
#include <Storages/Transaction/RowCodec.h>
#include <Storages/Transaction/TiDB.h>
#include <Storages/Transaction/Types.h>

namespace DB
{
Expand Down Expand Up @@ -109,6 +113,7 @@ bool RegionBlockReader::readImpl(Block & block, const RegionDataReadInfoList & d
raw_column->reserve(expected_rows);
}
}

size_t index = 0;
for (const auto & [pk, write_type, commit_ts, value_ptr] : data_list)
{
Expand Down Expand Up @@ -156,37 +161,37 @@ bool RegionBlockReader::readImpl(Block & block, const RegionDataReadInfoList & d
}
else
{
if (schema_snapshot->pk_is_handle)
{
if (!appendRowToBlock(*value_ptr, column_ids_iter, read_column_ids.end(), block, next_column_pos, schema_snapshot->column_infos, schema_snapshot->pk_column_ids[0], force_decode))
return false;
}
else
{
if (!appendRowToBlock(*value_ptr, column_ids_iter, read_column_ids.end(), block, next_column_pos, schema_snapshot->column_infos, InvalidColumnID, force_decode))
return false;
}
// Parse column value from encoded value
if (!appendRowToBlock(*value_ptr, column_ids_iter, read_column_ids.end(), block, next_column_pos, schema_snapshot, force_decode))
return false;
}
}

/// set extra handle column and pk columns if need
/// set extra handle column and pk columns from encoded key if need
if constexpr (pk_type != TMTPKType::STRING)
{
// extra handle column's type is always Int64
// For non-common handle, extra handle column's type is always Int64.
// We need to copy the handle value from encoded key.
const auto handle_value = static_cast<Int64>(pk);
auto * raw_extra_column = const_cast<IColumn *>((block.getByPosition(extra_handle_column_pos)).column.get());
static_cast<ColumnInt64 *>(raw_extra_column)->getData().push_back(Int64(pk));
static_cast<ColumnInt64 *>(raw_extra_column)->getData().push_back(handle_value);
// For pk_is_handle == true, we need to decode the handle value from encoded key, and insert
// to the specify column
if (!pk_column_ids.empty())
{
auto * raw_pk_column = const_cast<IColumn *>((block.getByPosition(pk_pos_map.at(pk_column_ids[0]))).column.get());
if constexpr (pk_type == TMTPKType::INT64)
static_cast<ColumnInt64 *>(raw_pk_column)->getData().push_back(Int64(pk));
static_cast<ColumnInt64 *>(raw_pk_column)->getData().push_back(handle_value);
else if constexpr (pk_type == TMTPKType::UINT64)
static_cast<ColumnUInt64 *>(raw_pk_column)->getData().push_back(UInt64(pk));
static_cast<ColumnUInt64 *>(raw_pk_column)->getData().push_back(UInt64(handle_value));
else
{
// The pk_type must be Int32/Uint32 or more narrow type
// The pk_type must be Int32/UInt32 or more narrow type
// so cannot tell its' exact type here, just use `insert(Field)`
<<<<<<< HEAD
HandleID handle_value(static_cast<Int64>(pk));
=======
>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879))
raw_pk_column->insert(Field(handle_value));
if (unlikely(raw_pk_column->getInt(index) != handle_value))
{
Expand All @@ -196,7 +201,7 @@ bool RegionBlockReader::readImpl(Block & block, const RegionDataReadInfoList & d
}
else
{
throw Exception("Detected overflow value when decoding pk column of type " + raw_pk_column->getName(),
throw Exception(fmt::format("Detected overflow value when decoding pk column, type={} handle={}", raw_pk_column->getName(), handle_value),
ErrorCodes::LOGICAL_ERROR);
}
}
Expand Down
Loading

0 comments on commit 8bbf2ca

Please sign in to comment.