Skip to content

Commit

Permalink
pick some from tidbcloud/tiflash-cse#234 to fix tidy
Browse files Browse the repository at this point in the history
Signed-off-by: Lloyd-Pottiger <[email protected]>
  • Loading branch information
Lloyd-Pottiger committed Aug 13, 2024
1 parent 74f2e44 commit e9a6e68
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 174 deletions.
125 changes: 0 additions & 125 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include <Storages/S3/FileCache.h>
#include <Storages/S3/FileCachePerf.h>

#include <algorithm>


namespace DB::ErrorCodes
{
Expand Down Expand Up @@ -130,14 +132,38 @@ Block DMFileWithVectorIndexBlockInputStream::readImpl(FilterPtr & res_filter)
else
res = readByIndexReader();

// Filter the output rows. If no rows need to filter, res_filter is nullptr.
filter.resize(res.rows());
bool all_match = valid_rows_after_search.get(filter, res.startOffset(), res.rows());
if (!res)
return {};

if unlikely (all_match)
res_filter = nullptr;
else
res_filter = &filter;
// Assign output filter according to sorted_results.
//
// For example, if sorted_results is [3, 10], the complete filter array is:
// [0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1]
// And we should only return filter array starting from res.startOffset(), like:
// [0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1]
// ^startOffset ^startOffset+rows
// filter: [0, 0, 0, 0, 0]
//
// We will use startOffset as lowerBound (inclusive), ans startOffset+rows
// as upperBound (exclusive) to find whether this range has a match in sorted_results.

const auto start_offset = res.startOffset();
const auto max_rowid_exclusive = start_offset + res.rows();

filter.clear();
filter.resize_fill(res.rows(), 0);

auto it = std::lower_bound(sorted_results.begin(), sorted_results.end(), start_offset);
while (it != sorted_results.end())
{
auto rowid = *it;
if (rowid >= max_rowid_exclusive)
break;
filter[rowid - start_offset] = 1;
++it;
}

res_filter = &filter;
return res;
}

Expand Down Expand Up @@ -173,12 +199,18 @@ Block DMFileWithVectorIndexBlockInputStream::readByIndexReader()
block.setStartOffset(block_start_row_id);

auto vec_column = vec_cd.type->createColumn();
vec_column->reserve(read_rows);

Stopwatch w;
vec_column_reader->read(vec_column, read_pack_id, read_rows);
duration_read_from_vec_index_seconds += w.elapsedSeconds();

block.insert(ColumnWithTypeAndName{std::move(vec_column), vec_cd.type, vec_cd.name, vec_cd.id});
block.insert(ColumnWithTypeAndName{//
std::move(vec_column),
vec_cd.type,
vec_cd.name,
vec_cd.id});

return block;
}

Expand All @@ -192,16 +224,19 @@ Block DMFileWithVectorIndexBlockInputStream::readByFollowingOtherColumns()
if (!block_others)
return {};

auto read_rows = block_others.rows();

// Using vec_cd.type to construct a Column directly instead of using
// the type from dmfile, so that we don't need extra transforms
// (e.g. wrap with a Nullable). vec_column_reader is compatible with
// both Nullable and NotNullable.
auto vec_column = vec_cd.type->createColumn();
vec_column->reserve(read_rows);

// Then read from vector index for the same pack.
w.restart();

vec_column_reader->read(vec_column, getPackIdFromBlock(block_others), block_others.rows());
vec_column_reader->read(vec_column, getPackIdFromBlock(block_others), read_rows);
duration_read_from_vec_index_seconds += w.elapsedSeconds();

// Re-assemble block using the same layout as header_layout.
Expand Down Expand Up @@ -298,7 +333,8 @@ void DMFileWithVectorIndexBlockInputStream::loadVectorIndex()
PerfContext::file_cache.fg_wait_download_from_s3 > perf_begin.fg_wait_download_from_s3)
has_s3_download = true;

