Skip to content

Commit

Permalink
Save gc safepoint for keyspace
Browse files Browse the repository at this point in the history
  • Loading branch information
JaySon-Huang committed Oct 25, 2023
1 parent 57c0c57 commit a7a5ec7
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 21 deletions.
4 changes: 2 additions & 2 deletions dbms/src/Databases/IDatabase.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class IDatabaseIterator
virtual const String & name() const = 0;
virtual StoragePtr & table() const = 0;

virtual ~IDatabaseIterator() {}
virtual ~IDatabaseIterator() = default;
};

using DatabaseIteratorPtr = std::unique_ptr<IDatabaseIterator>;
Expand Down Expand Up @@ -149,7 +149,7 @@ class IDatabase : public std::enable_shared_from_this<IDatabase>
/// Delete metadata, the deletion of which differs from the recursive deletion of the directory, if any.
virtual void drop(const Context & context) = 0;

virtual ~IDatabase() {}
virtual ~IDatabase() = default;
};

using DatabasePtr = std::shared_ptr<IDatabase>;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Debug/dbgFuncSchema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ void dbgFuncIsTombstone(Context & context, const ASTs & args, DBGInvoker::Printe
if (!tiflash_db)
throw Exception(database_name + " is not DatabaseTiFlash", ErrorCodes::BAD_ARGUMENTS);

fmt_buf.append((tiflash_db->isTombstone() ? "true" : "false"));
fmt_buf.append((tiflash_db->getTombstone() != 0 ? "true" : "false"));
}
else if (args.size() == 2)
{
Expand All @@ -225,7 +225,7 @@ void dbgFuncIsTombstone(Context & context, const ASTs & args, DBGInvoker::Printe
if (!managed_storage)
throw Exception(database_name + "." + table_name + " is not ManageableStorage", ErrorCodes::BAD_ARGUMENTS);

fmt_buf.append((managed_storage->isTombstone() ? "true" : "false"));
fmt_buf.append((managed_storage->getTombstone() != 0 ? "true" : "false"));
}
output(fmt_buf.toString());
}
Expand Down
44 changes: 34 additions & 10 deletions dbms/src/TiDB/Schema/SchemaSyncService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <Storages/BackgroundProcessingPool.h>
#include <Storages/IManageableStorage.h>
#include <Storages/KVStore/TMTContext.h>
#include <Storages/KVStore/Types.h>
#include <TiDB/Schema/SchemaNameMapper.h>
#include <TiDB/Schema/SchemaSyncService.h>
#include <TiDB/Schema/SchemaSyncer.h>
Expand Down Expand Up @@ -138,6 +139,8 @@ void SchemaSyncService::removeKeyspaceGCTasks()

context.getTMTContext().getSchemaSyncerManager()->removeSchemaSyncer(keyspace);
PDClientHelper::remove_ks_gc_sp(keyspace);

keyspace_gc_context.erase(keyspace);
}
}

Expand All @@ -160,19 +163,36 @@ bool SchemaSyncService::syncSchemas(KeyspaceID keyspace_id)
template <typename DatabaseOrTablePtr>
inline bool isSafeForGC(const DatabaseOrTablePtr & ptr, Timestamp gc_safe_point)
{
return ptr->isTombstone() && ptr->getTombstone() < gc_safe_point;
const auto tombstone_ts = ptr->getTombstone();
return tombstone_ts != 0 && tombstone_ts < gc_safe_point;
}

bool SchemaSyncService::gc(Timestamp gc_safe_point, KeyspaceID keyspace_id)
Timestamp SchemaSyncService::lastGcSafePoint(KeyspaceID keyspace_id) const
{
auto & tmt_context = context.getTMTContext();
if (gc_safe_point == gc_context.last_gc_safe_point)
std::shared_lock lock(keyspace_map_mutex);
auto iter = keyspace_gc_context.find(keyspace_id);
if (iter == keyspace_gc_context.end())
return 0;
return iter->second.last_gc_safepoint;
}

void SchemaSyncService::updateLastGcSafepoint(KeyspaceID keyspace_id, Timestamp gc_safepoint)
{
std::unique_lock lock(keyspace_map_mutex);
keyspace_gc_context[keyspace_id].last_gc_safepoint = gc_safepoint;
}

