Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UniPS: Catch exception when dumping incr snapshot #9407

Merged
merged 24 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
0b425fc
a
CalvinNeo Sep 4, 2024
01dfb29
a
CalvinNeo Sep 4, 2024
cc36c63
g
CalvinNeo Sep 4, 2024
18c1e09
fmt
CalvinNeo Sep 4, 2024
197135d
el
CalvinNeo Sep 4, 2024
efc4fa5
el
CalvinNeo Sep 4, 2024
767c174
el
CalvinNeo Sep 4, 2024
b9218ac
el
CalvinNeo Sep 4, 2024
337374b
e2
CalvinNeo Sep 4, 2024
55c2d51
e2
CalvinNeo Sep 4, 2024
de6635f
Update dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.cpp
CalvinNeo Sep 4, 2024
effb7d7
Update dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.cpp
CalvinNeo Sep 4, 2024
8807118
Update dbms/src/Storages/S3/S3RandomAccessFile.cpp
CalvinNeo Sep 4, 2024
1fd8107
Update dbms/src/Storages/S3/S3RandomAccessFile.cpp
CalvinNeo Sep 4, 2024
52a715a
Update dbms/src/Storages/Page/V3/Universal/UniversalPageStorage.cpp
CalvinNeo Sep 4, 2024
aa8654f
Update dbms/src/Storages/Page/V3/Universal/UniversalPageStorage.cpp
CalvinNeo Sep 4, 2024
a2588a6
Update dbms/src/Storages/Page/V3/Universal/UniversalPageStorage.cpp
CalvinNeo Sep 4, 2024
aff67c5
Update dbms/src/Storages/Page/V3/Universal/UniversalPageStorage.cpp
CalvinNeo Sep 4, 2024
481e021
Update dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.cpp
CalvinNeo Sep 4, 2024
8abb59e
Update dbms/src/Storages/S3/S3RandomAccessFile.cpp
CalvinNeo Sep 4, 2024
45bc97b
Update dbms/src/Storages/S3/S3RandomAccessFile.cpp
CalvinNeo Sep 4, 2024
b0ff322
Update dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.cpp
CalvinNeo Sep 4, 2024
b5a7b05
Update dbms/src/Storages/Page/V3/Universal/UniversalPageStorageServic…
CalvinNeo Sep 4, 2024
1b2f5d6
Merge branch 'master' into set-incr-snapshot
ti-chi-bot[bot] Sep 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ namespace DB
M(delta_tree_create_node_fail) \
M(disable_flush_cache) \
M(force_agg_two_level_hash_table_before_merge) \
M(force_thread_0_no_agg_spill)
M(force_thread_0_no_agg_spill) \
M(force_checkpoint_dump_throw_datafile)

#define APPLY_FOR_PAUSEABLE_FAILPOINTS_ONCE(M) \
M(pause_with_alter_locks_acquired) \
Expand Down
30 changes: 30 additions & 0 deletions dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ namespace FailPoints
extern const char force_fap_worker_throw[];
extern const char force_set_fap_candidate_store_id[];
extern const char force_not_clean_fap_on_destroy[];
extern const char force_checkpoint_dump_throw_datafile[];
} // namespace FailPoints

namespace tests
Expand Down Expand Up @@ -577,6 +578,35 @@ try
}
CATCH


