Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WALStore support multi path #4085

Merged
merged 11 commits into from
Mar 16, 2022
12 changes: 8 additions & 4 deletions dbms/src/Storages/Page/V3/WAL/WALReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,21 @@ LogFilenameSet WALStoreReader::listAllFiles(
// [<parent_path_0, [file0, file1, ...]>, <parent_path_1, [...]>, ...]
std::vector<std::pair<String, Strings>> all_filenames;
Strings filenames;
for (const auto & p : delegator->listPaths())
for (const auto & parent_path : delegator->listPaths())
{
Poco::File directory(p);
String wal_parent_path = parent_path + WALStore::wal_folder_prefix;
Poco::File directory(wal_parent_path);
if (!directory.exists())
{
directory.createDirectories();
continue;
}

filenames.clear();
directory.list(filenames);
all_filenames.emplace_back(std::make_pair(p, std::move(filenames)));
all_filenames.emplace_back(std::make_pair(wal_parent_path, std::move(filenames)));
filenames.clear();
}
assert(all_filenames.size() == 1); // TODO: multi-path

LogFilenameSet log_files;
for (const auto & [parent_path, filenames] : all_filenames)
Expand Down
29 changes: 23 additions & 6 deletions dbms/src/Storages/Page/V3/WALStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ WALStore::WALStore(
: delegator(delegator_)
, provider(provider_)
, last_log_num(last_log_num_)
, wal_paths_index(0)
, logger(&Poco::Logger::get("WALStore"))
{
}
Expand All @@ -76,7 +77,7 @@ void WALStore::apply(const PageEntriesEdit & edit, const WriteLimiterPtr & write
if (log_file == nullptr || log_file->writtenBytes() > PAGE_META_ROLL_SIZE)
{
auto log_num = last_log_num++;
auto [new_log_file, filename] = createLogWriter(delegator, provider, {log_num, 0}, logger, false);
auto [new_log_file, filename] = createLogWriter({log_num, 0}, false);
(void)filename;
log_file.swap(new_log_file);
}
Expand All @@ -86,13 +87,29 @@ void WALStore::apply(const PageEntriesEdit & edit, const WriteLimiterPtr & write
}

std::tuple<std::unique_ptr<LogWriter>, LogFilename> WALStore::createLogWriter(
PSDiskDelegatorPtr delegator,
const FileProviderPtr & provider,
const std::pair<Format::LogNumberType, Format::LogNumberType> & new_log_lvl,
Poco::Logger * logger,
bool manual_flush)
{
const auto path = delegator->defaultPath(); // TODO: multi-path
String path;

if (delegator->numPaths() == 1)
{
path = delegator->defaultPath();
}
else
{
const auto & paths = delegator->listPaths();

if (wal_paths_index >= paths.size())
{
wal_paths_index = 0;
}
path = paths[wal_paths_index];
wal_paths_index++;
JaySon-Huang marked this conversation as resolved.
Show resolved Hide resolved
}

path += wal_folder_prefix;

LogFilename log_filename = LogFilename{
(manual_flush ? LogFileStage::Temporary : LogFileStage::Normal),
new_log_lvl.first,
Expand Down Expand Up @@ -158,7 +175,7 @@ bool WALStore::saveSnapshot(FilesSnapshot && files_snap, PageEntriesEdit && dire
// Use {largest_log_num + 1, 1} to save the `edit`
const auto log_num = files_snap.persisted_log_files.rbegin()->log_num;
// Create a temporary file for saving directory snapshot
auto [compact_log, log_filename] = createLogWriter(delegator, provider, {log_num, 1}, logger, /*manual_flush*/ true);
auto [compact_log, log_filename] = createLogWriter({log_num, 1}, /*manual_flush*/ true);
{
const String serialized = ser::serializeTo(directory_snap);
ReadBufferFromString payload(serialized);
Expand Down
9 changes: 4 additions & 5 deletions dbms/src/Storages/Page/V3/WALStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ using WALStoreReaderPtr = std::shared_ptr<WALStoreReader>;
class WALStore
{
public:
using ChecksumClass = Digest::CRC64;
constexpr static const char * wal_folder_prefix = "/wal";
jiaqizho marked this conversation as resolved.
Show resolved Hide resolved

static std::pair<WALStorePtr, WALStoreReaderPtr>
create(
Expand Down Expand Up @@ -103,18 +103,17 @@ class WALStore
const FileProviderPtr & provider_,
Format::LogNumberType last_log_num_);

static std::tuple<std::unique_ptr<LogWriter>, LogFilename>
std::tuple<std::unique_ptr<LogWriter>, LogFilename>
createLogWriter(
PSDiskDelegatorPtr delegator,
const FileProviderPtr & provider,
const std::pair<Format::LogNumberType, Format::LogNumberType> & new_log_lvl,
Poco::Logger * logger,
bool manual_flush);

PSDiskDelegatorPtr delegator;
FileProviderPtr provider;
mutable std::mutex log_file_mutex;
Format::LogNumberType last_log_num;
// select next path for creating new logfile
UInt32 wal_paths_index;
std::unique_ptr<LogWriter> log_file;

Poco::Logger * logger;
Expand Down
50 changes: 42 additions & 8 deletions dbms/src/Storages/Page/V3/tests/gtest_wal_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,22 +197,45 @@ TEST(WALLognameSetTest, ordering)
}


class WALStoreTest : public DB::base::TiFlashStorageTestBasic
class WALStoreTest
: public DB::base::TiFlashStorageTestBasic
, public testing::WithParamInterface<bool>
{
public:
WALStoreTest()
: multi_paths(GetParam())
{
}

void SetUp() override
{
auto path = getTemporaryPath();
dropDataOnDisk(path);

// TODO: multi-path
delegator = std::make_shared<DB::tests::MockDiskDelegatorSingle>(getTemporaryPath());
if (!multi_paths)
{
delegator = std::make_shared<DB::tests::MockDiskDelegatorSingle>(getTemporaryPath());
}
else
{
// mock 8 dirs for multi-paths
Strings paths;
for (size_t i = 0; i < 8; ++i)
{
paths.emplace_back(fmt::format("{}/path_{}", path, i));
}
delegator = std::make_shared<DB::tests::MockDiskDelegatorMulti>(paths);
}
}

private:
const bool multi_paths;

protected:
PSDiskDelegatorPtr delegator;
};

TEST_F(WALStoreTest, FindCheckpointFile)
TEST_P(WALStoreTest, FindCheckpointFile)
{
Poco::Logger * log = &Poco::Logger::get("WALStoreTest");
auto path = getTemporaryPath();
Expand Down Expand Up @@ -262,7 +285,7 @@ TEST_F(WALStoreTest, FindCheckpointFile)
}
}

TEST_F(WALStoreTest, Empty)
TEST_P(WALStoreTest, Empty)
{
auto ctx = DB::tests::TiFlashTestEnv::getContext();
auto provider = ctx.getFileProvider();
Expand All @@ -285,7 +308,7 @@ TEST_F(WALStoreTest, Empty)
ASSERT_EQ(num_callback_called, 0);
}

TEST_F(WALStoreTest, ReadWriteRestore)
TEST_P(WALStoreTest, ReadWriteRestore)
try
{
auto ctx = DB::tests::TiFlashTestEnv::getContext();
Expand Down Expand Up @@ -405,7 +428,7 @@ try
}
CATCH

TEST_F(WALStoreTest, ReadWriteRestore2)
TEST_P(WALStoreTest, ReadWriteRestore2)
try
{
auto ctx = DB::tests::TiFlashTestEnv::getContext();
Expand Down Expand Up @@ -495,7 +518,7 @@ try
}
CATCH

TEST_F(WALStoreTest, ManyEdits)
TEST_P(WALStoreTest, ManyEdits)
try
{
auto ctx = DB::tests::TiFlashTestEnv::getContext();
Expand Down Expand Up @@ -575,4 +598,15 @@ try
}
CATCH

INSTANTIATE_TEST_CASE_P(
Disks,
WALStoreTest,
::testing::Bool(),
[](const ::testing::TestParamInfo<WALStoreTest::ParamType> & info) -> String {
const auto multi_path = info.param;
if (multi_path)
return "multi_disks";
return "single_disk";
});

} // namespace DB::PS::V3::tests
6 changes: 3 additions & 3 deletions dbms/src/Storages/tests/TiFlashStorageTestBasic.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ class TiFlashStorageTestBasic : public ::testing::Test
}

protected:
void dropDataOnDisk(const String & path)
static void dropDataOnDisk(const String & path)
{
if (Poco::File file(path); file.exists())
{
file.remove(true);
}
}

void createIfNotExist(const String & path)
static void createIfNotExist(const String & path)
{
if (Poco::File file(path); !file.exists())
file.createDirectories();
Expand All @@ -84,4 +84,4 @@ class TiFlashStorageTestBasic : public ::testing::Test
std::unique_ptr<Context> db_context;
};
} // namespace base
} // namespace DB
} // namespace DB