diff --git a/internal/core/CMakeLists.txt b/internal/core/CMakeLists.txt index a7a835f4627d2..fcfdd776f11f1 100644 --- a/internal/core/CMakeLists.txt +++ b/internal/core/CMakeLists.txt @@ -283,6 +283,12 @@ add_custom_target( Clean-All COMMAND ${CMAKE_BUILD_TOOL} clean ) # **************************** Install **************************** +# Install arrow c abi +install(DIRECTORY ${CONAN_INCLUDE_DIRS_ARROW}/arrow/ + DESTINATION include/arrow + FILES_MATCHING PATTERN "*.h" +) + # Install storage install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/src/storage/ DESTINATION include/storage diff --git a/internal/core/src/common/type_c.h b/internal/core/src/common/type_c.h index 77bc563698933..9113e8845601d 100644 --- a/internal/core/src/common/type_c.h +++ b/internal/core/src/common/type_c.h @@ -126,6 +126,7 @@ typedef struct CNewSegmentResult { CStatus status; CSegmentInterface segmentPtr; } CNewSegmentResult; + #ifdef __cplusplus } diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp index 1f6a1aa1e409e..572120b1c0c0e 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp @@ -1929,6 +1929,52 @@ ChunkedSegmentSealedImpl::Delete(int64_t reserved_offset, // deprecated return SegcoreError::success(); } +SegcoreError +ChunkedSegmentSealedImpl::Delete(arrow::Array* arr_pks, + const Timestamp* timestamps_raw) { + auto size = arr_pks->length(); + auto field_id = schema_->get_primary_field_id().value_or(FieldId(-1)); + AssertInfo(field_id.get() != -1, "Primary key not found in schema"); + auto& field_meta = schema_->operator[](field_id); + std::vector pks(arr_pks->length()); + ParsePksFromArray(pks, field_meta.get_data_type(), arr_pks); + + // filter out the deletions that the primary key not exists + std::vector> ordering(size); + for (int i = 0; i < size; i++) { + ordering[i] = std::make_tuple(timestamps_raw[i], pks[i]); + } + // if insert_record_ is empty (may be only-load meta but not data for lru-cache at go side), + // filtering may cause the deletion lost, skip the filtering to avoid it. + if (!insert_record_.empty_pks()) { + auto end = std::remove_if( + ordering.begin(), + ordering.end(), + [&](const std::tuple& record) { + return !insert_record_.contain(std::get<1>(record)); + }); + size = end - ordering.begin(); + ordering.resize(size); + } + if (size == 0) { + return SegcoreError::success(); + } + + // step 1: sort timestamp + std::sort(ordering.begin(), ordering.end()); + std::vector sort_pks(size); + std::vector sort_timestamps(size); + + for (int i = 0; i < size; i++) { + auto [t, pk] = ordering[i]; + sort_timestamps[i] = t; + sort_pks[i] = pk; + } + + deleted_record_.StreamPush(sort_pks, sort_timestamps.data()); + return SegcoreError::success(); +} + std::string ChunkedSegmentSealedImpl::debug() const { std::string log_str; diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.h b/internal/core/src/segcore/ChunkedSegmentSealedImpl.h index 0808857bde161..0544fe84088de 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.h +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.h @@ -180,6 +180,9 @@ class ChunkedSegmentSealedImpl : public SegmentSealed { const IdArray* pks, const Timestamp* timestamps) override; + SegcoreError + Delete(arrow::Array* pks, const Timestamp* timestamps) override; + std::pair, bool> find_first(int64_t limit, const BitsetType& bitset) const override; diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index 0ab4825d0e10e..79e9afc794c17 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -332,6 +332,52 @@ SegmentGrowingImpl::Delete(int64_t reserved_begin, return SegcoreError::success(); } +SegcoreError +SegmentGrowingImpl::Delete(arrow::Array* arr_pks, + const Timestamp* timestamps_raw) { + auto size = arr_pks->length(); + auto field_id = schema_->get_primary_field_id().value_or(FieldId(-1)); + AssertInfo(field_id.get() != -1, "Primary key not found in schema"); + auto& field_meta = schema_->operator[](field_id); + std::vector pks(arr_pks->length()); + ParsePksFromArray(pks, field_meta.get_data_type(), arr_pks); + + // filter out the deletions that the primary key not exists + std::vector> ordering(size); + for (int i = 0; i < size; i++) { + ordering[i] = std::make_tuple(timestamps_raw[i], pks[i]); + } + // if insert_record_ is empty (may be only-load meta but not data for lru-cache at go side), + // filtering may cause the deletion lost, skip the filtering to avoid it. + if (!insert_record_.empty_pks()) { + auto end = std::remove_if( + ordering.begin(), + ordering.end(), + [&](const std::tuple& record) { + return !insert_record_.contain(std::get<1>(record)); + }); + size = end - ordering.begin(); + ordering.resize(size); + } + if (size == 0) { + return SegcoreError::success(); + } + + // step 1: sort timestamp + std::sort(ordering.begin(), ordering.end()); + std::vector sort_pks(size); + std::vector sort_timestamps(size); + + for (int i = 0; i < size; i++) { + auto [t, pk] = ordering[i]; + sort_timestamps[i] = t; + sort_pks[i] = pk; + } + + deleted_record_.StreamPush(sort_pks, sort_timestamps.data()); + return SegcoreError::success(); +} + void SegmentGrowingImpl::LoadDeletedRecord(const LoadDeletedRecordInfo& info) { AssertInfo(info.row_count > 0, "The row count of deleted record is 0"); diff --git a/internal/core/src/segcore/SegmentGrowingImpl.h b/internal/core/src/segcore/SegmentGrowingImpl.h index 23fc4ff60f184..e3b8cd23f5390 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.h +++ b/internal/core/src/segcore/SegmentGrowingImpl.h @@ -59,6 +59,9 @@ class SegmentGrowingImpl : public SegmentGrowing { const IdArray* pks, const Timestamp* timestamps) override; + SegcoreError + Delete(arrow::Array* pks, const Timestamp* timestamps) override; + void LoadDeletedRecord(const LoadDeletedRecordInfo& info) override; diff --git a/internal/core/src/segcore/SegmentInterface.h b/internal/core/src/segcore/SegmentInterface.h index 90e34ce78a7b2..ffb0c00bb3f9c 100644 --- a/internal/core/src/segcore/SegmentInterface.h +++ b/internal/core/src/segcore/SegmentInterface.h @@ -20,6 +20,7 @@ #include #include "FieldIndexing.h" +#include "arrow/type_fwd.h" #include "common/Common.h" #include "common/Schema.h" #include "common/Span.h" @@ -110,6 +111,9 @@ class SegmentInterface { const IdArray* pks, const Timestamp* timestamps) = 0; + virtual SegcoreError + Delete(arrow::Array* pks, const Timestamp* timestamps) = 0; + virtual void LoadDeletedRecord(const LoadDeletedRecordInfo& info) = 0; diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index 3abede7800536..8f05d1d813243 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -1782,6 +1782,52 @@ SegmentSealedImpl::Delete(int64_t reserved_offset, // deprecated return SegcoreError::success(); } +SegcoreError +SegmentSealedImpl::Delete(arrow::Array* arr_pks, + const Timestamp* timestamps_raw) { + auto size = arr_pks->length(); + auto field_id = schema_->get_primary_field_id().value_or(FieldId(-1)); + AssertInfo(field_id.get() != -1, "Primary key not found in schema"); + auto& field_meta = schema_->operator[](field_id); + std::vector pks(arr_pks->length()); + ParsePksFromArray(pks, field_meta.get_data_type(), arr_pks); + + // filter out the deletions that the primary key not exists + std::vector> ordering(size); + for (int i = 0; i < size; i++) { + ordering[i] = std::make_tuple(timestamps_raw[i], pks[i]); + } + // if insert_record_ is empty (may be only-load meta but not data for lru-cache at go side), + // filtering may cause the deletion lost, skip the filtering to avoid it. + if (!insert_record_.empty_pks()) { + auto end = std::remove_if( + ordering.begin(), + ordering.end(), + [&](const std::tuple& record) { + return !insert_record_.contain(std::get<1>(record)); + }); + size = end - ordering.begin(); + ordering.resize(size); + } + if (size == 0) { + return SegcoreError::success(); + } + + // step 1: sort timestamp + std::sort(ordering.begin(), ordering.end()); + std::vector sort_pks(size); + std::vector sort_timestamps(size); + + for (int i = 0; i < size; i++) { + auto [t, pk] = ordering[i]; + sort_timestamps[i] = t; + sort_pks[i] = pk; + } + + deleted_record_.StreamPush(sort_pks, sort_timestamps.data()); + return SegcoreError::success(); +} + std::string SegmentSealedImpl::debug() const { std::string log_str; diff --git a/internal/core/src/segcore/SegmentSealedImpl.h b/internal/core/src/segcore/SegmentSealedImpl.h index ab445bea56f15..64f7c74c24cb1 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.h +++ b/internal/core/src/segcore/SegmentSealedImpl.h @@ -193,6 +193,9 @@ class SegmentSealedImpl : public SegmentSealed { const IdArray* pks, const Timestamp* timestamps) override; + SegcoreError + Delete(arrow::Array* pks, const Timestamp* timestamps) override; + std::pair, bool> find_first(int64_t limit, const BitsetType& bitset) const override; diff --git a/internal/core/src/segcore/Utils.cpp b/internal/core/src/segcore/Utils.cpp index 30b01caa86a4d..cb96c65f688a4 100644 --- a/internal/core/src/segcore/Utils.cpp +++ b/internal/core/src/segcore/Utils.cpp @@ -17,6 +17,7 @@ #include #include +#include "arrow/type_fwd.h" #include "common/Common.h" #include "common/FieldData.h" #include "common/Types.h" @@ -107,6 +108,32 @@ ParsePksFromIDs(std::vector& pks, } } +void +ParsePksFromArray(std::vector& pks, + DataType data_type, + arrow::Array* arr) { + switch (data_type) { + case DataType::INT64: { + auto int_arr = static_cast(arr); + for (int i = 0; i < int_arr->length(); i++) { + pks.at(i) = int_arr->Value(i); + } + break; + } + case DataType::VARCHAR: { + auto str_arr = static_cast(arr); + for (int i = 0; i < str_arr->length(); i++) { + pks.at(i) = str_arr->GetString(i); + } + break; + } + default: { + PanicInfo(DataTypeInvalid, + fmt::format("unsupported PK {}", data_type)); + } + } +} + int64_t GetSizeOfIdArray(const IdArray& data) { if (data.has_int_id()) { diff --git a/internal/core/src/segcore/Utils.h b/internal/core/src/segcore/Utils.h index a946fae20bec2..aedbb35e955f5 100644 --- a/internal/core/src/segcore/Utils.h +++ b/internal/core/src/segcore/Utils.h @@ -44,6 +44,11 @@ ParsePksFromIDs(std::vector& pks, DataType data_type, const IdArray& data); +void +ParsePksFromArray(std::vector& pks, + DataType data_type, + arrow::Array* arr); + int64_t GetSizeOfIdArray(const IdArray& data); diff --git a/internal/core/src/segcore/segment_c.cpp b/internal/core/src/segcore/segment_c.cpp index fd7180d1ef184..33c610f7fa2cc 100644 --- a/internal/core/src/segcore/segment_c.cpp +++ b/internal/core/src/segcore/segment_c.cpp @@ -14,6 +14,11 @@ #include #include +#include "arrow/api.h" +#include "arrow/array/array_binary.h" +#include "arrow/c/bridge.h" +#include "arrow/array/array_base.h" +#include "common/EasyAssert.h" #include "pb/cgo_msg.pb.h" #include "pb/index_cgo_msg.pb.h" @@ -332,6 +337,26 @@ Delete(CSegmentInterface c_segment, } } +CStatus +DeleteArray(CSegmentInterface c_segment, + CArrowArray pk_array, + CArrowSchema pk_schema, + const uint64_t* timestamps) { + auto segment = static_cast(c_segment); + AssertInfo(pk_array, "pk array cannot be null"); + AssertInfo(pk_schema, "pk schema cannot be null"); + auto result = arrow::ImportArray(pk_array, pk_schema); + AssertInfo(result.ok(), "failed to convert pk array with arrow bridge"); + // get internal array ptr, discard the ownership of shared_ptr + auto arr = result->get(); + try { + auto res = segment->Delete(arr, timestamps); + return milvus::SuccessCStatus(); + } catch (std::exception& e) { + return milvus::FailureCStatus(&e); + } +} + ////////////////////////////// interfaces for sealed segment ////////////////////////////// CStatus LoadFieldData(CSegmentInterface c_segment, diff --git a/internal/core/src/segcore/segment_c.h b/internal/core/src/segcore/segment_c.h index 80bb099886405..9fc90d3a41a5a 100644 --- a/internal/core/src/segcore/segment_c.h +++ b/internal/core/src/segcore/segment_c.h @@ -20,6 +20,7 @@ extern "C" { #include #include "common/type_c.h" +#include "arrow/c/abi.h" #include "futures/future_c.h" #include "segcore/plan_c.h" #include "segcore/load_index_c.h" @@ -27,6 +28,8 @@ extern "C" { typedef void* CSearchResult; typedef CProto CRetrieveResult; +typedef struct ArrowSchema* CArrowSchema; +typedef struct ArrowArray* CArrowArray; ////////////////////////////// common interfaces ////////////////////////////// CStatus @@ -158,6 +161,12 @@ Delete(CSegmentInterface c_segment, const uint64_t ids_size, const uint64_t* timestamps); +CStatus +DeleteArray(CSegmentInterface c_segment, + CArrowArray pk_array, + CArrowSchema pk_schema, + const uint64_t* timestamps); + void RemoveFieldFile(CSegmentInterface c_segment, int64_t field_id); diff --git a/internal/storage/primary_keys.go b/internal/storage/primary_keys.go index 58666356ed411..3cbc3f7758d90 100644 --- a/internal/storage/primary_keys.go +++ b/internal/storage/primary_keys.go @@ -71,6 +71,10 @@ func (pks *Int64PrimaryKeys) Get(idx int) PrimaryKey { return NewInt64PrimaryKey(pks.values[idx]) } +func (pks *Int64PrimaryKeys) GetValues() []int64 { + return pks.values +} + func (pks *Int64PrimaryKeys) Type() schemapb.DataType { return schemapb.DataType_Int64 } @@ -135,6 +139,10 @@ func (pks *VarcharPrimaryKeys) Get(idx int) PrimaryKey { return NewVarCharPrimaryKey(pks.values[idx]) } +func (pks *VarcharPrimaryKeys) GetValues() []string { + return pks.values +} + func (pks *VarcharPrimaryKeys) Type() schemapb.DataType { return schemapb.DataType_VarChar } diff --git a/internal/util/segcore/segment.go b/internal/util/segcore/segment.go index 7147aba470b66..a7dba764eac8a 100644 --- a/internal/util/segcore/segment.go +++ b/internal/util/segcore/segment.go @@ -17,10 +17,15 @@ import ( "runtime" "unsafe" + "github.com/apache/arrow/go/v12/arrow" + "github.com/apache/arrow/go/v12/arrow/array" + "github.com/apache/arrow/go/v12/arrow/cdata" + "github.com/apache/arrow/go/v12/arrow/memory" "github.com/cockroachdb/errors" "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/cgo" "github.com/milvus-io/milvus/pkg/util/merr" @@ -233,25 +238,39 @@ func (s *cSegmentImpl) preInsert(numOfRecords int) (int64, error) { // Delete deletes entities from the segment. func (s *cSegmentImpl) Delete(ctx context.Context, request *DeleteRequest) (*DeleteResult, error) { - cOffset := C.int64_t(0) // depre - - cSize := C.int64_t(request.PrimaryKeys.Len()) cTimestampsPtr := (*C.uint64_t)(&(request.Timestamps)[0]) - - ids, err := storage.ParsePrimaryKeysBatch2IDs(request.PrimaryKeys) - if err != nil { - return nil, err + primaryKeys := request.PrimaryKeys + var pkArr arrow.Array + + pkType := primaryKeys.Type() + switch pkType { + case schemapb.DataType_Int64: + bldr := array.NewInt64Builder(memory.DefaultAllocator) + defer bldr.Release() + pks := primaryKeys.(*storage.Int64PrimaryKeys) + bldr.AppendValues(pks.GetValues(), nil) + pkArr = bldr.NewArray() + case schemapb.DataType_VarChar: + bldr := array.NewStringBuilder(memory.DefaultAllocator) + defer bldr.Release() + pks := primaryKeys.(*storage.VarcharPrimaryKeys) + bldr.AppendValues(pks.GetValues(), nil) + pkArr = bldr.NewArray() + default: + return nil, fmt.Errorf("invalid data type of primary keys") } - dataBlob, err := proto.Marshal(ids) - if err != nil { - return nil, fmt.Errorf("failed to marshal ids: %s", err) - } - status := C.Delete(s.ptr, - cOffset, - cSize, - (*C.uint8_t)(unsafe.Pointer(&dataBlob[0])), - (C.uint64_t)(len(dataBlob)), + defer pkArr.Release() + + var cschema cdata.CArrowSchema + var carr cdata.CArrowArray + cdata.ExportArrowArray(pkArr, &carr, &cschema) + defer cdata.ReleaseCArrowSchema(&cschema) + defer cdata.ReleaseCArrowArray(&carr) + + status := C.DeleteArray(s.ptr, + (C.CArrowArray)(unsafe.Pointer(&carr)), + (C.CArrowSchema)(unsafe.Pointer(&cschema)), cTimestampsPtr, ) return &DeleteResult{}, ConsumeCStatusIntoError(&status)