Skip to content

Commit

Permalink
Add more region info when exception thrown in preHandleSnapshot/inges…
Browse files Browse the repository at this point in the history
…tSST

Signed-off-by: JaySon-Huang <[email protected]>
  • Loading branch information
JaySon-Huang committed Sep 14, 2022
1 parent 8cb022b commit 015d17f
Showing 1 changed file with 28 additions and 7 deletions.
35 changes: 28 additions & 7 deletions dbms/src/Storages/Transaction/ApplySnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <Storages/Transaction/RegionTable.h>
#include <Storages/Transaction/SSTReader.h>
#include <Storages/Transaction/TMTContext.h>
#include <Storages/Transaction/Types.h>
#include <TiDB/Schema/SchemaSyncer.h>

#include <ext/scope_guard.h>
Expand Down Expand Up @@ -259,16 +260,24 @@ void KVStore::onSnapshot(const RegionPtrWrap & new_region_wrap, RegionPtr old_re
}
}

extern RegionPtrWithBlock::CachePtr GenRegionPreDecodeBlockData(const RegionPtr &, Context &);

std::vector<UInt64> KVStore::preHandleSnapshotToFiles(
RegionPtr new_region,
const SSTViewVec snaps,
uint64_t index,
uint64_t term,
TMTContext & tmt)
{
return preHandleSSTsToDTFiles(new_region, snaps, index, term, DM::FileConvertJobType::ApplySnapshot, tmt);
std::vector<UInt64> ingest_ids;
try
{
ingest_ids = preHandleSSTsToDTFiles(new_region, snaps, index, term, DM::FileConvertJobType::ApplySnapshot, tmt);
}
catch (DB::Exception & e)
{
e.addMessage(fmt::format("(while preHandleSnapshot region_id={}, index={}, term={})", new_region->id(), index, term));
e.rethrow();
}
return ingest_ids;
}

/// `preHandleSSTsToDTFiles` read data from SSTFiles and generate DTFile(s) for commited data
Expand All @@ -291,7 +300,8 @@ std::vector<UInt64> KVStore::preHandleSSTsToDTFiles(
Stopwatch watch;
SCOPE_EXIT({ GET_METRIC(tiflash_raft_command_duration_seconds, type_apply_snapshot_predecode).Observe(watch.elapsedSeconds()); });

PageIds ids;
PageIds generated_ingest_ids;
TableID physical_table_id = InvalidTableID;
while (true)
{
// If any schema changes is detected during decoding SSTs to DTFiles, we need to cancel and recreate DTFiles with
Expand All @@ -316,6 +326,7 @@ std::vector<UInt64> KVStore::preHandleSSTsToDTFiles(
/* ignore_cache= */ false,
context.getSettingsRef().safe_point_update_interval_seconds);
}
physical_table_id = storage->getTableInfo().id;

// Read from SSTs and refine the boundary of blocks output to DTFiles
auto sst_stream = std::make_shared<DM::SSTFilesToBlockInputStream>(
Expand All @@ -339,7 +350,7 @@ std::vector<UInt64> KVStore::preHandleSSTsToDTFiles(
stream->writePrefix();
stream->write();
stream->writeSuffix();
ids = stream->ingestIds();
generated_ingest_ids = stream->ingestIds();

(void)table_drop_lock; // the table should not be dropped during ingesting file
break;
Expand Down Expand Up @@ -381,12 +392,13 @@ std::vector<UInt64> KVStore::preHandleSSTsToDTFiles(
else
{
// Other unrecoverable error, throw
e.addMessage(fmt::format("physical_table_id={}", physical_table_id));
throw;
}
}
}

return ids;
return generated_ingest_ids;
}

template <typename RegionPtrWrap>
Expand Down Expand Up @@ -537,7 +549,16 @@ RegionPtr KVStore::handleIngestSSTByDTFile(const RegionPtr & region, const SSTVi
}

// Decode the KV pairs in ingesting SST into DTFiles
PageIds ingest_ids = preHandleSSTsToDTFiles(tmp_region, snaps, index, term, DM::FileConvertJobType::IngestSST, tmt);
PageIds ingest_ids;
try
{
ingest_ids = preHandleSSTsToDTFiles(tmp_region, snaps, index, term, DM::FileConvertJobType::IngestSST, tmt);
}
catch (DB::Exception & e)
{
e.addMessage(fmt::format("(while handleIngestSST region_id={}, index={}, term={})", tmp_region->id(), index, term));
e.rethrow();
}

// If `ingest_ids` is empty, ingest SST won't write delete_range for ingest region, it is safe to
// ignore the step of calling `ingestFiles`
Expand Down

0 comments on commit 015d17f

Please sign in to comment.