bool SchemaSyncService::gc(Timestamp gc_safepoint, KeyspaceID keyspace_id)
{
const Timestamp last_gc_safepoint = lastGcSafePoint(keyspace_id);
if (gc_safepoint == last_gc_safepoint)
return false;

auto keyspace_log = log->getChild(fmt::format("keyspace={}", keyspace_id));

LOG_INFO(keyspace_log, "Performing GC using safe point {}", gc_safe_point);
LOG_INFO(keyspace_log, "Schema GC begin, last_safepoint={} safepoint={}", last_gc_safepoint, gc_safepoint);

auto & tmt_context = context.getTMTContext();
// The storages that are ready for gc
std::vector<std::weak_ptr<IManageableStorage>> storages_to_gc;
// Get a snapshot of database
Expand All @@ -190,7 +210,7 @@ bool SchemaSyncService::gc(Timestamp gc_safe_point, KeyspaceID keyspace_id)
if (!managed_storage)
continue;

if (isSafeForGC(db, gc_safe_point) || isSafeForGC(managed_storage, gc_safe_point))
if (isSafeForGC(db, gc_safepoint) || isSafeForGC(managed_storage, gc_safepoint))
{
// Only keep a weak_ptr on storage so that the memory can be free as soon as
// it is dropped.
Expand Down Expand Up @@ -260,7 +280,7 @@ bool SchemaSyncService::gc(Timestamp gc_safe_point, KeyspaceID keyspace_id)
{
const auto & db = iter.second;
auto ks_db_id = SchemaNameMapper::getMappedNameKeyspaceID(iter.first);
if (!isSafeForGC(db, gc_safe_point) || ks_db_id != keyspace_id)
if (!isSafeForGC(db, gc_safepoint) || ks_db_id != keyspace_id)
continue;

const auto & db_name = iter.first;
Expand Down Expand Up @@ -305,13 +325,17 @@ bool SchemaSyncService::gc(Timestamp gc_safe_point, KeyspaceID keyspace_id)

if (succeeded)
{
gc_context.last_gc_safe_point = gc_safe_point;
LOG_INFO(keyspace_log, "Performed GC using safe point {}", gc_safe_point);
updateLastGcSafepoint(keyspace_id, gc_safepoint);
LOG_INFO(keyspace_log, "Schema GC done, safepoint={}", gc_safepoint);
}
else
{
// Don't update last_gc_safe_point and retry later
LOG_INFO(keyspace_log, "Performed GC using safe point {} meet error, will try again later", gc_safe_point);
LOG_INFO(
keyspace_log,
"Schema GC meet error, will try again later, last_safepoint={} safepoint={}",
last_gc_safepoint,
gc_safepoint);
}

return true;
Expand Down
18 changes: 11 additions & 7 deletions dbms/src/TiDB/Schema/SchemaSyncService.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <Interpreters/Context_fwd.h>
#include <Storages/BackgroundProcessingPool.h>
#include <Storages/KVStore/Types.h>

Expand All @@ -24,7 +25,6 @@

namespace DB
{
class Context;
class BackgroundProcessingPool;
class Logger;
using LoggerPtr = std::shared_ptr<Logger>;
Expand All @@ -46,16 +46,14 @@ class SchemaSyncService
private:
bool syncSchemas(KeyspaceID keyspace_id);

struct GCContext
{
Timestamp last_gc_safe_point = 0;
} gc_context;

bool gc(Timestamp gc_safe_point, KeyspaceID keyspace_id);
bool gc(Timestamp gc_safepoint, KeyspaceID keyspace_id);

void addKeyspaceGCTasks();
void removeKeyspaceGCTasks();

Timestamp lastGcSafePoint(KeyspaceID keyspace_id) const;
void updateLastGcSafepoint(KeyspaceID keyspace_id, Timestamp gc_safepoint);

private:
Context & context;

Expand All @@ -68,6 +66,12 @@ class SchemaSyncService
// Handles for each keyspace schema sync task.
std::unordered_map<KeyspaceID, BackgroundProcessingPool::TaskHandle> keyspace_handle_map;

struct KeyspaceGCContext
{
Timestamp last_gc_safepoint = 0;
};
std::unordered_map<KeyspaceID, KeyspaceGCContext> keyspace_gc_context;

LoggerPtr log;
};

Expand Down

0 comments on commit a7a5ec7

Please sign in to comment.