Skip to content

Commit

Permalink
Storages: support building vector index for ColumnFileTiny (Part 2) (#…
Browse files Browse the repository at this point in the history
…9546)

ref #9032

Signed-off-by: Lloyd-Pottiger <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
Lloyd-Pottiger and ti-chi-bot[bot] authored Oct 25, 2024
1 parent ae92a5e commit a1a3b9b
Show file tree
Hide file tree
Showing 19 changed files with 728 additions and 59 deletions.
1 change: 1 addition & 0 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class ColumnFileTiny : public ColumnFilePersisted
{
public:
friend class ColumnFileTinyReader;
friend class ColumnFileTinyVectorIndexWriter;
friend struct Remote::Serializer;

struct IndexInfo
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <IO/Compression/CompressedWriteBuffer.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileTinyReader.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileTinyVectorIndexWriter.h>
#include <Storages/DeltaMerge/Index/VectorIndex.h>
#include <Storages/DeltaMerge/dtpb/dmfile.pb.h>


namespace DB::ErrorCodes
{
extern const int ABORTED;
} // namespace DB::ErrorCodes

namespace DB::DM
{

ColumnFileTinyVectorIndexWriter::LocalIndexBuildInfo ColumnFileTinyVectorIndexWriter::getLocalIndexBuildInfo(
const LocalIndexInfosSnapshot & index_infos,
const ColumnFilePersistedSetPtr & file_set)
{
assert(index_infos != nullptr);

LocalIndexBuildInfo build;
build.indexes_to_build = std::make_shared<LocalIndexInfos>();
build.file_ids.reserve(file_set->getColumnFileCount());
for (const auto & file : file_set->getFiles())
{
auto * tiny_file = file->tryToTinyFile();
if (!tiny_file)
continue;

bool any_new_index_build = false;
for (const auto & index : *index_infos)
{
auto schema = tiny_file->getSchema();
assert(schema != nullptr);
// The ColumnFileTiny may be built before col_id is added. Skip build indexes for it
if (!schema->getColIdToOffset().contains(index.column_id))
continue;

// Index already built, skip
if (tiny_file->hasIndex(index.index_id))
continue;

any_new_index_build = true;
// FIXME: the memory usage is not accurate, but it's fine for now.
build.estimated_memory_bytes += tiny_file->getBytes();

// Avoid duplicate index build
if (std::find(build.index_ids.begin(), build.index_ids.end(), index.index_id) == build.index_ids.end())
{
build.indexes_to_build->emplace_back(index);
build.index_ids.emplace_back(index.index_id);
}
}

if (any_new_index_build)
{
build.file_ids.emplace_back(LocalIndexerScheduler::ColumnFileTinyID(tiny_file->getDataPageId()));
}
}

build.file_ids.shrink_to_fit();
return build;
}

ColumnFileTinyPtr ColumnFileTinyVectorIndexWriter::buildIndexForFile(
const ColumnDefines & column_defines,
const ColumnDefine & del_cd,
const ColumnFileTiny * file,
ProceedCheckFn should_proceed) const
{
// read_columns are: DEL_MARK, COL_A, COL_B, ...
// index_builders are: COL_A, COL_B, ...

ColumnDefinesPtr read_columns = std::make_shared<ColumnDefines>();
read_columns->reserve(options.index_infos->size() + 1);
read_columns->push_back(del_cd);

std::unordered_map<ColId, std::vector<VectorIndexBuilderPtr>> index_builders;

std::unordered_map<ColId, std::vector<LocalIndexInfo>> col_indexes;
for (const auto & index_info : *options.index_infos)
{
if (index_info.type != IndexType::Vector)
continue;
col_indexes[index_info.column_id].emplace_back(index_info);
}

for (const auto & [col_id, index_infos] : col_indexes)
{
// Make sure the column_id is in the schema.
const auto cd_iter = std::find_if(column_defines.cbegin(), column_defines.cend(), [&](const auto & cd) {
return cd.id == col_id;
});
RUNTIME_CHECK_MSG(
cd_iter != column_defines.cend(),
"Cannot find column_id={} in file_id={}",
col_id,
file->getDataPageId());

for (const auto & idx_info : index_infos)
{
// Just skip if the index is already built
if (file->hasIndex(idx_info.index_id))
continue;

index_builders[col_id].emplace_back(
VectorIndexBuilder::create(idx_info.index_id, idx_info.index_definition));
}
read_columns->push_back(*cd_iter);
}

// If no index to build, return nullptr
if (read_columns->size() == 1 || index_builders.empty())
return nullptr;

// Read all blocks and build index
// TODO: read one column at a time to reduce peak memory usage.
const size_t num_cols = read_columns->size();
ColumnFileTinyReader reader(*file, options.data_provider, read_columns);
while (true)
{
if (!should_proceed())
throw Exception(ErrorCodes::ABORTED, "Index build is interrupted");

auto block = reader.readNextBlock();
if (!block)
break;

RUNTIME_CHECK(block.columns() == read_columns->size());
RUNTIME_CHECK(block.getByPosition(0).column_id == TAG_COLUMN_ID);

auto del_mark_col = block.safeGetByPosition(0).column;
RUNTIME_CHECK(del_mark_col != nullptr);
const auto * del_mark = static_cast<const ColumnVector<UInt8> *>(del_mark_col.get());
RUNTIME_CHECK(del_mark != nullptr);

for (size_t col_idx = 1; col_idx < num_cols; ++col_idx)
{
const auto & col_with_type_and_name = block.safeGetByPosition(col_idx);
RUNTIME_CHECK(col_with_type_and_name.column_id == read_columns->at(col_idx).id);
const auto & col = col_with_type_and_name.column;
for (const auto & index_builder : index_builders[read_columns->at(col_idx).id])
{
index_builder->addBlock(*col, del_mark, should_proceed);
}
}
}

// Save index to PageStorage
auto index_infos = std::make_shared<ColumnFileTiny::IndexInfos>();
for (size_t col_idx = 1; col_idx < num_cols; ++col_idx)
{
const auto & cd = read_columns->at(col_idx);
for (const auto & index_builder : index_builders[cd.id])
{
auto index_page_id = options.storage_pool->newLogPageId();
MemoryWriteBuffer write_buf;
CompressedWriteBuffer compressed(write_buf);
index_builder->saveToBuffer(compressed);
compressed.next();
auto data_size = write_buf.count();
auto buf = write_buf.tryGetReadBuffer();
// ColumnFileDataProviderRNLocalPageCache currently does not support read data with fields
options.wbs.log.putPage(index_page_id, 0, buf, data_size, {data_size});

dtpb::VectorIndexFileProps vector_index;
vector_index.set_index_id(index_builder->index_id);
vector_index.set_index_bytes(data_size);
vector_index.set_index_kind(tipb::VectorIndexKind_Name(index_builder->definition->kind));
vector_index.set_distance_metric(
tipb::VectorDistanceMetric_Name(index_builder->definition->distance_metric));
vector_index.set_dimensions(index_builder->definition->dimension);
index_infos->emplace_back(index_page_id, vector_index);
}
}

if (file->index_infos)
file->index_infos->insert(file->index_infos->end(), index_infos->begin(), index_infos->end());

options.wbs.writeLogAndData();
// Note: The id of the file cannot be changed, otherwise minor compaction will fail.
// So we just clone the file with new index info.
return file->cloneWith(file->getDataPageId(), index_infos);
}

ColumnFileTinys ColumnFileTinyVectorIndexWriter::build(ProceedCheckFn should_proceed) const
{
ColumnFileTinys new_files;
new_files.reserve(options.files.size());
ColumnDefines column_defines;
ColumnDefine del_cd;
for (const auto & file : options.files)
{
// Only build index for ColumnFileTiny
const auto * tiny_file = file->tryToTinyFile();
if (!tiny_file)
continue;
if (column_defines.empty())
{
auto schema = tiny_file->getSchema();
column_defines = getColumnDefinesFromBlock(schema->getSchema());
const auto del_cd_iter
= std::find_if(column_defines.cbegin(), column_defines.cend(), [](const ColumnDefine & cd) {
return cd.id == TAG_COLUMN_ID;
});
RUNTIME_CHECK_MSG(
del_cd_iter != column_defines.cend(),
"Cannot find del_mark column, file_id={}",
tiny_file->getDataPageId());
del_cd = *del_cd_iter;
}
if (auto new_file = buildIndexForFile(column_defines, del_cd, tiny_file, should_proceed); new_file)
new_files.push_back(new_file);
}
new_files.shrink_to_fit();
return new_files;
}

} // namespace DB::DM
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Common/Logger.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileDataProvider_fwd.h>
#include <Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h>
#include <Storages/DeltaMerge/Index/LocalIndexInfo.h>
#include <Storages/DeltaMerge/LocalIndexerScheduler.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool.h>
#include <Storages/DeltaMerge/WriteBatchesImpl.h>


namespace DB::DM
{

using ColumnFileTinys = std::vector<ColumnFileTinyPtr>;

// ColumnFileTinyVectorIndexWriter write vector index store in PageStorage for ColumnFileTiny.
class ColumnFileTinyVectorIndexWriter
{
public:
struct LocalIndexBuildInfo
{
std::vector<LocalIndexerScheduler::FileID> file_ids;
std::vector<IndexID> index_ids;
size_t estimated_memory_bytes = 0;
LocalIndexInfosPtr indexes_to_build;
};

static LocalIndexBuildInfo getLocalIndexBuildInfo(
const LocalIndexInfosSnapshot & index_infos,
const ColumnFilePersistedSetPtr & file_set);

struct Options
{
const StoragePoolPtr storage_pool;
const WriteLimiterPtr write_limiter;
const ColumnFiles & files;
const IColumnFileDataProviderPtr data_provider;
const LocalIndexInfosPtr index_infos;
WriteBatches & wbs; // Write index and modify meta in the same batch.
};

explicit ColumnFileTinyVectorIndexWriter(const Options & options)
: logger(Logger::get())
, options(options)
{}

// Build vector index for all files in `options.files`.
// Only return the files that have been built new indexes.
using ProceedCheckFn = std::function<bool()>; // Return false to stop building index.
ColumnFileTinys build(ProceedCheckFn should_proceed) const;

private:
ColumnFileTinyPtr buildIndexForFile(
const ColumnDefines & column_defines,
const ColumnDefine & del_cd,
const ColumnFileTiny * file,
ProceedCheckFn should_proceed) const;

const LoggerPtr logger;
const Options options;
};

} // namespace DB::DM
16 changes: 16 additions & 0 deletions dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,22 @@ bool ColumnFilePersistedSet::appendPersistedColumnFiles(const ColumnFilePersiste
return true;
}

bool ColumnFilePersistedSet::updatePersistedColumnFiles(
const ColumnFilePersisteds & new_persisted_files,
WriteBatches & wbs)
{
/// Save the new metadata of column files to disk.
serializeColumnFilePersisteds(wbs, metadata_id, new_persisted_files);
wbs.writeMeta();

/// Commit updates in memory.
persisted_files = std::move(new_persisted_files);
updateColumnFileStats();
LOG_DEBUG(log, "{}, after update column files, persisted column files: {}", info(), detailInfo());

return true;
}

MinorCompactionPtr ColumnFilePersistedSet::pickUpMinorCompaction(DMContext & context)
{
// Every time we try to compact all column files.
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ class ColumnFilePersistedSet

bool appendPersistedColumnFiles(const ColumnFilePersisteds & column_files, WriteBatches & wbs);

bool updatePersistedColumnFiles(const ColumnFilePersisteds & new_persisted_files, WriteBatches & wbs);

/// Choose all small column files that can be compacted to larger column files
MinorCompactionPtr pickUpMinorCompaction(DMContext & context);

Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,8 @@ bool DeltaMergeStore::flushCache(const DMContextPtr & dm_context, const RowKeyRa

if (segment->flushCache(*dm_context))
{
// After flush, try to add delta local index.
segmentEnsureDeltaLocalIndexAsync(segment);
break;
}
else if (!try_until_succeed)
Expand Down Expand Up @@ -987,6 +989,8 @@ void DeltaMergeStore::compact(const Context & db_context, const RowKeyRange & ra
// compact could fail.
if (segment->compactDelta(*dm_context))
{
// After compact delta, try to create delta local index.
segmentEnsureDeltaLocalIndexAsync(segment);
break;
}
}
Expand Down Expand Up @@ -1716,6 +1720,8 @@ bool DeltaMergeStore::checkSegmentUpdate(
segment->info(),
magic_enum::enum_name(input_type));
segment->flushCache(*dm_context);
// After flush, try to add delta local index.
segmentEnsureDeltaLocalIndexAsync(segment);
if (input_type == InputType::RaftLog)
{
// Only the segment update is from a raft log write, will we notify KVStore to trigger a foreground flush.
Expand Down
Loading

0 comments on commit a1a3b9b

Please sign in to comment.