diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index db3815718b4b6..9438b74de5802 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -529,14 +529,13 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg return nil } -func (sd *shardDelegator) GetLevel0Deletions(partitionID int64, candidate pkoracle.Candidate) ([]storage.PrimaryKey, []storage.Timestamp) { +func (sd *shardDelegator) GetLevel0Deletions(partitionID int64, candidate pkoracle.Candidate) (storage.PrimaryKeys, []storage.Timestamp) { sd.level0Mut.Lock() defer sd.level0Mut.Unlock() // TODO: this could be large, host all L0 delete on delegator might be a dangerous, consider mmap it on local segment and stream processing it level0Segments := sd.segmentManager.GetBy(segments.WithLevel(datapb.SegmentLevel_L0), segments.WithChannel(sd.vchannelName)) - pks := make([]storage.PrimaryKey, 0) - tss := make([]storage.Timestamp, 0) + deltaData := storage.NewDeltaData(0) for _, segment := range level0Segments { segment := segment.(*segments.L0Segment) @@ -553,15 +552,14 @@ func (sd *shardDelegator) GetLevel0Deletions(partitionID int64, candidate pkorac hits := candidate.BatchPkExist(lc) for i, hit := range hits { if hit { - pks = append(pks, segmentPks[idx+i]) - tss = append(tss, segmentTss[idx+i]) + deltaData.Append(segmentPks[idx+i], segmentTss[idx+i]) } } } } } - return pks, tss + return deltaData.DeletePks(), deltaData.DeleteTimestamps() } func (sd *shardDelegator) RefreshLevel0DeletionStats() { diff --git a/internal/querynodev2/delegator/delegator_data_test.go b/internal/querynodev2/delegator/delegator_data_test.go index c461cc16ae64d..7463c64da11a0 100644 --- a/internal/querynodev2/delegator/delegator_data_test.go +++ b/internal/querynodev2/delegator/delegator_data_test.go @@ -1546,29 +1546,33 @@ func (s *DelegatorDataSuite) TestLevel0Deletions() { l0Global.LoadDeltaData(context.TODO(), allPartitionDeleteData) pks, _ := delegator.GetLevel0Deletions(partitionID, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing)) - s.True(pks[0].EQ(partitionDeleteData.DeletePks().Get(0))) + s.True(pks.Get(0).EQ(partitionDeleteData.DeletePks().Get(0))) pks, _ = delegator.GetLevel0Deletions(partitionID+1, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing)) s.Empty(pks) delegator.segmentManager.Put(context.TODO(), segments.SegmentTypeSealed, l0Global) pks, _ = delegator.GetLevel0Deletions(partitionID, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing)) - s.ElementsMatch(pks, []storage.PrimaryKey{partitionDeleteData.DeletePks().Get(0), allPartitionDeleteData.DeletePks().Get(0)}) + rawPks := make([]storage.PrimaryKey, 0, pks.Len()) + for i := 0; i < pks.Len(); i++ { + rawPks = append(rawPks, pks.Get(i)) + } + s.ElementsMatch(rawPks, []storage.PrimaryKey{partitionDeleteData.DeletePks().Get(0), allPartitionDeleteData.DeletePks().Get(0)}) bfs := pkoracle.NewBloomFilterSet(3, l0.Partition(), commonpb.SegmentState_Sealed) bfs.UpdateBloomFilter([]storage.PrimaryKey{allPartitionDeleteData.DeletePks().Get(0)}) pks, _ = delegator.GetLevel0Deletions(partitionID, bfs) // bf filtered segment - s.Equal(len(pks), 1) - s.True(pks[0].EQ(allPartitionDeleteData.DeletePks().Get(0))) + s.Equal(pks.Len(), 1) + s.True(pks.Get(0).EQ(allPartitionDeleteData.DeletePks().Get(0))) delegator.segmentManager.Remove(context.TODO(), l0.ID(), querypb.DataScope_All) pks, _ = delegator.GetLevel0Deletions(partitionID, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing)) - s.True(pks[0].EQ(allPartitionDeleteData.DeletePks().Get(0))) + s.True(pks.Get(0).EQ(allPartitionDeleteData.DeletePks().Get(0))) pks, _ = delegator.GetLevel0Deletions(partitionID+1, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing)) - s.True(pks[0].EQ(allPartitionDeleteData.DeletePks().Get(0))) + s.True(pks.Get(0).EQ(allPartitionDeleteData.DeletePks().Get(0))) delegator.segmentManager.Remove(context.TODO(), l0Global.ID(), querypb.DataScope_All) pks, _ = delegator.GetLevel0Deletions(partitionID+1, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing)) diff --git a/internal/querynodev2/delegator/delta_forward.go b/internal/querynodev2/delegator/delta_forward.go index e8b19b8d24957..d3844e650f539 100644 --- a/internal/querynodev2/delegator/delta_forward.go +++ b/internal/querynodev2/delegator/delta_forward.go @@ -100,11 +100,11 @@ func (sd *shardDelegator) addL0ForGrowing(ctx context.Context, segment segments. func (sd *shardDelegator) addL0GrowingBF(ctx context.Context, segment segments.Segment) error { deletedPks, deletedTss := sd.GetLevel0Deletions(segment.Partition(), pkoracle.NewCandidateKey(segment.ID(), segment.Partition(), segments.SegmentTypeGrowing)) - if len(deletedPks) == 0 { + if deletedPks == nil || deletedPks.Len() == 0 { return nil } - log.Info("forwarding L0 delete records...", zap.Int64("segmentID", segment.ID()), zap.Int("deletionCount", len(deletedPks))) + log.Info("forwarding L0 delete records...", zap.Int64("segmentID", segment.ID()), zap.Int("deletionCount", deletedPks.Len())) return segment.Delete(ctx, deletedPks, deletedTss) } @@ -131,19 +131,21 @@ func (sd *shardDelegator) forwardL0ByBF(ctx context.Context, } deletedPks, deletedTss := sd.GetLevel0Deletions(candidate.Partition(), candidate) - deleteData := &storage.DeleteData{} - deleteData.AppendBatch(deletedPks, deletedTss) - if deleteData.RowCount > 0 { + if deletedPks != nil && deletedPks.Len() > 0 { log.Info("forward L0 delete to worker...", - zap.Int64("deleteRowNum", deleteData.RowCount), + zap.Int("deleteRowNum", deletedPks.Len()), ) - err := worker.Delete(ctx, &querypb.DeleteRequest{ + pks, err := storage.ParsePrimaryKeysBatch2IDs(deletedPks) + if err != nil { + return err + } + err = worker.Delete(ctx, &querypb.DeleteRequest{ Base: commonpbutil.NewMsgBase(commonpbutil.WithTargetID(targetNodeID)), CollectionId: info.GetCollectionID(), PartitionId: info.GetPartitionID(), SegmentId: info.GetSegmentID(), - PrimaryKeys: storage.ParsePrimaryKeys2IDs(deleteData.Pks), - Timestamps: deleteData.Tss, + PrimaryKeys: pks, + Timestamps: deletedTss, Scope: deleteScope, }) if err != nil { diff --git a/internal/querynodev2/segments/mock_segment.go b/internal/querynodev2/segments/mock_segment.go index cb99765b7270b..ca19cdb897d80 100644 --- a/internal/querynodev2/segments/mock_segment.go +++ b/internal/querynodev2/segments/mock_segment.go @@ -221,7 +221,7 @@ func (_c *MockSegment_DatabaseName_Call) RunAndReturn(run func() string) *MockSe } // Delete provides a mock function with given fields: ctx, primaryKeys, timestamps -func (_m *MockSegment) Delete(ctx context.Context, primaryKeys []storage.PrimaryKey, timestamps []uint64) error { +func (_m *MockSegment) Delete(ctx context.Context, primaryKeys storage.PrimaryKeys, timestamps []uint64) error { ret := _m.Called(ctx, primaryKeys, timestamps) if len(ret) == 0 { @@ -229,7 +229,7 @@ func (_m *MockSegment) Delete(ctx context.Context, primaryKeys []storage.Primary } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, []storage.PrimaryKey, []uint64) error); ok { + if rf, ok := ret.Get(0).(func(context.Context, storage.PrimaryKeys, []uint64) error); ok { r0 = rf(ctx, primaryKeys, timestamps) } else { r0 = ret.Error(0) @@ -245,15 +245,15 @@ type MockSegment_Delete_Call struct { // Delete is a helper method to define mock.On call // - ctx context.Context -// - primaryKeys []storage.PrimaryKey +// - primaryKeys storage.PrimaryKeys // - timestamps []uint64 func (_e *MockSegment_Expecter) Delete(ctx interface{}, primaryKeys interface{}, timestamps interface{}) *MockSegment_Delete_Call { return &MockSegment_Delete_Call{Call: _e.mock.On("Delete", ctx, primaryKeys, timestamps)} } -func (_c *MockSegment_Delete_Call) Run(run func(ctx context.Context, primaryKeys []storage.PrimaryKey, timestamps []uint64)) *MockSegment_Delete_Call { +func (_c *MockSegment_Delete_Call) Run(run func(ctx context.Context, primaryKeys storage.PrimaryKeys, timestamps []uint64)) *MockSegment_Delete_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].([]storage.PrimaryKey), args[2].([]uint64)) + run(args[0].(context.Context), args[1].(storage.PrimaryKeys), args[2].([]uint64)) }) return _c } @@ -263,7 +263,7 @@ func (_c *MockSegment_Delete_Call) Return(_a0 error) *MockSegment_Delete_Call { return _c } -func (_c *MockSegment_Delete_Call) RunAndReturn(run func(context.Context, []storage.PrimaryKey, []uint64) error) *MockSegment_Delete_Call { +func (_c *MockSegment_Delete_Call) RunAndReturn(run func(context.Context, storage.PrimaryKeys, []uint64) error) *MockSegment_Delete_Call { _c.Call.Return(run) return _c } diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 37544475f09d1..8bac9d20df4f3 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -776,7 +776,7 @@ func (s *LocalSegment) Insert(ctx context.Context, rowIDs []int64, timestamps [] return nil } -func (s *LocalSegment) Delete(ctx context.Context, primaryKeys []storage.PrimaryKey, timestamps []typeutil.Timestamp) error { +func (s *LocalSegment) Delete(ctx context.Context, primaryKeys storage.PrimaryKeys, timestamps []typeutil.Timestamp) error { /* CStatus Delete(CSegmentInterface c_segment, @@ -786,7 +786,7 @@ func (s *LocalSegment) Delete(ctx context.Context, primaryKeys []storage.Primary const unsigned long* timestamps); */ - if len(primaryKeys) == 0 { + if primaryKeys.Len() == 0 { return nil } if !s.ptrLock.RLockIf(state.IsNotReleased) { @@ -795,34 +795,12 @@ func (s *LocalSegment) Delete(ctx context.Context, primaryKeys []storage.Primary defer s.ptrLock.RUnlock() cOffset := C.int64_t(0) // depre - cSize := C.int64_t(len(primaryKeys)) + cSize := C.int64_t(primaryKeys.Len()) cTimestampsPtr := (*C.uint64_t)(&(timestamps)[0]) - ids := &schemapb.IDs{} - pkType := primaryKeys[0].Type() - switch pkType { - case schemapb.DataType_Int64: - int64Pks := make([]int64, len(primaryKeys)) - for index, pk := range primaryKeys { - int64Pks[index] = pk.(*storage.Int64PrimaryKey).Value - } - ids.IdField = &schemapb.IDs_IntId{ - IntId: &schemapb.LongArray{ - Data: int64Pks, - }, - } - case schemapb.DataType_VarChar: - varCharPks := make([]string, len(primaryKeys)) - for index, entity := range primaryKeys { - varCharPks[index] = entity.(*storage.VarCharPrimaryKey).Value - } - ids.IdField = &schemapb.IDs_StrId{ - StrId: &schemapb.StringArray{ - Data: varCharPks, - }, - } - default: - return fmt.Errorf("invalid data type of primary keys") + ids, err := storage.ParsePrimaryKeysBatch2IDs(primaryKeys) + if err != nil { + return err } dataBlob, err := proto.Marshal(ids) diff --git a/internal/querynodev2/segments/segment_interface.go b/internal/querynodev2/segments/segment_interface.go index 8212287b82454..7df85a85cf302 100644 --- a/internal/querynodev2/segments/segment_interface.go +++ b/internal/querynodev2/segments/segment_interface.go @@ -77,7 +77,7 @@ type Segment interface { // Modification related Insert(ctx context.Context, rowIDs []int64, timestamps []typeutil.Timestamp, record *segcorepb.InsertRecord) error - Delete(ctx context.Context, primaryKeys []storage.PrimaryKey, timestamps []typeutil.Timestamp) error + Delete(ctx context.Context, primaryKeys storage.PrimaryKeys, timestamps []typeutil.Timestamp) error LoadDeltaData(ctx context.Context, deltaData *storage.DeltaData) error LastDeltaTimestamp() uint64 Release(ctx context.Context, opts ...releaseOption) diff --git a/internal/querynodev2/segments/segment_l0.go b/internal/querynodev2/segments/segment_l0.go index 0b28f8042fd12..da8af518bf452 100644 --- a/internal/querynodev2/segments/segment_l0.go +++ b/internal/querynodev2/segments/segment_l0.go @@ -147,7 +147,7 @@ func (s *L0Segment) Insert(ctx context.Context, rowIDs []int64, timestamps []typ return merr.WrapErrIoFailedReason("insert not supported for L0 segment") } -func (s *L0Segment) Delete(ctx context.Context, primaryKeys []storage.PrimaryKey, timestamps []typeutil.Timestamp) error { +func (s *L0Segment) Delete(ctx context.Context, primaryKeys storage.PrimaryKeys, timestamps []typeutil.Timestamp) error { return merr.WrapErrIoFailedReason("delete not supported for L0 segment") } diff --git a/internal/querynodev2/segments/segment_test.go b/internal/querynodev2/segments/segment_test.go index d538cf8c6a89f..fe489d9bf6416 100644 --- a/internal/querynodev2/segments/segment_test.go +++ b/internal/querynodev2/segments/segment_test.go @@ -166,15 +166,15 @@ func (suite *SegmentSuite) TestResourceUsageEstimate() { func (suite *SegmentSuite) TestDelete() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - pks, err := storage.GenInt64PrimaryKeys(0, 1) - suite.NoError(err) + pks := storage.NewInt64PrimaryKeys(2) + pks.AppendRaw(0, 1) // Test for sealed rowNum := suite.sealed.RowNum() - err = suite.sealed.Delete(ctx, pks, []uint64{1000, 1000}) + err := suite.sealed.Delete(ctx, pks, []uint64{1000, 1000}) suite.NoError(err) - suite.Equal(rowNum-int64(len(pks)), suite.sealed.RowNum()) + suite.Equal(rowNum-int64(pks.Len()), suite.sealed.RowNum()) suite.Equal(rowNum, suite.sealed.InsertCount()) // Test for growing @@ -182,7 +182,7 @@ func (suite *SegmentSuite) TestDelete() { err = suite.growing.Delete(ctx, pks, []uint64{1000, 1000}) suite.NoError(err) - suite.Equal(rowNum-int64(len(pks)), suite.growing.RowNum()) + suite.Equal(rowNum-int64(pks.Len()), suite.growing.RowNum()) suite.Equal(rowNum, suite.growing.InsertCount()) } diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index e3ac0fb7f7ebd..22ac6e3b33f81 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -1364,7 +1364,7 @@ func (node *QueryNode) Delete(ctx context.Context, req *querypb.DeleteRequest) ( return merr.Status(err), nil } - pks := storage.ParseIDs2PrimaryKeys(req.GetPrimaryKeys()) + pks := storage.ParseIDs2PrimaryKeysBatch(req.GetPrimaryKeys()) for _, segment := range segments { err := segment.Delete(ctx, pks, req.GetTimestamps()) if err != nil { @@ -1419,7 +1419,7 @@ func (node *QueryNode) DeleteBatch(ctx context.Context, req *querypb.DeleteBatch log.Warn("Delete batch find missing ids", zap.Int64s("missing_ids", missingIDs.Collect())) } - pks := storage.ParseIDs2PrimaryKeys(req.GetPrimaryKeys()) + pks := storage.ParseIDs2PrimaryKeysBatch(req.GetPrimaryKeys()) // control the execution batch parallel with P number // maybe it shall be lower in case of heavy CPU usage may impacting search/query diff --git a/internal/storage/primary_key.go b/internal/storage/primary_key.go index 628edd2e6bcf0..6a6e502769a56 100644 --- a/internal/storage/primary_key.go +++ b/internal/storage/primary_key.go @@ -350,6 +350,25 @@ func ParseIDs2PrimaryKeys(ids *schemapb.IDs) []PrimaryKey { return ret } +func ParseIDs2PrimaryKeysBatch(ids *schemapb.IDs) PrimaryKeys { + var result PrimaryKeys + switch ids.IdField.(type) { + case *schemapb.IDs_IntId: + int64Pks := ids.GetIntId().GetData() + pks := NewInt64PrimaryKeys(int64(len(int64Pks))) + pks.AppendRaw(int64Pks...) + result = pks + case *schemapb.IDs_StrId: + stringPks := ids.GetStrId().GetData() + pks := NewVarcharPrimaryKeys(int64(len(stringPks))) + pks.AppendRaw(stringPks...) + result = pks + default: + panic(fmt.Sprintf("unexpected schema id field type %T", ids.IdField)) + } + return result +} + func ParsePrimaryKeysBatch2IDs(pks PrimaryKeys) (*schemapb.IDs, error) { ret := &schemapb.IDs{} if pks.Len() == 0 {