Skip to content

Commit

Permalink
storage: Support adding vector index in background (#203)
Browse files Browse the repository at this point in the history
Signed-off-by: Wish <[email protected]>
Signed-off-by: Lloyd-Pottiger <[email protected]>
Co-authored-by: Wish <[email protected]>
Co-authored-by: JaySon-Huang <[email protected]>
  • Loading branch information
3 people committed Sep 6, 2024
1 parent 4fac01c commit 707e91c
Show file tree
Hide file tree
Showing 43 changed files with 1,713 additions and 236 deletions.
1 change: 1 addition & 0 deletions dbms/src/Common/CurrentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
M(DT_SnapshotOfReadRaw) \
M(DT_SnapshotOfSegmentSplit) \
M(DT_SnapshotOfSegmentMerge) \
M(DT_SnapshotOfSegmentIngestIndex) \
M(DT_SnapshotOfSegmentIngest) \
M(DT_SnapshotOfDeltaMerge) \
M(DT_SnapshotOfDeltaCompact) \
Expand Down
23 changes: 23 additions & 0 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
#include <Storages/DeltaMerge/DeltaIndexManager.h>
#include <Storages/DeltaMerge/Index/MinMaxIndex.h>
#include <Storages/DeltaMerge/Index/VectorIndexCache.h>
#include <Storages/DeltaMerge/LocalIndexerScheduler.h>
#include <Storages/DeltaMerge/StoragePool/GlobalPageIdAllocator.h>
#include <Storages/DeltaMerge/StoragePool/GlobalStoragePool.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool.h>
Expand Down Expand Up @@ -171,6 +172,7 @@ struct ContextShared
PageStorageRunMode storage_run_mode = PageStorageRunMode::ONLY_V3;
DM::GlobalPageIdAllocatorPtr global_page_id_allocator;
DM::GlobalStoragePoolPtr global_storage_pool;
DM::LocalIndexerSchedulerPtr global_local_indexer_scheduler;

/// The PS instance available on Write Node.
UniversalPageStorageServicePtr ps_write;
Expand Down Expand Up @@ -1750,6 +1752,27 @@ DM::GlobalPageIdAllocatorPtr Context::getGlobalPageIdAllocator() const
return shared->global_page_id_allocator;
}

bool Context::initializeGlobalLocalIndexerScheduler(size_t pool_size, size_t memory_limit)
{
auto lock = getLock();
if (!shared->global_local_indexer_scheduler)
{
shared->global_local_indexer_scheduler
= std::make_shared<DM::LocalIndexerScheduler>(DM::LocalIndexerScheduler::Options{
.pool_size = pool_size,
.memory_limit = memory_limit,
.auto_start = true,
});
}
return true;
}

DM::LocalIndexerSchedulerPtr Context::getGlobalLocalIndexerScheduler() const
{
auto lock = getLock();
return shared->global_local_indexer_scheduler;
}

