Skip to content

Commit

Permalink
fix: Accidently exit the check loop
Browse files Browse the repository at this point in the history
See also: milvus-io#33460

Signed-off-by: yangxuan <[email protected]>
  • Loading branch information
XuanYang-cn committed Jul 8, 2024
1 parent a60e2a6 commit f088174
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 243 deletions.
19 changes: 13 additions & 6 deletions internal/datacoord/compaction_policy_l0.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package datacoord

import (
"github.com/samber/lo"
"go.uber.org/atomic"
"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/proto/datapb"
Expand All @@ -12,13 +13,14 @@ type l0CompactionPolicy struct {
meta *meta
view *FullViews

emptyLoopCount int
emptyLoopCount *atomic.Int64
}

func newL0CompactionPolicy(meta *meta, view *FullViews) *l0CompactionPolicy {
return &l0CompactionPolicy{
meta: meta,
view: view,
meta: meta,
view: view,
emptyLoopCount: atomic.NewInt64(0),
}
}

Expand All @@ -31,11 +33,16 @@ func (policy *l0CompactionPolicy) Trigger() (map[CompactionTriggerType][]Compact
events := policy.generateEventForLevelZeroViewChange()
if len(events) != 0 {
// each time when triggers a compaction, the idleTicker would reset
policy.emptyLoopCount = 0
policy.emptyLoopCount.Store(0)
return events, nil
}
if policy.emptyLoopCount >= 3 {
policy.emptyLoopCount.Inc()

if policy.emptyLoopCount.Load() >= 3 {
idleEvents := policy.generateEventForLevelZeroViewIDLE()
if len(idleEvents) > 0 {
policy.emptyLoopCount.Store(0)
}
return idleEvents, nil
}
return make(map[CompactionTriggerType][]CompactionView, 0), nil
Expand Down Expand Up @@ -129,13 +136,13 @@ func (policy *l0CompactionPolicy) groupL0ViewsByPartChan(collectionID UniqueID,
}

func (policy *l0CompactionPolicy) generateEventForLevelZeroViewIDLE() map[CompactionTriggerType][]CompactionView {
log.Info("Views idle for a long time, try to trigger a TriggerTypeLevelZeroViewIDLE compaction event")
events := make(map[CompactionTriggerType][]CompactionView, 0)
for collID := range policy.view.collections {
cachedViews := policy.view.GetSegmentViewBy(collID, func(v *SegmentView) bool {
return v.Level == datapb.SegmentLevel_L0
})
if len(cachedViews) > 0 {
log.Info("Views idle for a long time, try to trigger a TriggerTypeLevelZeroViewIDLE compaction event")
grouped := policy.groupL0ViewsByPartChan(collID, cachedViews)
events[TriggerTypeLevelZeroViewIDLE] = lo.Map(lo.Values(grouped),
func(l0View *LevelZeroSegmentsView, _ int) CompactionView {
Expand Down
Loading

0 comments on commit f088174

Please sign in to comment.