Skip to content

Commit

Permalink
enhance: Use PrimaryKeys to replace interface slice for segment del…
Browse files Browse the repository at this point in the history
…ete (milvus-io#37880)

Related to milvus-io#35303

Reduce temporary memory usage for PK interface for segment delete.

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored and JsDove committed Nov 26, 2024
1 parent b0c4938 commit be7b8ad
Show file tree
Hide file tree
Showing 10 changed files with 65 additions and 64 deletions.
10 changes: 4 additions & 6 deletions internal/querynodev2/delegator/delegator_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,14 +522,13 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg
return nil
}

func (sd *shardDelegator) GetLevel0Deletions(partitionID int64, candidate pkoracle.Candidate) ([]storage.PrimaryKey, []storage.Timestamp) {
func (sd *shardDelegator) GetLevel0Deletions(partitionID int64, candidate pkoracle.Candidate) (storage.PrimaryKeys, []storage.Timestamp) {
sd.level0Mut.Lock()
defer sd.level0Mut.Unlock()

// TODO: this could be large, host all L0 delete on delegator might be a dangerous, consider mmap it on local segment and stream processing it
level0Segments := sd.segmentManager.GetBy(segments.WithLevel(datapb.SegmentLevel_L0), segments.WithChannel(sd.vchannelName))
pks := make([]storage.PrimaryKey, 0)
tss := make([]storage.Timestamp, 0)
deltaData := storage.NewDeltaData(0)

for _, segment := range level0Segments {
segment := segment.(*segments.L0Segment)
Expand All @@ -546,15 +545,14 @@ func (sd *shardDelegator) GetLevel0Deletions(partitionID int64, candidate pkorac
hits := candidate.BatchPkExist(lc)
for i, hit := range hits {
if hit {
pks = append(pks, segmentPks[idx+i])
tss = append(tss, segmentTss[idx+i])
deltaData.Append(segmentPks[idx+i], segmentTss[idx+i])
}
}
}
}
}

return pks, tss
return deltaData.DeletePks(), deltaData.DeleteTimestamps()
}

