Skip to content

Commit

Permalink
Keep deallocation lists in volatile memory (#296)
Browse files Browse the repository at this point in the history
  • Loading branch information
outofforest authored Dec 12, 2024
1 parent 95f679a commit 6f40c10
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 209 deletions.
78 changes: 21 additions & 57 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,15 +207,14 @@ func (db *DB) deleteSnapshot(
snapshotID types.SnapshotID,
tx *pipeline.TransactionRequest,
volatileAllocator *alloc.Allocator[types.VolatileAddress],
volatileDeallocator *alloc.Deallocator[types.VolatileAddress],
persistentDeallocator *alloc.Deallocator[types.PersistentAddress],
snapshotSpace *space.Space[types.SnapshotID, types.SnapshotInfo],
snapshotHashBuff []byte,
snapshotHashMatches []uint64,
deallocationNodeAssistant *space.DataNodeAssistant[deallocationKey, types.PersistentAddress],
deallocationNodeAssistant *space.DataNodeAssistant[deallocationKey, list.Pointer],
deallocationHashBuff []byte,
deallocationHashMatches []uint64,
storeReader *persistent.Reader,
nodeBuff1, nodeBuff2 unsafe.Pointer,
) error {
if snapshotID == db.singularityNode.LastSnapshotID-1 {
return errors.New("deleting latest persistent snapshot is forbidden")
Expand Down Expand Up @@ -244,11 +243,8 @@ func (db *DB) deleteSnapshot(
Pointer: &snapshotInfo.DeallocationRoot,
}

// FIXME (wojciech): What??? Volatile nodes of that space have been deallocated long time ago, so this code
// is a complete disaster!!! It succeeded only because volatile address pool was so huge that the nodes haven't
// been reallocated before deleting the snapshot.
deallocationLists := space.New[deallocationKey, types.PersistentAddress](
space.Config[deallocationKey, types.PersistentAddress]{
deallocationLists := space.New[deallocationKey, list.Pointer](
space.Config[deallocationKey, list.Pointer]{
SpaceRoot: deallocationListsRoot,
State: db.config.State,
DataNodeAssistant: deallocationNodeAssistant,
Expand All @@ -260,25 +256,25 @@ func (db *DB) deleteSnapshot(
//nolint:nestif
if nextSnapshotInfo.DeallocationRoot.VolatileAddress != types.FreeAddress {
var err error
for nextDeallocSnapshot := range space.PersistentIteratorAndDeallocator(
for nextDeallocSnapshot := range space.IteratorAndDeallocator(
nextSnapshotInfo.DeallocationRoot,
storeReader,
db.config.State,
deallocationNodeAssistant,
volatileDeallocator,
persistentDeallocator,
nodeBuff1,
&err,
) {
if nextDeallocSnapshot.Key.SnapshotID > snapshotInfo.PreviousSnapshotID &&
nextDeallocSnapshot.Key.SnapshotID <= snapshotID {
if err := list.Deallocate(nextDeallocSnapshot.Value, storeReader, persistentDeallocator,
nodeBuff2); err != nil {
if err := list.Deallocate(nextDeallocSnapshot.Value, db.config.State, volatileDeallocator,
persistentDeallocator); err != nil {
return err
}

continue
}

var deallocationListValue space.Entry[deallocationKey, types.PersistentAddress]
var deallocationListValue space.Entry[deallocationKey, list.Pointer]
deallocationLists.Find(&deallocationListValue, nextDeallocSnapshot.Key, space.StageData, deallocationHashBuff,
deallocationHashMatches)
if err := deallocationListValue.Set(
Expand Down Expand Up @@ -338,13 +334,12 @@ func (db *DB) commit(
tx *pipeline.TransactionRequest,
deallocationListsToCommit map[types.SnapshotID]*list.Pointer,
volatileAllocator *alloc.Allocator[types.VolatileAddress],
volatileDeallocator *alloc.Deallocator[types.VolatileAddress],
persistentAllocator *alloc.Allocator[types.PersistentAddress],
persistentDeallocator *alloc.Deallocator[types.PersistentAddress],
snapshotSpace *space.Space[types.SnapshotID, types.SnapshotInfo],
snapshotHashBuff []byte,
snapshotHashMatches []uint64,
deallocationListSpace *space.Space[deallocationKey, types.PersistentAddress],
deallocationListSpace *space.Space[deallocationKey, list.Pointer],
deallocationHashBuff []byte,
deallocationHashMatches []uint64,
) error {
Expand All @@ -369,11 +364,7 @@ func (db *DB) commit(
for _, snapshotID := range lists {
listRoot := deallocationListsToCommit[snapshotID]

// It is safe to deallocate volatile nodes here despite they are used to write persistent nodes later.
// It's because deallocated nodes are never reallocated until current commit is finalized.
volatileDeallocator.Deallocate(listRoot.VolatileAddress)

var deallocationListValue space.Entry[deallocationKey, types.PersistentAddress]
var deallocationListValue space.Entry[deallocationKey, list.Pointer]
deallocationListSpace.Find(&deallocationListValue, deallocationKey{
ListSnapshotID: db.singularityNode.LastSnapshotID,
SnapshotID: snapshotID,
Expand All @@ -382,7 +373,7 @@ func (db *DB) commit(
if err := deallocationListValue.Set(
tx,
volatileAllocator,
listRoot.PersistentAddress,
*listRoot,
deallocationHashBuff,
deallocationHashMatches,
); err != nil {
Expand Down Expand Up @@ -419,10 +410,6 @@ func (db *DB) commit(
sr.Store[i].Pointer.Revision = uintptr(unsafe.Pointer(sr))
}
}

// It is safe to deallocate volatile nodes here despite they are used to write persistent nodes later.
// It's because deallocated nodes are never reallocated until current commit is finalized.
space.DeallocateVolatile(db.snapshotInfo.DeallocationRoot.VolatileAddress, volatileDeallocator, db.config.State)
}

lastSr := tx.LastStoreRequest
Expand Down Expand Up @@ -498,29 +485,11 @@ func (db *DB) prepareTransactions(
}

func (db *DB) executeTransactions(ctx context.Context, pipeReader *pipeline.Reader) error {
storeReader, err := db.config.Store.NewReader()
if err != nil {
return err
}
defer storeReader.Close()

volatileAllocator := db.config.State.NewVolatileAllocator()
volatileDeallocator := db.config.State.NewVolatileDeallocator()
persistentAllocator := db.config.State.NewPersistentAllocator()
persistentDeallocator := db.config.State.NewPersistentDeallocator()

// We use allocator to allocate this region to get aligned address effortlessly.
nodeBuff1Address, err := volatileAllocator.Allocate()
if err != nil {
return err
}
nodeBuff2Address, err := volatileAllocator.Allocate()
if err != nil {
return err
}
nodeBuff1 := db.config.State.Node(nodeBuff1Address)
nodeBuff2 := db.config.State.Node(nodeBuff2Address)

snapshotInfoNodeAssistant, err := space.NewDataNodeAssistant[types.SnapshotID, types.SnapshotInfo]()
if err != nil {
return err
Expand All @@ -538,13 +507,13 @@ func (db *DB) executeTransactions(ctx context.Context, pipeReader *pipeline.Read

deallocationListsToCommit := map[types.SnapshotID]*list.Pointer{}

deallocationNodeAssistant, err := space.NewDataNodeAssistant[deallocationKey, types.PersistentAddress]()
deallocationNodeAssistant, err := space.NewDataNodeAssistant[deallocationKey, list.Pointer]()
if err != nil {
return err
}

deallocationListSpace := space.New[deallocationKey, types.PersistentAddress](
space.Config[deallocationKey, types.PersistentAddress]{
deallocationListSpace := space.New[deallocationKey, list.Pointer](
space.Config[deallocationKey, list.Pointer]{
SpaceRoot: types.NodeRoot{
Pointer: &db.snapshotInfo.DeallocationRoot,
},
Expand Down Expand Up @@ -580,10 +549,9 @@ func (db *DB) executeTransactions(ctx context.Context, pipeReader *pipeline.Read
return err
}
case *deleteSnapshotTx:
if err := db.deleteSnapshot(tx.SnapshotID, req, volatileAllocator,
if err := db.deleteSnapshot(tx.SnapshotID, req, volatileAllocator, volatileDeallocator,
persistentDeallocator, snapshotSpace, snapshotHashBuff, snapshotHashMatches,
deallocationNodeAssistant, deallocationHashBuff, deallocationHashMatches, storeReader, nodeBuff1,
nodeBuff2); err != nil {
deallocationNodeAssistant, deallocationHashBuff, deallocationHashMatches); err != nil {
return err
}
case *genesis.Tx:
Expand Down Expand Up @@ -615,10 +583,6 @@ func (db *DB) executeTransactions(ctx context.Context, pipeReader *pipeline.Read
}

if listNodePointer.VolatileAddress != types.FreeAddress {
// It is safe to deallocate volatile nodes here despite they are used to write persistent nodes later.
// It's because deallocated nodes are never reallocated until current commit is finalized.
volatileDeallocator.Deallocate(listNodePointer.VolatileAddress)

if lr == nil {
lr = &pipeline.ListRequest{}
}
Expand Down Expand Up @@ -651,9 +615,9 @@ func (db *DB) executeTransactions(ctx context.Context, pipeReader *pipeline.Read
}

if req.Type == pipeline.Commit {
if err := db.commit(req, deallocationListsToCommit, volatileAllocator, volatileDeallocator,
persistentAllocator, persistentDeallocator, snapshotSpace, snapshotHashBuff, snapshotHashMatches,
deallocationListSpace, deallocationHashBuff, deallocationHashMatches); err != nil {
if err := db.commit(req, deallocationListsToCommit, volatileAllocator, persistentAllocator,
persistentDeallocator, snapshotSpace, snapshotHashBuff, snapshotHashMatches, deallocationListSpace,
deallocationHashBuff, deallocationHashMatches); err != nil {
return err
}
}
Expand Down
33 changes: 14 additions & 19 deletions list/list.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package list

import (
"unsafe"

"github.com/outofforest/quantum/alloc"
"github.com/outofforest/quantum/persistent"
"github.com/outofforest/quantum/types"
)

Expand Down Expand Up @@ -37,7 +34,7 @@ func Add(
node.Slots[0] = nodeAddress
node.NumOfPointerAddresses = 1
// This is needed because list nodes are not zeroed.
node.Next = 0
node.Next = Pointer{}

return Pointer{}, nil
}
Expand Down Expand Up @@ -66,33 +63,31 @@ func Add(

node.Slots[0] = nodeAddress
node.NumOfPointerAddresses = 1
node.Next = oldRoot.PersistentAddress
node.Next = oldRoot

return oldRoot, nil
}

// Deallocate deallocates nodes referenced by the list.
func Deallocate(
listRoot types.PersistentAddress,
storeReader *persistent.Reader,
deallocator *alloc.Deallocator[types.PersistentAddress],
nodeBuff unsafe.Pointer,
listRoot Pointer,
state *alloc.State,
volatileDeallocator *alloc.Deallocator[types.VolatileAddress],
persistentDeallocator *alloc.Deallocator[types.PersistentAddress],
) error {
for {
if listRoot == 0 {
return nil
}
// It is safe to do deallocations here because deallocated nodes are not reallocated until commit is finalized.
volatileDeallocator.Deallocate(listRoot.VolatileAddress)
persistentDeallocator.Deallocate(listRoot.PersistentAddress)

if err := storeReader.Read(listRoot, nodeBuff); err != nil {
return err
}

node := ProjectNode(nodeBuff)
node := ProjectNode(state.Node(listRoot.VolatileAddress))
for i := range node.NumOfPointerAddresses {
deallocator.Deallocate(node.Slots[i])
persistentDeallocator.Deallocate(node.Slots[i])
}

deallocator.Deallocate(listRoot)
if node.Next.VolatileAddress == types.FreeAddress {
return nil
}
listRoot = node.Next
}
}
4 changes: 2 additions & 2 deletions list/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ import (
)

// NumOfAddresses defines number of available slots in the list node.
const NumOfAddresses = 510
const NumOfAddresses = 509

// Node represents list node.
type Node struct {
Slots [NumOfAddresses]types.PersistentAddress

NumOfPointerAddresses uint16
Next types.PersistentAddress
Next Pointer
}

// ProjectNode projects node to list node.
Expand Down
80 changes: 0 additions & 80 deletions persistent/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,6 @@ func (s *Store) Size() uint64 {
return s.size
}

// NewReader creates new store reader.
func (s *Store) NewReader() (*Reader, error) {
return newReader(s.file)
}

// NewWriter creates new store writer.
func (s *Store) NewWriter(volatileOrigin unsafe.Pointer, volatileSize uint64) (*Writer, error) {
return newWriter(s.file, volatileOrigin, volatileSize)
Expand All @@ -62,81 +57,6 @@ func (s *Store) Close() {
_ = s.file.Close()
}

func newReader(file *os.File) (*Reader, error) {
// Required by uring.SetupSingleIssuer.
runtime.LockOSThread()

ring, err := uring.New(1, uring.WithCQSize(1),
uring.WithFlags(uring.SetupSingleIssuer),
)
if err != nil {
return nil, errors.WithStack(err)
}

return &Reader{
fd: int32(file.Fd()),
ring: ring,
}, nil
}

// Reader reads nodes from persistent store.
type Reader struct {
fd int32
ring *uring.Ring
}

// Read reads data from the persistent store.
func (r *Reader) Read(address types.PersistentAddress, data unsafe.Pointer) error {
sqe, err := r.ring.NextSQE()
if err != nil {
return errors.WithStack(err)
}
// We don't need to do this because we never set them so they are 0s in the entire ring all the time.
// sqe.Flags = 0
// sqe.IoPrio = 0
// sqe.OpcodeFlags = 0
// sqe.UserData = 0
// sqe.BufIG = 0
// sqe.Personality = 0
// sqe.SpliceFdIn = 0

sqe.OpCode = uint8(uring.ReadCode)
sqe.Fd = r.fd
sqe.Len = types.NodeLength
sqe.Off = uint64(address) * types.NodeLength
sqe.Addr = uint64(uintptr(data))

if _, err := r.ring.Submit(); err != nil {
return errors.WithStack(err)
}

for {
cqe, err := r.ring.PeekCQE()
if err != nil {
if errors.Is(err, syscall.EAGAIN) {
if _, err := r.ring.WaitCQEvents(1); err != nil && !errors.Is(err, syscall.EINTR) {
return errors.WithStack(err)
}
continue
}
return errors.WithStack(err)
}
if err := cqe.Error(); err != nil {
return errors.WithStack(err)
}

break
}
r.ring.AdvanceCQ(1)

return nil
}

// Close closes the reader.
func (r *Reader) Close() {
_ = r.ring.Close()
}

func newWriter(
file *os.File,
volatileOrigin unsafe.Pointer,
Expand Down
Loading

0 comments on commit 6f40c10

Please sign in to comment.