Skip to content

Commit

Permalink
enhance: Use Arrow cdata to avoid proto serde for delete
Browse files Browse the repository at this point in the history
  • Loading branch information
congqixia committed Dec 23, 2024
1 parent 90de37e commit 9a549d5
Show file tree
Hide file tree
Showing 15 changed files with 267 additions and 16 deletions.
6 changes: 6 additions & 0 deletions internal/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,12 @@ add_custom_target( Clean-All COMMAND ${CMAKE_BUILD_TOOL} clean )

# **************************** Install ****************************

# Install arrow c abi
install(DIRECTORY ${CONAN_INCLUDE_DIRS_ARROW}/arrow/
DESTINATION include/arrow
FILES_MATCHING PATTERN "*.h"
)

# Install storage
install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/src/storage/
DESTINATION include/storage
Expand Down
1 change: 1 addition & 0 deletions internal/core/src/common/type_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ typedef struct CNewSegmentResult {
CStatus status;
CSegmentInterface segmentPtr;
} CNewSegmentResult;

#ifdef __cplusplus
}

Expand Down
46 changes: 46 additions & 0 deletions internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1929,6 +1929,52 @@ ChunkedSegmentSealedImpl::Delete(int64_t reserved_offset, // deprecated
return SegcoreError::success();
}

SegcoreError
ChunkedSegmentSealedImpl::Delete(arrow::Array* arr_pks,
const Timestamp* timestamps_raw) {
auto size = arr_pks->length();
auto field_id = schema_->get_primary_field_id().value_or(FieldId(-1));
AssertInfo(field_id.get() != -1, "Primary key not found in schema");
auto& field_meta = schema_->operator[](field_id);
std::vector<PkType> pks(arr_pks->length());
ParsePksFromArray(pks, field_meta.get_data_type(), arr_pks);

// filter out the deletions that the primary key not exists
std::vector<std::tuple<Timestamp, PkType>> ordering(size);
for (int i = 0; i < size; i++) {
ordering[i] = std::make_tuple(timestamps_raw[i], pks[i]);
}
// if insert_record_ is empty (may be only-load meta but not data for lru-cache at go side),
// filtering may cause the deletion lost, skip the filtering to avoid it.
if (!insert_record_.empty_pks()) {
auto end = std::remove_if(
ordering.begin(),
ordering.end(),
[&](const std::tuple<Timestamp, PkType>& record) {
return !insert_record_.contain(std::get<1>(record));
});
size = end - ordering.begin();
ordering.resize(size);
}
if (size == 0) {
return SegcoreError::success();
}

// step 1: sort timestamp
std::sort(ordering.begin(), ordering.end());
std::vector<PkType> sort_pks(size);
std::vector<Timestamp> sort_timestamps(size);

for (int i = 0; i < size; i++) {
auto [t, pk] = ordering[i];
sort_timestamps[i] = t;
sort_pks[i] = pk;
}

deleted_record_.StreamPush(sort_pks, sort_timestamps.data());
return SegcoreError::success();
}

std::string
ChunkedSegmentSealedImpl::debug() const {
std::string log_str;
Expand Down
3 changes: 3 additions & 0 deletions internal/core/src/segcore/ChunkedSegmentSealedImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
const IdArray* pks,
const Timestamp* timestamps) override;

SegcoreError
Delete(arrow::Array* pks, const Timestamp* timestamps) override;

std::pair<std::vector<OffsetMap::OffsetType>, bool>
find_first(int64_t limit, const BitsetType& bitset) const override;

Expand Down
46 changes: 46 additions & 0 deletions internal/core/src/segcore/SegmentGrowingImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,52 @@ SegmentGrowingImpl::Delete(int64_t reserved_begin,
return SegcoreError::success();
}

SegcoreError
SegmentGrowingImpl::Delete(arrow::Array* arr_pks,
const Timestamp* timestamps_raw) {
auto size = arr_pks->length();
auto field_id = schema_->get_primary_field_id().value_or(FieldId(-1));
AssertInfo(field_id.get() != -1, "Primary key not found in schema");
auto& field_meta = schema_->operator[](field_id);
std::vector<PkType> pks(arr_pks->length());
ParsePksFromArray(pks, field_meta.get_data_type(), arr_pks);

// filter out the deletions that the primary key not exists
std::vector<std::tuple<Timestamp, PkType>> ordering(size);
for (int i = 0; i < size; i++) {
ordering[i] = std::make_tuple(timestamps_raw[i], pks[i]);
}
// if insert_record_ is empty (may be only-load meta but not data for lru-cache at go side),
// filtering may cause the deletion lost, skip the filtering to avoid it.
if (!insert_record_.empty_pks()) {
auto end = std::remove_if(
ordering.begin(),
ordering.end(),
[&](const std::tuple<Timestamp, PkType>& record) {
return !insert_record_.contain(std::get<1>(record));
});
size = end - ordering.begin();
ordering.resize(size);
}
if (size == 0) {
return SegcoreError::success();
}

// step 1: sort timestamp
std::sort(ordering.begin(), ordering.end());
std::vector<PkType> sort_pks(size);
std::vector<Timestamp> sort_timestamps(size);

for (int i = 0; i < size; i++) {
auto [t, pk] = ordering[i];
sort_timestamps[i] = t;
sort_pks[i] = pk;
}

deleted_record_.StreamPush(sort_pks, sort_timestamps.data());
return SegcoreError::success();
}

