Skip to content

Commit

Permalink
UniPS: Catch exception when dumping incr snapshot (#9407)
Browse files Browse the repository at this point in the history
close #9406

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
Co-authored-by: JaySon <[email protected]>
  • Loading branch information
3 people authored Sep 4, 2024
1 parent b30c1f5 commit abb577e
Show file tree
Hide file tree
Showing 11 changed files with 160 additions and 41 deletions.
3 changes: 2 additions & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ namespace DB
M(delta_tree_create_node_fail) \
M(disable_flush_cache) \
M(force_agg_two_level_hash_table_before_merge) \
M(force_thread_0_no_agg_spill)
M(force_thread_0_no_agg_spill) \
M(force_checkpoint_dump_throw_datafile)

#define APPLY_FOR_PAUSEABLE_FAILPOINTS_ONCE(M) \
M(pause_with_alter_locks_acquired) \
Expand Down
30 changes: 30 additions & 0 deletions dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ namespace FailPoints
extern const char force_fap_worker_throw[];
extern const char force_set_fap_candidate_store_id[];
extern const char force_not_clean_fap_on_destroy[];
extern const char force_checkpoint_dump_throw_datafile[];
} // namespace FailPoints

namespace tests
Expand Down Expand Up @@ -577,6 +578,35 @@ try
}
CATCH


TEST_F(RegionKVStoreTestFAP, DumpCheckpointError)
try
{
auto & global_context = TiFlashTestEnv::getGlobalContext();
uint64_t region_id = 1;
auto peer_id = 1;
KVStore & kvs = getKVS();
auto page_storage = global_context.getWriteNodePageStorage();

proxy_instance->bootstrapWithRegion(kvs, global_context.getTMTContext(), region_id, std::nullopt);
auto region = proxy_instance->getRegion(region_id);
auto store_id = kvs.getStore().store_id.load();
region->addPeer(store_id, peer_id, metapb::PeerRole::Learner);

// Write some data, and persist meta.
auto [index, term]
= proxy_instance->normalWrite(region_id, {34}, {"v2"}, {WriteCmdType::Put}, {ColumnFamilyType::Default});
kvs.setRegionCompactLogConfig(0, 0, 0, 0);
persistAfterWrite(global_context, kvs, proxy_instance, page_storage, region_id, index);

auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient();
ASSERT_TRUE(::DB::tests::TiFlashTestEnv::createBucketIfNotExist(*s3_client));
FailPointHelper::enableFailPoint(FailPoints::force_checkpoint_dump_throw_datafile);
EXPECT_NO_THROW(dumpCheckpoint());
FailPointHelper::disableFailPoint(FailPoints::force_checkpoint_dump_throw_datafile);
}
CATCH

// Test cancel from peer select
TEST_F(RegionKVStoreTestFAP, Cancel1)
try
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
#include <limits>

extern std::shared_ptr<MemoryTracker> root_of_kvstore_mem_trackers;