duration_load_index += watch.elapsedSeconds();
auto download_duration = watch.elapsedSeconds();
duration_load_index += download_duration;
}
else
{
Expand Down Expand Up @@ -354,7 +390,10 @@ void DMFileWithVectorIndexBlockInputStream::loadVectorSearchResult()
auto perf_begin = PerfContext::vector_search;

RUNTIME_CHECK(valid_rows.size() >= dmfile->getRows(), valid_rows.size(), dmfile->getRows());
auto results_rowid = vec_index->search(ann_query_info, valid_rows);
sorted_results = vec_index->search(ann_query_info, valid_rows);
std::sort(sorted_results.begin(), sorted_results.end());
// results must not contain duplicates. Usually there should be no duplicates.
sorted_results.erase(std::unique(sorted_results.begin(), sorted_results.end()), sorted_results.end());

auto discarded_nodes = PerfContext::vector_search.discarded_nodes - perf_begin.discarded_nodes;
auto visited_nodes = PerfContext::vector_search.visited_nodes - perf_begin.visited_nodes;
Expand All @@ -364,24 +403,10 @@ void DMFileWithVectorIndexBlockInputStream::loadVectorSearchResult()
scan_context->total_vector_idx_search_discarded_nodes += discarded_nodes;
scan_context->total_vector_idx_search_visited_nodes += visited_nodes;

size_t rows_after_mvcc = valid_rows.count();
size_t rows_after_vector_search = results_rowid.size();

// After searching with the BitmapFilter, we create a bitmap
// to exclude rows that are not in the search result, because these rows
// are produced as [] or NULL, which is not a valid vector for future use.
// The bitmap will be used when returning the output to the caller.
{
valid_rows_after_search = BitmapFilter(valid_rows.size(), false);
for (auto rowid : results_rowid)
valid_rows_after_search.set(rowid, 1, true);
valid_rows_after_search.runOptimize();
}

vec_column_reader = std::make_shared<VectorColumnFromIndexReader>( //
dmfile,
vec_index,
std::move(results_rowid));
sorted_results);

// Vector index is very likely to filter out some packs. For example,
// if we query for Top 1, then only 1 pack will be remained. So we
Expand All @@ -393,7 +418,7 @@ void DMFileWithVectorIndexBlockInputStream::loadVectorSearchResult()
auto & pack_res = reader.pack_filter.getPackRes();

size_t results_it = 0;
const size_t results_it_max = results_rowid.size();
const size_t results_it_max = sorted_results.size();

UInt32 pack_start = 0;

Expand All @@ -406,8 +431,8 @@ void DMFileWithVectorIndexBlockInputStream::loadVectorSearchResult()