func (sd *shardDelegator) RefreshLevel0DeletionStats() {
Expand Down
16 changes: 10 additions & 6 deletions internal/querynodev2/delegator/delegator_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1546,29 +1546,33 @@ func (s *DelegatorDataSuite) TestLevel0Deletions() {
l0Global.LoadDeltaData(context.TODO(), allPartitionDeleteData)

pks, _ := delegator.GetLevel0Deletions(partitionID, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
s.True(pks[0].EQ(partitionDeleteData.DeletePks().Get(0)))
s.True(pks.Get(0).EQ(partitionDeleteData.DeletePks().Get(0)))

pks, _ = delegator.GetLevel0Deletions(partitionID+1, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
s.Empty(pks)

delegator.segmentManager.Put(context.TODO(), segments.SegmentTypeSealed, l0Global)
pks, _ = delegator.GetLevel0Deletions(partitionID, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
s.ElementsMatch(pks, []storage.PrimaryKey{partitionDeleteData.DeletePks().Get(0), allPartitionDeleteData.DeletePks().Get(0)})
rawPks := make([]storage.PrimaryKey, 0, pks.Len())
for i := 0; i < pks.Len(); i++ {
rawPks = append(rawPks, pks.Get(i))
}
s.ElementsMatch(rawPks, []storage.PrimaryKey{partitionDeleteData.DeletePks().Get(0), allPartitionDeleteData.DeletePks().Get(0)})

bfs := pkoracle.NewBloomFilterSet(3, l0.Partition(), commonpb.SegmentState_Sealed)
bfs.UpdateBloomFilter([]storage.PrimaryKey{allPartitionDeleteData.DeletePks().Get(0)})

pks, _ = delegator.GetLevel0Deletions(partitionID, bfs)
// bf filtered segment
s.Equal(len(pks), 1)
s.True(pks[0].EQ(allPartitionDeleteData.DeletePks().Get(0)))
s.Equal(pks.Len(), 1)
s.True(pks.Get(0).EQ(allPartitionDeleteData.DeletePks().Get(0)))

delegator.segmentManager.Remove(context.TODO(), l0.ID(), querypb.DataScope_All)
pks, _ = delegator.GetLevel0Deletions(partitionID, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
s.True(pks[0].EQ(allPartitionDeleteData.DeletePks().Get(0)))
s.True(pks.Get(0).EQ(allPartitionDeleteData.DeletePks().Get(0)))

pks, _ = delegator.GetLevel0Deletions(partitionID+1, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
s.True(pks[0].EQ(allPartitionDeleteData.DeletePks().Get(0)))
s.True(pks.Get(0).EQ(allPartitionDeleteData.DeletePks().Get(0)))

delegator.segmentManager.Remove(context.TODO(), l0Global.ID(), querypb.DataScope_All)
pks, _ = delegator.GetLevel0Deletions(partitionID+1, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
Expand Down
20 changes: 11 additions & 9 deletions internal/querynodev2/delegator/delta_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,11 @@ func (sd *shardDelegator) addL0ForGrowing(ctx context.Context, segment segments.

func (sd *shardDelegator) addL0GrowingBF(ctx context.Context, segment segments.Segment) error {
deletedPks, deletedTss := sd.GetLevel0Deletions(segment.Partition(), pkoracle.NewCandidateKey(segment.ID(), segment.Partition(), segments.SegmentTypeGrowing))
if len(deletedPks) == 0 {
if deletedPks == nil || deletedPks.Len() == 0 {
return nil
}

log.Info("forwarding L0 delete records...", zap.Int64("segmentID", segment.ID()), zap.Int("deletionCount", len(deletedPks)))
log.Info("forwarding L0 delete records...", zap.Int64("segmentID", segment.ID()), zap.Int("deletionCount", deletedPks.Len()))
return segment.Delete(ctx, deletedPks, deletedTss)
}

Expand All @@ -131,19 +131,21 @@ func (sd *shardDelegator) forwardL0ByBF(ctx context.Context,
}

deletedPks, deletedTss := sd.GetLevel0Deletions(candidate.Partition(), candidate)
deleteData := &storage.DeleteData{}
deleteData.AppendBatch(deletedPks, deletedTss)
if deleteData.RowCount > 0 {
if deletedPks != nil && deletedPks.Len() > 0 {
log.Info("forward L0 delete to worker...",
zap.Int64("deleteRowNum", deleteData.RowCount),
zap.Int("deleteRowNum", deletedPks.Len()),
)
err := worker.Delete(ctx, &querypb.DeleteRequest{
pks, err := storage.ParsePrimaryKeysBatch2IDs(deletedPks)
if err != nil {
return err
}
err = worker.Delete(ctx, &querypb.DeleteRequest{
Base: commonpbutil.NewMsgBase(commonpbutil.WithTargetID(targetNodeID)),
CollectionId: info.GetCollectionID(),
PartitionId: info.GetPartitionID(),
SegmentId: info.GetSegmentID(),
PrimaryKeys: storage.ParsePrimaryKeys2IDs(deleteData.Pks),
Timestamps: deleteData.Tss,
PrimaryKeys: pks,
Timestamps: deletedTss,
Scope: deleteScope,
})
if err != nil {
Expand Down
12 changes: 6 additions & 6 deletions internal/querynodev2/segments/mock_segment.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 6 additions & 28 deletions internal/querynodev2/segments/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ func (s *LocalSegment) Insert(ctx context.Context, rowIDs []int64, timestamps []
return nil
}

func (s *LocalSegment) Delete(ctx context.Context, primaryKeys []storage.PrimaryKey, timestamps []typeutil.Timestamp) error {
func (s *LocalSegment) Delete(ctx context.Context, primaryKeys storage.PrimaryKeys, timestamps []typeutil.Timestamp) error {
/*
CStatus
Delete(CSegmentInterface c_segment,
Expand All @@ -786,7 +786,7 @@ func (s *LocalSegment) Delete(ctx context.Context, primaryKeys []storage.Primary
const unsigned long* timestamps);
*/

if len(primaryKeys) == 0 {
if primaryKeys.Len() == 0 {
return nil
}
if !s.ptrLock.RLockIf(state.IsNotReleased) {
Expand All @@ -795,34 +795,12 @@ func (s *LocalSegment) Delete(ctx context.Context, primaryKeys []storage.Primary
defer s.ptrLock.RUnlock()

cOffset := C.int64_t(0) // depre
cSize := C.int64_t(len(primaryKeys))
cSize := C.int64_t(primaryKeys.Len())
cTimestampsPtr := (*C.uint64_t)(&(timestamps)[0])

ids := &schemapb.IDs{}
pkType := primaryKeys[0].Type()
switch pkType {
case schemapb.DataType_Int64:
int64Pks := make([]int64, len(primaryKeys))
for index, pk := range primaryKeys {
int64Pks[index] = pk.(*storage.Int64PrimaryKey).Value
}
ids.IdField = &schemapb.IDs_IntId{
IntId: &schemapb.LongArray{
Data: int64Pks,
},
}
case schemapb.DataType_VarChar:
varCharPks := make([]string, len(primaryKeys))
for index, entity := range primaryKeys {
varCharPks[index] = entity.(*storage.VarCharPrimaryKey).Value
}
ids.IdField = &schemapb.IDs_StrId{
StrId: &schemapb.StringArray{
Data: varCharPks,
},
}
default:
return fmt.Errorf("invalid data type of primary keys")
ids, err := storage.ParsePrimaryKeysBatch2IDs(primaryKeys)
if err != nil {
return err
}

dataBlob, err := proto.Marshal(ids)
Expand Down
2 changes: 1 addition & 1 deletion internal/querynodev2/segments/segment_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type Segment interface {

// Modification related
Insert(ctx context.Context, rowIDs []int64, timestamps []typeutil.Timestamp, record *segcorepb.InsertRecord) error
Delete(ctx context.Context, primaryKeys []storage.PrimaryKey, timestamps []typeutil.Timestamp) error
Delete(ctx context.Context, primaryKeys storage.PrimaryKeys, timestamps []typeutil.Timestamp) error
LoadDeltaData(ctx context.Context, deltaData *storage.DeltaData) error
LastDeltaTimestamp() uint64
Release(ctx context.Context, opts ...releaseOption)
Expand Down
2 changes: 1 addition & 1 deletion internal/querynodev2/segments/segment_l0.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (s *L0Segment) Insert(ctx context.Context, rowIDs []int64, timestamps []typ
return merr.WrapErrIoFailedReason("insert not supported for L0 segment")
}

func (s *L0Segment) Delete(ctx context.Context, primaryKeys []storage.PrimaryKey, timestamps []typeutil.Timestamp) error {
func (s *L0Segment) Delete(ctx context.Context, primaryKeys storage.PrimaryKeys, timestamps []typeutil.Timestamp) error {
return merr.WrapErrIoFailedReason("delete not supported for L0 segment")
}

Expand Down
10 changes: 5 additions & 5 deletions internal/querynodev2/segments/segment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,23 +166,23 @@ func (suite *SegmentSuite) TestResourceUsageEstimate() {
func (suite *SegmentSuite) TestDelete() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pks, err := storage.GenInt64PrimaryKeys(0, 1)
suite.NoError(err)
pks := storage.NewInt64PrimaryKeys(2)
pks.AppendRaw(0, 1)

// Test for sealed
rowNum := suite.sealed.RowNum()
err = suite.sealed.Delete(ctx, pks, []uint64{1000, 1000})
err := suite.sealed.Delete(ctx, pks, []uint64{1000, 1000})
suite.NoError(err)

suite.Equal(rowNum-int64(len(pks)), suite.sealed.RowNum())
suite.Equal(rowNum-int64(pks.Len()), suite.sealed.RowNum())
suite.Equal(rowNum, suite.sealed.InsertCount())

// Test for growing
rowNum = suite.growing.RowNum()
err = suite.growing.Delete(ctx, pks, []uint64{1000, 1000})
suite.NoError(err)

suite.Equal(rowNum-int64(len(pks)), suite.growing.RowNum())
suite.Equal(rowNum-int64(pks.Len()), suite.growing.RowNum())
suite.Equal(rowNum, suite.growing.InsertCount())
}

Expand Down
4 changes: 2 additions & 2 deletions internal/querynodev2/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -1372,7 +1372,7 @@ func (node *QueryNode) Delete(ctx context.Context, req *querypb.DeleteRequest) (
return merr.Status(err), nil
}

pks := storage.ParseIDs2PrimaryKeys(req.GetPrimaryKeys())
pks := storage.ParseIDs2PrimaryKeysBatch(req.GetPrimaryKeys())
for _, segment := range segments {
err := segment.Delete(ctx, pks, req.GetTimestamps())
if err != nil {
Expand Down Expand Up @@ -1427,7 +1427,7 @@ func (node *QueryNode) DeleteBatch(ctx context.Context, req *querypb.DeleteBatch
log.Warn("Delete batch find missing ids", zap.Int64s("missing_ids", missingIDs.Collect()))
}

pks := storage.ParseIDs2PrimaryKeys(req.GetPrimaryKeys())
pks := storage.ParseIDs2PrimaryKeysBatch(req.GetPrimaryKeys())

// control the execution batch parallel with P number
// maybe it shall be lower in case of heavy CPU usage may impacting search/query
Expand Down
19 changes: 19 additions & 0 deletions internal/storage/primary_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,25 @@ func ParseIDs2PrimaryKeys(ids *schemapb.IDs) []PrimaryKey {
return ret
}

func ParseIDs2PrimaryKeysBatch(ids *schemapb.IDs) PrimaryKeys {
var result PrimaryKeys
switch ids.IdField.(type) {
case *schemapb.IDs_IntId:
int64Pks := ids.GetIntId().GetData()
pks := NewInt64PrimaryKeys(int64(len(int64Pks)))
pks.AppendRaw(int64Pks...)
result = pks
case *schemapb.IDs_StrId:
stringPks := ids.GetStrId().GetData()
pks := NewVarcharPrimaryKeys(int64(len(stringPks)))
pks.AppendRaw(stringPks...)
result = pks
default:
panic(fmt.Sprintf("unexpected schema id field type %T", ids.IdField))
}
return result
}

func ParsePrimaryKeysBatch2IDs(pks PrimaryKeys) (*schemapb.IDs, error) {
ret := &schemapb.IDs{}
if pks.Len() == 0 {
Expand Down

0 comments on commit be7b8ad

Please sign in to comment.