void
SegmentGrowingImpl::LoadDeletedRecord(const LoadDeletedRecordInfo& info) {
AssertInfo(info.row_count > 0, "The row count of deleted record is 0");
Expand Down
3 changes: 3 additions & 0 deletions internal/core/src/segcore/SegmentGrowingImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ class SegmentGrowingImpl : public SegmentGrowing {
const IdArray* pks,
const Timestamp* timestamps) override;

SegcoreError
Delete(arrow::Array* pks, const Timestamp* timestamps) override;

void
LoadDeletedRecord(const LoadDeletedRecordInfo& info) override;

Expand Down
4 changes: 4 additions & 0 deletions internal/core/src/segcore/SegmentInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <index/ScalarIndex.h>

#include "FieldIndexing.h"
#include "arrow/type_fwd.h"
#include "common/Common.h"
#include "common/Schema.h"
#include "common/Span.h"
Expand Down Expand Up @@ -110,6 +111,9 @@ class SegmentInterface {
const IdArray* pks,
const Timestamp* timestamps) = 0;

virtual SegcoreError
Delete(arrow::Array* pks, const Timestamp* timestamps) = 0;

virtual void
LoadDeletedRecord(const LoadDeletedRecordInfo& info) = 0;

Expand Down
46 changes: 46 additions & 0 deletions internal/core/src/segcore/SegmentSealedImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1782,6 +1782,52 @@ SegmentSealedImpl::Delete(int64_t reserved_offset, // deprecated
return SegcoreError::success();
}

SegcoreError
SegmentSealedImpl::Delete(arrow::Array* arr_pks,
const Timestamp* timestamps_raw) {
auto size = arr_pks->length();
auto field_id = schema_->get_primary_field_id().value_or(FieldId(-1));
AssertInfo(field_id.get() != -1, "Primary key not found in schema");
auto& field_meta = schema_->operator[](field_id);
std::vector<PkType> pks(arr_pks->length());
ParsePksFromArray(pks, field_meta.get_data_type(), arr_pks);

// filter out the deletions that the primary key not exists
std::vector<std::tuple<Timestamp, PkType>> ordering(size);
for (int i = 0; i < size; i++) {
ordering[i] = std::make_tuple(timestamps_raw[i], pks[i]);
}
// if insert_record_ is empty (may be only-load meta but not data for lru-cache at go side),
// filtering may cause the deletion lost, skip the filtering to avoid it.
if (!insert_record_.empty_pks()) {
auto end = std::remove_if(
ordering.begin(),
ordering.end(),
[&](const std::tuple<Timestamp, PkType>& record) {
return !insert_record_.contain(std::get<1>(record));
});
size = end - ordering.begin();
ordering.resize(size);
}
if (size == 0) {
return SegcoreError::success();
}

// step 1: sort timestamp
std::sort(ordering.begin(), ordering.end());
std::vector<PkType> sort_pks(size);
std::vector<Timestamp> sort_timestamps(size);

for (int i = 0; i < size; i++) {
auto [t, pk] = ordering[i];
sort_timestamps[i] = t;
sort_pks[i] = pk;
}

deleted_record_.StreamPush(sort_pks, sort_timestamps.data());
return SegcoreError::success();
}

std::string
SegmentSealedImpl::debug() const {
std::string log_str;
Expand Down
3 changes: 3 additions & 0 deletions internal/core/src/segcore/SegmentSealedImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ class SegmentSealedImpl : public SegmentSealed {
const IdArray* pks,
const Timestamp* timestamps) override;

SegcoreError
Delete(arrow::Array* pks, const Timestamp* timestamps) override;

std::pair<std::vector<OffsetMap::OffsetType>, bool>
find_first(int64_t limit, const BitsetType& bitset) const override;

Expand Down
27 changes: 27 additions & 0 deletions internal/core/src/segcore/Utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <string>
#include <vector>

#include "arrow/type_fwd.h"
#include "common/Common.h"
#include "common/FieldData.h"
#include "common/Types.h"
Expand Down Expand Up @@ -107,6 +108,32 @@ ParsePksFromIDs(std::vector<PkType>& pks,
}
}

void
ParsePksFromArray(std::vector<PkType>& pks,
DataType data_type,
arrow::Array* arr) {
switch (data_type) {
case DataType::INT64: {
auto int_arr = static_cast<arrow::Int64Array*>(arr);
for (int i = 0; i < int_arr->length(); i++) {
pks.at(i) = int_arr->Value(i);
}
break;
}
case DataType::VARCHAR: {
auto str_arr = static_cast<arrow::StringArray*>(arr);
for (int i = 0; i < str_arr->length(); i++) {
pks.at(i) = str_arr->GetString(i);
}
break;
}
default: {
PanicInfo(DataTypeInvalid,
fmt::format("unsupported PK {}", data_type));
}
}
}

int64_t
GetSizeOfIdArray(const IdArray& data) {
if (data.has_int_id()) {
Expand Down
5 changes: 5 additions & 0 deletions internal/core/src/segcore/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ ParsePksFromIDs(std::vector<PkType>& pks,
DataType data_type,
const IdArray& data);

void
ParsePksFromArray(std::vector<PkType>& pks,
DataType data_type,
arrow::Array* arr);

int64_t
GetSizeOfIdArray(const IdArray& data);

Expand Down
25 changes: 25 additions & 0 deletions internal/core/src/segcore/segment_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@
#include <memory>
#include <limits>

#include "arrow/api.h"
#include "arrow/array/array_binary.h"
#include "arrow/c/bridge.h"
#include "arrow/array/array_base.h"
#include "common/EasyAssert.h"
#include "pb/cgo_msg.pb.h"
#include "pb/index_cgo_msg.pb.h"

Expand Down Expand Up @@ -332,6 +337,26 @@ Delete(CSegmentInterface c_segment,
}
}

CStatus
DeleteArray(CSegmentInterface c_segment,
CArrowArray pk_array,
CArrowSchema pk_schema,
const uint64_t* timestamps) {
auto segment = static_cast<milvus::segcore::SegmentInterface*>(c_segment);
AssertInfo(pk_array, "pk array cannot be null");
AssertInfo(pk_schema, "pk schema cannot be null");
auto result = arrow::ImportArray(pk_array, pk_schema);
AssertInfo(result.ok(), "failed to convert pk array with arrow bridge");
// get internal array ptr, discard the ownership of shared_ptr
auto arr = result->get();
try {
auto res = segment->Delete(arr, timestamps);
return milvus::SuccessCStatus();
} catch (std::exception& e) {
return milvus::FailureCStatus(&e);
}
}

////////////////////////////// interfaces for sealed segment //////////////////////////////
CStatus
LoadFieldData(CSegmentInterface c_segment,
Expand Down
9 changes: 9 additions & 0 deletions internal/core/src/segcore/segment_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@ extern "C" {
#include <stdint.h>

#include "common/type_c.h"
#include "arrow/c/abi.h"
#include "futures/future_c.h"
#include "segcore/plan_c.h"
#include "segcore/load_index_c.h"
#include "segcore/load_field_data_c.h"

typedef void* CSearchResult;
typedef CProto CRetrieveResult;
typedef struct ArrowSchema* CArrowSchema;
typedef struct ArrowArray* CArrowArray;

////////////////////////////// common interfaces //////////////////////////////
CStatus
Expand Down Expand Up @@ -158,6 +161,12 @@ Delete(CSegmentInterface c_segment,
const uint64_t ids_size,
const uint64_t* timestamps);

CStatus
DeleteArray(CSegmentInterface c_segment,
CArrowArray pk_array,
CArrowSchema pk_schema,
const uint64_t* timestamps);

void
RemoveFieldFile(CSegmentInterface c_segment, int64_t field_id);

Expand Down
8 changes: 8 additions & 0 deletions internal/storage/primary_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ func (pks *Int64PrimaryKeys) Get(idx int) PrimaryKey {
return NewInt64PrimaryKey(pks.values[idx])
}

func (pks *Int64PrimaryKeys) GetValues() []int64 {
return pks.values
}

func (pks *Int64PrimaryKeys) Type() schemapb.DataType {
return schemapb.DataType_Int64
}
Expand Down Expand Up @@ -135,6 +139,10 @@ func (pks *VarcharPrimaryKeys) Get(idx int) PrimaryKey {
return NewVarCharPrimaryKey(pks.values[idx])
}

func (pks *VarcharPrimaryKeys) GetValues() []string {
return pks.values
}

func (pks *VarcharPrimaryKeys) Type() schemapb.DataType {
return schemapb.DataType_VarChar
}
Expand Down
Loading

0 comments on commit 9a549d5

Please sign in to comment.