namespace DB::tests
{

Expand Down
27 changes: 25 additions & 2 deletions dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Stopwatch.h>
#include <Poco/File.h>
#include <Storages/Page/V3/CheckpointFile/CPDumpStat.h>
#include <Storages/Page/V3/CheckpointFile/CPFilesWriter.h>
#include <Storages/Page/V3/PageEntriesEdit.h>
Expand Down Expand Up @@ -180,10 +182,15 @@ CPDataDumpStats CPFilesWriter::writeEditsAndApplyCheckpointInfo(

// 2. For entry edits without the checkpoint info, or it is stored on an existing data file that needs compact,
// write the entry data to the data file, and assign a new checkpoint info.
Stopwatch sw;
try
{
auto page = data_source->read({rec_edit.page_id, rec_edit.entry});
RUNTIME_CHECK_MSG(page.isValid(), "failed to read page, record={}", rec_edit);
RUNTIME_CHECK_MSG(
page.isValid(),
"failed to read page, record={} elapsed={:.3f}s",
rec_edit,
sw.elapsedSeconds());
auto data_location
= data_writer->write(rec_edit.page_id, rec_edit.version, page.data.begin(), page.data.size());
// the page data size uploaded in this checkpoint
Expand All @@ -207,7 +214,7 @@ CPDataDumpStats CPFilesWriter::writeEditsAndApplyCheckpointInfo(
}
catch (...)
{
LOG_ERROR(log, "failed to read page, record={}", rec_edit);
LOG_ERROR(log, "failed to read and write page, record={} elapsed={:.3f}s", rec_edit, sw.elapsedSeconds());
tryLogCurrentException(__PRETTY_FUNCTION__);
throw;
}
Expand Down Expand Up @@ -255,6 +262,7 @@ void CPFilesWriter::newDataWriter()
fmt::runtime(data_file_path_pattern),
fmt::arg("seq", sequence),
fmt::arg("index", data_file_index)));

data_writer = CPDataFileWriter::create({
.file_path = data_file_paths.back(),
.file_id = fmt::format(
Expand All @@ -268,4 +276,19 @@ void CPFilesWriter::newDataWriter()
++data_file_index;
}

void CPFilesWriter::abort()
{
for (const auto & s : data_file_paths)
{
if (Poco::File f(s); f.exists())
{
f.remove();
}
}
if likely (manifest_writer != nullptr)
{
manifest_writer->abort();
}
}

} // namespace DB::PS::V3
2 changes: 2 additions & 0 deletions dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ class CPFilesWriter : private boost::noncopyable
*/
[[nodiscard]] std::vector<String> writeSuffix();

void abort();

#ifndef DBMS_PUBLIC_GTEST
private:
#else
Expand Down
11 changes: 11 additions & 0 deletions dbms/src/Storages/Page/V3/CheckpointFile/CPManifestFileWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <Common/Logger.h>
#include <Poco/File.h>
#include <Storages/Page/V3/CheckpointFile/CPManifestFileWriter.h>
#include <Storages/Page/V3/CheckpointFile/ProtoHelper.h>
#include <Storages/Page/V3/Universal/UniversalPageIdFormatImpl.h>
Expand Down Expand Up @@ -147,4 +148,14 @@ void CPManifestFileWriter::writeSuffix()
write_stage = WriteStage::WritingFinished;
}

void CPManifestFileWriter::abort()
{
if (options.file_path.empty())
return;
if (Poco::File f(options.file_path); f.exists())
{
f.remove();
}
}

} // namespace DB::PS::V3
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ class CPManifestFileWriter : private boost::noncopyable
return std::make_unique<CPManifestFileWriter>(std::move(options));
}

explicit CPManifestFileWriter(Options options)
: file_writer(std::make_unique<WriteBufferFromFile>(options.file_path))
explicit CPManifestFileWriter(Options options_)
: options(std::move(options_))
, file_writer(std::make_unique<WriteBufferFromFile>(options.file_path))
, compressed_writer(std::make_unique<CompressedWriteBuffer<true>>(*file_writer, CompressionSettings()))
, max_edit_records_per_part(options.max_edit_records_per_part)
{
Expand All @@ -65,6 +66,8 @@ class CPManifestFileWriter : private boost::noncopyable

void flush();

void abort();

private:
void writeEditsPart(const universal::PageEntriesEdit & edit, UInt64 start, UInt64 limit);

Expand All @@ -78,6 +81,7 @@ class CPManifestFileWriter : private boost::noncopyable
WritingFinished,
};

Options options;
// compressed<plain_file>
const std::unique_ptr<WriteBufferFromFile> file_writer;
const WriteBufferPtr compressed_writer;
Expand Down
74 changes: 52 additions & 22 deletions dbms/src/Storages/Page/V3/Universal/UniversalPageStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,13 @@

#include <mutex>


