Skip to content

Commit

Permalink
WALStore support multi path (pingcap#4085)
Browse files Browse the repository at this point in the history
  • Loading branch information
jiaqizho authored and JaySon-Huang committed Mar 17, 2022
1 parent ea05b8c commit 989bbb5
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 26 deletions.
12 changes: 8 additions & 4 deletions dbms/src/Storages/Page/V3/WAL/WALReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,21 @@ LogFilenameSet WALStoreReader::listAllFiles(
// [<parent_path_0, [file0, file1, ...]>, <parent_path_1, [...]>, ...]
std::vector<std::pair<String, Strings>> all_filenames;
Strings filenames;
for (const auto & p : delegator->listPaths())
for (const auto & parent_path : delegator->listPaths())
{
Poco::File directory(p);
String wal_parent_path = parent_path + WALStore::wal_folder_prefix;
Poco::File directory(wal_parent_path);
if (!directory.exists())
{
directory.createDirectories();
continue;
}

filenames.clear();
directory.list(filenames);
all_filenames.emplace_back(std::make_pair(p, std::move(filenames)));
all_filenames.emplace_back(std::make_pair(wal_parent_path, std::move(filenames)));
filenames.clear();
}
assert(all_filenames.size() == 1); // TODO: multi-path

LogFilenameSet log_files;
for (const auto & [parent_path, filenames] : all_filenames)
Expand Down
29 changes: 23 additions & 6 deletions dbms/src/Storages/Page/V3/WALStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ WALStore::WALStore(
: delegator(delegator_)
, provider(provider_)
, last_log_num(last_log_num_)
, wal_paths_index(0)
, logger(&Poco::Logger::get("WALStore"))
{
}
Expand All @@ -90,7 +91,7 @@ void WALStore::apply(const PageEntriesEdit & edit, const WriteLimiterPtr & write
if (log_file == nullptr || log_file->writtenBytes() > PAGE_META_ROLL_SIZE)
{
auto log_num = last_log_num++;
auto [new_log_file, filename] = createLogWriter(delegator, provider, {log_num, 0}, logger, false);
auto [new_log_file, filename] = createLogWriter({log_num, 0}, false);
(void)filename;
log_file.swap(new_log_file);
}
Expand All @@ -100,13 +101,29 @@ void WALStore::apply(const PageEntriesEdit & edit, const WriteLimiterPtr & write
}

std::tuple<std::unique_ptr<LogWriter>, LogFilename> WALStore::createLogWriter(
PSDiskDelegatorPtr delegator,
const FileProviderPtr & provider,
const std::pair<Format::LogNumberType, Format::LogNumberType> & new_log_lvl,
Poco::Logger * logger,
bool manual_flush)
{
const auto path = delegator->defaultPath(); // TODO: multi-path
String path;

if (delegator->numPaths() == 1)
{
path = delegator->defaultPath();
}
else
{
const auto & paths = delegator->listPaths();

if (wal_paths_index >= paths.size())
{
wal_paths_index = 0;
}
path = paths[wal_paths_index];
wal_paths_index++;
}

path += wal_folder_prefix;

LogFilename log_filename = LogFilename{
(manual_flush ? LogFileStage::Temporary : LogFileStage::Normal),
new_log_lvl.first,
Expand Down Expand Up @@ -172,7 +189,7 @@ bool WALStore::saveSnapshot(FilesSnapshot && files_snap, PageEntriesEdit && dire
// Use {largest_log_num + 1, 1} to save the `edit`
const auto log_num = files_snap.persisted_log_files.rbegin()->log_num;
// Create a temporary file for saving directory snapshot
auto [compact_log, log_filename] = createLogWriter(delegator, provider, {log_num, 1}, logger, /*manual_flush*/ true);
auto [compact_log, log_filename] = createLogWriter({log_num, 1}, /*manual_flush*/ true);
{
const String serialized = ser::serializeTo(directory_snap);
ReadBufferFromString payload(serialized);
Expand Down
9 changes: 4 additions & 5 deletions dbms/src/Storages/Page/V3/WALStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ using WALStoreReaderPtr = std::shared_ptr<WALStoreReader>;
class WALStore
{
public:
using ChecksumClass = Digest::CRC64;
constexpr static const char * wal_folder_prefix = "/wal";

static std::pair<WALStorePtr, WALStoreReaderPtr>
create(
Expand Down Expand Up @@ -117,18 +117,17 @@ class WALStore
const FileProviderPtr & provider_,
Format::LogNumberType last_log_num_);

static std::tuple<std::unique_ptr<LogWriter>, LogFilename>
std::tuple<std::unique_ptr<LogWriter>, LogFilename>
createLogWriter(
PSDiskDelegatorPtr delegator,
const FileProviderPtr & provider,
const std::pair<Format::LogNumberType, Format::LogNumberType> & new_log_lvl,
Poco::Logger * logger,
bool manual_flush);

PSDiskDelegatorPtr delegator;
FileProviderPtr provider;
mutable std::mutex log_file_mutex;
Format::LogNumberType last_log_num;
// select next path for creating new logfile
UInt32 wal_paths_index;
std::unique_ptr<LogWriter> log_file;

Poco::Logger * logger;
Expand Down
50 changes: 42 additions & 8 deletions dbms/src/Storages/Page/V3/tests/gtest_wal_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,22 +211,45 @@ TEST(WALLognameSetTest, ordering)
}


class WALStoreTest : public DB::base::TiFlashStorageTestBasic
class WALStoreTest
: public DB::base::TiFlashStorageTestBasic
, public testing::WithParamInterface<bool>
{
public:
WALStoreTest()
: multi_paths(GetParam())
{
}

void SetUp() override
{
auto path = getTemporaryPath();
dropDataOnDisk(path);

// TODO: multi-path
delegator = std::make_shared<DB::tests::MockDiskDelegatorSingle>(getTemporaryPath());
if (!multi_paths)
{
delegator = std::make_shared<DB::tests::MockDiskDelegatorSingle>(getTemporaryPath());
}
else
{
// mock 8 dirs for multi-paths
Strings paths;
for (size_t i = 0; i < 8; ++i)
{
paths.emplace_back(fmt::format("{}/path_{}", path, i));
}
delegator = std::make_shared<DB::tests::MockDiskDelegatorMulti>(paths);
}
}

private:
const bool multi_paths;

protected:
PSDiskDelegatorPtr delegator;
};

TEST_F(WALStoreTest, FindCheckpointFile)
TEST_P(WALStoreTest, FindCheckpointFile)
{
Poco::Logger * log = &Poco::Logger::get("WALStoreTest");
auto path = getTemporaryPath();
Expand Down Expand Up @@ -276,7 +299,7 @@ TEST_F(WALStoreTest, FindCheckpointFile)
}
}

TEST_F(WALStoreTest, Empty)
TEST_P(WALStoreTest, Empty)
{
auto ctx = DB::tests::TiFlashTestEnv::getContext();
auto provider = ctx.getFileProvider();
Expand All @@ -299,7 +322,7 @@ TEST_F(WALStoreTest, Empty)
ASSERT_EQ(num_callback_called, 0);
}

TEST_F(WALStoreTest, ReadWriteRestore)
TEST_P(WALStoreTest, ReadWriteRestore)
try
{
auto ctx = DB::tests::TiFlashTestEnv::getContext();
Expand Down Expand Up @@ -419,7 +442,7 @@ try
}
CATCH

TEST_F(WALStoreTest, ReadWriteRestore2)
TEST_P(WALStoreTest, ReadWriteRestore2)
try
{
auto ctx = DB::tests::TiFlashTestEnv::getContext();
Expand Down Expand Up @@ -509,7 +532,7 @@ try
}
CATCH

TEST_F(WALStoreTest, ManyEdits)
TEST_P(WALStoreTest, ManyEdits)
try
{
auto ctx = DB::tests::TiFlashTestEnv::getContext();
Expand Down Expand Up @@ -589,4 +612,15 @@ try
}
CATCH

INSTANTIATE_TEST_CASE_P(
Disks,
WALStoreTest,
::testing::Bool(),
[](const ::testing::TestParamInfo<WALStoreTest::ParamType> & info) -> String {
const auto multi_path = info.param;
if (multi_path)
return "multi_disks";
return "single_disk";
});

} // namespace DB::PS::V3::tests
6 changes: 3 additions & 3 deletions dbms/src/Storages/tests/TiFlashStorageTestBasic.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,15 @@ class TiFlashStorageTestBasic : public ::testing::Test
}

protected:
void dropDataOnDisk(const String & path)
static void dropDataOnDisk(const String & path)
{
if (Poco::File file(path); file.exists())
{
file.remove(true);
}
}

void createIfNotExist(const String & path)
static void createIfNotExist(const String & path)
{
if (Poco::File file(path); !file.exists())
file.createDirectories();
Expand All @@ -98,4 +98,4 @@ class TiFlashStorageTestBasic : public ::testing::Test
std::unique_ptr<Context> db_context;
};
} // namespace base
} // namespace DB
} // namespace DB

0 comments on commit 989bbb5

Please sign in to comment.