TEST_F(RegionKVStoreTestFAP, DumpCheckpointError)
try
{
auto & global_context = TiFlashTestEnv::getGlobalContext();
uint64_t region_id = 1;
auto peer_id = 1;
KVStore & kvs = getKVS();
auto page_storage = global_context.getWriteNodePageStorage();

proxy_instance->bootstrapWithRegion(kvs, global_context.getTMTContext(), region_id, std::nullopt);
auto region = proxy_instance->getRegion(region_id);
auto store_id = kvs.getStore().store_id.load();
region->addPeer(store_id, peer_id, metapb::PeerRole::Learner);

// Write some data, and persist meta.
auto [index, term]
= proxy_instance->normalWrite(region_id, {34}, {"v2"}, {WriteCmdType::Put}, {ColumnFamilyType::Default});
kvs.setRegionCompactLogConfig(0, 0, 0, 0);
persistAfterWrite(global_context, kvs, proxy_instance, page_storage, region_id, index);

auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient();
ASSERT_TRUE(::DB::tests::TiFlashTestEnv::createBucketIfNotExist(*s3_client));
FailPointHelper::enableFailPoint(FailPoints::force_checkpoint_dump_throw_datafile);
EXPECT_NO_THROW(dumpCheckpoint());
FailPointHelper::disableFailPoint(FailPoints::force_checkpoint_dump_throw_datafile);
}
CATCH

// Test cancel from peer select
TEST_F(RegionKVStoreTestFAP, Cancel1)
try
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
#include <limits>

extern std::shared_ptr<MemoryTracker> root_of_kvstore_mem_trackers;

namespace DB::tests
{

Expand Down
27 changes: 25 additions & 2 deletions dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Stopwatch.h>
#include <Poco/File.h>
#include <Storages/Page/V3/CheckpointFile/CPDumpStat.h>
#include <Storages/Page/V3/CheckpointFile/CPFilesWriter.h>
#include <Storages/Page/V3/PageEntriesEdit.h>
Expand Down Expand Up @@ -180,10 +182,15 @@ CPDataDumpStats CPFilesWriter::writeEditsAndApplyCheckpointInfo(

// 2. For entry edits without the checkpoint info, or it is stored on an existing data file that needs compact,
// write the entry data to the data file, and assign a new checkpoint info.
Stopwatch sw;
try
{
auto page = data_source->read({rec_edit.page_id, rec_edit.entry});
RUNTIME_CHECK_MSG(page.isValid(), "failed to read page, record={}", rec_edit);
RUNTIME_CHECK_MSG(
page.isValid(),
"failed to read page, record={}, elapsed={}s",
CalvinNeo marked this conversation as resolved.
Show resolved Hide resolved
rec_edit,
sw.elapsedSeconds());
JinheLin marked this conversation as resolved.
Show resolved Hide resolved
auto data_location
= data_writer->write(rec_edit.page_id, rec_edit.version, page.data.begin(), page.data.size());
// the page data size uploaded in this checkpoint
Expand All @@ -207,7 +214,7 @@ CPDataDumpStats CPFilesWriter::writeEditsAndApplyCheckpointInfo(
}
catch (...)
{
LOG_ERROR(log, "failed to read page, record={}", rec_edit);
LOG_ERROR(log, "failed to read and write page, record={}, elapsed={}s", rec_edit, sw.elapsedSeconds());
CalvinNeo marked this conversation as resolved.
Show resolved Hide resolved
tryLogCurrentException(__PRETTY_FUNCTION__);
throw;
}
Expand Down Expand Up @@ -255,6 +262,7 @@ void CPFilesWriter::newDataWriter()
fmt::runtime(data_file_path_pattern),
fmt::arg("seq", sequence),
fmt::arg("index", data_file_index)));

data_writer = CPDataFileWriter::create({
.file_path = data_file_paths.back(),
.file_id = fmt::format(
Expand All @@ -268,4 +276,19 @@ void CPFilesWriter::newDataWriter()
++data_file_index;
}

void CPFilesWriter::abort()
{
for (const auto & s : data_file_paths)
{
if (Poco::File f(s); f.exists())
{
f.remove();
}
}
if likely (manifest_writer != nullptr)
{
manifest_writer->abort();
}
}

} // namespace DB::PS::V3
2 changes: 2 additions & 0 deletions dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ class CPFilesWriter : private boost::noncopyable
*/
[[nodiscard]] std::vector<String> writeSuffix();

void abort();

#ifndef DBMS_PUBLIC_GTEST
private:
#else
Expand Down
11 changes: 11 additions & 0 deletions dbms/src/Storages/Page/V3/CheckpointFile/CPManifestFileWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <Common/Logger.h>
#include <Poco/File.h>
#include <Storages/Page/V3/CheckpointFile/CPManifestFileWriter.h>
#include <Storages/Page/V3/CheckpointFile/ProtoHelper.h>
#include <Storages/Page/V3/Universal/UniversalPageIdFormatImpl.h>
Expand Down Expand Up @@ -147,4 +148,14 @@ void CPManifestFileWriter::writeSuffix()
write_stage = WriteStage::WritingFinished;
}

void CPManifestFileWriter::abort()
{
if (options.file_path.empty())
return;
if (Poco::File f(options.file_path); f.exists())
{
f.remove();
}
}

} // namespace DB::PS::V3
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ class CPManifestFileWriter : private boost::noncopyable
return std::make_unique<CPManifestFileWriter>(std::move(options));
}

