Skip to content

Commit

Permalink
storage: Support vector index and ANN hint (#156)
Browse files Browse the repository at this point in the history
Signed-off-by: Wish <[email protected]>
  • Loading branch information
breezewish authored and JaySon-Huang committed Aug 6, 2024
1 parent dc1eb52 commit 1afddb3
Show file tree
Hide file tree
Showing 62 changed files with 5,598 additions and 44 deletions.
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,6 @@
[submodule "contrib/qpl"]
path = contrib/qpl
url = https://github.com/intel/qpl.git
[submodule "contrib/usearch"]
path = contrib/usearch
url = https://github.com/unum-cloud/usearch.git
2 changes: 2 additions & 0 deletions contrib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,5 @@ endif ()
add_subdirectory(magic_enum)

add_subdirectory(aws-cmake)

add_subdirectory(usearch-cmake)
11 changes: 11 additions & 0 deletions contrib/usearch-cmake/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
set(USEARCH_PROJECT_DIR "${TiFlash_SOURCE_DIR}/contrib/usearch")
set(USEARCH_SOURCE_DIR "${USEARCH_PROJECT_DIR}/include")

add_library(_usearch INTERFACE)

target_include_directories(_usearch SYSTEM INTERFACE
${USEARCH_PROJECT_DIR}/simsimd/include
${USEARCH_PROJECT_DIR}/fp16/include
${USEARCH_SOURCE_DIR})

add_library(tiflash_contrib::usearch ALIAS _usearch)
1 change: 1 addition & 0 deletions dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ target_link_libraries (dbms
${OPENSSL_CRYPTO_LIBRARY}
${BTRIE_LIBRARIES}
absl::synchronization
tiflash_contrib::usearch
tiflash_contrib::aws_s3

etcdpb
Expand Down
10 changes: 5 additions & 5 deletions dbms/src/Columns/ColumnArray.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,16 +165,16 @@ class ColumnArray final : public COWPtrHelper<IColumn, ColumnArray>

size_t encodeIntoDatumData(size_t element_idx, WriteBuffer & writer) const;

private:
ColumnPtr data;
ColumnPtr offsets;

size_t ALWAYS_INLINE offsetAt(size_t i) const { return i == 0 ? 0 : getOffsets()[i - 1]; }
size_t ALWAYS_INLINE sizeAt(size_t i) const
{
return i == 0 ? getOffsets()[0] : (getOffsets()[i] - getOffsets()[i - 1]);
}

private:
ColumnPtr data;
ColumnPtr offsets;

size_t ALWAYS_INLINE offsetAt(size_t i) const { return i == 0 ? 0 : getOffsets()[i - 1]; }

/// Multiply values if the nested column is ColumnVector<T>.
template <typename T>
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Debug/MockStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(
auto scan_column_infos = mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]);
query_info.dag_query = std::make_unique<DAGQueryInfo>(
filter_conditions->conditions,
tipb::ANNQueryInfo(),
empty_pushed_down_filters, // Not care now
scan_column_infos,
runtime_filter_ids,
Expand Down Expand Up @@ -230,6 +231,7 @@ BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(
auto scan_column_infos = mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]);
query_info.dag_query = std::make_unique<DAGQueryInfo>(
empty_filters,
tipb::ANNQueryInfo(),
empty_pushed_down_filters, // Not care now
scan_column_infos,
runtime_filter_ids,
Expand Down Expand Up @@ -261,6 +263,7 @@ void MockStorage::buildExecFromDeltaMerge(
auto scan_column_infos = mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]);
query_info.dag_query = std::make_unique<DAGQueryInfo>(
filter_conditions->conditions,
tipb::ANNQueryInfo(),
empty_pushed_down_filters, // Not care now
scan_column_infos,
runtime_filter_ids,
Expand Down Expand Up @@ -294,6 +297,7 @@ void MockStorage::buildExecFromDeltaMerge(
auto scan_column_infos = mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]);
query_info.dag_query = std::make_unique<DAGQueryInfo>(
empty_filters,
tipb::ANNQueryInfo(),
empty_pushed_down_filters, // Not care now
scan_column_infos,
runtime_filter_ids,
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGQueryInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <Interpreters/TimezoneInfo.h>
#include <Storages/KVStore/Decode/DecodingStorageSchemaSnapshot.h>
#include <google/protobuf/repeated_ptr_field.h>
#include <tipb/executor.pb.h>
#include <tipb/expression.pb.h>


Expand All @@ -28,13 +29,15 @@ struct DAGQueryInfo
{
DAGQueryInfo(
const google::protobuf::RepeatedPtrField<tipb::Expr> & filters_,
const tipb::ANNQueryInfo & ann_query_info_,
const google::protobuf::RepeatedPtrField<tipb::Expr> & pushed_down_filters_,
const ColumnInfos & source_columns_,
const std::vector<int> & runtime_filter_ids_,
const int rf_max_wait_time_ms_,
const TimezoneInfo & timezone_info_)
: source_columns(source_columns_)
, filters(filters_)
, ann_query_info(ann_query_info_)
, pushed_down_filters(pushed_down_filters_)
, runtime_filter_ids(runtime_filter_ids_)
, rf_max_wait_time_ms(rf_max_wait_time_ms_)
Expand All @@ -44,6 +47,7 @@ struct DAGQueryInfo
const ColumnInfos & source_columns;
// filters in dag request
const google::protobuf::RepeatedPtrField<tipb::Expr> & filters;
const tipb::ANNQueryInfo & ann_query_info;
// filters have been push down to storage engine in dag request
const google::protobuf::RepeatedPtrField<tipb::Expr> & pushed_down_filters;

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,7 @@ std::unordered_map<TableID, SelectQueryInfo> DAGStorageInterpreter::generateSele
query_info.query = dagContext().dummy_ast;
query_info.dag_query = std::make_unique<DAGQueryInfo>(
filter_conditions.conditions,
table_scan.getANNQueryInfo(),
table_scan.getPushedDownFilters(),
table_scan.getColumns(),
table_scan.getRuntimeFilterIDs(),
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Flash/Coprocessor/TiDBTableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ TiDBTableScan::TiDBTableScan(
, pushed_down_filters(
is_partition_table_scan ? std::move(table_scan->partition_table_scan().pushed_down_filter_conditions())
: std::move(table_scan->tbl_scan().pushed_down_filter_conditions()))
, ann_query_info(
is_partition_table_scan ? std::move(table_scan->partition_table_scan().ann_query())
: std::move(table_scan->tbl_scan().ann_query()))
// Only No-partition table need keep order when tablescan executor required keep order.
// If keep_order is not set, keep order for safety.
, keep_order(
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Flash/Coprocessor/TiDBTableScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class TiDBTableScan

const google::protobuf::RepeatedPtrField<tipb::Expr> & getPushedDownFilters() const { return pushed_down_filters; }

const tipb::ANNQueryInfo & getANNQueryInfo() const { return ann_query_info; }

private:
const tipb::Executor * table_scan;
String executor_id;
Expand All @@ -65,6 +67,8 @@ class TiDBTableScan
/// They will be executed on Storage layer.
const google::protobuf::RepeatedPtrField<tipb::Expr> pushed_down_filters;

const tipb::ANNQueryInfo ann_query_info;

bool keep_order;
bool is_fast_scan;
std::vector<Int32> runtime_filter_ids;
Expand Down
24 changes: 24 additions & 0 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
#include <Storages/DeltaMerge/ColumnFile/ColumnFileSchema.h>
#include <Storages/DeltaMerge/DeltaIndexManager.h>
#include <Storages/DeltaMerge/Index/MinMaxIndex.h>
#include <Storages/DeltaMerge/Index/VectorIndex.h>
#include <Storages/DeltaMerge/StoragePool/GlobalPageIdAllocator.h>
#include <Storages/DeltaMerge/StoragePool/GlobalStoragePool.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool.h>
Expand Down Expand Up @@ -151,6 +152,7 @@ struct ContextShared
mutable DBGInvoker dbg_invoker; /// Execute inner functions, debug only.
mutable MarkCachePtr mark_cache; /// Cache of marks in compressed files.
mutable DM::MinMaxIndexCachePtr minmax_index_cache; /// Cache of minmax index in compressed files.
mutable DM::VectorIndexCachePtr vector_index_cache;
mutable DM::DeltaIndexManagerPtr delta_index_manager; /// Manage the Delta Indies of Segments.
ProcessList process_list; /// Executing queries at the moment.
ViewDependencies view_dependencies; /// Current dependencies
Expand Down Expand Up @@ -1406,6 +1408,28 @@ void Context::dropMinMaxIndexCache() const
shared->minmax_index_cache->reset();
}

void Context::setVectorIndexCache(size_t cache_size_in_bytes)
{
auto lock = getLock();

RUNTIME_CHECK(!shared->vector_index_cache);

shared->vector_index_cache = std::make_shared<DM::VectorIndexCache>(cache_size_in_bytes);
}

DM::VectorIndexCachePtr Context::getVectorIndexCache() const
{
auto lock = getLock();
return shared->vector_index_cache;
}

void Context::dropVectorIndexCache() const
{
auto lock = getLock();
if (shared->vector_index_cache)
shared->vector_index_cache->reset();
}

bool Context::isDeltaIndexLimited() const
{
// Don't need to use a lock here, as delta_index_manager should be set at starting up.
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ enum class PageStorageRunMode : UInt8;
namespace DM
{
class MinMaxIndexCache;
class VectorIndexCache;
class DeltaIndexManager;
class GlobalStoragePool;
class SharedBlockSchemas;
Expand Down Expand Up @@ -397,6 +398,10 @@ class Context
std::shared_ptr<DM::MinMaxIndexCache> getMinMaxIndexCache() const;
void dropMinMaxIndexCache() const;

void setVectorIndexCache(size_t cache_size_in_bytes);
std::shared_ptr<DM::VectorIndexCache> getVectorIndexCache() const;
void dropVectorIndexCache() const;

bool isDeltaIndexLimited() const;
void setDeltaIndexManager(size_t cache_size_in_bytes);
std::shared_ptr<DM::DeltaIndexManager> getDeltaIndexManager() const;
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1428,6 +1428,11 @@ int Server::main(const std::vector<std::string> & /*args*/)
if (minmax_index_cache_size)
global_context->setMinMaxIndexCache(minmax_index_cache_size);

// 1GiB vector index cache.
size_t vec_index_cache_size = config().getUInt64("vec_index_cache_size", 1ULL * 1024 * 1024 * 1024);
if (vec_index_cache_size)
global_context->setVectorIndexCache(vec_index_cache_size);

/// Size of max memory usage of DeltaIndex, used by DeltaMerge engine.
/// - In non-disaggregated mode, its default value is 0, means unlimited, and it
/// controls the number of total bytes keep in the memory.
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ void BitmapFilter::set(const UInt32 * data, UInt32 size, const FilterPtr & f)
}
}

void BitmapFilter::set(UInt32 start, UInt32 limit)
void BitmapFilter::set(UInt32 start, UInt32 limit, bool value)
{
RUNTIME_CHECK(start + limit <= filter.size(), start, limit, filter.size());
std::fill(filter.begin() + start, filter.begin() + start + limit, true);
std::fill(filter.begin() + start, filter.begin() + start + limit, value);
}

bool BitmapFilter::get(IColumn::Filter & f, UInt32 start, UInt32 limit) const
Expand Down Expand Up @@ -127,4 +127,4 @@ size_t BitmapFilter::count() const
{
return std::count(filter.cbegin(), filter.cend(), true);
}
} // namespace DB::DM
} // namespace DB::DM
10 changes: 8 additions & 2 deletions dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,27 @@ class BitmapFilter
void set(BlockInputStreamPtr & stream);
void set(const ColumnPtr & col, const FilterPtr & f);
void set(const UInt32 * data, UInt32 size, const FilterPtr & f);
void set(UInt32 start, UInt32 limit);
void set(UInt32 start, UInt32 limit, bool value = true);
// If return true, all data is match and do not fill the filter.
bool get(IColumn::Filter & f, UInt32 start, UInt32 limit) const;
inline bool get(UInt32 n) const
{
RUNTIME_CHECK(n < filter.size(), n, filter.size());
return filter[n];
}
// filter[start, limit] & f -> f
void rangeAnd(IColumn::Filter & f, UInt32 start, UInt32 limit) const;

void runOptimize();

String toDebugString() const;
size_t count() const;
inline size_t size() const { return filter.size(); }

private:
std::vector<bool> filter;
bool all_match;
};

using BitmapFilterPtr = std::shared_ptr<BitmapFilter>;
} // namespace DB::DM
} // namespace DB::DM
Original file line number Diff line number Diff line change
Expand Up @@ -37,34 +37,85 @@ BitmapFilterBlockInputStream::BitmapFilterBlockInputStream(

Block BitmapFilterBlockInputStream::readImpl(FilterPtr & res_filter, bool return_filter)
{
auto [block, from_delta] = readBlock(stable, delta);
if (return_filter)
return readImpl(res_filter);

// The caller want a filtered resut, so let's filter by ourselves.

FilterPtr block_filter;
auto block = readImpl(block_filter);
if (!block)
return {};

// all rows in block are not filtered out, simply do nothing.
if (!block_filter)
return block;

// some rows should be filtered according to `block_filter`:
size_t passed_count = countBytesInFilter(*block_filter);
for (auto & col : block)
{
col.column = col.column->filter(*block_filter, passed_count);
}
return block;
}

Block BitmapFilterBlockInputStream::readImpl(FilterPtr & res_filter)
{
FilterPtr block_filter = nullptr;
auto [block, from_delta] = readBlockWithReturnFilter(stable, delta, block_filter);

if (block)
{
if (from_delta)
{
block.setStartOffset(block.startOffset() + stable_rows);
}

String block_filter_value;
if (block_filter)
{
for (size_t i = 0; i < block_filter->size(); ++i)
{
block_filter_value += (*block_filter)[i] ? "1" : "0";
}
}

filter.resize(block.rows());
bool all_match = bitmap_filter->get(filter, block.startOffset(), block.rows());
if (!all_match)

if (!block_filter)
{
if (return_filter)
{
if (all_match)
res_filter = nullptr;
else
res_filter = &filter;
}
else
{
RUNTIME_CHECK(filter.size() >= block_filter->size());

if (!all_match)
{
// We have a `block_filter`, and have a bitmap filter in `filter`.
// filter ← filter & block_filter.
std::transform( //
filter.begin(),
filter.end(),
block_filter->begin(),
filter.begin(),
[](UInt8 a, UInt8 b) { return static_cast<UInt8>(a && b); });
}
else
{
size_t passed_count = countBytesInFilter(filter);
for (auto & col : block)
{
col.column = col.column->filter(filter, passed_count);
}
// We only have a `block_filter`.
// filter ← block_filter.
std::copy( //
block_filter->begin(),
block_filter->end(),
filter.begin());
}
}
else
{
res_filter = nullptr;
res_filter = &filter;
}
}
return block;
Expand Down
Loading

0 comments on commit 1afddb3

Please sign in to comment.