diff --git a/compactor/compactor.go b/compactor/compactor.go index 5a83d13f8333..c057225174cc 100644 --- a/compactor/compactor.go +++ b/compactor/compactor.go @@ -29,8 +29,7 @@ var ( ) const ( - checkCompactionInterval = 5 * time.Minute - executeCompactionInterval = time.Hour + checkCompactionInterval = 5 * time.Minute ModePeriodic = "periodic" ModeRevision = "revision" @@ -57,7 +56,7 @@ type RevGetter interface { Rev() int64 } -func New(mode string, retention int, rg RevGetter, c Compactable) (Compactor, error) { +func New(mode string, retention time.Duration, rg RevGetter, c Compactable) (Compactor, error) { switch mode { case ModePeriodic: return NewPeriodic(retention, rg, c), nil diff --git a/compactor/periodic.go b/compactor/periodic.go index 784cef7c1663..8f265cd3291f 100644 --- a/compactor/periodic.go +++ b/compactor/periodic.go @@ -26,72 +26,73 @@ import ( ) // Periodic compacts the log by purging revisions older than -// the configured retention time. Compaction happens hourly. +// the configured retention time. type Periodic struct { - clock clockwork.Clock - periodInHour int + clock clockwork.Clock + period time.Duration rg RevGetter c Compactable - revs []int64 ctx context.Context cancel context.CancelFunc - mu sync.Mutex + // mu protects paused + mu sync.RWMutex paused bool } // NewPeriodic creates a new instance of Periodic compactor that purges -// the log older than h hours. -func NewPeriodic(h int, rg RevGetter, c Compactable) *Periodic { +// the log older than h Duration. +func NewPeriodic(h time.Duration, rg RevGetter, c Compactable) *Periodic { return &Periodic{ - clock: clockwork.NewRealClock(), - periodInHour: h, - rg: rg, - c: c, + clock: clockwork.NewRealClock(), + period: h, + rg: rg, + c: c, } } func (t *Periodic) Run() { t.ctx, t.cancel = context.WithCancel(context.Background()) - t.revs = make([]int64, 0) clock := t.clock - + wait := t.period + retryTimeout := time.Duration(5) * time.Minute + if wait < retryTimeout { + retryTimeout = wait + } go func() { - last := clock.Now() + revTimer := clock.After(t.period) + lastRev := t.rg.Rev() for { - t.revs = append(t.revs, t.rg.Rev()) + select { + case <-revTimer: + revTimer = clock.After(t.period) + lastRev = t.rg.Rev() + default: + } + timer := clock.After(wait) select { case <-t.ctx.Done(): return - case <-clock.After(checkCompactionInterval): - t.mu.Lock() - p := t.paused - t.mu.Unlock() - if p { + case <-timer: + t.mu.RLock() + paused := t.paused + t.mu.RUnlock() + if paused { + wait = retryTimeout continue } - } - - if clock.Now().Sub(last) < executeCompactionInterval { - continue - } - - rev, remaining := t.getRev(t.periodInHour) - if rev < 0 { - continue - } - - plog.Noticef("Starting auto-compaction at revision %d (retention: %d hours)", rev, t.periodInHour) - _, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev}) - if err == nil || err == mvcc.ErrCompacted { - t.revs = remaining - last = clock.Now() - plog.Noticef("Finished auto-compaction at revision %d", rev) - } else { - plog.Noticef("Failed auto-compaction at revision %d (%v)", err, rev) - plog.Noticef("Retry after %v", checkCompactionInterval) + plog.Noticef("Starting auto-compaction at revision %d (retention: %v )", lastRev, wait) + _, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: lastRev}) + if err == nil || err == mvcc.ErrCompacted { + wait = t.period + plog.Noticef("Finished auto-compaction at revision %d", lastRev) + } else { + wait = retryTimeout + plog.Noticef("Failed auto-compaction at revision %d (%v)", lastRev, err) + plog.Noticef("Retry after %v", wait) + } } } }() @@ -112,11 +113,3 @@ func (t *Periodic) Resume() { defer t.mu.Unlock() t.paused = false } - -func (t *Periodic) getRev(h int) (int64, []int64) { - i := len(t.revs) - int(time.Duration(h)*time.Hour/checkCompactionInterval) - if i < 0 { - return -1, t.revs - } - return t.revs[i], t.revs[i+1:] -} diff --git a/compactor/periodic_test.go b/compactor/periodic_test.go index d0bb7f6eef3c..36bd90987f1b 100644 --- a/compactor/periodic_test.go +++ b/compactor/periodic_test.go @@ -25,40 +25,32 @@ import ( ) func TestPeriodic(t *testing.T) { - retentionHours := 2 + retentionHours := time.Hour fc := clockwork.NewFakeClock() rg := &fakeRevGetter{testutil.NewRecorderStream(), 0} compactable := &fakeCompactable{testutil.NewRecorderStream()} tb := &Periodic{ - clock: fc, - periodInHour: retentionHours, - rg: rg, - c: compactable, + clock: fc, + period: retentionHours, + rg: rg, + c: compactable, } tb.Run() defer tb.Stop() - n := int(time.Hour / checkCompactionInterval) - // collect 5 hours of revisions + // simulates 5 hours time elapse for i := 0; i < 5; i++ { // advance one hour, one revision for each interval - for j := 0; j < n; j++ { - rg.Wait(1) - fc.Advance(checkCompactionInterval) - } - - // compaction doesn't happen til 2 hours elapses - if i+1 < retentionHours { - continue - } + rg.Wait(1) + fc.Advance(time.Hour) a, err := compactable.Wait(1) if err != nil { t.Fatal(err) } - expectedRevision := int64(1 + (i+1)*n - retentionHours*n) + expectedRevision := int64(i + 1) if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) { t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) } @@ -76,23 +68,22 @@ func TestPeriodicPause(t *testing.T) { compactable := &fakeCompactable{testutil.NewRecorderStream()} rg := &fakeRevGetter{testutil.NewRecorderStream(), 0} tb := &Periodic{ - clock: fc, - periodInHour: 1, - rg: rg, - c: compactable, + clock: fc, + period: time.Hour, + rg: rg, + c: compactable, } tb.Run() tb.Pause() - // tb will collect 3 hours of revisions but not compact since paused - n := int(time.Hour / checkCompactionInterval) - for i := 0; i < 3*n; i++ { + // simulate 3 hours time elapse + for i := 0; i < 3; i++ { rg.Wait(1) - fc.Advance(checkCompactionInterval) + fc.Advance(time.Hour) } - // tb ends up waiting for the clock + // Since tb is paused, expect no compaction event. select { case a := <-compactable.Chan(): t.Fatalf("unexpected action %v", a) @@ -102,15 +93,15 @@ func TestPeriodicPause(t *testing.T) { // tb resumes to being blocked on the clock tb.Resume() - // unblock clock, will kick off a compaction at hour 3:05 + // unblock clock, will kick off a compaction at hour 4:00 rg.Wait(1) - fc.Advance(checkCompactionInterval) + fc.Advance(time.Hour) a, err := compactable.Wait(1) if err != nil { t.Fatal(err) } - // compact the revision from hour 2:05 - wreq := &pb.CompactionRequest{Revision: int64(1 + 2*n + 1)} + // compact the revision from hour 3:00 + wreq := &pb.CompactionRequest{Revision: int64(4)} if !reflect.DeepEqual(a[0].Params[0], wreq) { t.Errorf("compact request = %v, want %v", a[0].Params[0], wreq.Revision) }