bool Context::initializeGlobalStoragePoolIfNeed(const PathPool & path_pool)
{
auto lock = getLock();
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <Interpreters/SharedContexts/Disagg_fwd.h>
#include <Interpreters/TimezoneInfo.h>
#include <Server/ServerInfo.h>
#include <Storages/DeltaMerge/LocalIndexerScheduler_fwd.h>
#include <common/MultiVersion.h>

#include <chrono>
Expand Down Expand Up @@ -456,6 +457,9 @@ class Context
bool initializeGlobalPageIdAllocator();
DM::GlobalPageIdAllocatorPtr getGlobalPageIdAllocator() const;

bool initializeGlobalLocalIndexerScheduler(size_t pool_size, size_t memory_limit);
DM::LocalIndexerSchedulerPtr getGlobalLocalIndexerScheduler() const;

bool initializeGlobalStoragePoolIfNeed(const PathPool & path_pool);
DM::GlobalStoragePoolPtr getGlobalStoragePool() const;

Expand Down
29 changes: 24 additions & 5 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -987,12 +987,13 @@ int Server::main(const std::vector<std::string> & /*args*/)

if (storage_config.format_version != 0)
{
if (storage_config.s3_config.isS3Enabled() && storage_config.format_version != STORAGE_FORMAT_V100.identifier)
if (storage_config.s3_config.isS3Enabled() && storage_config.format_version != STORAGE_FORMAT_V100.identifier
&& storage_config.format_version != STORAGE_FORMAT_V101.identifier)
{
LOG_WARNING(log, "'storage.format_version' must be set to 100 when S3 is enabled!");
LOG_WARNING(log, "'storage.format_version' must be set to 100 or 101 when S3 is enabled!");
throw Exception(
ErrorCodes::INVALID_CONFIG_PARAMETER,
"'storage.format_version' must be set to 100 when S3 is enabled!");
"'storage.format_version' must be set to 100 or 101 when S3 is enabled!");
}
setStorageFormat(storage_config.format_version);
LOG_INFO(log, "Using format_version={} (explicit storage format detected).", storage_config.format_version);
Expand All @@ -1003,8 +1004,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
{
// If the user does not explicitly set format_version in the config file but
// enables S3, then we set up a proper format version to support S3.
setStorageFormat(STORAGE_FORMAT_V100.identifier);
LOG_INFO(log, "Using format_version={} (infer by S3 is enabled).", STORAGE_FORMAT_V100.identifier);
setStorageFormat(STORAGE_FORMAT_V101.identifier);
LOG_INFO(log, "Using format_version={} (infer by S3 is enabled).", STORAGE_FORMAT_V101.identifier);
}
else
{
Expand Down Expand Up @@ -1325,6 +1326,24 @@ int Server::main(const std::vector<std::string> & /*args*/)
settings.max_memory_usage_for_all_queries.getActualBytes(server_info.memory_info.capacity),
settings.bytes_that_rss_larger_than_limit);

if (global_context->getSharedContextDisagg()->isDisaggregatedComputeMode())
{
// No need to have local index scheduler.
}
else if (global_context->getSharedContextDisagg()->isDisaggregatedStorageMode())
{
global_context->initializeGlobalLocalIndexerScheduler(
server_info.cpu_info.logical_cores * 8 / 10,
server_info.memory_info.capacity * 6 / 10);
}
else
{
// There could be compute tasks, reserve more memory for computes.
global_context->initializeGlobalLocalIndexerScheduler(
server_info.cpu_info.logical_cores * 4 / 10,
server_info.memory_info.capacity * 4 / 10);
}

/// PageStorage run mode has been determined above
global_context->initializeGlobalPageIdAllocator();
if (!global_context->getSharedContextDisagg()->isDisaggregatedComputeMode())
Expand Down
17 changes: 16 additions & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include <Storages/DeltaMerge/File/DMFile.h>
#include <Storages/DeltaMerge/Filter/PushDownFilter.h>
#include <Storages/DeltaMerge/Filter/RSOperator.h>
#include <Storages/DeltaMerge/LocalIndexerScheduler.h>
#include <Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h>
#include <Storages/DeltaMerge/ReadThread/UnorderedInputStream.h>
#include <Storages/DeltaMerge/Remote/DisaggSnapshot.h>
Expand All @@ -62,6 +63,7 @@
#include <magic_enum.hpp>
#include <memory>


namespace ProfileEvents
{
extern const Event DMWriteBlock;
Expand Down Expand Up @@ -216,6 +218,7 @@ DeltaMergeStore::DeltaMergeStore(
const ColumnDefine & handle,
bool is_common_handle_,
size_t rowkey_column_size_,
IndexInfosPtr local_index_infos_,
const Settings & settings_,
ThreadPool * thread_pool)
: global_context(db_context.getGlobalContext())
Expand All @@ -230,6 +233,7 @@ DeltaMergeStore::DeltaMergeStore(
, background_pool(db_context.getBackgroundPool())
, blockable_background_pool(db_context.getBlockableBackgroundPool())
, next_gc_check_key(is_common_handle ? RowKeyValue::COMMON_HANDLE_MIN_KEY : RowKeyValue::INT_HANDLE_MIN_KEY)
, local_index_infos(std::move(local_index_infos_))
, log(Logger::get(fmt::format("keyspace={} table_id={}", keyspace_id_, physical_table_id_)))
{
{
Expand Down Expand Up @@ -332,6 +336,7 @@ DeltaMergeStorePtr DeltaMergeStore::create(
const ColumnDefine & handle,
bool is_common_handle_,
size_t rowkey_column_size_,
IndexInfosPtr local_index_infos_,
const Settings & settings_,
ThreadPool * thread_pool)
{
Expand All @@ -347,9 +352,11 @@ DeltaMergeStorePtr DeltaMergeStore::create(
handle,
is_common_handle_,
rowkey_column_size_,
local_index_infos_,
settings_,
thread_pool);
std::shared_ptr<DeltaMergeStore> store_shared_ptr(store);
store_shared_ptr->checkAllSegmentsLocalIndex();
return store_shared_ptr;
}

Expand All @@ -365,6 +372,7 @@ std::unique_ptr<DeltaMergeStore> DeltaMergeStore::createUnique(
const ColumnDefine & handle,
bool is_common_handle_,
size_t rowkey_column_size_,
IndexInfosPtr local_index_infos_,
const Settings & settings_,
ThreadPool * thread_pool)
{
Expand All @@ -380,9 +388,11 @@ std::unique_ptr<DeltaMergeStore> DeltaMergeStore::createUnique(
handle,
is_common_handle_,
rowkey_column_size_,
local_index_infos_,
settings_,
thread_pool);
std::unique_ptr<DeltaMergeStore> store_unique_ptr(store);
store_unique_ptr->checkAllSegmentsLocalIndex();
return store_unique_ptr;
}

Expand Down Expand Up @@ -504,6 +514,11 @@ void DeltaMergeStore::shutdown()
return;

LOG_TRACE(log, "Shutdown DeltaMerge start");

auto indexer_scheulder = global_context.getGlobalLocalIndexerScheduler();
RUNTIME_CHECK(indexer_scheulder != nullptr);
indexer_scheulder->dropTasks(keyspace_id, physical_table_id);

// Must shutdown storage path pool to make sure the DMFile remove callbacks
// won't remove dmfiles unexpectly.
path_pool->shutdown();
Expand Down Expand Up @@ -2032,10 +2047,10 @@ void DeltaMergeStore::applySchemaChanges(TiDB::TableInfo & table_info)
original_table_columns.swap(new_original_table_columns);
store_columns.swap(new_store_columns);

// TODO(local index): There could be some local indexes added/dropped after DDL
std::atomic_store(&original_table_header, std::make_shared<Block>(toEmptyBlock(original_table_columns)));
}


SortDescription DeltaMergeStore::getPrimarySortDescription() const
{
std::shared_lock lock(read_write_mutex);
Expand Down
69 changes: 66 additions & 3 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,13 @@ struct LocalIndexStats
};
using LocalIndexesStats = std::vector<LocalIndexStats>;


class DeltaMergeStore;
using DeltaMergeStorePtr = std::shared_ptr<DeltaMergeStore>;

class DeltaMergeStore : private boost::noncopyable
class DeltaMergeStore
: private boost::noncopyable
, public std::enable_shared_from_this<DeltaMergeStore>
{
public:
friend class ::DB::DM::tests::DeltaMergeStoreTest;
Expand Down Expand Up @@ -292,6 +295,7 @@ class DeltaMergeStore : private boost::noncopyable
const ColumnDefine & handle,
bool is_common_handle_,
size_t rowkey_column_size_,
IndexInfosPtr local_index_infos_,
const Settings & settings_ = EMPTY_SETTINGS,
ThreadPool * thread_pool = nullptr);

Expand All @@ -308,6 +312,7 @@ class DeltaMergeStore : private boost::noncopyable
const ColumnDefine & handle,
bool is_common_handle_,
size_t rowkey_column_size_,
IndexInfosPtr local_index_infos_,
const Settings & settings_ = EMPTY_SETTINGS,
ThreadPool * thread_pool = nullptr);

Expand All @@ -323,6 +328,7 @@ class DeltaMergeStore : private boost::noncopyable
const ColumnDefine & handle,
bool is_common_handle_,
size_t rowkey_column_size_,
IndexInfosPtr local_index_infos_,
const Settings & settings_ = EMPTY_SETTINGS,
ThreadPool * thread_pool = nullptr);

Expand Down Expand Up @@ -726,6 +732,12 @@ class DeltaMergeStore : private boost::noncopyable
MergeDeltaReason reason,
SegmentSnapshotPtr segment_snap = nullptr);

void segmentEnsureStableIndex(
DMContext & dm_context,
const IndexInfosPtr & index_info,
const DMFiles & dm_files,
const String & source_segment_info);

/**
* Ingest a DMFile into the segment, optionally causing a new segment being created.
*
Expand Down Expand Up @@ -854,11 +866,45 @@ class DeltaMergeStore : private boost::noncopyable
const SegmentPtr & segment,
ThreadType thread_type,
InputType input_type);

/**
* Segment update meta with new DMFiles. A lock must be provided, so that it is
* possible to update the meta for multiple segments all at once.
*/
SegmentPtr segmentUpdateMeta(
std::unique_lock<std::shared_mutex> & read_write_lock,
DMContext & dm_context,
const SegmentPtr & segment,
const DMFiles & new_dm_files);

/**
* Check whether there are new local indexes should be built for all segments.
*/
void checkAllSegmentsLocalIndex();

/**
* Ensure the segment has stable index.
* If the segment has no stable index, it will be built in background.
* Note: This function can not be called in constructor, since shared_from_this() is not available.
*
* @returns true if index is missing and a build task is added in background.
*/
bool segmentEnsureStableIndexAsync(const SegmentPtr & segment);

#ifndef DBMS_PUBLIC_GTEST
private:
#else
public:
#endif
/**
* Wait until the segment has stable index.
* If the index is ready or no need to build, it will return immediately.
* Only used for testing.
*
* @returns false if index is still missing after wait timed out.
*/
bool segmentWaitStableIndexReady(const SegmentPtr & segment) const;

void dropAllSegments(bool keep_first_segment);
String getLogTracingId(const DMContext & dm_ctx);
// Returns segment that contains start_key and whether 'segments' is empty.
Expand Down Expand Up @@ -916,13 +962,30 @@ class DeltaMergeStore : private boost::noncopyable
// of resources to build, so they will be built in separated background pool.
IndexInfosPtr local_index_infos;

struct DMFileIDToSegmentIDs
{
public:
using Key = PageIdU64; // dmfile_id
using Value = std::unordered_set<PageIdU64>; // segment_ids

void remove(SegmentPtr segment);

void add(SegmentPtr segment);

const Value & get(PageIdU64 dmfile_id) const;

private:
std::unordered_map<Key, Value> u_map;
};
// dmfile_id -> segment_ids
// This map is not protected by lock, should be accessed under read_write_mutex.
DMFileIDToSegmentIDs dmfile_id_to_segment_ids;

// Synchronize between write threads and read threads.
mutable std::shared_mutex read_write_mutex;

LoggerPtr log;
};

using DeltaMergeStorePtr = std::shared_ptr<DeltaMergeStore>;

} // namespace DM
} // namespace DB
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ bool DeltaMergeStore::handleBackgroundTask(bool heavy)
// Foreground task don't get GC safe point from remote, but we better make it as up to date as possible.
if (updateGCSafePoint())
{
/// Note that `task.dm_context->db_context` will be free after query is finish. We should not use that in background task.
/// Note that `task.dm_context->global_context` will be free after query is finish. We should not use that in background task.
task.dm_context->min_version = latest_gc_safe_point.load(std::memory_order_relaxed);
LOG_DEBUG(log, "Task {} GC safe point: {}", magic_enum::enum_name(task.type), task.dm_context->min_version);
}
Expand Down
Loading

0 comments on commit 707e91c

Please sign in to comment.