Skip to content

Commit

Permalink
Simplify compact loop error handling
Browse files Browse the repository at this point in the history
Rather than restarting the outer loop on unhandled error, simmply break out of the inner loop on any error. We always want to update the compact and target rev, and do post-compact operations - so the only thing we really need to check the error for is logging and metrics.

Signed-off-by: Brad Davidson <[email protected]>
  • Loading branch information
brandond committed Nov 11, 2024
1 parent c92e3e1 commit 810f408
Showing 1 changed file with 54 additions and 26 deletions.
80 changes: 54 additions & 26 deletions pkg/logstructured/sqllog/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ func (s *SQLLog) compactor(interval time.Duration) {
targetCompactRev, _ := s.CurrentRevision(s.ctx)
logrus.Tracef("COMPACT starting compactRev=%d targetCompactRev=%d", compactRev, targetCompactRev)

outer:
for {
select {
case <-s.ctx.Done():
Expand All @@ -127,14 +126,20 @@ outer:
// (several hundred ms) just for the database to execute the subquery to select the revisions to delete.

var (
resultLabel string
iterCompactRev int64
iterStart time.Time
iterCount int64
compactedRev int64
currentRev int64
err error
)

resultLabel = metrics.ResultSuccess
iterCompactRev = compactRev
compactedRev = compactRev
iterStart = time.Now()
iterCount = 0

for iterCompactRev < targetCompactRev {
// Set move iteration target compactBatchSize revisions forward, or
Expand All @@ -145,54 +150,74 @@ outer:
iterCompactRev = targetCompactRev
}

compactedRev, currentRev, err = s.compact(compactedRev, iterCompactRev)
if err != nil {
// ErrCompacted indicates that no further work is necessary - either compactRev changed since the
// last iteration because another client has compacted, or the requested revision has already been compacted.
if err == server.ErrCompacted {
break
}
logrus.Errorf("Compact failed: %v", err)
metrics.CompactTotal.WithLabelValues(metrics.ResultError).Inc()
continue outer
// only update the compacted and current revisions if they are valid,
// but break out of the inner loop on any error.
compacted, current, cerr := s.compact(compactedRev, iterCompactRev)
if compacted != 0 && current != 0 {
compactedRev = compacted
currentRev = current
}
if cerr != nil {
err = cerr
break
}
iterCount++
}

if err := s.postCompact(); err != nil {
logrus.Errorf("Post-compact operations failed: %v", err)
if iterCount > 0 {
logrus.Infof("COMPACT compacted from %d to %d in %d transactions over %s", compactRev, compactedRev, iterCount, time.Now().Sub(iterStart).Round(time.Millisecond))

// post-compact operation errors are not critical, but should be reported
if perr := s.postCompact(); perr != nil {
logrus.Errorf("Post-compact operations failed: %v", perr)
}
}

// Record the final results for the outer loop
// Store the final results for this compact interval.
// Note that one or more of the small-batch compact transactions
// may have succeeded and moved the compact revision forward, even if err is non-nil.
compactRev = compactedRev
targetCompactRev = currentRev

metrics.CompactTotal.WithLabelValues(metrics.ResultSuccess).Inc()
// ErrCompacted indicates that no further work is necessary - either compactRev changed since the
// last iteration because another client has compacted, or the requested revision has already been compacted.
if err != nil && err != server.ErrCompacted {
logrus.Errorf("Compact failed: %v", err)
resultLabel = metrics.ResultError
}
metrics.CompactTotal.WithLabelValues(resultLabel).Inc()
}
}

// compact removes deleted or replaced rows from the database. compactRev is the revision that was last compacted to.
// If this changes between compactions, we know that someone else has compacted and we don't need to do it.
// targetCompactRev is the revision that we should try to compact to. Upon success, the function returns the revision
// compacted to, and the revision that we should try to compact to next time (the current revision).
// This logic is directly cribbed from k8s.io/apiserver/pkg/storage/etcd3/compact.go
// compact removes deleted or replaced rows from the database, and updates the compact rev key.
// compactRev is the current compact revision; targetCompactRev is the revision to compact to.
// If compactRev does not match what's in the database, we know that someone else has compacted and we don't need to do it.
// Deletion of rows and update of the compact rev key is done within a single transaction. The transaction is rolled back on any error.
//
// On success, the function returns the revision compacted to, and the revision that we should try to compact to next time (the current revision).
// ErrCompacted is returned if the current revision is stale, or the target revision has already been compacted.
// In this case the compact and current revisions from the database are returned.
// On any other error, the returned compact and current revisions should not be used.
//
// This logic is cribbed from k8s.io/apiserver/pkg/storage/etcd3/compact.go
func (s *SQLLog) compact(compactRev int64, targetCompactRev int64) (int64, int64, error) {
ctx, cancel := context.WithTimeout(s.ctx, compactTimeout)
defer cancel()

t, err := s.d.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable})
if err != nil {
return compactRev, targetCompactRev, errors.Wrap(err, "failed to begin transaction")
return 0, 0, errors.Wrap(err, "failed to begin transaction")
}
defer t.MustRollback()

currentRev, err := t.CurrentRevision(s.ctx)
if err != nil {
return compactRev, targetCompactRev, errors.Wrap(err, "failed to get current revision")
return 0, 0, errors.Wrap(err, "failed to get current revision")
}

dbCompactRev, err := t.GetCompactRevision(s.ctx)
if err != nil {
return compactRev, targetCompactRev, errors.Wrap(err, "failed to get compact revision")
return 0, 0, errors.Wrap(err, "failed to get compact revision")
}

// Check to see if another node already compacted. This is normal on a multi-server cluster.
Expand All @@ -206,7 +231,7 @@ func (s *SQLLog) compact(compactRev int64, targetCompactRev int64) (int64, int64

// Don't bother compacting to a revision that has already been compacted
if targetCompactRev <= compactRev {
logrus.Infof("COMPACT revision %d has already been compacted", targetCompactRev)
logrus.Tracef("COMPACT revision %d has already been compacted", targetCompactRev)
return dbCompactRev, currentRev, server.ErrCompacted
}

Expand All @@ -215,13 +240,16 @@ func (s *SQLLog) compact(compactRev int64, targetCompactRev int64) (int64, int64
start := time.Now()
deletedRows, err := t.Compact(s.ctx, targetCompactRev)
if err != nil {
return compactRev, targetCompactRev, errors.Wrapf(err, "failed to compact to revision %d", targetCompactRev)
return 0, 0, errors.Wrapf(err, "failed to compact to revision %d", targetCompactRev)
}

if err := t.SetCompactRevision(s.ctx, targetCompactRev); err != nil {
return compactRev, targetCompactRev, errors.Wrap(err, "failed to record compact revision")
return 0, 0, errors.Wrap(err, "failed to record compact revision")
}

// only commit the transaction if we make it all the way through deleting and
// updating the compact revision without any errors. The deferred rollback
// becomes a no-op if the transaction is committed.
t.MustCommit()
logrus.Infof("COMPACT deleted %d rows from %d revisions in %s - compacted to %d/%d", deletedRows, (targetCompactRev - compactRev), time.Since(start), targetCompactRev, currentRev)

Expand Down

0 comments on commit 810f408

Please sign in to comment.