diff --git a/configs/milvus.yaml b/configs/milvus.yaml index cd9db3944bc12..808c1c5a3af7c 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -456,6 +456,7 @@ queryNode: enableCrossUserGrouping: false # Enable Cross user grouping when using user-task-polling policy. (Disable it if user's task can not merge each other) maxPendingTaskPerUser: 1024 # Max pending task per user in scheduler levelZeroForwardPolicy: FilterByBF # delegator level zero deletion forward policy, possible option["FilterByBF", "RemoteLoad"] + streamingDeltaForwardPolicy: FilterByBF # delegator streaming deletion forward policy, possible option["FilterByBF", "Direct"] dataSync: flowGraph: maxQueueLength: 16 # The maximum size of task queue cache in flow graph in query node. diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index e2a04e3792e74..3d6db44629c4f 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -203,85 +203,10 @@ func (sd *shardDelegator) ProcessDelete(deleteData []*DeleteData, ts uint64) { Data: cacheItems, }) - start := time.Now() - retMap := sd.applyBFInParallel(deleteData, segments.GetBFApplyPool()) - // segment => delete data - delRecords := make(map[int64]DeleteData) - retMap.Range(func(key int, value *BatchApplyRet) bool { - startIdx := value.StartIdx - pk2SegmentIDs := value.Segment2Hits - - pks := deleteData[value.DeleteDataIdx].PrimaryKeys - tss := deleteData[value.DeleteDataIdx].Timestamps - - for segmentID, hits := range pk2SegmentIDs { - for i, hit := range hits { - if hit { - delRecord := delRecords[segmentID] - delRecord.PrimaryKeys = append(delRecord.PrimaryKeys, pks[startIdx+i]) - delRecord.Timestamps = append(delRecord.Timestamps, tss[startIdx+i]) - delRecord.RowCount++ - delRecords[segmentID] = delRecord - } - } - } - return true - }) - bfCost := time.Since(start) - - offlineSegments := typeutil.NewConcurrentSet[int64]() - - sealed, growing, version := sd.distribution.PinOnlineSegments() - - start = time.Now() - eg, ctx := errgroup.WithContext(context.Background()) - for _, entry := range sealed { - entry := entry - eg.Go(func() error { - worker, err := sd.workerManager.GetWorker(ctx, entry.NodeID) - if err != nil { - log.Warn("failed to get worker", - zap.Int64("nodeID", paramtable.GetNodeID()), - zap.Error(err), - ) - // skip if node down - // delete will be processed after loaded again - return nil - } - offlineSegments.Upsert(sd.applyDelete(ctx, entry.NodeID, worker, delRecords, entry.Segments, querypb.DataScope_Historical)...) - return nil - }) - } - if len(growing) > 0 { - eg.Go(func() error { - worker, err := sd.workerManager.GetWorker(ctx, paramtable.GetNodeID()) - if err != nil { - log.Error("failed to get worker(local)", - zap.Int64("nodeID", paramtable.GetNodeID()), - zap.Error(err), - ) - // panic here, local worker shall not have error - panic(err) - } - offlineSegments.Upsert(sd.applyDelete(ctx, paramtable.GetNodeID(), worker, delRecords, growing, querypb.DataScope_Streaming)...) - return nil - }) - } - // not error return in apply delete - _ = eg.Wait() - forwardDeleteCost := time.Since(start) - - sd.distribution.Unpin(version) - offlineSegIDs := offlineSegments.Collect() - if len(offlineSegIDs) > 0 { - log.Warn("failed to apply delete, mark segment offline", zap.Int64s("offlineSegments", offlineSegIDs)) - sd.markSegmentOffline(offlineSegIDs...) - } + sd.forwardStreamingDeletion(context.Background(), deleteData) metrics.QueryNodeProcessCost.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.DeleteLabel). Observe(float64(tr.ElapseSpan().Milliseconds())) - metrics.QueryNodeApplyBFCost.WithLabelValues("ProcessDelete", fmt.Sprint(paramtable.GetNodeID())).Observe(float64(bfCost.Milliseconds())) - metrics.QueryNodeForwardDeleteCost.WithLabelValues("ProcessDelete", fmt.Sprint(paramtable.GetNodeID())).Observe(float64(forwardDeleteCost.Milliseconds())) } type BatchApplyRet = struct { @@ -327,7 +252,13 @@ func (sd *shardDelegator) applyBFInParallel(deleteDatas []*DeleteData, pool *con } // applyDelete handles delete record and apply them to corresponding workers. -func (sd *shardDelegator) applyDelete(ctx context.Context, nodeID int64, worker cluster.Worker, delRecords map[int64]DeleteData, entries []SegmentEntry, scope querypb.DataScope) []int64 { +func (sd *shardDelegator) applyDelete(ctx context.Context, + nodeID int64, + worker cluster.Worker, + delRecords func(segmentID int64) (DeleteData, bool), + entries []SegmentEntry, + scope querypb.DataScope, +) []int64 { offlineSegments := typeutil.NewConcurrentSet[int64]() log := sd.getLogger(ctx) @@ -340,7 +271,7 @@ func (sd *shardDelegator) applyDelete(ctx context.Context, nodeID int64, worker var futures []*conc.Future[struct{}] for _, segmentEntry := range entries { segmentEntry := segmentEntry - delRecord, ok := delRecords[segmentEntry.SegmentID] + delRecord, ok := delRecords(segmentEntry.SegmentID) log := log.With( zap.Int64("segmentID", segmentEntry.SegmentID), zap.Int64("workerID", nodeID), diff --git a/internal/querynodev2/delegator/delegator_delta_forward.go b/internal/querynodev2/delegator/delegator_delta_forward.go deleted file mode 100644 index 00d4b6dce96c7..0000000000000 --- a/internal/querynodev2/delegator/delegator_delta_forward.go +++ /dev/null @@ -1,146 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package delegator - -import ( - "context" - - "go.uber.org/zap" - - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/internal/proto/querypb" - "github.com/milvus-io/milvus/internal/querynodev2/cluster" - "github.com/milvus-io/milvus/internal/querynodev2/pkoracle" - "github.com/milvus-io/milvus/internal/querynodev2/segments" - "github.com/milvus-io/milvus/internal/storage" - "github.com/milvus-io/milvus/pkg/common" - "github.com/milvus-io/milvus/pkg/log" - "github.com/milvus-io/milvus/pkg/util/commonpbutil" - "github.com/milvus-io/milvus/pkg/util/merr" - "github.com/milvus-io/milvus/pkg/util/paramtable" - "github.com/milvus-io/milvus/pkg/util/typeutil" -) - -const ( - L0ForwardPolicyDefault = `` - L0ForwardPolicyBF = `FilterByBF` - L0ForwardPolicyRemoteLoad = `RemoteLoad` -) - -func (sd *shardDelegator) forwardL0Deletion(ctx context.Context, - info *querypb.SegmentLoadInfo, - req *querypb.LoadSegmentsRequest, - candidate *pkoracle.BloomFilterSet, - targetNodeID int64, - worker cluster.Worker, -) error { - switch policy := paramtable.Get().QueryNodeCfg.LevelZeroForwardPolicy.GetValue(); policy { - case L0ForwardPolicyDefault, L0ForwardPolicyBF: - return sd.forwardL0ByBF(ctx, info, candidate, targetNodeID, worker) - case L0ForwardPolicyRemoteLoad: - return sd.forwardL0RemoteLoad(ctx, info, req, targetNodeID, worker) - default: - return merr.WrapErrServiceInternal("Unknown l0 forward policy: %s", policy) - } -} - -func (sd *shardDelegator) forwardL0ByBF(ctx context.Context, - info *querypb.SegmentLoadInfo, - candidate *pkoracle.BloomFilterSet, - targetNodeID int64, - worker cluster.Worker, -) error { - // after L0 segment feature - // growing segemnts should have load stream delete as well - deleteScope := querypb.DataScope_All - switch candidate.Type() { - case commonpb.SegmentState_Sealed: - deleteScope = querypb.DataScope_Historical - case commonpb.SegmentState_Growing: - deleteScope = querypb.DataScope_Streaming - } - - deletedPks, deletedTss := sd.GetLevel0Deletions(candidate.Partition(), candidate) - deleteData := &storage.DeleteData{} - deleteData.AppendBatch(deletedPks, deletedTss) - if deleteData.RowCount > 0 { - log.Info("forward L0 delete to worker...", - zap.Int64("deleteRowNum", deleteData.RowCount), - ) - err := worker.Delete(ctx, &querypb.DeleteRequest{ - Base: commonpbutil.NewMsgBase(commonpbutil.WithTargetID(targetNodeID)), - CollectionId: info.GetCollectionID(), - PartitionId: info.GetPartitionID(), - SegmentId: info.GetSegmentID(), - PrimaryKeys: storage.ParsePrimaryKeys2IDs(deleteData.Pks), - Timestamps: deleteData.Tss, - Scope: deleteScope, - }) - if err != nil { - log.Warn("failed to apply delete when LoadSegment", zap.Error(err)) - return err - } - } - return nil -} - -func (sd *shardDelegator) forwardL0RemoteLoad(ctx context.Context, - info *querypb.SegmentLoadInfo, - req *querypb.LoadSegmentsRequest, - targetNodeID int64, - worker cluster.Worker, -) error { - info = typeutil.Clone(info) - // load l0 segment deltalogs - info.Deltalogs = sd.getLevel0Deltalogs(info.GetPartitionID()) - - return worker.LoadSegments(ctx, &querypb.LoadSegmentsRequest{ - Base: &commonpb.MsgBase{ - TargetID: targetNodeID, - }, - DstNodeID: targetNodeID, - Infos: []*querypb.SegmentLoadInfo{ - info, - }, - CollectionID: info.GetCollectionID(), - LoadScope: querypb.LoadScope_Delta, - Schema: req.GetSchema(), - IndexInfoList: req.GetIndexInfoList(), - }) -} - -func (sd *shardDelegator) getLevel0Deltalogs(partitionID int64) []*datapb.FieldBinlog { - sd.level0Mut.Lock() - defer sd.level0Mut.Unlock() - - level0Segments := sd.segmentManager.GetBy( - segments.WithLevel(datapb.SegmentLevel_L0), - segments.WithChannel(sd.vchannelName)) - - var deltalogs []*datapb.FieldBinlog - - for _, segment := range level0Segments { - if segment.Partition() != common.AllPartitionsID && segment.Partition() != partitionID { - continue - } - segment := segment.(*segments.L0Segment) - deltalogs = append(deltalogs, segment.LoadInfo().GetDeltalogs()...) - } - - return deltalogs -} diff --git a/internal/querynodev2/delegator/delta_forward.go b/internal/querynodev2/delegator/delta_forward.go new file mode 100644 index 0000000000000..06ede0c82fa36 --- /dev/null +++ b/internal/querynodev2/delegator/delta_forward.go @@ -0,0 +1,334 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package delegator + +import ( + "context" + "fmt" + "time" + + "github.com/samber/lo" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/querynodev2/cluster" + "github.com/milvus-io/milvus/internal/querynodev2/pkoracle" + "github.com/milvus-io/milvus/internal/querynodev2/segments" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/commonpbutil" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +const ( + ForwardPolicyDefault = `` + L0ForwardPolicyBF = `FilterByBF` + L0ForwardPolicyRemoteLoad = `RemoteLoad` + StreamingForwardPolicyBF = `FilterByBF` + StreamingForwardPolicyDirect = `Direct` +) + +func (sd *shardDelegator) forwardL0Deletion(ctx context.Context, + info *querypb.SegmentLoadInfo, + req *querypb.LoadSegmentsRequest, + candidate *pkoracle.BloomFilterSet, + targetNodeID int64, + worker cluster.Worker, +) error { + switch policy := paramtable.Get().QueryNodeCfg.LevelZeroForwardPolicy.GetValue(); policy { + case ForwardPolicyDefault, L0ForwardPolicyBF: + return sd.forwardL0ByBF(ctx, info, candidate, targetNodeID, worker) + case L0ForwardPolicyRemoteLoad: + return sd.forwardL0RemoteLoad(ctx, info, req, targetNodeID, worker) + default: + return merr.WrapErrServiceInternal("Unknown l0 forward policy: %s", policy) + } +} + +func (sd *shardDelegator) forwardStreamingDeletion(ctx context.Context, deleteData []*DeleteData) { + // TODO add `auto` policy + // using direct when streaming size is too large + // need some experimental data to support this policy + switch policy := paramtable.Get().QueryNodeCfg.StreamingDeltaForwardPolicy.GetValue(); policy { + case ForwardPolicyDefault, StreamingForwardPolicyBF: + sd.forwardStreamingByBF(ctx, deleteData) + case StreamingForwardPolicyDirect: + // forward streaming deletion without bf filtering + sd.forwardStreamingDirect(ctx, deleteData) + default: + log.Fatal("unsupported streaming forward policy", zap.String("policy", policy)) + } +} + +func (sd *shardDelegator) forwardL0ByBF(ctx context.Context, + info *querypb.SegmentLoadInfo, + candidate *pkoracle.BloomFilterSet, + targetNodeID int64, + worker cluster.Worker, +) error { + // after L0 segment feature + // growing segemnts should have load stream delete as well + deleteScope := querypb.DataScope_All + switch candidate.Type() { + case commonpb.SegmentState_Sealed: + deleteScope = querypb.DataScope_Historical + case commonpb.SegmentState_Growing: + deleteScope = querypb.DataScope_Streaming + } + + deletedPks, deletedTss := sd.GetLevel0Deletions(candidate.Partition(), candidate) + deleteData := &storage.DeleteData{} + deleteData.AppendBatch(deletedPks, deletedTss) + if deleteData.RowCount > 0 { + log.Info("forward L0 delete to worker...", + zap.Int64("deleteRowNum", deleteData.RowCount), + ) + err := worker.Delete(ctx, &querypb.DeleteRequest{ + Base: commonpbutil.NewMsgBase(commonpbutil.WithTargetID(targetNodeID)), + CollectionId: info.GetCollectionID(), + PartitionId: info.GetPartitionID(), + SegmentId: info.GetSegmentID(), + PrimaryKeys: storage.ParsePrimaryKeys2IDs(deleteData.Pks), + Timestamps: deleteData.Tss, + Scope: deleteScope, + }) + if err != nil { + log.Warn("failed to apply delete when LoadSegment", zap.Error(err)) + return err + } + } + return nil +} + +func (sd *shardDelegator) forwardL0RemoteLoad(ctx context.Context, + info *querypb.SegmentLoadInfo, + req *querypb.LoadSegmentsRequest, + targetNodeID int64, + worker cluster.Worker, +) error { + info = typeutil.Clone(info) + // load l0 segment deltalogs + info.Deltalogs = sd.getLevel0Deltalogs(info.GetPartitionID()) + + return worker.LoadSegments(ctx, &querypb.LoadSegmentsRequest{ + Base: &commonpb.MsgBase{ + TargetID: targetNodeID, + }, + DstNodeID: targetNodeID, + Infos: []*querypb.SegmentLoadInfo{ + info, + }, + CollectionID: info.GetCollectionID(), + LoadScope: querypb.LoadScope_Delta, + Schema: req.GetSchema(), + IndexInfoList: req.GetIndexInfoList(), + }) +} + +func (sd *shardDelegator) getLevel0Deltalogs(partitionID int64) []*datapb.FieldBinlog { + sd.level0Mut.Lock() + defer sd.level0Mut.Unlock() + + level0Segments := sd.segmentManager.GetBy( + segments.WithLevel(datapb.SegmentLevel_L0), + segments.WithChannel(sd.vchannelName)) + + var deltalogs []*datapb.FieldBinlog + + for _, segment := range level0Segments { + if segment.Partition() != common.AllPartitionsID && segment.Partition() != partitionID { + continue + } + segment := segment.(*segments.L0Segment) + deltalogs = append(deltalogs, segment.LoadInfo().GetDeltalogs()...) + } + + return deltalogs +} + +func (sd *shardDelegator) forwardStreamingByBF(ctx context.Context, deleteData []*DeleteData) { + start := time.Now() + retMap := sd.applyBFInParallel(deleteData, segments.GetBFApplyPool()) + // segment => delete data + delRecords := make(map[int64]DeleteData) + retMap.Range(func(key int, value *BatchApplyRet) bool { + startIdx := value.StartIdx + pk2SegmentIDs := value.Segment2Hits + + pks := deleteData[value.DeleteDataIdx].PrimaryKeys + tss := deleteData[value.DeleteDataIdx].Timestamps + + for segmentID, hits := range pk2SegmentIDs { + for i, hit := range hits { + if hit { + delRecord := delRecords[segmentID] + delRecord.PrimaryKeys = append(delRecord.PrimaryKeys, pks[startIdx+i]) + delRecord.Timestamps = append(delRecord.Timestamps, tss[startIdx+i]) + delRecord.RowCount++ + delRecords[segmentID] = delRecord + } + } + } + return true + }) + bfCost := time.Since(start) + + offlineSegments := typeutil.NewConcurrentSet[int64]() + + sealed, growing, version := sd.distribution.PinOnlineSegments() + + start = time.Now() + eg, ctx := errgroup.WithContext(context.Background()) + for _, entry := range sealed { + entry := entry + eg.Go(func() error { + worker, err := sd.workerManager.GetWorker(ctx, entry.NodeID) + if err != nil { + log.Warn("failed to get worker", + zap.Int64("nodeID", paramtable.GetNodeID()), + zap.Error(err), + ) + // skip if node down + // delete will be processed after loaded again + return nil + } + offlineSegments.Upsert(sd.applyDelete(ctx, entry.NodeID, worker, func(segmentID int64) (DeleteData, bool) { + data, ok := delRecords[segmentID] + return data, ok + }, entry.Segments, querypb.DataScope_Historical)...) + return nil + }) + } + if len(growing) > 0 { + eg.Go(func() error { + worker, err := sd.workerManager.GetWorker(ctx, paramtable.GetNodeID()) + if err != nil { + log.Error("failed to get worker(local)", + zap.Int64("nodeID", paramtable.GetNodeID()), + zap.Error(err), + ) + // panic here, local worker shall not have error + panic(err) + } + offlineSegments.Upsert(sd.applyDelete(ctx, paramtable.GetNodeID(), worker, func(segmentID int64) (DeleteData, bool) { + data, ok := delRecords[segmentID] + return data, ok + }, growing, querypb.DataScope_Streaming)...) + return nil + }) + } + // not error return in apply delete + _ = eg.Wait() + forwardDeleteCost := time.Since(start) + + sd.distribution.Unpin(version) + offlineSegIDs := offlineSegments.Collect() + if len(offlineSegIDs) > 0 { + log.Warn("failed to apply delete, mark segment offline", zap.Int64s("offlineSegments", offlineSegIDs)) + sd.markSegmentOffline(offlineSegIDs...) + } + + metrics.QueryNodeApplyBFCost.WithLabelValues("ProcessDelete", fmt.Sprint(paramtable.GetNodeID())).Observe(float64(bfCost.Milliseconds())) + metrics.QueryNodeForwardDeleteCost.WithLabelValues("ProcessDelete", fmt.Sprint(paramtable.GetNodeID())).Observe(float64(forwardDeleteCost.Milliseconds())) +} + +func (sd *shardDelegator) forwardStreamingDirect(ctx context.Context, deleteData []*DeleteData) { + start := time.Now() + + // group by partition id + groups := lo.GroupBy(deleteData, func(delData *DeleteData) int64 { + return delData.PartitionID + }) + + offlineSegments := typeutil.NewConcurrentSet[int64]() + eg, ctx := errgroup.WithContext(ctx) + + for partitionID, group := range groups { + partitionID := partitionID + group := group + eg.Go(func() error { + partitions := []int64{partitionID} + // check if all partitions + if partitionID == common.AllPartitionsID { + partitions = []int64{} + } + sealed, growing, version := sd.distribution.PinOnlineSegments(partitions...) + defer sd.distribution.Unpin(version) + + for _, item := range group { + deleteData := *item + for _, entry := range sealed { + entry := entry + worker, err := sd.workerManager.GetWorker(ctx, entry.NodeID) + if err != nil { + log.Warn("failed to get worker", + zap.Int64("nodeID", paramtable.GetNodeID()), + zap.Error(err), + ) + // skip if node down + // delete will be processed after loaded again + continue + } + + eg.Go(func() error { + offlineSegments.Upsert(sd.applyDelete(ctx, entry.NodeID, worker, func(segmentID int64) (DeleteData, bool) { + return deleteData, true + }, entry.Segments, querypb.DataScope_Historical)...) + return nil + }) + } + + if len(growing) > 0 { + worker, err := sd.workerManager.GetWorker(ctx, paramtable.GetNodeID()) + if err != nil { + log.Error("failed to get worker(local)", + zap.Int64("nodeID", paramtable.GetNodeID()), + zap.Error(err), + ) + // panic here, local worker shall not have error + panic(err) + } + eg.Go(func() error { + offlineSegments.Upsert(sd.applyDelete(ctx, paramtable.GetNodeID(), worker, func(segmentID int64) (DeleteData, bool) { + return deleteData, true + }, growing, querypb.DataScope_Streaming)...) + return nil + }) + } + } + return nil + }) + } + // not error return in apply delete + _ = eg.Wait() + forwardDeleteCost := time.Since(start) + + offlineSegIDs := offlineSegments.Collect() + if len(offlineSegIDs) > 0 { + log.Warn("failed to apply delete, mark segment offline", zap.Int64s("offlineSegments", offlineSegIDs)) + sd.markSegmentOffline(offlineSegIDs...) + } + + metrics.QueryNodeForwardDeleteCost.WithLabelValues("ProcessDelete", fmt.Sprint(paramtable.GetNodeID())).Observe(float64(forwardDeleteCost.Milliseconds())) +} diff --git a/internal/querynodev2/delegator/delta_forward_test.go b/internal/querynodev2/delegator/delta_forward_test.go new file mode 100644 index 0000000000000..9a53fbb7c250c --- /dev/null +++ b/internal/querynodev2/delegator/delta_forward_test.go @@ -0,0 +1,267 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package delegator + +import ( + "context" + "testing" + + "github.com/samber/lo" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/proto/segcorepb" + "github.com/milvus-io/milvus/internal/querynodev2/cluster" + "github.com/milvus-io/milvus/internal/querynodev2/pkoracle" + "github.com/milvus-io/milvus/internal/querynodev2/segments" + "github.com/milvus-io/milvus/internal/querynodev2/tsafe" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/mq/msgstream" + "github.com/milvus-io/milvus/pkg/util/metric" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +type StreamingForwardSuite struct { + suite.Suite + + collectionID int64 + partitionIDs []int64 + replicaID int64 + vchannelName string + version int64 + workerManager *cluster.MockManager + manager *segments.Manager + tsafeManager tsafe.Manager + loader *segments.MockLoader + mq *msgstream.MockMsgStream + + delegator *shardDelegator + chunkManager storage.ChunkManager + rootPath string +} + +func (s *StreamingForwardSuite) SetupSuite() { + paramtable.Init() + paramtable.SetNodeID(1) +} + +func (s *StreamingForwardSuite) SetupTest() { + s.collectionID = 1000 + s.partitionIDs = []int64{500, 501} + s.replicaID = 65535 + s.vchannelName = "rootcoord-dml_1000_v0" + s.version = 2000 + s.workerManager = &cluster.MockManager{} + s.manager = segments.NewManager() + s.tsafeManager = tsafe.NewTSafeReplica() + s.loader = &segments.MockLoader{} + s.loader.EXPECT(). + Load(mock.Anything, s.collectionID, segments.SegmentTypeGrowing, int64(0), mock.Anything). + Call.Return(func(ctx context.Context, collectionID int64, segmentType segments.SegmentType, version int64, infos ...*querypb.SegmentLoadInfo) []segments.Segment { + return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) segments.Segment { + ms := &segments.MockSegment{} + ms.EXPECT().ID().Return(info.GetSegmentID()) + ms.EXPECT().Type().Return(segments.SegmentTypeGrowing) + ms.EXPECT().Partition().Return(info.GetPartitionID()) + ms.EXPECT().Collection().Return(info.GetCollectionID()) + ms.EXPECT().Indexes().Return(nil) + ms.EXPECT().RowNum().Return(info.GetNumOfRows()) + ms.EXPECT().Delete(mock.Anything, mock.Anything, mock.Anything).Return(nil) + return ms + }) + }, nil) + + // init schema + s.manager.Collection.PutOrRef(s.collectionID, &schemapb.CollectionSchema{ + Name: "TestCollection", + Fields: []*schemapb.FieldSchema{ + { + Name: "id", + FieldID: 100, + IsPrimaryKey: true, + DataType: schemapb.DataType_Int64, + AutoID: true, + }, + { + Name: "vector", + FieldID: 101, + IsPrimaryKey: false, + DataType: schemapb.DataType_BinaryVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "128", + }, + }, + }, + }, + }, &segcorepb.CollectionIndexMeta{ + MaxIndexRowCount: 100, + IndexMetas: []*segcorepb.FieldIndexMeta{ + { + FieldID: 101, + CollectionID: s.collectionID, + IndexName: "binary_index", + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "128", + }, + }, + IndexParams: []*commonpb.KeyValuePair{ + { + Key: common.IndexTypeKey, + Value: "BIN_IVF_FLAT", + }, + { + Key: common.MetricTypeKey, + Value: metric.JACCARD, + }, + }, + }, + }, + }, &querypb.LoadMetaInfo{ + PartitionIDs: s.partitionIDs, + }) + + s.mq = &msgstream.MockMsgStream{} + s.rootPath = "delegator_test" + + // init chunkManager + chunkManagerFactory := storage.NewTestChunkManagerFactory(paramtable.Get(), s.rootPath) + s.chunkManager, _ = chunkManagerFactory.NewPersistentStorageChunkManager(context.Background()) + + delegator, err := NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.tsafeManager, s.loader, &msgstream.MockMqFactory{ + NewMsgStreamFunc: func(_ context.Context) (msgstream.MsgStream, error) { + return s.mq, nil + }, + }, 10000, nil, s.chunkManager) + s.Require().NoError(err) + + sd, ok := delegator.(*shardDelegator) + s.Require().True(ok) + s.delegator = sd +} + +func (s *StreamingForwardSuite) TestBFStreamingForward() { + paramtable.Get().Save(paramtable.Get().QueryNodeCfg.StreamingDeltaForwardPolicy.Key, StreamingForwardPolicyBF) + defer paramtable.Get().Reset(paramtable.Get().QueryNodeCfg.StreamingDeltaForwardPolicy.Key) + + delegator := s.delegator + + // Setup distribution + delegator.distribution.AddGrowing(SegmentEntry{ + NodeID: 1, + SegmentID: 100, + }) + delegator.distribution.AddDistributions(SegmentEntry{ + NodeID: 1, + SegmentID: 101, + }) + delegator.distribution.AddDistributions(SegmentEntry{ + NodeID: 1, + SegmentID: 102, + }) + delegator.distribution.SyncTargetVersion(1, []int64{100}, []int64{101, 102}, nil) + + // Setup pk oracle + // empty bfs will not match + delegator.pkOracle.Register(pkoracle.NewBloomFilterSet(100, 10, commonpb.SegmentState_Growing), 1) + delegator.pkOracle.Register(pkoracle.NewBloomFilterSet(102, 10, commonpb.SegmentState_Sealed), 1) + // candidate key alway match + delegator.pkOracle.Register(pkoracle.NewCandidateKey(101, 10, commonpb.SegmentState_Sealed), 1) + + deletedSegment := typeutil.NewConcurrentSet[int64]() + mockWorker := cluster.NewMockWorker(s.T()) + s.workerManager.EXPECT().GetWorker(mock.Anything, int64(1)).Return(mockWorker, nil) + mockWorker.EXPECT().Delete(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, dr *querypb.DeleteRequest) error { + s.T().Log(dr.GetSegmentId()) + deletedSegment.Insert(dr.SegmentId) + s.ElementsMatch([]int64{10}, dr.GetPrimaryKeys().GetIntId().GetData()) + s.ElementsMatch([]uint64{10}, dr.GetTimestamps()) + return nil + }).Maybe() + + delegator.ProcessDelete([]*DeleteData{ + { + PartitionID: -1, + PrimaryKeys: []storage.PrimaryKey{storage.NewInt64PrimaryKey(10)}, + Timestamps: []uint64{10}, + RowCount: 1, + }, + }, 100) + s.ElementsMatch([]int64{101}, deletedSegment.Collect()) +} + +func (s *StreamingForwardSuite) TestDirectStreamingForward() { + paramtable.Get().Save(paramtable.Get().QueryNodeCfg.StreamingDeltaForwardPolicy.Key, StreamingForwardPolicyDirect) + defer paramtable.Get().Reset(paramtable.Get().QueryNodeCfg.StreamingDeltaForwardPolicy.Key) + + delegator := s.delegator + + // Setup distribution + delegator.distribution.AddGrowing(SegmentEntry{ + NodeID: 1, + SegmentID: 100, + }) + delegator.distribution.AddDistributions(SegmentEntry{ + NodeID: 1, + SegmentID: 101, + }) + delegator.distribution.AddDistributions(SegmentEntry{ + NodeID: 1, + SegmentID: 102, + }) + delegator.distribution.SyncTargetVersion(1, []int64{100}, []int64{101, 102}, nil) + + // Setup pk oracle + // empty bfs will not match + delegator.pkOracle.Register(pkoracle.NewBloomFilterSet(100, 10, commonpb.SegmentState_Growing), 1) + delegator.pkOracle.Register(pkoracle.NewBloomFilterSet(102, 10, commonpb.SegmentState_Sealed), 1) + // candidate key alway match + delegator.pkOracle.Register(pkoracle.NewCandidateKey(101, 10, commonpb.SegmentState_Sealed), 1) + + deletedSegment := typeutil.NewConcurrentSet[int64]() + mockWorker := cluster.NewMockWorker(s.T()) + s.workerManager.EXPECT().GetWorker(mock.Anything, int64(1)).Return(mockWorker, nil) + mockWorker.EXPECT().Delete(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, dr *querypb.DeleteRequest) error { + s.T().Log(dr.GetSegmentId()) + deletedSegment.Insert(dr.SegmentId) + s.ElementsMatch([]int64{10}, dr.GetPrimaryKeys().GetIntId().GetData()) + s.ElementsMatch([]uint64{10}, dr.GetTimestamps()) + return nil + }) + + delegator.ProcessDelete([]*DeleteData{ + { + PartitionID: -1, + PrimaryKeys: []storage.PrimaryKey{storage.NewInt64PrimaryKey(10)}, + Timestamps: []uint64{10}, + RowCount: 1, + }, + }, 100) + s.ElementsMatch([]int64{100, 101, 102}, deletedSegment.Collect()) +} + +func TestStreamingForward(t *testing.T) { + suite.Run(t, new(StreamingForwardSuite)) +} diff --git a/internal/querynodev2/delegator/ScalarPruner.go b/internal/querynodev2/delegator/scalar_pruner.go similarity index 100% rename from internal/querynodev2/delegator/ScalarPruner.go rename to internal/querynodev2/delegator/scalar_pruner.go diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 80695e460c9fb..0ab67fd9f5671 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2394,8 +2394,9 @@ type queryNodeConfig struct { MaxSegmentDeleteBuffer ParamItem `refreshable:"false"` DeleteBufferBlockSize ParamItem `refreshable:"false"` - // level zero - LevelZeroForwardPolicy ParamItem `refreshable:"true"` + // delta forward + LevelZeroForwardPolicy ParamItem `refreshable:"true"` + StreamingDeltaForwardPolicy ParamItem `refreshable:"true"` // loader IoPoolSize ParamItem `refreshable:"false"` @@ -3014,6 +3015,15 @@ Max read concurrency must greater than or equal to 1, and less than or equal to } p.LevelZeroForwardPolicy.Init(base.mgr) + p.StreamingDeltaForwardPolicy = ParamItem{ + Key: "queryNode.streamingDeltaForwardPolicy", + Version: "2.4.12", + Doc: "delegator streaming deletion forward policy, possible option[\"FilterByBF\", \"Direct\"]", + DefaultValue: "FilterByBF", + Export: true, + } + p.StreamingDeltaForwardPolicy.Init(base.mgr) + p.IoPoolSize = ParamItem{ Key: "queryNode.ioPoolSize", Version: "2.3.0",