diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 920b02e80dc01..37f9fc050d952 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -452,6 +452,7 @@ queryNode: taskQueueExpire: 60 # Control how long (many seconds) that queue retains since queue is empty enableCrossUserGrouping: false # Enable Cross user grouping when using user-task-polling policy. (Disable it if user's task can not merge each other) maxPendingTaskPerUser: 1024 # Max pending task per user in scheduler + levelZeroForwardPolicy: FilterByBF # delegator level zero deletion forward policy, possible option["FilterByBF", "RemoteLoad"] dataSync: flowGraph: maxQueueLength: 16 # The maximum size of task queue cache in flow graph in query node. diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index 242791de0dbcb..d10dfe14d582c 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -541,7 +541,7 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg } log.Debug("load delete...") - err = sd.loadStreamDelete(ctx, candidates, infos, req.GetDeltaPositions(), targetNodeID, worker, entries) + err = sd.loadStreamDelete(ctx, candidates, infos, req, targetNodeID, worker) if err != nil { log.Warn("load stream delete failed", zap.Error(err)) return err @@ -615,16 +615,16 @@ func (sd *shardDelegator) RefreshLevel0DeletionStats() { func (sd *shardDelegator) loadStreamDelete(ctx context.Context, candidates []*pkoracle.BloomFilterSet, infos []*querypb.SegmentLoadInfo, - deltaPositions []*msgpb.MsgPosition, + req *querypb.LoadSegmentsRequest, targetNodeID int64, worker cluster.Worker, - entries []SegmentEntry, ) error { log := sd.getLogger(ctx) idCandidates := lo.SliceToMap(candidates, func(candidate *pkoracle.BloomFilterSet) (int64, *pkoracle.BloomFilterSet) { return candidate.ID(), candidate }) + deltaPositions := req.GetDeltaPositions() sd.deleteMut.RLock() defer sd.deleteMut.RUnlock() @@ -655,29 +655,13 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context, deleteScope = querypb.DataScope_Streaming } - deletedPks, deletedTss := sd.GetLevel0Deletions(candidate.Partition(), candidate) - deleteData := &storage.DeleteData{} - deleteData.AppendBatch(deletedPks, deletedTss) - if deleteData.RowCount > 0 { - log.Info("forward L0 delete to worker...", - zap.Int64("deleteRowNum", deleteData.RowCount), - ) - err := worker.Delete(ctx, &querypb.DeleteRequest{ - Base: commonpbutil.NewMsgBase(commonpbutil.WithTargetID(targetNodeID)), - CollectionId: info.GetCollectionID(), - PartitionId: info.GetPartitionID(), - SegmentId: info.GetSegmentID(), - PrimaryKeys: storage.ParsePrimaryKeys2IDs(deleteData.Pks), - Timestamps: deleteData.Tss, - Scope: deleteScope, - }) - if err != nil { - log.Warn("failed to apply delete when LoadSegment", zap.Error(err)) - return err - } + // forward l0 deletion + err := sd.forwardL0Deletion(ctx, info, req, candidate, targetNodeID, worker) + if err != nil { + return err } - deleteData = &storage.DeleteData{} + deleteData := &storage.DeleteData{} // start position is dml position for segment // if this position is before deleteBuffer's safe ts, it means some delete shall be read from msgstream if position.GetTimestamp() < sd.deleteBuffer.SafeTs() { diff --git a/internal/querynodev2/delegator/delegator_delta_forward.go b/internal/querynodev2/delegator/delegator_delta_forward.go new file mode 100644 index 0000000000000..00d4b6dce96c7 --- /dev/null +++ b/internal/querynodev2/delegator/delegator_delta_forward.go @@ -0,0 +1,146 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package delegator + +import ( + "context" + + "go.uber.org/zap" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/querynodev2/cluster" + "github.com/milvus-io/milvus/internal/querynodev2/pkoracle" + "github.com/milvus-io/milvus/internal/querynodev2/segments" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/commonpbutil" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +const ( + L0ForwardPolicyDefault = `` + L0ForwardPolicyBF = `FilterByBF` + L0ForwardPolicyRemoteLoad = `RemoteLoad` +) + +func (sd *shardDelegator) forwardL0Deletion(ctx context.Context, + info *querypb.SegmentLoadInfo, + req *querypb.LoadSegmentsRequest, + candidate *pkoracle.BloomFilterSet, + targetNodeID int64, + worker cluster.Worker, +) error { + switch policy := paramtable.Get().QueryNodeCfg.LevelZeroForwardPolicy.GetValue(); policy { + case L0ForwardPolicyDefault, L0ForwardPolicyBF: + return sd.forwardL0ByBF(ctx, info, candidate, targetNodeID, worker) + case L0ForwardPolicyRemoteLoad: + return sd.forwardL0RemoteLoad(ctx, info, req, targetNodeID, worker) + default: + return merr.WrapErrServiceInternal("Unknown l0 forward policy: %s", policy) + } +} + +func (sd *shardDelegator) forwardL0ByBF(ctx context.Context, + info *querypb.SegmentLoadInfo, + candidate *pkoracle.BloomFilterSet, + targetNodeID int64, + worker cluster.Worker, +) error { + // after L0 segment feature + // growing segemnts should have load stream delete as well + deleteScope := querypb.DataScope_All + switch candidate.Type() { + case commonpb.SegmentState_Sealed: + deleteScope = querypb.DataScope_Historical + case commonpb.SegmentState_Growing: + deleteScope = querypb.DataScope_Streaming + } + + deletedPks, deletedTss := sd.GetLevel0Deletions(candidate.Partition(), candidate) + deleteData := &storage.DeleteData{} + deleteData.AppendBatch(deletedPks, deletedTss) + if deleteData.RowCount > 0 { + log.Info("forward L0 delete to worker...", + zap.Int64("deleteRowNum", deleteData.RowCount), + ) + err := worker.Delete(ctx, &querypb.DeleteRequest{ + Base: commonpbutil.NewMsgBase(commonpbutil.WithTargetID(targetNodeID)), + CollectionId: info.GetCollectionID(), + PartitionId: info.GetPartitionID(), + SegmentId: info.GetSegmentID(), + PrimaryKeys: storage.ParsePrimaryKeys2IDs(deleteData.Pks), + Timestamps: deleteData.Tss, + Scope: deleteScope, + }) + if err != nil { + log.Warn("failed to apply delete when LoadSegment", zap.Error(err)) + return err + } + } + return nil +} + +func (sd *shardDelegator) forwardL0RemoteLoad(ctx context.Context, + info *querypb.SegmentLoadInfo, + req *querypb.LoadSegmentsRequest, + targetNodeID int64, + worker cluster.Worker, +) error { + info = typeutil.Clone(info) + // load l0 segment deltalogs + info.Deltalogs = sd.getLevel0Deltalogs(info.GetPartitionID()) + + return worker.LoadSegments(ctx, &querypb.LoadSegmentsRequest{ + Base: &commonpb.MsgBase{ + TargetID: targetNodeID, + }, + DstNodeID: targetNodeID, + Infos: []*querypb.SegmentLoadInfo{ + info, + }, + CollectionID: info.GetCollectionID(), + LoadScope: querypb.LoadScope_Delta, + Schema: req.GetSchema(), + IndexInfoList: req.GetIndexInfoList(), + }) +} + +func (sd *shardDelegator) getLevel0Deltalogs(partitionID int64) []*datapb.FieldBinlog { + sd.level0Mut.Lock() + defer sd.level0Mut.Unlock() + + level0Segments := sd.segmentManager.GetBy( + segments.WithLevel(datapb.SegmentLevel_L0), + segments.WithChannel(sd.vchannelName)) + + var deltalogs []*datapb.FieldBinlog + + for _, segment := range level0Segments { + if segment.Partition() != common.AllPartitionsID && segment.Partition() != partitionID { + continue + } + segment := segment.(*segments.L0Segment) + deltalogs = append(deltalogs, segment.LoadInfo().GetDeltalogs()...) + } + + return deltalogs +} diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 1f6fc5cb49a41..f21560ebfa08e 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2363,6 +2363,9 @@ type queryNodeConfig struct { MaxSegmentDeleteBuffer ParamItem `refreshable:"false"` DeleteBufferBlockSize ParamItem `refreshable:"false"` + // level zero + LevelZeroForwardPolicy ParamItem `refreshable:"true"` + // loader IoPoolSize ParamItem `refreshable:"false"` DeltaDataExpansionRate ParamItem `refreshable:"true"` @@ -2952,6 +2955,15 @@ Max read concurrency must greater than or equal to 1, and less than or equal to } p.DeleteBufferBlockSize.Init(base.mgr) + p.LevelZeroForwardPolicy = ParamItem{ + Key: "queryNode.levelZeroForwardPolicy", + Version: "2.4.12", + Doc: "delegator level zero deletion forward policy, possible option[\"FilterByBF\", \"RemoteLoad\"]", + DefaultValue: "FilterByBF", + Export: true, + } + p.LevelZeroForwardPolicy.Init(base.mgr) + p.IoPoolSize = ParamItem{ Key: "queryNode.ioPoolSize", Version: "2.3.0",