diff --git a/internal/flushcommon/pipeline/data_sync_service.go b/internal/flushcommon/pipeline/data_sync_service.go index 649345e0f92ed..d46ef69f97e57 100644 --- a/internal/flushcommon/pipeline/data_sync_service.go +++ b/internal/flushcommon/pipeline/data_sync_service.go @@ -132,6 +132,10 @@ func (dsService *DataSyncService) GetMetaCache() metacache.MetaCache { return dsService.metacache } +func getMetaCacheForStreaming(initCtx context.Context, params *util.PipelineParams, info *datapb.ChannelWatchInfo, unflushed, flushed []*datapb.SegmentInfo) (metacache.MetaCache, error) { + return initMetaCache(initCtx, params.ChunkManager, info, nil, unflushed, flushed) +} + func getMetaCacheWithTickler(initCtx context.Context, params *util.PipelineParams, info *datapb.ChannelWatchInfo, tickler *util.Tickler, unflushed, flushed []*datapb.SegmentInfo) (metacache.MetaCache, error) { tickler.SetTotal(int32(len(unflushed) + len(flushed))) return initMetaCache(initCtx, params.ChunkManager, info, tickler, unflushed, flushed) @@ -161,7 +165,7 @@ func initMetaCache(initCtx context.Context, chunkManager storage.ChunkManager, i return nil, err } segmentPks.Insert(segment.GetID(), pkoracle.NewBloomFilterSet(stats...)) - if !streamingutil.IsStreamingServiceEnabled() { + if tickler != nil { tickler.Inc() } @@ -180,8 +184,11 @@ func initMetaCache(initCtx context.Context, chunkManager storage.ChunkManager, i } } + // growing segments's stats should always be loaded, for generating merged pk bf. loadSegmentStats("growing", unflushed) - loadSegmentStats("sealed", flushed) + if !(streamingutil.IsStreamingServiceEnabled() || paramtable.Get().DataNodeCfg.SkipBFStatsLoad.GetAsBool()) { + loadSegmentStats("sealed", flushed) + } // use fetched segment info info.Vchan.FlushedSegments = flushed @@ -344,22 +351,10 @@ func NewDataSyncService(initCtx context.Context, pipelineParams *util.PipelinePa } } - if paramtable.Get().DataNodeCfg.SkipBFStatsLoad.GetAsBool() { - // In SkipBFStatsLoad mode, flushed segments no longer maintain a bloom filter. - // So, here we skip loading the bloom filter for flushed segments. - info.Vchan.FlushedSegments = flushedSegmentInfos - info.Vchan.UnflushedSegments = unflushedSegmentInfos - metaCache = metacache.NewMetaCache(info, func(segment *datapb.SegmentInfo) pkoracle.PkStat { - return pkoracle.NewBloomFilterSet() - }, metacache.NoneBm25StatsFactory) - } else { - // init metaCache meta - metaCache, err = getMetaCacheWithTickler(initCtx, pipelineParams, info, tickler, unflushedSegmentInfos, flushedSegmentInfos) - if err != nil { - return nil, err - } + // init metaCache meta + if metaCache, err = getMetaCacheWithTickler(initCtx, pipelineParams, info, tickler, unflushedSegmentInfos, flushedSegmentInfos); err != nil { + return nil, err } - return getServiceWithChannel(initCtx, pipelineParams, info, metaCache, unflushedSegmentInfos, flushedSegmentInfos, nil) } @@ -367,6 +362,7 @@ func NewStreamingNodeDataSyncService(initCtx context.Context, pipelineParams *ut // recover segment checkpoints var ( err error + metaCache metacache.MetaCache unflushedSegmentInfos []*datapb.SegmentInfo flushedSegmentInfos []*datapb.SegmentInfo ) @@ -383,13 +379,10 @@ func NewStreamingNodeDataSyncService(initCtx context.Context, pipelineParams *ut } } - // In streaming service mode, flushed segments no longer maintain a bloom filter. - // So, here we skip loading the bloom filter for flushed segments. - info.Vchan.UnflushedSegments = unflushedSegmentInfos - metaCache := metacache.NewMetaCache(info, func(segment *datapb.SegmentInfo) pkoracle.PkStat { - return pkoracle.NewBloomFilterSet() - }, metacache.NoneBm25StatsFactory) - + // init metaCache meta + if metaCache, err = getMetaCacheForStreaming(initCtx, pipelineParams, info, unflushedSegmentInfos, flushedSegmentInfos); err != nil { + return nil, err + } return getServiceWithChannel(initCtx, pipelineParams, info, metaCache, unflushedSegmentInfos, flushedSegmentInfos, input) }