diff --git a/internal/core/src/exec/expression/Expr.h b/internal/core/src/exec/expression/Expr.h index 195e3777a9ebc..a7c44967a33ea 100644 --- a/internal/core/src/exec/expression/Expr.h +++ b/internal/core/src/exec/expression/Expr.h @@ -363,14 +363,10 @@ class SegmentExpr : public Expr { if (segment_->type() == SegmentType::Sealed) { // first is the raw data, second is valid_data // use valid_data to see if raw data is null - auto data_vec = segment_ - ->get_batch_views( - field_id_, i, data_pos, size) - .first; - auto valid_data = segment_ - ->get_batch_views( - field_id_, i, data_pos, size) - .second; + auto fetched_data = segment_->get_batch_views( + field_id_, i, data_pos, size); + auto data_vec = fetched_data.first; + auto valid_data = fetched_data.second; func(data_vec.data(), valid_data.data(), size, diff --git a/internal/core/src/exec/expression/TermExpr.cpp b/internal/core/src/exec/expression/TermExpr.cpp index 4cf6555cb9d85..0aa6406fceff9 100644 --- a/internal/core/src/exec/expression/TermExpr.cpp +++ b/internal/core/src/exec/expression/TermExpr.cpp @@ -17,6 +17,7 @@ #include "TermExpr.h" #include #include +#include "log/Log.h" #include "query/Utils.h" namespace milvus { namespace exec { @@ -137,9 +138,18 @@ PhyTermFilterExpr::CanSkipSegment() { max = i == 0 ? val : std::max(val, max); min = i == 0 ? val : std::min(val, min); } + auto can_skip = [&]() -> bool { + for (int i = 0; i < num_data_chunk_; ++i) { + if (!skip_index.CanSkipBinaryRange( + field_id_, i, min, max, true, true)) { + return false; + } + } + return true; + }; + // using skip index to help skipping this segment - if (segment_->type() == SegmentType::Sealed && - skip_index.CanSkipBinaryRange(field_id_, 0, min, max, true, true)) { + if (segment_->type() == SegmentType::Sealed && can_skip()) { cached_bits_.resize(active_count_, false); cached_bits_inited_ = true; return true; diff --git a/internal/core/src/mmap/ChunkedColumn.h b/internal/core/src/mmap/ChunkedColumn.h index 31255b60b53a8..93d43bafa6b17 100644 --- a/internal/core/src/mmap/ChunkedColumn.h +++ b/internal/core/src/mmap/ChunkedColumn.h @@ -95,6 +95,14 @@ class ChunkedColumnBase : public ColumnBase { return true; } + bool + IsValid(int64_t chunk_id, int64_t offset) const { + if (nullable_) { + return chunks_[chunk_id]->isValid(offset); + } + return true; + } + bool IsNullable() const { return nullable_; @@ -136,7 +144,7 @@ class ChunkedColumnBase : public ColumnBase { // used for sequential access for search virtual BufferView - GetBatchBuffer(int64_t start_offset, int64_t length) { + GetBatchBuffer(int64_t chunk_id, int64_t start_offset, int64_t length) { PanicInfo(ErrorCode::Unsupported, "GetBatchBuffer only supported for VariableColumn"); } @@ -323,33 +331,17 @@ class ChunkedVariableColumn : public ChunkedColumnBase { } BufferView - GetBatchBuffer(int64_t start_offset, int64_t length) override { - if (start_offset < 0 || start_offset > num_rows_ || - start_offset + length > num_rows_) { - PanicInfo(ErrorCode::OutOfRange, "index out of range"); - } - - int chunk_num = chunks_.size(); - - auto [start_chunk_id, start_offset_in_chunk] = - GetChunkIDByOffset(start_offset); + GetBatchBuffer(int64_t chunk_id, + int64_t start_offset, + int64_t length) override { BufferView buffer_view; - std::vector elements; - for (; start_chunk_id < chunk_num && length > 0; ++start_chunk_id) { - int chunk_size = chunks_[start_chunk_id]->RowNums(); - int len = - std::min(int64_t(chunk_size - start_offset_in_chunk), length); - elements.push_back( - {chunks_[start_chunk_id]->Data(), - std::dynamic_pointer_cast(chunks_[start_chunk_id]) - ->Offsets(), - static_cast(start_offset_in_chunk), - static_cast(start_offset_in_chunk + len)}); - - start_offset_in_chunk = 0; - length -= len; - } + elements.push_back( + {chunks_[chunk_id]->Data(), + std::dynamic_pointer_cast(chunks_[chunk_id]) + ->Offsets(), + static_cast(start_offset), + static_cast(start_offset + length)}); buffer_view.data_ = elements; return buffer_view; diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp index b3719ed38df2b..71e5e4cc69097 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp @@ -776,11 +776,13 @@ ChunkedSegmentSealedImpl::get_chunk_buffer(FieldId field_id, if (field_data->IsNullable()) { valid_data.reserve(length); for (int i = 0; i < length; i++) { - valid_data.push_back(field_data->IsValid(start_offset + i)); + valid_data.push_back( + field_data->IsValid(chunk_id, start_offset + i)); } } - return std::make_pair(field_data->GetBatchBuffer(start_offset, length), - valid_data); + return std::make_pair( + field_data->GetBatchBuffer(chunk_id, start_offset, length), + valid_data); } PanicInfo(ErrorCode::UnexpectedError, "get_chunk_buffer only used for variable column field"); @@ -1227,9 +1229,10 @@ ChunkedSegmentSealedImpl::search_sorted_pk(const PkType& pk, [](const int64_t& elem, const int64_t& value) { return elem < value; }); + auto num_rows_until_chunk = pk_column->GetNumRowsUntilChunk(i); for (; it != src + pk_column->NumRows() && *it == target; ++it) { - auto offset = it - src; + auto offset = it - src + num_rows_until_chunk; if (condition(offset)) { pk_offsets.emplace_back(offset); } @@ -1248,14 +1251,16 @@ ChunkedSegmentSealedImpl::search_sorted_pk(const PkType& pk, auto num_chunk = var_column->num_chunks(); for (int i = 0; i < num_chunk; ++i) { // TODO @xiaocai2333, @sunby: chunk need to record the min/max. + auto num_rows_until_chunk = pk_column->GetNumRowsUntilChunk(i); auto string_chunk = std::dynamic_pointer_cast( var_column->GetChunk(i)); auto offset = string_chunk->binary_search_string(target); for (; offset != -1 && offset < var_column->NumRows() && var_column->RawAt(offset) == target; ++offset) { - if (condition(offset)) { - pk_offsets.emplace_back(offset); + auto segment_offset = offset + num_rows_until_chunk; + if (condition(segment_offset)) { + pk_offsets.emplace_back(segment_offset); } } } diff --git a/internal/core/unittest/test_chunked_segment.cpp b/internal/core/unittest/test_chunked_segment.cpp index d9b34218bdc7b..97cf3fc03be75 100644 --- a/internal/core/unittest/test_chunked_segment.cpp +++ b/internal/core/unittest/test_chunked_segment.cpp @@ -12,13 +12,25 @@ #include #include #include +#include "arrow/table_builder.h" +#include "arrow/type_fwd.h" #include "common/BitsetView.h" +#include "common/Consts.h" +#include "common/FieldDataInterface.h" #include "common/QueryInfo.h" #include "common/Schema.h" +#include "common/Types.h" +#include "expr/ITypeExpr.h" #include "knowhere/comp/index_param.h" #include "mmap/ChunkedColumn.h" +#include "mmap/Types.h" +#include "query/ExecPlanNodeVisitor.h" #include "query/SearchOnSealed.h" +#include "segcore/SegcoreConfig.h" +#include "segcore/SegmentSealedImpl.h" #include "test_utils/DataGen.h" +#include +#include #include struct DeferRelease { @@ -135,3 +147,115 @@ TEST(test_chunk_segment, TestSearchOnSealed) { ASSERT_TRUE(offsets.find(i) != offsets.end()); } } + +TEST(test_chunk_segment, TestTermExpr) { + auto schema = std::make_shared(); + auto int64_fid = schema->AddDebugField("int64", DataType::INT64, true); + auto pk_fid = schema->AddDebugField("pk", DataType::INT64, true); + schema->AddField(FieldName("ts"), TimestampFieldID, DataType::INT64, true); + schema->set_primary_field_id(pk_fid); + auto segment = + segcore::CreateSealedSegment(schema, + nullptr, + -1, + segcore::SegcoreConfig::default_config(), + false, + false, + true); + size_t test_data_count = 1000; + + auto arrow_i64_field = arrow::field("int64", arrow::int64()); + auto arrow_pk_field = arrow::field("pk", arrow::int64()); + auto arrow_ts_field = arrow::field("ts", arrow::int64()); + std::vector> arrow_fields = { + arrow_i64_field, arrow_pk_field, arrow_ts_field}; + + std::vector field_ids = {int64_fid, pk_fid, TimestampFieldID}; + + int start_id = 1; + int chunk_num = 2; + + std::vector field_infos; + for (auto fid : field_ids) { + FieldDataInfo field_info; + field_info.field_id = fid.get(); + field_info.row_count = test_data_count * chunk_num; + field_infos.push_back(field_info); + } + + // generate data + for (int chunk_id = 0; chunk_id < chunk_num; + chunk_id++, start_id += test_data_count) { + std::vector test_data(test_data_count); + std::iota(test_data.begin(), test_data.end(), start_id); + + auto builder = std::make_shared(); + auto status = builder->AppendValues(test_data.begin(), test_data.end()); + ASSERT_TRUE(status.ok()); + auto res = builder->Finish(); + ASSERT_TRUE(res.ok()); + std::shared_ptr arrow_int64; + arrow_int64 = res.ValueOrDie(); + + for (int i = 0; i < arrow_fields.size(); i++) { + auto f = arrow_fields[i]; + auto fid = field_ids[i]; + auto arrow_schema = + std::make_shared(arrow::FieldVector(1, f)); + auto record_batch = arrow::RecordBatch::Make( + arrow_schema, arrow_int64->length(), {arrow_int64}); + + auto res2 = arrow::RecordBatchReader::Make({record_batch}); + ASSERT_TRUE(res2.ok()); + auto arrow_reader = res2.ValueOrDie(); + + field_infos[i].arrow_reader_channel->push( + std::make_shared( + arrow_reader, nullptr, nullptr)); + } + } + + // load + for (int i = 0; i < field_infos.size(); i++) { + field_infos[i].arrow_reader_channel->close(); + segment->LoadFieldData(field_ids[i], field_infos[i]); + } + + // query int64 expr + std::vector filter_data; + for (int i = 1; i <= 10; ++i) { + proto::plan::GenericValue v; + v.set_int64_val(i); + filter_data.push_back(v); + } + auto term_filter_expr = std::make_shared( + expr::ColumnInfo(int64_fid, DataType::INT64), filter_data); + BitsetType final; + auto plan = std::make_shared(DEFAULT_PLANNODE_ID, + term_filter_expr); + final = query::ExecuteQueryExpr( + plan, segment.get(), chunk_num * test_data_count, MAX_TIMESTAMP); + ASSERT_EQ(10, final.count()); + + // query pk expr + auto pk_term_filter_expr = std::make_shared( + expr::ColumnInfo(pk_fid, DataType::INT64), filter_data); + plan = std::make_shared(DEFAULT_PLANNODE_ID, + pk_term_filter_expr); + final = query::ExecuteQueryExpr( + plan, segment.get(), chunk_num * test_data_count, MAX_TIMESTAMP); + ASSERT_EQ(10, final.count()); + + // query pk in second chunk + std::vector filter_data2; + proto::plan::GenericValue v; + v.set_int64_val(test_data_count + 1); + filter_data2.push_back(v); + pk_term_filter_expr = std::make_shared( + expr::ColumnInfo(pk_fid, DataType::INT64), filter_data2); + plan = std::make_shared(DEFAULT_PLANNODE_ID, + pk_term_filter_expr); + final = query::ExecuteQueryExpr( + plan, segment.get(), chunk_num * test_data_count, MAX_TIMESTAMP); + ASSERT_EQ(1, final.count()); +}