Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: mark in-flight storage checkpoints #99119

Merged
merged 1 commit into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion pkg/kv/kvserver/consistency_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"math/rand"
"path/filepath"
"strconv"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -373,6 +374,10 @@ func TestCheckConsistencyInconsistent(t *testing.T) {
for i := 0; i < numStores; i++ {
cps := onDiskCheckpointPaths(i)
require.Len(t, cps, 1)
t.Logf("found a checkpoint at %s", cps[0])
// The checkpoint must have been finalized.
require.False(t, strings.HasSuffix(cps[0], "_pending"))

metric := tc.GetFirstStoreFromServer(t, i).Metrics().RdbCheckpoints
testutils.SucceedsSoon(t, func() error {
if got, want := metric.Value(), int64(1); got != want {
Expand All @@ -387,7 +392,8 @@ func TestCheckConsistencyInconsistent(t *testing.T) {
require.NoError(t, err)
// Copy the min-version file so we can open the checkpoint as a store.
require.NoError(t, vfs.Copy(fs, storage.MinVersionFilename, fs.PathJoin(cps[0], storage.MinVersionFilename)))
cpEng := storage.InMemFromFS(context.Background(), fs, cps[0], cluster.MakeClusterSettings(), storage.CacheSize(1<<20))
cpEng := storage.InMemFromFS(context.Background(), fs, cps[0], cluster.MakeClusterSettings(),
storage.ForTesting, storage.MustExist, storage.ReadOnly, storage.CacheSize(1<<20))
defer cpEng.Close()

// Find the problematic range in the storage.
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/replica_consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,10 @@ To inspect the checkpoints, one can use the cockroach debug range-data tool, and
command line tools like diff. For example:

$ cockroach debug range-data --replicated data/auxiliary/checkpoints/rN_at_M N

Note that a directory that ends with "_pending" might not represent a valid
checkpoint. Such directories can exist if the node fails during checkpoint
creation. These directories should be deleted, or inspected with caution.
`
attentionArgs := []any{r, desc.Replicas(), redact.Safe(auxDir), redact.Safe(path)}
preventStartupMsg := fmt.Sprintf(attentionFmt, attentionArgs...)
Expand Down
9 changes: 8 additions & 1 deletion pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3070,8 +3070,15 @@ func (s *Store) checkpointSpans(desc *roachpb.RangeDescriptor) []roachpb.Span {
func (s *Store) checkpoint(tag string, spans []roachpb.Span) (string, error) {
checkpointBase := s.checkpointsDir()
_ = s.TODOEngine().MkdirAll(checkpointBase)
// Create the checkpoint in a "pending" directory first. If we fail midway, it
// should be clear that the directory contains an incomplete checkpoint.
pendingDir := filepath.Join(checkpointBase, tag+"_pending")
if err := s.TODOEngine().CreateCheckpoint(pendingDir, spans); err != nil {
return "", err
}
// Atomically rename the directory when it represents a complete checkpoint.
checkpointDir := filepath.Join(checkpointBase, tag)
if err := s.TODOEngine().CreateCheckpoint(checkpointDir, spans); err != nil {
if err := s.TODOEngine().Rename(pendingDir, checkpointDir); err != nil {
return "", err
}
return checkpointDir, nil
Expand Down