Skip to content

Commit

Permalink
Binary compatibility version for DeltaMerge's chunk; Fix disappear of…
Browse files Browse the repository at this point in the history
… PageStorage del meta (#257)

* Chunk binary version

* Add name to PageStorage for identify different storages

* Remove getMaxId of PageStorage

* Migrate DelPage mark in doing GC

* enable fullstack-test

* Revert "Remove getMaxId of PageStorage"

This reverts commit 34d50eb6e9fb2f229f32e2d6b219b74c340d0d0a.
  • Loading branch information
JaySon-Huang committed Oct 22, 2019
1 parent f26846c commit 10e775e
Show file tree
Hide file tree
Showing 15 changed files with 170 additions and 63 deletions.
10 changes: 10 additions & 0 deletions dbms/src/Storages/DeltaMerge/Chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@ namespace DB
namespace DM
{

const Chunk::Version Chunk::CURRENT_VERSION = 1;

void Chunk::serialize(WriteBuffer & buf) const
{
writeVarUInt(Chunk::CURRENT_VERSION, buf); // Add binary version

writeIntBinary(handle_start, buf);
writeIntBinary(handle_end, buf);
writePODBinary(is_delete_range, buf);
Expand All @@ -37,6 +41,12 @@ void Chunk::serialize(WriteBuffer & buf) const

Chunk Chunk::deserialize(ReadBuffer & buf)
{
// Check binary version
Chunk::Version chunk_batch_version;
readVarUInt(chunk_batch_version, buf);
if (chunk_batch_version != Chunk::CURRENT_VERSION)
throw Exception("Chunk binary version not match: " + DB::toString(chunk_batch_version), ErrorCodes::LOGICAL_ERROR);

Handle start, end;
readIntBinary(start, buf);
readIntBinary(end, buf);
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Storages/DeltaMerge/Chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ using ColumnMetas = std::vector<ColumnMeta>;

class Chunk
{
public:
// Binary version of chunk
using Version = UInt32;
static const Version CURRENT_VERSION;
public:
using ColumnMetaMap = std::unordered_map<ColId, ColumnMeta>;

Expand Down
7 changes: 4 additions & 3 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,14 @@ namespace DM

DeltaMergeStore::DeltaMergeStore(Context & db_context,
const String & path_,
const String & name,
const String & db_name,
const String & tbl_name,
const ColumnDefines & columns,
const ColumnDefine & handle,
const Settings & settings_)
: path(path_),
storage_pool(path),
table_name(name),
storage_pool(db_name + "." + tbl_name, path),
table_name(tbl_name),
table_handle_define(handle),
background_pool(db_context.getBackgroundPool()),
settings(settings_),
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class DeltaMergeStore

DeltaMergeStore(Context & db_context, //
const String & path_,
const String & name,
const String & db_name,
const String & tbl_name,
const ColumnDefines & columns,
const ColumnDefine & handle,
const Settings & settings_);
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Storages/DeltaMerge/StoragePool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ namespace DM
{

// TODO: Load configs from settings.
StoragePool::StoragePool(const String & path)
: log_storage(path + "/log", {}),
data_storage(path + "/data", {}),
meta_storage(path + "/meta", {}),
StoragePool::StoragePool(const String &name, const String & path)
: log_storage(name + ".log", path + "/log", {}),
data_storage(name + ".data", path + "/data", {}),
meta_storage(name + ".meta", path + "/meta", {}),
max_log_page_id(log_storage.getMaxId()),
max_data_page_id(data_storage.getMaxId()),
max_meta_page_id(meta_storage.getMaxId())
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/StoragePool.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class StoragePool : private boost::noncopyable
using Duration = Clock::duration;
using Seconds = std::chrono::seconds;

explicit StoragePool(const String & path);
StoragePool(const String & name, const String & path);

PageId maxLogPageId() { return max_log_page_id; }
PageId maxDataPageId() { return max_data_page_id; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class DeltaMergeStore_test : public ::testing::Test
ColumnDefine handle_column_define = cols[0];

DeltaMergeStorePtr s
= std::make_shared<DeltaMergeStore>(*context, path, name, cols, handle_column_define, DeltaMergeStore::Settings());
= std::make_shared<DeltaMergeStore>(*context, path, "test", name, cols, handle_column_define, DeltaMergeStore::Settings());
return s;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class DiskValueSpace_test : public ::testing::Test
{
dropDataInDisk();

storage_pool = std::make_unique<StoragePool>(path);
storage_pool = std::make_unique<StoragePool>("test.t1", path);
Context & context = DMTestEnv::getContext();
table_handle_define = ColumnDefine(1, "pk", std::make_shared<DataTypeInt64>());
table_columns.clear();
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class Segment_test : public ::testing::Test

SegmentPtr reload(ColumnDefines && pre_define_columns = {}, DB::Settings && db_settings = DB::Settings())
{
storage_pool = std::make_unique<StoragePool>(path);
storage_pool = std::make_unique<StoragePool>("test.t1", path);
*db_context = DMTestEnv::getContext(db_settings);
ColumnDefines cols = pre_define_columns.empty() ? DMTestEnv::getDefaultColumns() : pre_define_columns;
setColumns(cols);
Expand Down
97 changes: 62 additions & 35 deletions dbms/src/Storages/Page/PageStorage.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <Storages/Page/PageStorage.h>

#include <set>
#include <utility>

#include <Storages/Page/PageStorage.h>

#include <IO/ReadBufferFromMemory.h>
#include <Poco/File.h>
Expand Down Expand Up @@ -51,8 +52,9 @@ PageStorage::listAllPageFiles(const String & storage_path, bool remove_tmp_file,
return page_files;
}

PageStorage::PageStorage(const String & storage_path_, const Config & config_)
: storage_path(storage_path_),
PageStorage::PageStorage(String name, const String & storage_path_, const Config & config_)
: storage_name(std::move(name)),
storage_path(storage_path_),
config(config_),
versioned_page_entries(),
page_file_log(&Poco::Logger::get("PageFile")),
Expand Down Expand Up @@ -120,7 +122,7 @@ PageEntry PageStorage::getEntry(PageId page_id, SnapshotPtr snapshot)
}
catch (DB::Exception & e)
{
LOG_WARNING(log, e.message());
LOG_WARNING(log, storage_name << " " << e.message());
return {}; // return invalid PageEntry
}
}
Expand Down Expand Up @@ -336,8 +338,6 @@ bool PageStorage::gc()
return false;
}

LOG_DEBUG(log, "PageStorage GC start");

PageFileIdAndLevel writing_file_id_level;
{
std::lock_guard<std::mutex> lock(write_mutex);
Expand Down Expand Up @@ -387,12 +387,14 @@ bool PageStorage::gc()
|| (merge_files.size() >= 2 && candidate_total_size >= config.merge_hint_low_used_file_total_size);
if (!should_merge)
{
LOG_DEBUG(log,
"GC exit without merging. merge file size: " << merge_files.size() << ", candidate size: " << candidate_total_size);
LOG_TRACE(log,
storage_name << " GC exit without merging. merge file size: " << merge_files.size()
<< ", candidate size: " << candidate_total_size);
return false;
}

LOG_INFO(log, "GC decide to merge " << merge_files.size() << " files, containing " << migrate_page_count << " regions");
LOG_INFO(log,
storage_name << " GC decide to merge " << merge_files.size() << " files, containing " << migrate_page_count << " regions");

// There are no valid pages to be migrated but valid ref pages, scan over all `merge_files` and do migrate.
gc_file_entries_edit = gcMigratePages(snapshot, file_valid_pages, merge_files);
Expand Down Expand Up @@ -421,20 +423,7 @@ bool PageStorage::gc()
}

// Delete obsolete files that are not used by any version, without lock
for (const auto & page_file : page_files)
{
const auto page_id_and_lvl = page_file.fileIdLevel();
if (page_id_and_lvl >= writing_file_id_level)
{
continue;
}

if (live_files.count(page_id_and_lvl) == 0)
{
// the page file is not used by any version, remove reader cache
page_file.destroy();
}
}
gcRemoveObsoleteFiles(page_files, writing_file_id_level, live_files);
return true;
}

Expand Down Expand Up @@ -491,6 +480,7 @@ PageStorage::gcMigratePages(const SnapshotPtr & snapshot, const GcLivesPages & f

size_t num_successful_migrate_pages = 0;
size_t num_valid_ref_pages = 0;
size_t num_del_page_meta = 0;
auto * current = snapshot->version();
{
PageEntriesEdit legacy_edit; // All page entries in `merge_files`
Expand All @@ -510,8 +500,8 @@ PageStorage::gcMigratePages(const SnapshotPtr & snapshot, const GcLivesPages & f
continue;
}

PageIdAndEntries page_id_and_entries; // The valid pages that we need to migrate to `gc_file`
auto to_merge_file_reader = to_merge_file.createReader();
PageIdAndEntries page_id_and_entries;
{
const auto & page_ids = it->second.second;
for (auto page_id : page_ids)
Expand All @@ -530,7 +520,7 @@ PageStorage::gcMigratePages(const SnapshotPtr & snapshot, const GcLivesPages & f
catch (DB::Exception & e)
{
// ignore if it2 is a ref to non-exist page
LOG_WARNING(log, "Ignore invalid RefPage while gcMigratePages: " + e.message());
LOG_WARNING(log, storage_name << " Ignore invalid RefPage while gcMigratePages: " << e.message());
}
}
}
Expand All @@ -554,17 +544,26 @@ PageStorage::gcMigratePages(const SnapshotPtr & snapshot, const GcLivesPages & f
}

{
// Migrate RefPages which are still valid.
// Migrate valid RefPages and DelPage.
WriteBatch batch;
for (const auto & rec : legacy_edit.getRecords())
{
// Get `normal_page_id` from memory's `page_entry_map`. Note: can not get `normal_page_id` from disk,
// if it is a record of RefPage to another RefPage, the later ref-id is resolve to the actual `normal_page_id`.
auto [is_ref, normal_page_id] = current->isRefId(rec.page_id);
if (is_ref)
if (rec.type == WriteBatch::WriteType::REF)
{
batch.putRefPage(rec.page_id, normal_page_id);
num_valid_ref_pages += 1;
// Get `normal_page_id` from memory's `page_entry_map`. Note: can not get `normal_page_id` from disk,
// if it is a record of RefPage to another RefPage, the later ref-id is resolve to the actual `normal_page_id`.
auto [is_ref, normal_page_id] = current->isRefId(rec.page_id);
if (is_ref)
{
batch.putRefPage(rec.page_id, normal_page_id);
num_valid_ref_pages += 1;
}
}
else if (rec.type == WriteBatch::WriteType::DEL)
{
// DelPage should be migrate to new PageFile
batch.delPage(rec.page_id);
num_del_page_meta += 1;
}
}
gc_file_writer->write(batch, gc_file_edit);
Expand All @@ -580,10 +579,38 @@ PageStorage::gcMigratePages(const SnapshotPtr & snapshot, const GcLivesPages & f
gc_file.setFormal();
const auto id = gc_file.fileIdLevel();
LOG_INFO(log,
"GC have migrated " << num_successful_migrate_pages << " regions and " << num_valid_ref_pages << " RefPages to PageFile_"
<< id.first << "_" << id.second);
storage_name << " GC have migrated " << num_successful_migrate_pages //
<< " regions and " << num_valid_ref_pages //
<< " RefPages and " << num_del_page_meta //
<< " DelPage to PageFile_" << id.first << "_" << id.second);
}
return gc_file_edit;
}

/**
* Delete obsolete files that are not used by any version
* @param page_files All avaliable files in disk
* @param writing_file_id_level The PageFile id which is writing to
* @param live_files The live files after gc
*/
void PageStorage::gcRemoveObsoleteFiles(const std::set<PageFile, PageFile::Comparator> & page_files,
const PageFileIdAndLevel & writing_file_id_level,
const std::set<PageFileIdAndLevel> & live_files)
{
for (const auto & page_file : page_files)
{
const auto page_id_and_lvl = page_file.fileIdLevel();
if (page_id_and_lvl >= writing_file_id_level)
{
continue;
}

if (live_files.count(page_id_and_lvl) == 0)
{
// the page file is not used by any version, remove reader cache
page_file.destroy();
}
}
}

} // namespace DB
7 changes: 6 additions & 1 deletion dbms/src/Storages/Page/PageStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class PageStorage
using OpenReadFiles = std::map<PageFileIdAndLevel, ReaderPtr>;

public:
PageStorage(const String & storage_path, const Config & config_);
PageStorage(String name, const String & storage_path, const Config & config_);

PageId getMaxId();

Expand Down Expand Up @@ -89,7 +89,12 @@ class PageStorage
PageEntriesEdit
gcMigratePages(const SnapshotPtr & snapshot, const GcLivesPages & file_valid_pages, const GcCandidates & merge_files) const;

static void gcRemoveObsoleteFiles(const std::set<PageFile, PageFile::Comparator> & page_files,
const PageFileIdAndLevel & writing_file_id_level,
const std::set<PageFileIdAndLevel> & live_files);

private:
String storage_name; // Identify between different Storage
String storage_path;
Config config;

Expand Down
61 changes: 60 additions & 1 deletion dbms/src/Storages/Page/tests/gtest_page_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class PageStorage_test : public ::testing::Test

std::shared_ptr<PageStorage> reopenWithConfig(const PageStorage::Config & config_)
{
return std::make_shared<PageStorage>(path, config_);
return std::make_shared<PageStorage>("test.t", path, config_);
}

protected:
Expand Down Expand Up @@ -324,6 +324,65 @@ TEST_F(PageStorage_test, GcMoveRefPage)
ASSERT_EQ(normal_page_id, 1UL);
}

TEST_F(PageStorage_test, GcMovePageDelMeta)
{
const size_t buf_sz = 256;
char c_buff[buf_sz];

{
// Page1 should be written to PageFile{1, 0}
WriteBatch batch;
memset(c_buff, 0xf, buf_sz);
ReadBufferPtr buff = std::make_shared<ReadBufferFromMemory>(c_buff, sizeof(c_buff));
batch.putPage(1, 0, buff, buf_sz);
buff = std::make_shared<ReadBufferFromMemory>(c_buff, sizeof(c_buff));
batch.putPage(2, 0, buff, buf_sz);
buff = std::make_shared<ReadBufferFromMemory>(c_buff, sizeof(c_buff));
batch.putPage(3, 0, buff, buf_sz);

storage->write(batch);
}

{
// DelPage1 should be written to PageFile{2, 0}
WriteBatch batch;
batch.delPage(1);

storage->write(batch);
}

PageFileIdAndLevel id_and_lvl = {2, 0}; // PageFile{2, 0} is ready to be migrated by gc
PageStorage::GcLivesPages livesPages{{id_and_lvl, {0, {}}}};
PageStorage::GcCandidates candidates{
id_and_lvl,
};
const auto page_files = PageStorage::listAllPageFiles(storage->storage_path, true, storage->page_file_log);
auto s0 = storage->getSnapshot();
PageEntriesEdit edit = storage->gcMigratePages(s0, livesPages, candidates);

// We should see migration of DelPage1
bool exist = false;
for (const auto & rec : edit.getRecords())
{
if (rec.type == WriteBatch::WriteType::DEL && rec.page_id == 1)
{
exist = true;
break;
}
}
EXPECT_TRUE(exist);
s0.reset();

auto live_files = storage->versioned_page_entries.gcApply(edit);
EXPECT_EQ(live_files.find(id_and_lvl), live_files.end());
storage->gcRemoveObsoleteFiles(/* page_files= */ page_files, /* writing_file_id_level= */ {3, 0}, live_files);

// reopen PageStorage, Page 1 should be deleted
storage = reopenWithConfig(config);
auto s1 = storage->getSnapshot();
ASSERT_EQ(s1->version()->find(1), std::nullopt);
}

/**
* PageStorage tests with predefine Page1 && Page2
*/
Expand Down
Loading

0 comments on commit 10e775e

Please sign in to comment.