From d5f30eb9a5e31d74e483de8bbcefd54849080ffd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Ma=C5=82ota-W=C3=B3jcik?= <59281144+outofforest@users.noreply.github.com> Date: Tue, 22 Oct 2024 08:14:03 +0200 Subject: [PATCH] Sync persistent store on commit (#78) --- db.go | 63 ++++++++++++++++++++------------------------ persistent/dummy.go | 5 ++++ persistent/file.go | 5 ++++ persistent/memory.go | 5 ++++ persistent/types.go | 1 + types/types.go | 7 ++--- 6 files changed, 49 insertions(+), 37 deletions(-) diff --git a/db.go b/db.go index d6c0765..77e2450 100644 --- a/db.go +++ b/db.go @@ -83,7 +83,8 @@ func New(config Config) (*DB, error) { db := &DB{ config: config, persistentAllocationCh: config.State.NewPhysicalAllocationCh(), - singularityNode: *photon.FromPointer[types.SingularityNode](config.State.Node(0)), + singularityNode: photon.FromPointer[types.SingularityNode](config.State.Node(0)), + singularityNodePointer: &types.Pointer{}, pointerNodeAllocator: pointerNodeAllocator, snapshotToNodeNodeAllocator: snapshotToNodeNodeAllocator, listNodeAllocator: listNodeAllocator, @@ -154,7 +155,8 @@ func New(config Config) (*DB, error) { type DB struct { config Config persistentAllocationCh chan []types.PhysicalAddress - singularityNode types.SingularityNode + singularityNode *types.SingularityNode + singularityNodePointer *types.Pointer snapshotInfo types.SnapshotInfo snapshots *space.Space[types.SnapshotID, types.SnapshotInfo] spaces *space.Space[types.SpaceID, types.SpaceInfo] @@ -361,12 +363,6 @@ func (db *DB) DeleteSnapshot( // Commit commits current snapshot and returns next one. func (db *DB) Commit(pool *alloc.Pool[types.LogicalAddress]) error { - doneCh := make(chan struct{}) - db.eventCh <- types.DBCommitEvent{ - DoneCh: doneCh, - } - <-doneCh - if len(db.spacesToCommit) > 0 { spaces := make([]types.SpaceID, 0, len(db.spacesToCommit)) for spaceID := range db.spacesToCommit { @@ -404,20 +400,16 @@ func (db *DB) Commit(pool *alloc.Pool[types.LogicalAddress]) error { return err } - doneCh = make(chan struct{}) + syncCh := make(chan struct{}) db.eventCh <- types.DBCommitEvent{ - DoneCh: doneCh, + SingularityNodePointer: db.singularityNodePointer, + SyncCh: syncCh, } - <-doneCh - - *photon.FromPointer[types.SingularityNode](db.config.State.Node(0)) = db.singularityNode + <-syncCh db.availableSnapshots[db.singularityNode.LastSnapshotID] = struct{}{} return db.prepareNextSnapshot() - - // FIXME (wojciech): Store singularity node - // FIXME (wojciech): Call Sync on DB file } // Close closed DB. @@ -589,12 +581,14 @@ func (db *DB) processEvents( listNode, ) case types.DBCommitEvent: - ch := make(chan struct{}, 1) + syncCh := make(chan struct{}, 1) storeRequestCh <- types.StoreRequest{ - DoneCh: ch, + Revision: 0, + Pointer: e.SingularityNodePointer, + SyncCh: syncCh, } - <-ch - close(e.DoneCh) + <-syncCh + close(e.SyncCh) } } return errors.WithStack(ctx.Err()) @@ -666,25 +660,26 @@ func (db *DB) storeSpacePointerNodes( func (db *DB) processStoreRequests(ctx context.Context, storeRequestCh <-chan types.StoreRequest) error { for req := range storeRequestCh { - if req.Pointer != nil { - header := photon.FromPointer[types.RevisionHeader](db.config.State.Node(req.Pointer.LogicalAddress)) - revision := atomic.LoadUint64(&header.Revision) + header := photon.FromPointer[types.RevisionHeader](db.config.State.Node(req.Pointer.LogicalAddress)) + revision := atomic.LoadUint64(&header.Revision) - if revision != req.Revision { - continue - } + if revision != req.Revision { + continue + } - if err := db.config.Store.Write( - req.Pointer.PhysicalAddress, - db.config.State.Bytes(req.Pointer.LogicalAddress), - ); err != nil { + if err := db.config.Store.Write( + req.Pointer.PhysicalAddress, + db.config.State.Bytes(req.Pointer.LogicalAddress), + ); err != nil { + return err + } + + if req.SyncCh != nil { + if err := db.config.Store.Sync(); err != nil { return err } - - continue + req.SyncCh <- struct{}{} } - - req.DoneCh <- struct{}{} } return errors.WithStack(ctx.Err()) } diff --git a/persistent/dummy.go b/persistent/dummy.go index 291d819..945c06f 100644 --- a/persistent/dummy.go +++ b/persistent/dummy.go @@ -16,3 +16,8 @@ type DummyStore struct{} func (s *DummyStore) Write(_ types.PhysicalAddress, _ []byte) error { return nil } + +// Sync does nothing. +func (s *DummyStore) Sync() error { + return nil +} diff --git a/persistent/file.go b/persistent/file.go index 8d9489b..e74d1db 100644 --- a/persistent/file.go +++ b/persistent/file.go @@ -29,3 +29,8 @@ func (s *FileStore) Write(address types.PhysicalAddress, data []byte) error { _, err := s.file.Write(data) return errors.WithStack(err) } + +// Sync syncs pending writes. +func (s *FileStore) Sync() error { + return errors.WithStack(s.file.Sync()) +} diff --git a/persistent/memory.go b/persistent/memory.go index 1fe184a..9207a8a 100644 --- a/persistent/memory.go +++ b/persistent/memory.go @@ -36,3 +36,8 @@ func (s *MemoryStore) Write(address types.PhysicalAddress, data []byte) error { copy(s.data[address:], data) return nil } + +// Sync does nothing. +func (s *MemoryStore) Sync() error { + return nil +} diff --git a/persistent/types.go b/persistent/types.go index ee71c5e..888cc21 100644 --- a/persistent/types.go +++ b/persistent/types.go @@ -5,4 +5,5 @@ import "github.com/outofforest/quantum/types" // Store defines the interface of the store. type Store interface { Write(address types.PhysicalAddress, data []byte) error + Sync() error } diff --git a/types/types.go b/types/types.go index 7299ce4..b071afc 100644 --- a/types/types.go +++ b/types/types.go @@ -84,7 +84,7 @@ type SnapshotInfo struct { // SingularityNode is the root of the store. type SingularityNode struct { - Version uint64 + RevisionHeader FirstSnapshotID SnapshotID LastSnapshotID SnapshotID SnapshotRoot SpaceInfo @@ -127,12 +127,13 @@ type ListDeallocationEvent struct { // DBCommitEvent is emitted to wait until all the events are processed before snapshot is committed. type DBCommitEvent struct { - DoneCh chan<- struct{} + SingularityNodePointer *Pointer + SyncCh chan<- struct{} } // StoreRequest is used to request writing a node to the store. type StoreRequest struct { Revision uint64 Pointer *Pointer - DoneCh chan<- struct{} + SyncCh chan<- struct{} }