Skip to content

Commit

Permalink
Sync persistent store on commit (#78)
Browse files Browse the repository at this point in the history
  • Loading branch information
outofforest authored Oct 22, 2024
1 parent d2aaaf8 commit d5f30eb
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 37 deletions.
63 changes: 29 additions & 34 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ func New(config Config) (*DB, error) {
db := &DB{
config: config,
persistentAllocationCh: config.State.NewPhysicalAllocationCh(),
singularityNode: *photon.FromPointer[types.SingularityNode](config.State.Node(0)),
singularityNode: photon.FromPointer[types.SingularityNode](config.State.Node(0)),
singularityNodePointer: &types.Pointer{},
pointerNodeAllocator: pointerNodeAllocator,
snapshotToNodeNodeAllocator: snapshotToNodeNodeAllocator,
listNodeAllocator: listNodeAllocator,
Expand Down Expand Up @@ -154,7 +155,8 @@ func New(config Config) (*DB, error) {
type DB struct {
config Config
persistentAllocationCh chan []types.PhysicalAddress
singularityNode types.SingularityNode
singularityNode *types.SingularityNode
singularityNodePointer *types.Pointer
snapshotInfo types.SnapshotInfo
snapshots *space.Space[types.SnapshotID, types.SnapshotInfo]
spaces *space.Space[types.SpaceID, types.SpaceInfo]
Expand Down Expand Up @@ -361,12 +363,6 @@ func (db *DB) DeleteSnapshot(

// Commit commits current snapshot and returns next one.
func (db *DB) Commit(pool *alloc.Pool[types.LogicalAddress]) error {
doneCh := make(chan struct{})
db.eventCh <- types.DBCommitEvent{
DoneCh: doneCh,
}
<-doneCh

if len(db.spacesToCommit) > 0 {
spaces := make([]types.SpaceID, 0, len(db.spacesToCommit))
for spaceID := range db.spacesToCommit {
Expand Down Expand Up @@ -404,20 +400,16 @@ func (db *DB) Commit(pool *alloc.Pool[types.LogicalAddress]) error {
return err
}

doneCh = make(chan struct{})
syncCh := make(chan struct{})
db.eventCh <- types.DBCommitEvent{
DoneCh: doneCh,
SingularityNodePointer: db.singularityNodePointer,
SyncCh: syncCh,
}
<-doneCh

*photon.FromPointer[types.SingularityNode](db.config.State.Node(0)) = db.singularityNode
<-syncCh

db.availableSnapshots[db.singularityNode.LastSnapshotID] = struct{}{}

return db.prepareNextSnapshot()

// FIXME (wojciech): Store singularity node
// FIXME (wojciech): Call Sync on DB file
}

// Close closed DB.
Expand Down Expand Up @@ -589,12 +581,14 @@ func (db *DB) processEvents(
listNode,
)
case types.DBCommitEvent:
ch := make(chan struct{}, 1)
syncCh := make(chan struct{}, 1)
storeRequestCh <- types.StoreRequest{
DoneCh: ch,
Revision: 0,
Pointer: e.SingularityNodePointer,
SyncCh: syncCh,
}
<-ch
close(e.DoneCh)
<-syncCh
close(e.SyncCh)
}
}
return errors.WithStack(ctx.Err())
Expand Down Expand Up @@ -666,25 +660,26 @@ func (db *DB) storeSpacePointerNodes(

func (db *DB) processStoreRequests(ctx context.Context, storeRequestCh <-chan types.StoreRequest) error {
for req := range storeRequestCh {
if req.Pointer != nil {
header := photon.FromPointer[types.RevisionHeader](db.config.State.Node(req.Pointer.LogicalAddress))
revision := atomic.LoadUint64(&header.Revision)
header := photon.FromPointer[types.RevisionHeader](db.config.State.Node(req.Pointer.LogicalAddress))
revision := atomic.LoadUint64(&header.Revision)

if revision != req.Revision {
continue
}
if revision != req.Revision {
continue
}

if err := db.config.Store.Write(
req.Pointer.PhysicalAddress,
db.config.State.Bytes(req.Pointer.LogicalAddress),
); err != nil {
if err := db.config.Store.Write(
req.Pointer.PhysicalAddress,
db.config.State.Bytes(req.Pointer.LogicalAddress),
); err != nil {
return err
}

if req.SyncCh != nil {
if err := db.config.Store.Sync(); err != nil {
return err
}

continue
req.SyncCh <- struct{}{}
}

req.DoneCh <- struct{}{}
}
return errors.WithStack(ctx.Err())
}
Expand Down
5 changes: 5 additions & 0 deletions persistent/dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,8 @@ type DummyStore struct{}
func (s *DummyStore) Write(_ types.PhysicalAddress, _ []byte) error {
return nil
}

// Sync does nothing.
func (s *DummyStore) Sync() error {
return nil
}
5 changes: 5 additions & 0 deletions persistent/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,8 @@ func (s *FileStore) Write(address types.PhysicalAddress, data []byte) error {
_, err := s.file.Write(data)
return errors.WithStack(err)
}

// Sync syncs pending writes.
func (s *FileStore) Sync() error {
return errors.WithStack(s.file.Sync())
}
5 changes: 5 additions & 0 deletions persistent/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,8 @@ func (s *MemoryStore) Write(address types.PhysicalAddress, data []byte) error {
copy(s.data[address:], data)
return nil
}

// Sync does nothing.
func (s *MemoryStore) Sync() error {
return nil
}
1 change: 1 addition & 0 deletions persistent/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ import "github.com/outofforest/quantum/types"
// Store defines the interface of the store.
type Store interface {
Write(address types.PhysicalAddress, data []byte) error
Sync() error
}
7 changes: 4 additions & 3 deletions types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ type SnapshotInfo struct {

// SingularityNode is the root of the store.
type SingularityNode struct {
Version uint64
RevisionHeader
FirstSnapshotID SnapshotID
LastSnapshotID SnapshotID
SnapshotRoot SpaceInfo
Expand Down Expand Up @@ -127,12 +127,13 @@ type ListDeallocationEvent struct {

// DBCommitEvent is emitted to wait until all the events are processed before snapshot is committed.
type DBCommitEvent struct {
DoneCh chan<- struct{}
SingularityNodePointer *Pointer
SyncCh chan<- struct{}
}

// StoreRequest is used to request writing a node to the store.
type StoreRequest struct {
Revision uint64
Pointer *Pointer
DoneCh chan<- struct{}
SyncCh chan<- struct{}
}

0 comments on commit d5f30eb

Please sign in to comment.