diff --git a/dbms/src/Storages/Page/V3/LogFile/LogReader.cpp b/dbms/src/Storages/Page/V3/LogFile/LogReader.cpp index bd3258b6e07..1a87f042a2e 100644 --- a/dbms/src/Storages/Page/V3/LogFile/LogReader.cpp +++ b/dbms/src/Storages/Page/V3/LogFile/LogReader.cpp @@ -470,7 +470,7 @@ UInt8 LogReader::readMore(size_t * drop_size) void LogReader::reportCorruption(size_t bytes, const String & reason) { - reportDrop(bytes, "Corruption: " + reason); + reportDrop(bytes, fmt::format("Corruption: {} [offset={}] [file={}]", reason, file->getPositionInFile(), file->getFileName())); } void LogReader::reportDrop(size_t bytes, const String & reason) diff --git a/dbms/src/Storages/Page/V3/WAL/WALReader.h b/dbms/src/Storages/Page/V3/WAL/WALReader.h index 3443cc605c0..e61a53da5de 100644 --- a/dbms/src/Storages/Page/V3/WAL/WALReader.h +++ b/dbms/src/Storages/Page/V3/WAL/WALReader.h @@ -21,6 +21,11 @@ namespace DB { +namespace ErrorCodes +{ +extern const int CORRUPTED_DATA; +} + class FileProvider; using FileProviderPtr = std::shared_ptr; @@ -29,10 +34,11 @@ namespace PS::V3 class ReportCollector : public LogReader::Reporter { public: - void corruption(size_t /*bytes*/, const String & /*msg*/) override + void corruption(size_t /*bytes*/, const String & msg) override { error_happened = true; // FIXME: store the reason of corruption + throw Exception(msg, ErrorCodes::CORRUPTED_DATA); } bool hasError() const diff --git a/dbms/src/Storages/Page/V3/WALStore.cpp b/dbms/src/Storages/Page/V3/WALStore.cpp index 6585c6dfdfe..6759e80f416 100644 --- a/dbms/src/Storages/Page/V3/WALStore.cpp +++ b/dbms/src/Storages/Page/V3/WALStore.cpp @@ -201,9 +201,15 @@ bool WALStore::saveSnapshot(FilesSnapshot && files_snap, PageEntriesEdit && dire // Rename it to be a normal log file. const auto temp_fullname = log_filename.fullname(LogFileStage::Temporary); const auto normal_fullname = log_filename.fullname(LogFileStage::Normal); + LOG_FMT_INFO(logger, "Renaming log file to be normal [fullname={}]", temp_fullname); - auto f = Poco::File{temp_fullname}; - f.renameTo(normal_fullname); + // Use `renameFile` from FileProvider that take good care of encryption path + provider->renameFile( + temp_fullname, + EncryptionPath(temp_fullname, ""), + normal_fullname, + EncryptionPath(normal_fullname, ""), + true); LOG_FMT_INFO(logger, "Rename log file to normal done [fullname={}]", normal_fullname); // #define ARCHIVE_COMPACTED_LOGS // keep for debug diff --git a/dbms/src/Storages/Page/V3/tests/gtest_wal_store.cpp b/dbms/src/Storages/Page/V3/tests/gtest_wal_store.cpp index 008a311841c..23ee2e93f07 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_wal_store.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_wal_store.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -220,6 +221,7 @@ class WALStoreTest public: WALStoreTest() : multi_paths(GetParam()) + , log(Logger::get("WALStoreTest")) { } @@ -250,11 +252,11 @@ class WALStoreTest protected: PSDiskDelegatorPtr delegator; WALStore::Config config; + LoggerPtr log; }; TEST_P(WALStoreTest, FindCheckpointFile) { - LoggerPtr log = Logger::get("WALLognameTest"); auto path = getTemporaryPath(); { @@ -547,7 +549,7 @@ try ASSERT_NE(wal, nullptr); std::mt19937 rd; - std::uniform_int_distribution<> d(0, 20); + std::uniform_int_distribution<> d_20(0, 20); // Stage 2. insert many edits constexpr size_t num_edits_test = 100000; @@ -559,7 +561,7 @@ try { PageEntryV3 entry{.file_id = 2, .size = 1, .tag = 0, .offset = 0x123, .checksum = 0x4567}; PageEntriesEdit edit; - const size_t num_pages_put = d(rd); + const size_t num_pages_put = d_20(rd); for (size_t p = 0; p < num_pages_put; ++p) { page_id += 1; @@ -595,23 +597,45 @@ try LOG_FMT_INFO(&Poco::Logger::get("WALStoreTest"), "Done test for {} persist pages in {} edits", num_pages_read, num_edits_test); - // Stage 3. compact logs and verify - // wal->compactLogs(); - // wal.reset(); - - // // After logs compacted, they should be written as one edit. - // num_edits_read = 0; - // num_pages_read = 0; - // wal = WALStore::create( - // [&](PageEntriesEdit && edit) { - // num_pages_read += edit.size(); - // EXPECT_EQ(page_id, edit.size()) << fmt::format("at idx={}", num_edits_read); - // num_edits_read += 1; - // }, - // provider, - // delegator); - // EXPECT_EQ(num_edits_read, 1); - // EXPECT_EQ(num_pages_read, page_id); + // Test for save snapshot (with encryption) + auto enc_key_manager = std::make_shared(/*encryption_enabled_=*/true); + auto enc_provider = std::make_shared(enc_key_manager, true); + LogFilenameSet persisted_log_files = WALStoreReader::listAllFiles(delegator, log); + WALStore::FilesSnapshot file_snap{.current_writting_log_num = 100, // just a fake value + .persisted_log_files = persisted_log_files}; + + PageEntriesEdit snap_edit; + PageEntryV3 entry{.file_id = 2, .size = 1, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + std::uniform_int_distribution<> d_10000(0, 10000); + // just fill in some random entry + for (size_t i = 0; i < 70; ++i) + { + snap_edit.varEntry(d_10000(rd), PageVersionType(345, 22), entry, 1); + } + std::tie(wal, reader) = WALStore::create(getCurrentTestName(), enc_provider, delegator, config); + bool done = wal->saveSnapshot(std::move(file_snap), std::move(snap_edit)); + ASSERT_TRUE(done); + wal.reset(); + reader.reset(); + + // After logs compacted, they should be written as one edit. + num_edits_read = 0; + num_pages_read = 0; + std::tie(wal, reader) = WALStore::create(getCurrentTestName(), enc_provider, delegator, config); + while (reader->remained()) + { + auto [ok, edit] = reader->next(); + if (!ok) + { + reader->throwIfError(); + // else it just run to the end of file. + break; + } + num_pages_read += edit.size(); + num_edits_read += 1; + } + EXPECT_EQ(num_edits_read, 1); + EXPECT_EQ(num_pages_read, 70); } CATCH