explicit CPManifestFileWriter(Options options)
: file_writer(std::make_unique<WriteBufferFromFile>(options.file_path))
explicit CPManifestFileWriter(Options options_)
: options(std::move(options_))
, file_writer(std::make_unique<WriteBufferFromFile>(options.file_path))
, compressed_writer(std::make_unique<CompressedWriteBuffer<true>>(*file_writer, CompressionSettings()))
, max_edit_records_per_part(options.max_edit_records_per_part)
{
Expand All @@ -65,6 +66,8 @@ class CPManifestFileWriter : private boost::noncopyable

void flush();

void abort();

private:
void writeEditsPart(const universal::PageEntriesEdit & edit, UInt64 start, UInt64 limit);

Expand All @@ -78,6 +81,7 @@ class CPManifestFileWriter : private boost::noncopyable
WritingFinished,
};

Options options;
// compressed<plain_file>
const std::unique_ptr<WriteBufferFromFile> file_writer;
const WriteBufferPtr compressed_writer;
Expand Down
74 changes: 52 additions & 22 deletions dbms/src/Storages/Page/V3/Universal/UniversalPageStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,13 @@

#include <mutex>


namespace DB
{
namespace FailPoints
{
extern const char force_checkpoint_dump_throw_datafile[];
}

UniversalPageStoragePtr UniversalPageStorage::create(
const String & name,
PSDiskDelegatorPtr delegator,
Expand Down Expand Up @@ -495,7 +499,7 @@ bool UniversalPageStorage::canSkipCheckpoint() const
return snap->sequence == last_checkpoint_sequence;
}

PS::V3::CPDataDumpStats UniversalPageStorage::dumpIncrementalCheckpoint(
std::optional<PS::V3::CPDataDumpStats> UniversalPageStorage::dumpIncrementalCheckpoint(
const UniversalPageStorage::DumpCheckpointOptions & options)
{
std::scoped_lock lock(checkpoint_mu);
Expand All @@ -504,7 +508,7 @@ PS::V3::CPDataDumpStats UniversalPageStorage::dumpIncrementalCheckpoint(
auto snap = page_directory->createSnapshot(/*tracing_id*/ "dumpIncrementalCheckpoint");

if (snap->sequence == last_checkpoint_sequence && !options.full_compact)
return {.has_new_data = false};
return PS::V3::CPDataDumpStats{.has_new_data = false};

// As a checkpoint, we write both entries (in manifest) and its data.
// Some entries' data may be already written by a previous checkpoint. These data will not be written again.
Expand Down Expand Up @@ -542,23 +546,49 @@ PS::V3::CPDataDumpStats UniversalPageStorage::dumpIncrementalCheckpoint(
.max_data_file_size = options.max_data_file_size,
.max_edit_records_per_part = options.max_edit_records_per_part,
});

writer->writePrefix({
.writer = options.writer_info,
.sequence = snap->sequence,
.last_sequence = last_checkpoint_sequence,
});
PS::V3::CPFilesWriter::CompactOptions compact_opts = [&]() {
if (options.full_compact)
return PS::V3::CPFilesWriter::CompactOptions(true);
if (options.compact_getter == nullptr)
return PS::V3::CPFilesWriter::CompactOptions(false);
return PS::V3::CPFilesWriter::CompactOptions(options.compact_getter());
std::vector<String> data_file_paths;
const auto checkpoint_dump_stats = [&]() -> std::optional<PS::V3::CPDataDumpStats> {
try
{
writer->writePrefix({
.writer = options.writer_info,
.sequence = snap->sequence,
.last_sequence = last_checkpoint_sequence,
});
PS::V3::CPFilesWriter::CompactOptions compact_opts = [&]() {
CalvinNeo marked this conversation as resolved.
Show resolved Hide resolved
if (options.full_compact)
return PS::V3::CPFilesWriter::CompactOptions(true);
if (options.compact_getter == nullptr)
return PS::V3::CPFilesWriter::CompactOptions(false);
return PS::V3::CPFilesWriter::CompactOptions(options.compact_getter());
}();
// get the remote file ids that need to be compacted
const auto checkpoint_dump_stats_inner
= writer->writeEditsAndApplyCheckpointInfo(edit_from_mem, compact_opts, options.only_upload_manifest);
data_file_paths = writer->writeSuffix();
fiu_do_on(FailPoints::force_checkpoint_dump_throw_datafile, {
throw Exception(ErrorCodes::LOGICAL_ERROR, "fake checkpoint write exception");
Copy link
Contributor

@JaySon-Huang JaySon-Huang Sep 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit change

Suggested change
throw Exception(ErrorCodes::LOGICAL_ERROR, "fake checkpoint write exception");
throw Exception(ErrorCodes::FAIL_POINT_ERROR, "fake checkpoint write exception");

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be LOGICAL_ERROR. Because there is another error ErrorCodes::REGION_DATA_SCHEMA_UPDATED which could make prehandle to retry. So error codes matters here.

});
return checkpoint_dump_stats_inner;
}
catch (...)
{
// Could be #9406, which is a soft error.
tryLogCurrentException(
__PRETTY_FUNCTION__,
fmt::format(
"Error dumping incremental snapshot sequence={} manifest_file_path={} data_file_path_pattern={}",
sequence,
manifest_file_path,
options.data_file_path_pattern));
writer->abort();
return std::nullopt;
}
}();
// get the remote file ids that need to be compacted
const auto checkpoint_dump_stats
= writer->writeEditsAndApplyCheckpointInfo(edit_from_mem, compact_opts, options.only_upload_manifest);
auto data_file_paths = writer->writeSuffix();

if (!checkpoint_dump_stats.has_value())
return checkpoint_dump_stats;
CalvinNeo marked this conversation as resolved.
Show resolved Hide resolved

writer.reset();
auto dump_data_seconds = sw.elapsedMillisecondsFromLastTime() / 1000.0;

Expand All @@ -580,7 +610,7 @@ PS::V3::CPDataDumpStats UniversalPageStorage::dumpIncrementalCheckpoint(
// TODO: Currently, even when has_new_data == false,
// something will be written to DataFile (i.e., the file prefix).
// This can be avoided, as its content is useless.
if (checkpoint_dump_stats.has_new_data)
if (checkpoint_dump_stats.value().has_new_data)
CalvinNeo marked this conversation as resolved.
Show resolved Hide resolved
{
// Copy back the checkpoint info to the current PageStorage.
// New checkpoint infos are attached in `writeEditsAndApplyCheckpointInfo`.
Expand All @@ -607,8 +637,8 @@ PS::V3::CPDataDumpStats UniversalPageStorage::dumpIncrementalCheckpoint(
copy_checkpoint_info_seconds,
sw.elapsedSeconds(),
sequence,
checkpoint_dump_stats);
SetMetrics(checkpoint_dump_stats);
checkpoint_dump_stats.value());
SetMetrics(checkpoint_dump_stats.value());
CalvinNeo marked this conversation as resolved.
Show resolved Hide resolved
return checkpoint_dump_stats;
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/Universal/UniversalPageStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ class UniversalPageStorage final
UInt64 max_edit_records_per_part = 100000;
};

PS::V3::CPDataDumpStats dumpIncrementalCheckpoint(const DumpCheckpointOptions & options);
std::optional<PS::V3::CPDataDumpStats> dumpIncrementalCheckpoint(const DumpCheckpointOptions & options);

PS::V3::CPDataFilesStatCache::CacheMap getRemoteDataFilesStatCache() const
{
Expand Down
24 changes: 14 additions & 10 deletions dbms/src/Storages/Page/V3/Universal/UniversalPageStorageService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,16 +282,20 @@ bool UniversalPageStorageService::uploadCheckpointImpl(
};

const auto write_stats = uni_page_storage->dumpIncrementalCheckpoint(opts);
GET_METRIC(tiflash_storage_checkpoint_flow, type_incremental).Increment(write_stats.incremental_data_bytes);
GET_METRIC(tiflash_storage_checkpoint_flow, type_compaction).Increment(write_stats.compact_data_bytes);

LOG_INFO(
log,
"Upload checkpoint success,{} upload_sequence={} incremental_bytes={} compact_bytes={}",
force_sync_data ? " sync_all=true" : "",
upload_info.upload_sequence,
write_stats.incremental_data_bytes,
write_stats.compact_data_bytes);
if (write_stats.has_value())
{
GET_METRIC(tiflash_storage_checkpoint_flow, type_incremental)
.Increment(write_stats.value().incremental_data_bytes);
GET_METRIC(tiflash_storage_checkpoint_flow, type_compaction).Increment(write_stats.value().compact_data_bytes);

LOG_INFO(
log,
"Upload checkpoint success,{} upload_sequence={} incremental_bytes={} compact_bytes={}",
force_sync_data ? " sync_all=true" : "",
upload_info.upload_sequence,
write_stats.value().incremental_data_bytes,
write_stats.value().compact_data_bytes);
CalvinNeo marked this conversation as resolved.
Show resolved Hide resolved
}

return true;
}
Expand Down
19 changes: 17 additions & 2 deletions dbms/src/Storages/S3/S3RandomAccessFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,15 @@ bool S3RandomAccessFile::initialize()
auto outcome = client_ptr->GetObject(req);
if (!outcome.IsSuccess())
{
LOG_ERROR(log, "S3 GetObject failed: {}, cur_retry={}", S3::S3ErrorMessage(outcome.GetError()), cur_retry);
auto el = sw.elapsedSeconds();
LOG_ERROR(
log,
"S3 GetObject failed: {}, cur_retry={}, key={}, elapsed{}={}s",
CalvinNeo marked this conversation as resolved.
Show resolved Hide resolved
S3::S3ErrorMessage(outcome.GetError()),
cur_retry,
req.GetKey(),
JinheLin marked this conversation as resolved.
Show resolved Hide resolved
el > 60.0 ? "(long)" : "",
el);
continue;
}

Expand All @@ -208,7 +216,14 @@ bool S3RandomAccessFile::initialize()
}
if (cur_retry >= max_retry && !request_succ)
{
LOG_INFO(log, "S3 GetObject timeout: {}, max_retry={}", remote_fname, max_retry);
auto el = sw.elapsedSeconds();
LOG_INFO(
log,
"S3 GetObject timeout: max_retry={}, key={}, elapsed{}={}s",
CalvinNeo marked this conversation as resolved.
Show resolved Hide resolved
max_retry,
req.GetKey(),
el > 60.0 ? "(long)" : "",
el);
}
return request_succ;
}
Expand Down