UInt32 pack_end = pack_start + pack_stats[pack_id].rows;
while (results_it < results_it_max //
&& results_rowid[results_it] >= pack_start //
&& results_rowid[results_it] < pack_end)
&& sorted_results[results_it] >= pack_start //
&& sorted_results[results_it] < pack_end)
{
pack_has_result = true;
results_it++;
Expand All @@ -428,7 +453,7 @@ void DMFileWithVectorIndexBlockInputStream::loadVectorSearchResult()
log,
"Finished vector search over column dmf_{}/{}(id={}), cost={:.2f}s "
"top_k_[query/visited/discarded/result]={}/{}/{}/{} "
"rows_[file/after_mvcc/after_search]={}/{}/{} "
"rows_[file/after_search]={}/{} "
"pack_[total/before_search/after_search]={}/{}/{}",

dmfile->fileId(),
Expand All @@ -439,11 +464,10 @@ void DMFileWithVectorIndexBlockInputStream::loadVectorSearchResult()
ann_query_info->top_k(),
visited_nodes, // Visited nodes will be larger than query_top_k when there are MVCC rows
discarded_nodes, // How many nodes are skipped by MVCC
results_rowid.size(),
sorted_results.size(),

dmfile->getRows(),
rows_after_mvcc,
rows_after_vector_search,
sorted_results.size(),

pack_stats.size(),
valid_packs_before_search,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ class DMFileWithVectorIndexBlockInputStream : public SkippableBlockInputStream
// Set after load().
VectorColumnFromIndexReaderPtr vec_column_reader = nullptr;
// Set after load(). Used to filter the output rows.
BitmapFilter valid_rows_after_search{0, false};
std::vector<UInt32> sorted_results{}; // Key is rowid
IColumn::Filter filter;

bool loaded = false;
Expand Down
27 changes: 15 additions & 12 deletions dbms/src/Storages/DeltaMerge/File/VectorColumnFromIndexReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Exception.h>
#include <Storages/DeltaMerge/File/VectorColumnFromIndexReader.h>

#include <algorithm>


namespace DB::DM
{
Expand All @@ -31,20 +34,22 @@ std::vector<UInt32> VectorColumnFromIndexReader::calcPackStartRowID(const DMFile
}

MutableColumnPtr VectorColumnFromIndexReader::calcResultsByPack(
std::vector<VectorIndexViewer::Key> && results,
const std::vector<VectorIndexViewer::Key> & sorted_results,
const DMFileMeta::PackStats & pack_stats,
const std::vector<UInt32> & pack_start_rowid)
{
auto column = ColumnArray::create(ColumnUInt32::create());

// results must be in ascending order.
std::sort(results.begin(), results.end());
// results must not contain duplicates. Usually there should be no duplicates.
results.erase(std::unique(results.begin(), results.end()), results.end());
#ifndef NDEBUG
{
const auto sorted = std::is_sorted(sorted_results.begin(), sorted_results.end());
RUNTIME_CHECK(sorted);
}
#endif

std::vector<UInt32> offsets_in_pack;
size_t results_it = 0;
const size_t results_it_max = results.size();
const size_t results_it_max = sorted_results.size();
for (size_t pack_id = 0, pack_id_max = pack_start_rowid.size(); pack_id < pack_id_max; pack_id++)
{
offsets_in_pack.clear();
Expand All @@ -53,14 +58,13 @@ MutableColumnPtr VectorColumnFromIndexReader::calcResultsByPack(
UInt32 pack_end = pack_start + pack_stats[pack_id].rows;

while (results_it < results_it_max //
&& results[results_it] >= pack_start //
&& results[results_it] < pack_end)
&& sorted_results[results_it] >= pack_start //
&& sorted_results[results_it] < pack_end)
{
offsets_in_pack.push_back(results[results_it] - pack_start);
offsets_in_pack.push_back(sorted_results[results_it] - pack_start);
results_it++;
}

// insert <pack_id, [offset0, offset1, ...]>
column->insertData(
reinterpret_cast<const char *>(offsets_in_pack.data()),
offsets_in_pack.size() * sizeof(UInt32));
Expand All @@ -75,7 +79,6 @@ void VectorColumnFromIndexReader::read(MutableColumnPtr & column, size_t start_p
{
std::vector<Float32> value;
const auto * results_by_pack = checkAndGetColumn<ColumnArray>(this->results_by_pack.get());
checkAndGetColumn<ColumnArray>(column.get());

size_t pack_id = start_pack_id;
UInt32 remaining_rows_in_pack = pack_stats[pack_id].rows;
Expand Down Expand Up @@ -111,7 +114,7 @@ void VectorColumnFromIndexReader::read(MutableColumnPtr & column, size_t start_p
}
RUNTIME_CHECK(filled_result_rows == offset_in_pack);

// TODO(vector-index): We could fill multiple rows if rowid is continuous.
// TODO: We could fill multiple rows if rowid is continuous.
VectorIndexViewer::Key rowid = pack_start_rowid[pack_id] + offset_in_pack;
index->get(rowid, value);
column->insertData(reinterpret_cast<const char *>(value.data()), value.size() * sizeof(Float32));
Expand Down
Loading

0 comments on commit e9a6e68

Please sign in to comment.