Skip to content

Commit

Permalink
Limit LTX compaction during backups
Browse files Browse the repository at this point in the history
  • Loading branch information
benbjohnson committed Sep 12, 2023
1 parent dd46004 commit 8e8da4b
Showing 1 changed file with 26 additions and 14 deletions.
40 changes: 26 additions & 14 deletions store.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ const (
)

const (
// MaxBackupLTXFileN is the number of LTX files that can be compacted
// together at a time when sending data to the backup service.
MaxBackupLTXFileN = 256

MetricsMonitorInterval = 1 * time.Second
)

Expand Down Expand Up @@ -1113,9 +1117,9 @@ func (s *Store) streamBackupDB(ctx context.Context, name string, remotePos ltx.P

// Check local replication position.
// If we haven't written anything yet then try to send data.
pos := db.Pos()
if pos.IsZero() {
return pos, nil
localPos := db.Pos()
if localPos.IsZero() {
return localPos, nil
}

// If the database doesn't exist remotely, perform a full snapshot.
Expand All @@ -1125,11 +1129,11 @@ func (s *Store) streamBackupDB(ctx context.Context, name string, remotePos ltx.P

// If the position from the backup server is ahead of the primary then we
// need to perform a recovery so that we snapshot from the backup server.
if remotePos.TXID > pos.TXID {
if remotePos.TXID > localPos.TXID {
slog.Warn("restoring from backup",
slog.String("name", name),
slog.Group("pos",
slog.String("local", pos.String()),
slog.String("local", localPos.String()),
slog.String("remote", remotePos.String()),
),
slog.String("reason", "remote-ahead"),
Expand All @@ -1141,12 +1145,12 @@ func (s *Store) streamBackupDB(ctx context.Context, name string, remotePos ltx.P
// If the TXID matches the backup server, we need to ensure the checksum
// does as well. If it doesn't, we need to grab a snapshot from the backup
// server. If it does, then we can exit as we're already in sync.
if remotePos.TXID == pos.TXID {
if remotePos.PostApplyChecksum != pos.PostApplyChecksum {
if remotePos.TXID == localPos.TXID {
if remotePos.PostApplyChecksum != localPos.PostApplyChecksum {
slog.Warn("restoring from backup",
slog.String("name", name),
slog.Group("pos",
slog.String("local", pos.String()),
slog.String("local", localPos.String()),
slog.String("remote", remotePos.String()),
),
slog.String("reason", "chksum-mismatch"),
Expand All @@ -1155,10 +1159,10 @@ func (s *Store) streamBackupDB(ctx context.Context, name string, remotePos ltx.P
}

slog.Debug("database in sync with backup, skipping", slog.String("name", name))
return pos, nil // already in sync
return localPos, nil // already in sync
}

assert(remotePos.TXID < pos.TXID, "remote/local position must be ordered")
assert(remotePos.TXID < localPos.TXID, "remote/local position must be ordered")

// OPTIMIZE: Check that remote postApplyChecksum equals next TXID's preApplyChecksum

Expand All @@ -1169,13 +1173,14 @@ func (s *Store) streamBackupDB(ctx context.Context, name string, remotePos ltx.P
_ = r.(io.Closer).Close()
}
}()
for txID := remotePos.TXID + 1; txID <= pos.TXID; txID++ {

for txID, n := remotePos.TXID+1, 0; txID <= localPos.TXID && n < MaxBackupLTXFileN; txID, n = txID+1, n+1 {
f, err := db.OpenLTXFile(txID)
if os.IsNotExist(err) {
slog.Warn("restoring from backup",
slog.String("name", name),
slog.Group("pos",
slog.String("local", pos.String()),
slog.String("local", localPos.String()),
slog.String("remote", remotePos.String()),
),
slog.String("txid", txID.String()),
Expand All @@ -1190,10 +1195,17 @@ func (s *Store) streamBackupDB(ctx context.Context, name string, remotePos ltx.P

// Compact LTX files through a pipe so we can pass it to the backup client.
pr, pw := io.Pipe()
var pos ltx.Pos
go func() {
compactor := ltx.NewCompactor(pw, rdrs)
compactor.HeaderFlags = s.ltxHeaderFlags()
_ = pw.CloseWithError(compactor.Compact(ctx))
if err := compactor.Compact(ctx); err != nil {
_ = pw.CloseWithError(err)
return
}

pos = ltx.NewPos(compactor.Header().MaxTXID, compactor.Trailer().PostApplyChecksum)
_ = pw.Close()
}()

var pmErr *ltx.PosMismatchError
Expand All @@ -1202,7 +1214,7 @@ func (s *Store) streamBackupDB(ctx context.Context, name string, remotePos ltx.P
slog.Warn("restoring from backup",
slog.String("name", name),
slog.Group("pos",
slog.String("local", pos.String()),
slog.String("local", localPos.String()),
slog.String("remote", pmErr.Pos.String()),
),
slog.String("reason", "out-of-sync"),
Expand Down

0 comments on commit 8e8da4b

Please sign in to comment.