Skip to content

Commit

Permalink
enhance: Skip loading bf in datanode (#36367)
Browse files Browse the repository at this point in the history
Skip loading bf in datanode:
1. When watching vchannels, skip loading bloom filters for segments.
2. Bypass bloom filter checks for delete messages, directly writing to
L0 segments.
3. Remove flushed segments proactively after flush.

issue: #34585

---------

Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper authored Sep 26, 2024
1 parent a8c80ab commit 9e8cafc
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 54 deletions.
2 changes: 1 addition & 1 deletion internal/datacoord/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,7 @@ func (s *Server) startServerLoop() {
go s.importChecker.Start()
s.garbageCollector.start()

if !streamingutil.IsStreamingServiceEnabled() {
if !(streamingutil.IsStreamingServiceEnabled() || paramtable.Get().DataNodeCfg.SkipBFStatsLoad.GetAsBool()) {
s.syncSegmentsScheduler.Start()
}
}
Expand Down
4 changes: 4 additions & 0 deletions internal/datanode/data_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,10 @@ func (node *DataNode) tryToReleaseFlowgraph(channel string) {

// Start will update DataNode state to HEALTHY
func (node *DataNode) Start() error {
if paramtable.Get().DataNodeCfg.SkipBFStatsLoad.GetAsBool() && !paramtable.Get().DataCoordCfg.EnableLevelZeroSegment.GetAsBool() {
panic("In non-L0 mode, skip loading of bloom filter stats is not allowed.")
}

var startErr error
node.startOnce.Do(func() {
if err := node.allocator.Start(); err != nil {
Expand Down
79 changes: 31 additions & 48 deletions internal/flushcommon/pipeline/data_sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,46 +179,9 @@ func initMetaCache(initCtx context.Context, chunkManager storage.ChunkManager, i
futures = append(futures, future)
}
}
lazyLoadSegmentStats := func(segType string, segments []*datapb.SegmentInfo) {
for _, item := range segments {
log.Info("lazy load pk stats for segment",
zap.String("vChannelName", item.GetInsertChannel()),
zap.Int64("segmentID", item.GetID()),
zap.Int64("numRows", item.GetNumOfRows()),
zap.String("segmentType", segType),
)
segment := item

lazy := pkoracle.NewLazyPkstats()

// ignore lazy load future
_ = io.GetOrCreateStatsPool().Submit(func() (any, error) {
var stats []*storage.PkStatistics
var err error
stats, err = compaction.LoadStats(context.Background(), chunkManager, info.GetSchema(), segment.GetID(), segment.GetStatslogs())
if err != nil {
return nil, err
}
pkStats := pkoracle.NewBloomFilterSet(stats...)
lazy.SetPkStats(pkStats)
return struct{}{}, nil
})
segmentPks.Insert(segment.GetID(), lazy)
if tickler != nil {
tickler.Inc()
}
}
}

// growing segment cannot use lazy mode
loadSegmentStats("growing", unflushed)
lazy := paramtable.Get().DataNodeCfg.SkipBFStatsLoad.GetAsBool()
// check paramtable to decide whether skip load BF stage when initializing
if lazy {
lazyLoadSegmentStats("sealed", flushed)
} else {
loadSegmentStats("sealed", flushed)
}
loadSegmentStats("sealed", flushed)

// use fetched segment info
info.Vchan.FlushedSegments = flushed
Expand Down Expand Up @@ -362,19 +325,39 @@ func getServiceWithChannel(initCtx context.Context, params *util.PipelineParams,
// NewDataSyncService stops and returns the initCtx.Err()
func NewDataSyncService(initCtx context.Context, pipelineParams *util.PipelineParams, info *datapb.ChannelWatchInfo, tickler *util.Tickler) (*DataSyncService, error) {
// recover segment checkpoints
unflushedSegmentInfos, err := pipelineParams.Broker.GetSegmentInfo(initCtx, info.GetVchan().GetUnflushedSegmentIds())
if err != nil {
return nil, err
var (
err error
metaCache metacache.MetaCache
unflushedSegmentInfos []*datapb.SegmentInfo
flushedSegmentInfos []*datapb.SegmentInfo
)
if len(info.GetVchan().GetUnflushedSegmentIds()) > 0 {
unflushedSegmentInfos, err = pipelineParams.Broker.GetSegmentInfo(initCtx, info.GetVchan().GetUnflushedSegmentIds())
if err != nil {
return nil, err
}
}
flushedSegmentInfos, err := pipelineParams.Broker.GetSegmentInfo(initCtx, info.GetVchan().GetFlushedSegmentIds())
if err != nil {
return nil, err
if len(info.GetVchan().GetFlushedSegmentIds()) > 0 {
flushedSegmentInfos, err = pipelineParams.Broker.GetSegmentInfo(initCtx, info.GetVchan().GetFlushedSegmentIds())
if err != nil {
return nil, err
}
}

// init metaCache meta
metaCache, err := getMetaCacheWithTickler(initCtx, pipelineParams, info, tickler, unflushedSegmentInfos, flushedSegmentInfos)
if err != nil {
return nil, err
if paramtable.Get().DataNodeCfg.SkipBFStatsLoad.GetAsBool() {
// In SkipBFStatsLoad mode, flushed segments no longer maintain a bloom filter.
// So, here we skip loading the bloom filter for flushed segments.
info.Vchan.FlushedSegments = flushedSegmentInfos
info.Vchan.UnflushedSegments = unflushedSegmentInfos
metaCache = metacache.NewMetaCache(info, func(segment *datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
}, metacache.NoneBm25StatsFactory)
} else {
// init metaCache meta
metaCache, err = getMetaCacheWithTickler(initCtx, pipelineParams, info, tickler, unflushedSegmentInfos, flushedSegmentInfos)
if err != nil {
return nil, err
}
}

return getServiceWithChannel(initCtx, pipelineParams, info, metaCache, unflushedSegmentInfos, flushedSegmentInfos, nil)
Expand Down
2 changes: 1 addition & 1 deletion internal/flushcommon/writebuffer/l0_write_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (wb *l0WriteBuffer) BufferData(insertData []*InsertData, deleteMsgs []*msgs
}
}

if streamingutil.IsStreamingServiceEnabled() {
if paramtable.Get().DataNodeCfg.SkipBFStatsLoad.GetAsBool() || streamingutil.IsStreamingServiceEnabled() {
// In streaming service mode, flushed segments no longer maintain a bloom filter.
// So, here we skip filtering delete entries by bf.
wb.dispatchDeleteMsgsWithoutFilter(deleteMsgs, startPos, endPos)
Expand Down
8 changes: 5 additions & 3 deletions internal/flushcommon/writebuffer/write_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,9 +350,11 @@ func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64)
wb.syncCheckpoint.Remove(syncTask.SegmentID(), syncTask.StartPosition().GetTimestamp())
}

if streamingutil.IsStreamingServiceEnabled() && syncTask.IsFlush() {
wb.metaCache.RemoveSegments(metacache.WithSegmentIDs(syncTask.SegmentID()))
log.Info("flushed segment removed", zap.Int64("segmentID", syncTask.SegmentID()), zap.String("channel", syncTask.ChannelName()))
if syncTask.IsFlush() {
if paramtable.Get().DataNodeCfg.SkipBFStatsLoad.GetAsBool() || streamingutil.IsStreamingServiceEnabled() {
wb.metaCache.RemoveSegments(metacache.WithSegmentIDs(syncTask.SegmentID()))
log.Info("flushed segment removed", zap.Int64("segmentID", syncTask.SegmentID()), zap.String("channel", syncTask.ChannelName()))
}
}
return nil
}))
Expand Down
4 changes: 4 additions & 0 deletions internal/streamingnode/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
_ "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/pulsar"
_ "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/rmq"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)

// Server is the streamingnode server.
Expand Down Expand Up @@ -43,6 +44,9 @@ func (s *Server) Init(ctx context.Context) (err error) {

// Start starts the streamingnode server.
func (s *Server) Start() {
if !paramtable.Get().DataCoordCfg.EnableLevelZeroSegment.GetAsBool() {
panic("In streaming service mode, disable L0 is not allowed.")
}
resource.Resource().Flusher().Start()
log.Info("flusher started")
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -4389,7 +4389,8 @@ Setting this parameter too small causes the system to store a small amount of da
Key: "dataNode.skip.BFStats.Load",
Version: "2.2.5",
PanicIfEmpty: false,
DefaultValue: "false",
DefaultValue: "true",
Forbidden: true, // The SkipBFStatsLoad is a static config that not allow dynamic refresh.
}
p.SkipBFStatsLoad.Init(base.mgr)

Expand Down

0 comments on commit 9e8cafc

Please sign in to comment.