From 55370b01985bfbd4fd5776692450e3403227f65a Mon Sep 17 00:00:00 2001 From: Matt Kocubinski Date: Fri, 1 Mar 2024 08:17:48 -0800 Subject: [PATCH] feat(store): remove RawDB dependency from snapshot store (#19608) --- store/migration/manager_test.go | 2 +- store/snapshots/helpers_test.go | 4 +- store/snapshots/manager.go | 5 +- store/snapshots/manager_test.go | 3 +- store/snapshots/store.go | 218 ++++++++++++++++++++------------ store/snapshots/store_test.go | 7 +- 6 files changed, 145 insertions(+), 94 deletions(-) diff --git a/store/migration/manager_test.go b/store/migration/manager_test.go index b02ac5db94a9..c60a2376d4bd 100644 --- a/store/migration/manager_test.go +++ b/store/migration/manager_test.go @@ -31,7 +31,7 @@ func setupMigrationManager(t *testing.T) (*Manager, *commitment.CommitStore) { commitStore, err := commitment.NewCommitStore(multiTrees, db, nil, log.NewNopLogger()) require.NoError(t, err) - snapshotsStore, err := snapshots.NewStore(db, t.TempDir()) + snapshotsStore, err := snapshots.NewStore(t.TempDir()) require.NoError(t, err) snapshotsManager := snapshots.NewManager(snapshotsStore, snapshots.NewSnapshotOptions(1500, 2), commitStore, nil, nil, log.NewNopLogger()) diff --git a/store/snapshots/helpers_test.go b/store/snapshots/helpers_test.go index 8ed5d3d594b1..33dcc14fa6f4 100644 --- a/store/snapshots/helpers_test.go +++ b/store/snapshots/helpers_test.go @@ -17,7 +17,6 @@ import ( errorsmod "cosmossdk.io/errors" "cosmossdk.io/log" "cosmossdk.io/store/v2" - dbm "cosmossdk.io/store/v2/db" "cosmossdk.io/store/v2/snapshots" snapshotstypes "cosmossdk.io/store/v2/snapshots/types" ) @@ -189,7 +188,7 @@ func (m *mockErrorCommitSnapshotter) SupportedFormats() []uint32 { // The snapshot will complete when the returned closer is called. func setupBusyManager(t *testing.T) *snapshots.Manager { t.Helper() - store, err := snapshots.NewStore(dbm.NewMemDB(), t.TempDir()) + store, err := snapshots.NewStore(t.TempDir()) require.NoError(t, err) hung := newHungCommitSnapshotter() mgr := snapshots.NewManager(store, opts, hung, &mockStorageSnapshotter{}, nil, log.NewNopLogger()) @@ -292,6 +291,7 @@ func (s *extSnapshotter) RestoreExtension(height uint64, format uint32, payloadR // GetTempDir returns a writable temporary director for the test to use. func GetTempDir(tb testing.TB) string { + //return "/tmp/snapshots" tb.Helper() // os.MkDir() is used instead of testing.T.TempDir() // see https://github.com/cosmos/cosmos-sdk/pull/8475 and diff --git a/store/snapshots/manager.go b/store/snapshots/manager.go index ad29da179671..b713da7134f4 100644 --- a/store/snapshots/manager.go +++ b/store/snapshots/manager.go @@ -599,7 +599,4 @@ func (m *Manager) snapshot(height int64) { } // Close the snapshot database. -func (m *Manager) Close() error { - m.logger.Info("snapshotManager Close Database") - return m.store.db.Close() -} +func (m *Manager) Close() error { return nil } diff --git a/store/snapshots/manager_test.go b/store/snapshots/manager_test.go index da598f7a6910..987d0c3b0e81 100644 --- a/store/snapshots/manager_test.go +++ b/store/snapshots/manager_test.go @@ -8,7 +8,6 @@ import ( "github.com/stretchr/testify/require" "cosmossdk.io/log" - dbm "cosmossdk.io/store/v2/db" "cosmossdk.io/store/v2/snapshots" "cosmossdk.io/store/v2/snapshots/types" ) @@ -237,7 +236,7 @@ func TestManager_Restore(t *testing.T) { func TestManager_TakeError(t *testing.T) { snapshotter := &mockErrorCommitSnapshotter{} - store, err := snapshots.NewStore(dbm.NewMemDB(), GetTempDir(t)) + store, err := snapshots.NewStore(GetTempDir(t)) require.NoError(t, err) manager := snapshots.NewManager(store, opts, snapshotter, &mockStorageSnapshotter{}, nil, log.NewNopLogger()) diff --git a/store/snapshots/store.go b/store/snapshots/store.go index ad2179ddbac7..c7fef66bb424 100644 --- a/store/snapshots/store.go +++ b/store/snapshots/store.go @@ -3,12 +3,15 @@ package snapshots import ( "crypto/sha256" "encoding/binary" + "fmt" "hash" "io" "math" "os" "path/filepath" + "sort" "strconv" + "strings" "sync" "github.com/cosmos/gogoproto/proto" @@ -25,7 +28,6 @@ const ( // Store is a snapshot store, containing snapshot metadata and binary chunks. type Store struct { - db store.RawDB dir string mtx sync.Mutex @@ -33,7 +35,7 @@ type Store struct { } // NewStore creates a new snapshot store. -func NewStore(db store.RawDB, dir string) (*Store, error) { +func NewStore(dir string) (*Store, error) { if dir == "" { return nil, errors.Wrap(store.ErrLogic, "snapshot directory not given") } @@ -41,9 +43,12 @@ func NewStore(db store.RawDB, dir string) (*Store, error) { if err != nil { return nil, errors.Wrapf(err, "failed to create snapshot directory %q", dir) } + err = os.MkdirAll(filepath.Join(dir, "metadata"), 0o750) + if err != nil { + return nil, errors.Wrapf(err, "failed to create snapshot metadata directory %q", dir) + } return &Store{ - db: db, dir: dir, saving: make(map[uint64]bool), }, nil @@ -58,32 +63,25 @@ func (s *Store) Delete(height uint64, format uint32) error { return errors.Wrapf(store.ErrConflict, "snapshot for height %v format %v is currently being saved", height, format) } - b := s.db.NewBatch() - defer b.Close() - if err := b.Delete(encodeKey(height, format)); err != nil { - return errors.Wrapf(err, "failed to delete item in the batch") - } - if err := b.WriteSync(); err != nil { - return errors.Wrapf(err, "failed to delete snapshot for height %v format %v", - height, format) - } if err := os.RemoveAll(s.pathSnapshot(height, format)); err != nil { - return errors.Wrapf(err, "failed to delete snapshot chunks for height %v format %v", - height, format) + return errors.Wrapf(err, "failed to delete snapshot chunks for height %v format %v", height, format) + } + if err := os.RemoveAll(s.pathMetadata(height, format)); err != nil { + return errors.Wrapf(err, "failed to delete snapshot metadata for height %v format %v", height, format) } return nil } // Get fetches snapshot info from the database. func (s *Store) Get(height uint64, format uint32) (*types.Snapshot, error) { - bytes, err := s.db.Get(encodeKey(height, format)) + if _, err := os.Stat(s.pathMetadata(height, format)); os.IsNotExist(err) { + return nil, nil + } + bytes, err := os.ReadFile(s.pathMetadata(height, format)) if err != nil { return nil, errors.Wrapf(err, "failed to fetch snapshot metadata for height %v format %v", height, format) } - if bytes == nil { - return nil, nil - } snapshot := &types.Snapshot{} err = proto.Unmarshal(bytes, snapshot) if err != nil { @@ -96,44 +94,62 @@ func (s *Store) Get(height uint64, format uint32) (*types.Snapshot, error) { return snapshot, nil } -// Get fetches the latest snapshot from the database, if any. +// GetLatest fetches the latest snapshot from the database, if any. func (s *Store) GetLatest() (*types.Snapshot, error) { - iter, err := s.db.ReverseIterator(encodeKey(0, 0), encodeKey(uint64(math.MaxUint64), math.MaxUint32)) + metadata, err := os.ReadDir(s.pathMetadataDir()) if err != nil { - return nil, errors.Wrap(err, "failed to find latest snapshot") + return nil, errors.Wrap(err, "failed to list snapshot metadata") } - defer iter.Close() + if len(metadata) == 0 { + return nil, nil + } + // file system may not guarantee the order of the files, so we sort them lexically + sort.Slice(metadata, func(i, j int) bool { return metadata[i].Name() < metadata[j].Name() }) - var snapshot *types.Snapshot - if iter.Valid() { - snapshot = &types.Snapshot{} - err := proto.Unmarshal(iter.Value(), snapshot) - if err != nil { - return nil, errors.Wrap(err, "failed to decode latest snapshot") - } + path := filepath.Join(s.pathMetadataDir(), metadata[len(metadata)-1].Name()) + if err := s.validateMetadataPath(path); err != nil { + return nil, err + } + bz, err := os.ReadFile(path) + if err != nil { + return nil, errors.Wrapf(err, "failed to read latest snapshot metadata %s", path) } - err = iter.Error() - return snapshot, errors.Wrap(err, "failed to find latest snapshot") + + snapshot := &types.Snapshot{} + err = proto.Unmarshal(bz, snapshot) + if err != nil { + return nil, errors.Wrapf(err, "failed to decode latest snapshot metadata %s", path) + } + return snapshot, nil } // List lists snapshots, in reverse order (newest first). func (s *Store) List() ([]*types.Snapshot, error) { - iter, err := s.db.ReverseIterator(encodeKey(0, 0), encodeKey(uint64(math.MaxUint64), math.MaxUint32)) + metadata, err := os.ReadDir(s.pathMetadataDir()) if err != nil { - return nil, errors.Wrap(err, "failed to list snapshots") + return nil, errors.Wrap(err, "failed to list snapshot metadata") } - defer iter.Close() + // file system may not guarantee the order of the files, so we sort them lexically + sort.Slice(metadata, func(i, j int) bool { return metadata[i].Name() < metadata[j].Name() }) - snapshots := make([]*types.Snapshot, 0) - for ; iter.Valid(); iter.Next() { + snapshots := make([]*types.Snapshot, len(metadata)) + for i, entry := range metadata { + path := filepath.Join(s.pathMetadataDir(), entry.Name()) + if err := s.validateMetadataPath(path); err != nil { + return nil, err + } + bz, err := os.ReadFile(path) + if err != nil { + return nil, errors.Wrapf(err, "failed to read snapshot metadata %s", entry.Name()) + } snapshot := &types.Snapshot{} - err := proto.Unmarshal(iter.Value(), snapshot) + err = proto.Unmarshal(bz, snapshot) if err != nil { - return nil, errors.Wrap(err, "failed to decode snapshot info") + return nil, errors.Wrapf(err, "failed to decode snapshot metadata %s", entry.Name()) } - snapshots = append(snapshots, snapshot) + snapshots[len(metadata)-1-i] = snapshot } - return snapshots, iter.Error() + return snapshots, nil } // Load loads a snapshot (both metadata and binary chunks). The chunks must be consumed and closed. @@ -188,25 +204,25 @@ func (s *Store) loadChunkFile(height uint64, format, chunk uint32) (io.ReadClose // Prune removes old snapshots. The given number of most recent heights (regardless of format) are retained. func (s *Store) Prune(retain uint32) (uint64, error) { - iter, err := s.db.ReverseIterator(encodeKey(0, 0), encodeKey(uint64(math.MaxUint64), math.MaxUint32)) + metadata, err := os.ReadDir(s.pathMetadataDir()) if err != nil { - return 0, errors.Wrap(err, "failed to prune snapshots") + return 0, errors.Wrap(err, "failed to list snapshot metadata") } - defer iter.Close() pruned := uint64(0) prunedHeights := make(map[uint64]bool) skip := make(map[uint64]bool) - for ; iter.Valid(); iter.Next() { - height, format, err := decodeKey(iter.Key()) + for i := len(metadata) - 1; i >= 0; i-- { + height, format, err := s.parseMetadataFilename(metadata[i].Name()) if err != nil { - return 0, errors.Wrap(err, "failed to prune snapshots") + return 0, err } + if skip[height] || uint32(len(skip)) < retain { skip[height] = true continue } - err = s.Delete(height, format) + err = s.Delete(height, uint32(format)) if err != nil { return 0, errors.Wrap(err, "failed to prune snapshots") } @@ -223,7 +239,7 @@ func (s *Store) Prune(retain uint32) (uint64, error) { } } } - return pruned, iter.Error() + return pruned, nil } // Save saves a snapshot to disk, returning it. @@ -249,37 +265,24 @@ func (s *Store) Save( s.mtx.Unlock() }() - exists, err := s.db.Has(encodeKey(height, format)) - if err != nil { - return nil, err - } - if exists { - return nil, errors.Wrapf(store.ErrConflict, - "snapshot already exists for height %v format %v", height, format) - } - snapshot := &types.Snapshot{ Height: height, Format: format, } - dirCreated := false + // create height directory or do nothing + if err := os.MkdirAll(s.pathHeight(height), 0o750); err != nil { + return nil, errors.Wrapf(err, "failed to create snapshot directory for height %v", height) + } + // create format directory or fail (if for example the format directory already exists) + if err := os.Mkdir(s.pathSnapshot(height, format), 0o750); err != nil { + return nil, errors.Wrapf(err, "failed to create snapshot directory for height %v format %v", height, format) + } + index := uint32(0) snapshotHasher := sha256.New() chunkHasher := sha256.New() for chunkBody := range chunks { - // Only create the snapshot directory on encountering the first chunk. - // If the directory disappears during chunk saving, - // the whole operation will fail anyway. - if !dirCreated { - dir := s.pathSnapshot(height, format) - if err := os.MkdirAll(dir, 0o755); err != nil { - return nil, errors.Wrapf(err, "failed to create snapshot directory %q", dir) - } - - dirCreated = true - } - if err := s.saveChunk(chunkBody, index, snapshot, chunkHasher, snapshotHasher); err != nil { return nil, err } @@ -332,13 +335,9 @@ func (s *Store) saveSnapshot(snapshot *types.Snapshot) error { if err != nil { return errors.Wrap(err, "failed to encode snapshot metadata") } - b := s.db.NewBatch() - defer b.Close() - if err := b.Set(encodeKey(snapshot.Height, snapshot.Format), value); err != nil { - return errors.Wrap(err, "failed to set snapshot in batch") - } - if err := b.WriteSync(); err != nil { - return errors.Wrap(err, "failed to store snapshot") + err = os.WriteFile(s.pathMetadata(snapshot.Height, snapshot.Format), value, 0o600) + if err != nil { + return errors.Wrap(err, "failed to write snapshot metadata") } return nil } @@ -353,13 +352,52 @@ func (s *Store) pathSnapshot(height uint64, format uint32) string { return filepath.Join(s.pathHeight(height), strconv.FormatUint(uint64(format), 10)) } +func (s *Store) pathMetadataDir() string { + return filepath.Join(s.dir, "metadata") +} + +// pathMetadata generates a snapshot metadata path. +func (s *Store) pathMetadata(height uint64, format uint32) string { + return filepath.Join(s.pathMetadataDir(), fmt.Sprintf("%020d-%08d", height, format)) +} + // PathChunk generates a snapshot chunk path. func (s *Store) PathChunk(height uint64, format, chunk uint32) string { return filepath.Join(s.pathSnapshot(height, format), strconv.FormatUint(uint64(chunk), 10)) } -// decodeKey decodes a snapshot key. -func decodeKey(k []byte) (uint64, uint32, error) { +func (s *Store) parseMetadataFilename(filename string) (height uint64, format uint32, err error) { + parts := strings.Split(filename, "-") + if len(parts) != 2 { + return 0, 0, fmt.Errorf("invalid snapshot metadata filename %s", filename) + } + height, err = strconv.ParseUint(parts[0], 10, 64) + if err != nil { + return 0, 0, errors.Wrapf(err, "invalid snapshot metadata filename %s", filename) + } + var f uint64 + f, err = strconv.ParseUint(parts[1], 10, 32) + if err != nil { + return 0, 0, errors.Wrapf(err, "invalid snapshot metadata filename %s", filename) + } + format = uint32(f) + if filename != filepath.Base(s.pathMetadata(height, uint32(format))) { + return 0, 0, fmt.Errorf("invalid snapshot metadata filename %s", filename) + } + return height, format, nil +} + +func (s *Store) validateMetadataPath(path string) error { + dir, f := filepath.Split(path) + if dir != fmt.Sprintf("%s/", s.pathMetadataDir()) { + return fmt.Errorf("invalid snapshot metadata path %s", path) + } + _, _, err := s.parseMetadataFilename(f) + return err +} + +// legacyV1DecodeKey decodes a legacy snapshot key used in a raw kv store. +func legacyV1DecodeKey(k []byte) (uint64, uint32, error) { if len(k) != 13 { return 0, 0, errors.Wrapf(store.ErrLogic, "invalid snapshot key with length %v", len(k)) } @@ -372,11 +410,29 @@ func decodeKey(k []byte) (uint64, uint32, error) { return height, format, nil } -// encodeKey encodes a snapshot key. -func encodeKey(height uint64, format uint32) []byte { +// legacyV1EncodeKey encodes a snapshot key for use in a raw kv store. +func legacyV1EncodeKey(height uint64, format uint32) []byte { k := make([]byte, 13) k[0] = keyPrefixSnapshot binary.BigEndian.PutUint64(k[1:], height) binary.BigEndian.PutUint32(k[9:], format) return k } + +func (s *Store) MigrateFromV1(db store.RawDB) error { + itr, err := db.Iterator(legacyV1EncodeKey(0, 0), legacyV1EncodeKey(math.MaxUint64, math.MaxUint32)) + if err != nil { + return err + } + defer itr.Close() + for ; itr.Valid(); itr.Next() { + height, format, err := legacyV1DecodeKey(itr.Key()) + if err != nil { + return err + } + if err := os.WriteFile(s.pathMetadata(height, format), itr.Value(), 0o600); err != nil { + return errors.Wrapf(err, "failed to write snapshot metadata %q", s.pathMetadata(height, format)) + } + } + return nil +} diff --git a/store/snapshots/store_test.go b/store/snapshots/store_test.go index 07f4d4a6d515..c6708ec8d73c 100644 --- a/store/snapshots/store_test.go +++ b/store/snapshots/store_test.go @@ -10,14 +10,13 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - dbm "cosmossdk.io/store/v2/db" "cosmossdk.io/store/v2/snapshots" "cosmossdk.io/store/v2/snapshots/types" ) func setupStore(t *testing.T) *snapshots.Store { t.Helper() - store, err := snapshots.NewStore(dbm.NewMemDB(), GetTempDir(t)) + store, err := snapshots.NewStore(GetTempDir(t)) require.NoError(t, err) _, err = store.Save(1, 1, makeChunks([][]byte{ @@ -42,13 +41,13 @@ func setupStore(t *testing.T) *snapshots.Store { func TestNewStore(t *testing.T) { tempdir := GetTempDir(t) - _, err := snapshots.NewStore(dbm.NewMemDB(), tempdir) + _, err := snapshots.NewStore(tempdir) require.NoError(t, err) } func TestNewStore_ErrNoDir(t *testing.T) { - _, err := snapshots.NewStore(dbm.NewMemDB(), "") + _, err := snapshots.NewStore("") require.Error(t, err) }