From 9e8cafcbe29e154bdd55ed2d29ba9c9f28dfc469 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Thu, 26 Sep 2024 10:11:15 +0800 Subject: [PATCH] enhance: Skip loading bf in datanode (#36367) Skip loading bf in datanode: 1. When watching vchannels, skip loading bloom filters for segments. 2. Bypass bloom filter checks for delete messages, directly writing to L0 segments. 3. Remove flushed segments proactively after flush. issue: https://github.com/milvus-io/milvus/issues/34585 --------- Signed-off-by: bigsheeper --- internal/datacoord/server.go | 2 +- internal/datanode/data_node.go | 4 + .../flushcommon/pipeline/data_sync_service.go | 79 ++++++++----------- .../writebuffer/l0_write_buffer.go | 2 +- .../flushcommon/writebuffer/write_buffer.go | 8 +- internal/streamingnode/server/server.go | 4 + pkg/util/paramtable/component_param.go | 3 +- 7 files changed, 48 insertions(+), 54 deletions(-) diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 46c72948e002d..0c10169de9e8d 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -732,7 +732,7 @@ func (s *Server) startServerLoop() { go s.importChecker.Start() s.garbageCollector.start() - if !streamingutil.IsStreamingServiceEnabled() { + if !(streamingutil.IsStreamingServiceEnabled() || paramtable.Get().DataNodeCfg.SkipBFStatsLoad.GetAsBool()) { s.syncSegmentsScheduler.Start() } } diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index b374c113aefd8..4618caa90d30a 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -288,6 +288,10 @@ func (node *DataNode) tryToReleaseFlowgraph(channel string) { // Start will update DataNode state to HEALTHY func (node *DataNode) Start() error { + if paramtable.Get().DataNodeCfg.SkipBFStatsLoad.GetAsBool() && !paramtable.Get().DataCoordCfg.EnableLevelZeroSegment.GetAsBool() { + panic("In non-L0 mode, skip loading of bloom filter stats is not allowed.") + } + var startErr error node.startOnce.Do(func() { if err := node.allocator.Start(); err != nil { diff --git a/internal/flushcommon/pipeline/data_sync_service.go b/internal/flushcommon/pipeline/data_sync_service.go index 2a38ac2745cb3..649345e0f92ed 100644 --- a/internal/flushcommon/pipeline/data_sync_service.go +++ b/internal/flushcommon/pipeline/data_sync_service.go @@ -179,46 +179,9 @@ func initMetaCache(initCtx context.Context, chunkManager storage.ChunkManager, i futures = append(futures, future) } } - lazyLoadSegmentStats := func(segType string, segments []*datapb.SegmentInfo) { - for _, item := range segments { - log.Info("lazy load pk stats for segment", - zap.String("vChannelName", item.GetInsertChannel()), - zap.Int64("segmentID", item.GetID()), - zap.Int64("numRows", item.GetNumOfRows()), - zap.String("segmentType", segType), - ) - segment := item - - lazy := pkoracle.NewLazyPkstats() - // ignore lazy load future - _ = io.GetOrCreateStatsPool().Submit(func() (any, error) { - var stats []*storage.PkStatistics - var err error - stats, err = compaction.LoadStats(context.Background(), chunkManager, info.GetSchema(), segment.GetID(), segment.GetStatslogs()) - if err != nil { - return nil, err - } - pkStats := pkoracle.NewBloomFilterSet(stats...) - lazy.SetPkStats(pkStats) - return struct{}{}, nil - }) - segmentPks.Insert(segment.GetID(), lazy) - if tickler != nil { - tickler.Inc() - } - } - } - - // growing segment cannot use lazy mode loadSegmentStats("growing", unflushed) - lazy := paramtable.Get().DataNodeCfg.SkipBFStatsLoad.GetAsBool() - // check paramtable to decide whether skip load BF stage when initializing - if lazy { - lazyLoadSegmentStats("sealed", flushed) - } else { - loadSegmentStats("sealed", flushed) - } + loadSegmentStats("sealed", flushed) // use fetched segment info info.Vchan.FlushedSegments = flushed @@ -362,19 +325,39 @@ func getServiceWithChannel(initCtx context.Context, params *util.PipelineParams, // NewDataSyncService stops and returns the initCtx.Err() func NewDataSyncService(initCtx context.Context, pipelineParams *util.PipelineParams, info *datapb.ChannelWatchInfo, tickler *util.Tickler) (*DataSyncService, error) { // recover segment checkpoints - unflushedSegmentInfos, err := pipelineParams.Broker.GetSegmentInfo(initCtx, info.GetVchan().GetUnflushedSegmentIds()) - if err != nil { - return nil, err + var ( + err error + metaCache metacache.MetaCache + unflushedSegmentInfos []*datapb.SegmentInfo + flushedSegmentInfos []*datapb.SegmentInfo + ) + if len(info.GetVchan().GetUnflushedSegmentIds()) > 0 { + unflushedSegmentInfos, err = pipelineParams.Broker.GetSegmentInfo(initCtx, info.GetVchan().GetUnflushedSegmentIds()) + if err != nil { + return nil, err + } } - flushedSegmentInfos, err := pipelineParams.Broker.GetSegmentInfo(initCtx, info.GetVchan().GetFlushedSegmentIds()) - if err != nil { - return nil, err + if len(info.GetVchan().GetFlushedSegmentIds()) > 0 { + flushedSegmentInfos, err = pipelineParams.Broker.GetSegmentInfo(initCtx, info.GetVchan().GetFlushedSegmentIds()) + if err != nil { + return nil, err + } } - // init metaCache meta - metaCache, err := getMetaCacheWithTickler(initCtx, pipelineParams, info, tickler, unflushedSegmentInfos, flushedSegmentInfos) - if err != nil { - return nil, err + if paramtable.Get().DataNodeCfg.SkipBFStatsLoad.GetAsBool() { + // In SkipBFStatsLoad mode, flushed segments no longer maintain a bloom filter. + // So, here we skip loading the bloom filter for flushed segments. + info.Vchan.FlushedSegments = flushedSegmentInfos + info.Vchan.UnflushedSegments = unflushedSegmentInfos + metaCache = metacache.NewMetaCache(info, func(segment *datapb.SegmentInfo) pkoracle.PkStat { + return pkoracle.NewBloomFilterSet() + }, metacache.NoneBm25StatsFactory) + } else { + // init metaCache meta + metaCache, err = getMetaCacheWithTickler(initCtx, pipelineParams, info, tickler, unflushedSegmentInfos, flushedSegmentInfos) + if err != nil { + return nil, err + } } return getServiceWithChannel(initCtx, pipelineParams, info, metaCache, unflushedSegmentInfos, flushedSegmentInfos, nil) diff --git a/internal/flushcommon/writebuffer/l0_write_buffer.go b/internal/flushcommon/writebuffer/l0_write_buffer.go index 3f9b458b5ffa9..07237f43f7705 100644 --- a/internal/flushcommon/writebuffer/l0_write_buffer.go +++ b/internal/flushcommon/writebuffer/l0_write_buffer.go @@ -163,7 +163,7 @@ func (wb *l0WriteBuffer) BufferData(insertData []*InsertData, deleteMsgs []*msgs } } - if streamingutil.IsStreamingServiceEnabled() { + if paramtable.Get().DataNodeCfg.SkipBFStatsLoad.GetAsBool() || streamingutil.IsStreamingServiceEnabled() { // In streaming service mode, flushed segments no longer maintain a bloom filter. // So, here we skip filtering delete entries by bf. wb.dispatchDeleteMsgsWithoutFilter(deleteMsgs, startPos, endPos) diff --git a/internal/flushcommon/writebuffer/write_buffer.go b/internal/flushcommon/writebuffer/write_buffer.go index 020425e09debb..82afb38932d03 100644 --- a/internal/flushcommon/writebuffer/write_buffer.go +++ b/internal/flushcommon/writebuffer/write_buffer.go @@ -350,9 +350,11 @@ func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64) wb.syncCheckpoint.Remove(syncTask.SegmentID(), syncTask.StartPosition().GetTimestamp()) } - if streamingutil.IsStreamingServiceEnabled() && syncTask.IsFlush() { - wb.metaCache.RemoveSegments(metacache.WithSegmentIDs(syncTask.SegmentID())) - log.Info("flushed segment removed", zap.Int64("segmentID", syncTask.SegmentID()), zap.String("channel", syncTask.ChannelName())) + if syncTask.IsFlush() { + if paramtable.Get().DataNodeCfg.SkipBFStatsLoad.GetAsBool() || streamingutil.IsStreamingServiceEnabled() { + wb.metaCache.RemoveSegments(metacache.WithSegmentIDs(syncTask.SegmentID())) + log.Info("flushed segment removed", zap.Int64("segmentID", syncTask.SegmentID()), zap.String("channel", syncTask.ChannelName())) + } } return nil })) diff --git a/internal/streamingnode/server/server.go b/internal/streamingnode/server/server.go index 52413071dab22..c72726b9a2830 100644 --- a/internal/streamingnode/server/server.go +++ b/internal/streamingnode/server/server.go @@ -13,6 +13,7 @@ import ( "github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb" _ "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/pulsar" _ "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/rmq" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) // Server is the streamingnode server. @@ -43,6 +44,9 @@ func (s *Server) Init(ctx context.Context) (err error) { // Start starts the streamingnode server. func (s *Server) Start() { + if !paramtable.Get().DataCoordCfg.EnableLevelZeroSegment.GetAsBool() { + panic("In streaming service mode, disable L0 is not allowed.") + } resource.Resource().Flusher().Start() log.Info("flusher started") } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 0ab67fd9f5671..4a5da5394e369 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -4389,7 +4389,8 @@ Setting this parameter too small causes the system to store a small amount of da Key: "dataNode.skip.BFStats.Load", Version: "2.2.5", PanicIfEmpty: false, - DefaultValue: "false", + DefaultValue: "true", + Forbidden: true, // The SkipBFStatsLoad is a static config that not allow dynamic refresh. } p.SkipBFStatsLoad.Init(base.mgr)