namespace DB
{
namespace FailPoints
{
extern const char force_checkpoint_dump_throw_datafile[];
}

UniversalPageStoragePtr UniversalPageStorage::create(
const String & name,
PSDiskDelegatorPtr delegator,
Expand Down Expand Up @@ -495,7 +499,7 @@ bool UniversalPageStorage::canSkipCheckpoint() const
return snap->sequence == last_checkpoint_sequence;
}

PS::V3::CPDataDumpStats UniversalPageStorage::dumpIncrementalCheckpoint(
std::optional<PS::V3::CPDataDumpStats> UniversalPageStorage::dumpIncrementalCheckpoint(
const UniversalPageStorage::DumpCheckpointOptions & options)
{
std::scoped_lock lock(checkpoint_mu);
Expand All @@ -504,7 +508,7 @@ PS::V3::CPDataDumpStats UniversalPageStorage::dumpIncrementalCheckpoint(
auto snap = page_directory->createSnapshot(/*tracing_id*/ "dumpIncrementalCheckpoint");

if (snap->sequence == last_checkpoint_sequence && !options.full_compact)
return {.has_new_data = false};
return PS::V3::CPDataDumpStats{.has_new_data = false};

// As a checkpoint, we write both entries (in manifest) and its data.
// Some entries' data may be already written by a previous checkpoint. These data will not be written again.
Expand Down Expand Up @@ -542,23 +546,49 @@ PS::V3::CPDataDumpStats UniversalPageStorage::dumpIncrementalCheckpoint(
.max_data_file_size = options.max_data_file_size,
.max_edit_records_per_part = options.max_edit_records_per_part,
});

writer->writePrefix({
.writer = options.writer_info,
.sequence = snap->sequence,
.last_sequence = last_checkpoint_sequence,
});
PS::V3::CPFilesWriter::CompactOptions compact_opts = [&]() {
if (options.full_compact)
return PS::V3::CPFilesWriter::CompactOptions(true);
if (options.compact_getter == nullptr)
return PS::V3::CPFilesWriter::CompactOptions(false);
return PS::V3::CPFilesWriter::CompactOptions(options.compact_getter());
std::vector<String> data_file_paths;
const auto checkpoint_dump_stats = [&]() -> std::optional<PS::V3::CPDataDumpStats> {
try
{
writer->writePrefix({
.writer = options.writer_info,
.sequence = snap->sequence,
.last_sequence = last_checkpoint_sequence,
});
const PS::V3::CPFilesWriter::CompactOptions compact_opts = [&]() {
if (options.full_compact)
return PS::V3::CPFilesWriter::CompactOptions(true);
if (options.compact_getter == nullptr)
return PS::V3::CPFilesWriter::CompactOptions(false);
return PS::V3::CPFilesWriter::CompactOptions(options.compact_getter());
}();
// get the remote file ids that need to be compacted
const auto checkpoint_dump_stats_inner
= writer->writeEditsAndApplyCheckpointInfo(edit_from_mem, compact_opts, options.only_upload_manifest);
data_file_paths = writer->writeSuffix();
fiu_do_on(FailPoints::force_checkpoint_dump_throw_datafile, {
throw Exception(ErrorCodes::LOGICAL_ERROR, "fake checkpoint write exception");
});
return checkpoint_dump_stats_inner;
}
catch (...)
{
// Could be #9406, which is a soft error.
tryLogCurrentException(
__PRETTY_FUNCTION__,
fmt::format(
"Error dumping incremental snapshot sequence={} manifest_file_path={} data_file_path_pattern={}",
sequence,
manifest_file_path,
options.data_file_path_pattern));
writer->abort();
return std::nullopt;
}
}();
// get the remote file ids that need to be compacted
const auto checkpoint_dump_stats
= writer->writeEditsAndApplyCheckpointInfo(edit_from_mem, compact_opts, options.only_upload_manifest);
auto data_file_paths = writer->writeSuffix();

if (!checkpoint_dump_stats.has_value())
return std::nullopt;

writer.reset();
auto dump_data_seconds = sw.elapsedMillisecondsFromLastTime() / 1000.0;

Expand All @@ -580,7 +610,7 @@ PS::V3::CPDataDumpStats UniversalPageStorage::dumpIncrementalCheckpoint(
// TODO: Currently, even when has_new_data == false,
// something will be written to DataFile (i.e., the file prefix).
// This can be avoided, as its content is useless.
if (checkpoint_dump_stats.has_new_data)
if (checkpoint_dump_stats->has_new_data)
{
// Copy back the checkpoint info to the current PageStorage.
// New checkpoint infos are attached in `writeEditsAndApplyCheckpointInfo`.
Expand All @@ -607,8 +637,8 @@ PS::V3::CPDataDumpStats UniversalPageStorage::dumpIncrementalCheckpoint(
copy_checkpoint_info_seconds,
sw.elapsedSeconds(),
sequence,
checkpoint_dump_stats);
SetMetrics(checkpoint_dump_stats);
*checkpoint_dump_stats);
SetMetrics(*checkpoint_dump_stats);
return checkpoint_dump_stats;
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/Universal/UniversalPageStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ class UniversalPageStorage final
UInt64 max_edit_records_per_part = 100000;
};

PS::V3::CPDataDumpStats dumpIncrementalCheckpoint(const DumpCheckpointOptions & options);
std::optional<PS::V3::CPDataDumpStats> dumpIncrementalCheckpoint(const DumpCheckpointOptions & options);

PS::V3::CPDataFilesStatCache::CacheMap getRemoteDataFilesStatCache() const
{
Expand Down
24 changes: 14 additions & 10 deletions dbms/src/Storages/Page/V3/Universal/UniversalPageStorageService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,16 +282,20 @@ bool UniversalPageStorageService::uploadCheckpointImpl(
};

const auto write_stats = uni_page_storage->dumpIncrementalCheckpoint(opts);
GET_METRIC(tiflash_storage_checkpoint_flow, type_incremental).Increment(write_stats.incremental_data_bytes);
GET_METRIC(tiflash_storage_checkpoint_flow, type_compaction).Increment(write_stats.compact_data_bytes);

LOG_INFO(
log,
"Upload checkpoint success,{} upload_sequence={} incremental_bytes={} compact_bytes={}",
force_sync_data ? " sync_all=true" : "",
upload_info.upload_sequence,
write_stats.incremental_data_bytes,
write_stats.compact_data_bytes);
if (write_stats.has_value())
{
GET_METRIC(tiflash_storage_checkpoint_flow, type_incremental)
.Increment(write_stats.value().incremental_data_bytes);
GET_METRIC(tiflash_storage_checkpoint_flow, type_compaction).Increment(write_stats.value().compact_data_bytes);

LOG_INFO(
log,
"Upload checkpoint success,{} upload_sequence={} incremental_bytes={} compact_bytes={}",
force_sync_data ? " sync_all=true" : "",
upload_info.upload_sequence,
write_stats->incremental_data_bytes,
write_stats->compact_data_bytes);
}

return true;
}
Expand Down
19 changes: 17 additions & 2 deletions dbms/src/Storages/S3/S3RandomAccessFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,15 @@ bool S3RandomAccessFile::initialize()
auto outcome = client_ptr->GetObject(req);
if (!outcome.IsSuccess())
{
LOG_ERROR(log, "S3 GetObject failed: {}, cur_retry={}", S3::S3ErrorMessage(outcome.GetError()), cur_retry);
auto el = sw.elapsedSeconds();
LOG_ERROR(
log,
"S3 GetObject failed: {}, cur_retry={}, key={}, elapsed{}={:.3f}s",
S3::S3ErrorMessage(outcome.GetError()),
cur_retry,
req.GetKey(),
el > 60.0 ? "(long)" : "",
el);
continue;
}

Expand All @@ -208,7 +216,14 @@ bool S3RandomAccessFile::initialize()
}
if (cur_retry >= max_retry && !request_succ)
{
LOG_INFO(log, "S3 GetObject timeout: {}, max_retry={}", remote_fname, max_retry);
auto el = sw.elapsedSeconds();
LOG_INFO(
log,
"S3 GetObject timeout: max_retry={}, key={}, elapsed{}={:.3f}s",
max_retry,
req.GetKey(),
el > 60.0 ? "(long)" : "",
el);
}
return request_succ;
}
Expand Down

0 comments on commit abb577e

Please sign in to comment.