Skip to content

Commit

Permalink
3-stage tree walk
Browse files Browse the repository at this point in the history
  • Loading branch information
outofforest committed Nov 30, 2024
1 parent cbe3f49 commit 0f87c27
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 71 deletions.
65 changes: 33 additions & 32 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ func New(config Config) (*DB, error) {
},
State: config.State,
DataNodeAssistant: snapshotInfoNodeAssistant,
MassEntry: mass.New[space.Entry[types.SnapshotID, types.SnapshotInfo]](1000),
NoSnapshots: true,
})

Expand All @@ -92,7 +91,6 @@ func New(config Config) (*DB, error) {
},
State: config.State,
DataNodeAssistant: snapshotToPointerNodeAssistant,
MassEntry: mass.New[space.Entry[types.SnapshotID, types.Pointer]](1000),
NoSnapshots: true,
},
)
Expand Down Expand Up @@ -179,11 +177,14 @@ func (db *DB) Run(ctx context.Context) error {
defer db.config.State.Close()

return parallel.Run(ctx, func(ctx context.Context, spawn parallel.SpawnFn) error {
prepareTxReaders := make([]*pipeline.Reader, 0, 1)
supervisorReader := db.queueReader
prepareTxReader := pipeline.CloneReader(db.queueReader)
prepareTxReaders := make([]*pipeline.Reader, 0, 3)
for range cap(prepareTxReaders) {
prepareTxReaders = append(prepareTxReaders, pipeline.CloneReader(db.queueReader))
prepareTxReaders = append(prepareTxReaders, prepareTxReader)
prepareTxReader = pipeline.NewReader(prepareTxReader)
}
executeTxReader := pipeline.NewReader(prepareTxReaders...)
executeTxReader := prepareTxReader
deallocateReader := pipeline.NewReader(executeTxReader)
copyReaders := make([]*pipeline.Reader, 0, 4)
for range cap(copyReaders) {
Expand Down Expand Up @@ -212,7 +213,7 @@ func (db *DB) Run(ctx context.Context) error {
var processedCount uint64

for {
req, err := db.queueReader.Read(ctx)
req, err := supervisorReader.Read(ctx)
if err != nil {
if lastSyncCh != nil {
lastSyncCh <- struct{}{}
Expand All @@ -226,7 +227,7 @@ func (db *DB) Run(ctx context.Context) error {

if req != nil {
processedCount++
db.queueReader.Acknowledge(processedCount, req)
supervisorReader.Acknowledge(processedCount, req)

if req.Type == pipeline.Sync {
lastSyncCh = req.SyncCh
Expand All @@ -244,7 +245,7 @@ func (db *DB) Run(ctx context.Context) error {
})
for i, reader := range prepareTxReaders {
spawn(fmt.Sprintf("prepareTx-%02d", i), parallel.Fail, func(ctx context.Context) error {
return db.prepareTransactions(ctx, reader, uint64(len(prepareTxReaders)), uint64(i))
return db.prepareTransactions(ctx, reader)
})
}
spawn("executeTx", parallel.Fail, func(ctx context.Context) error {
Expand Down Expand Up @@ -290,15 +291,15 @@ func (db *DB) deleteSnapshot(
walRecorder *wal.Recorder,
allocator *alloc.Allocator,
deallocator *alloc.Deallocator,
massSnapshotToPointerEntry *mass.Mass[space.Entry[types.SnapshotID, types.Pointer]],
massStoreRequest *mass.Mass[pipeline.StoreRequest],
snapshotHashBuff []byte,
snapshotHashMatches []uint64,
deallocationHashBuff []byte,
deallocationHashMatches []uint64,
) error {
snapshotInfoValue, err := db.snapshots.Find(snapshotID, tx, walRecorder, allocator, snapshotID, snapshotHashBuff,
snapshotHashMatches)
var snapshotInfoValue space.Entry[types.SnapshotID, types.SnapshotInfo]
err := db.snapshots.Find(&snapshotInfoValue, snapshotID, tx, walRecorder, allocator, snapshotID, space.StageData,
snapshotHashBuff, snapshotHashMatches)
if err != nil {
return err
}
Expand Down Expand Up @@ -327,8 +328,9 @@ func (db *DB) deleteSnapshot(

//nolint:nestif
if snapshotInfo.NextSnapshotID < db.singularityNode.LastSnapshotID {
nextSnapshotInfoValue, err := db.snapshots.Find(snapshotID, tx, walRecorder, allocator,
snapshotInfo.NextSnapshotID, snapshotHashBuff, snapshotHashMatches)
var nextSnapshotInfoValue space.Entry[types.SnapshotID, types.SnapshotInfo]
err := db.snapshots.Find(&nextSnapshotInfoValue, snapshotID, tx, walRecorder, allocator,
snapshotInfo.NextSnapshotID, space.StageData, snapshotHashBuff, snapshotHashMatches)
if err != nil {
return err
}
Expand Down Expand Up @@ -356,7 +358,6 @@ func (db *DB) deleteSnapshot(
SpaceRoot: nextDeallocationListRoot,
State: db.config.State,
DataNodeAssistant: db.snapshotToPointerNodeAssistant,
MassEntry: massSnapshotToPointerEntry,
},
)
} else {
Expand All @@ -375,7 +376,6 @@ func (db *DB) deleteSnapshot(
SpaceRoot: deallocationListsRoot,
State: db.config.State,
DataNodeAssistant: db.snapshotToPointerNodeAssistant,
MassEntry: massSnapshotToPointerEntry,
},
)

Expand All @@ -391,8 +391,9 @@ func (db *DB) deleteSnapshot(
continue
}

deallocationListValue, err := deallocationLists.Find(snapshotID, tx, walRecorder, allocator,
nextDeallocSnapshot.Key, deallocationHashBuff, deallocationHashMatches)
var deallocationListValue space.Entry[types.SnapshotID, types.Pointer]
err := deallocationLists.Find(&deallocationListValue, snapshotID, tx, walRecorder, allocator,
nextDeallocSnapshot.Key, space.StageData, deallocationHashBuff, deallocationHashMatches)
if err != nil {
return err
}
Expand Down Expand Up @@ -459,8 +460,9 @@ func (db *DB) deleteSnapshot(
nextSnapshotInfo.PreviousSnapshotID = snapshotInfo.PreviousSnapshotID

if snapshotInfo.NextSnapshotID < db.singularityNode.LastSnapshotID {
nextSnapshotInfoValue, err := db.snapshots.Find(snapshotID, tx, walRecorder, allocator,
snapshotInfo.NextSnapshotID, snapshotHashBuff, snapshotHashMatches)
var nextSnapshotInfoValue space.Entry[types.SnapshotID, types.SnapshotInfo]
err := db.snapshots.Find(&nextSnapshotInfoValue, snapshotID, tx, walRecorder, allocator,
snapshotInfo.NextSnapshotID, space.StageData, snapshotHashBuff, snapshotHashMatches)
if err != nil {
return err
}
Expand All @@ -479,8 +481,9 @@ func (db *DB) deleteSnapshot(

//nolint:nestif
if snapshotInfo.PreviousSnapshotID > 0 {
previousSnapshotInfoValue, err := db.snapshots.Find(snapshotID, tx, walRecorder, allocator,
snapshotInfo.PreviousSnapshotID, snapshotHashBuff, snapshotHashMatches)
var previousSnapshotInfoValue space.Entry[types.SnapshotID, types.SnapshotInfo]
err := db.snapshots.Find(&previousSnapshotInfoValue, snapshotID, tx, walRecorder, allocator,
snapshotInfo.PreviousSnapshotID, space.StageData, snapshotHashBuff, snapshotHashMatches)
if err != nil {
return err
}
Expand Down Expand Up @@ -538,8 +541,9 @@ func (db *DB) commit(
var sr *pipeline.StoreRequest
for _, snapshotID := range lists {
l := db.deallocationListsToCommit[snapshotID]
deallocationListValue, err := db.deallocationLists.Find(commitSnapshotID, tx, walRecorder, allocator,
snapshotID, deallocationHashBuff, deallocationHashMatches)
var deallocationListValue space.Entry[types.SnapshotID, types.Pointer]
err := db.deallocationLists.Find(&deallocationListValue, commitSnapshotID, tx, walRecorder, allocator,
snapshotID, space.StageData, deallocationHashBuff, deallocationHashMatches)
if err != nil {
return err
}
Expand Down Expand Up @@ -599,8 +603,9 @@ func (db *DB) commit(
clear(db.deallocationListsToCommit)
}

nextSnapshotInfoValue, err := db.snapshots.Find(commitSnapshotID, tx, walRecorder, allocator,
db.singularityNode.LastSnapshotID, snapshotHashBuff, snapshotHashMatches)
var nextSnapshotInfoValue space.Entry[types.SnapshotID, types.SnapshotInfo]
err := db.snapshots.Find(&nextSnapshotInfoValue, commitSnapshotID, tx, walRecorder, allocator,
db.singularityNode.LastSnapshotID, space.StageData, snapshotHashBuff, snapshotHashMatches)
if err != nil {
return err
}
Expand Down Expand Up @@ -628,8 +633,6 @@ func (db *DB) commit(
func (db *DB) prepareTransactions(
ctx context.Context,
pipeReader *pipeline.Reader,
divider uint64,
mod uint64,
) error {
s, err := GetSpace[txtypes.Account, txtypes.Amount](spaces.Balances, db)
if err != nil {
Expand All @@ -648,7 +651,7 @@ func (db *DB) prepareTransactions(
return err
}

if processedCount%divider == mod && req.Transaction != nil {
if req.Transaction != nil {
if transferTx, ok := req.Transaction.(*transfer.Tx); ok {
if err := transferTx.Prepare(s, db.singularityNode.LastSnapshotID, req, walRecorder, allocator,
hashBuff, hashMatches); err != nil {
Expand Down Expand Up @@ -677,7 +680,6 @@ func (db *DB) executeTransactions(ctx context.Context, pipeReader *pipeline.Read
deallocationHashBuff := db.deallocationLists.NewHashBuff()
deallocationHashMatches := db.deallocationLists.NewHashMatches()

massSnapshotToPointerEntry := mass.New[space.Entry[types.SnapshotID, types.Pointer]](1000)
massStoreRequest := mass.New[pipeline.StoreRequest](1000)

walRecorder := wal.NewRecorder(db.config.State, allocator)
Expand Down Expand Up @@ -707,8 +709,8 @@ func (db *DB) executeTransactions(ctx context.Context, pipeReader *pipeline.Read
walRecorder.Commit(req)
case *deleteSnapshotTx:
if err := db.deleteSnapshot(tx.SnapshotID, req, walRecorder, allocator, deallocator,
massSnapshotToPointerEntry, massStoreRequest, snapshotHashBuff, snapshotHashMatches,
deallocationHashBuff, deallocationHashMatches); err != nil {
massStoreRequest, snapshotHashBuff, snapshotHashMatches, deallocationHashBuff,
deallocationHashMatches); err != nil {
return err
}
case *genesis.Tx:
Expand Down Expand Up @@ -1214,7 +1216,6 @@ func GetSpace[K, V comparable](spaceID types.SpaceID, db *DB) (*space.Space[K, V
},
State: db.config.State,
DataNodeAssistant: dataNodeAssistant,
MassEntry: mass.New[space.Entry[K, V]](1000),
}), nil
}

Expand Down
77 changes: 49 additions & 28 deletions space/space.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/cespare/xxhash"
"github.com/samber/lo"

"github.com/outofforest/mass"
"github.com/outofforest/photon"
"github.com/outofforest/quantum/alloc"
"github.com/outofforest/quantum/pipeline"
Expand All @@ -18,6 +17,13 @@ import (
"github.com/outofforest/quantum/wal"
)

// Stage constants.
const (
StagePointer0 uint8 = iota
StagePointer1
StageData
)

var (
stateFreePtr = lo.ToPtr(types.StateFree)
stateDataPtr = lo.ToPtr(types.StateData)
Expand All @@ -30,7 +36,6 @@ type Config[K, V comparable] struct {
SpaceRoot types.NodeRoot
State *alloc.State
DataNodeAssistant *DataNodeAssistant[K, V]
MassEntry *mass.Mass[Entry[K, V]]
NoSnapshots bool
}

Expand Down Expand Up @@ -81,43 +86,44 @@ func (s *Space[K, V]) NewHashMatches() []uint64 {

// Find locates key in the space.
func (s *Space[K, V]) Find(
v *Entry[K, V],
snapshotID types.SnapshotID,
tx *pipeline.TransactionRequest,
walRecorder *wal.Recorder,
allocator *alloc.Allocator,
key K,
stage uint8,
hashBuff []byte,
hashMatches []uint64,
) (*Entry[K, V], error) {
v := s.config.MassEntry.New()
initBytes := unsafe.Slice((*byte)(unsafe.Pointer(v)), s.initSize)
copy(initBytes, s.defaultInit)
v.keyHash = hashKey(&key, nil, 0)
v.item.Key = key
v.dataItemIndex = dataItemIndex(v.keyHash, s.numOfDataItems)

if v.storeRequest.Store[0].Pointer.State != types.StateFree &&
v.storeRequest.Store[0].Pointer.SnapshotID != snapshotID {
persistentAddress, err := allocator.Allocate()
if err != nil {
return nil, err
}

if err := wal.Deallocate(walRecorder, tx, v.storeRequest.Store[0].Pointer, persistentAddress,
s.config.NoSnapshots); err != nil {
return nil, err
}
) error {
//nolint:nestif
if v.space == nil {
initBytes := unsafe.Slice((*byte)(unsafe.Pointer(v)), s.initSize)
copy(initBytes, s.defaultInit)
v.keyHash = hashKey(&key, nil, 0)
v.item.Key = key
v.dataItemIndex = dataItemIndex(v.keyHash, s.numOfDataItems)
v.stage = stage

if s.config.SpaceRoot.Pointer.State != types.StateFree &&
s.config.SpaceRoot.Pointer.SnapshotID != snapshotID {
persistentAddress, err := allocator.Allocate()
if err != nil {
return err
}

// This is not stored in WAL because space roots are stored separately on commit.
v.storeRequest.Store[0].Pointer.SnapshotID = snapshotID
v.storeRequest.Store[0].Pointer.PersistentAddress = persistentAddress
}
if err := wal.Deallocate(walRecorder, tx, s.config.SpaceRoot.Pointer, persistentAddress,
s.config.NoSnapshots); err != nil {
return err
}

if err := s.find(snapshotID, tx, walRecorder, allocator, v, hashBuff, hashMatches); err != nil {
return nil, err
// This is not stored in WAL because space roots are stored separately on commit.
s.config.SpaceRoot.Pointer.SnapshotID = snapshotID
s.config.SpaceRoot.Pointer.PersistentAddress = persistentAddress
}
}

return v, nil
return s.find(snapshotID, tx, walRecorder, allocator, v, hashBuff, hashMatches)
}

// Query queries the key.
Expand Down Expand Up @@ -405,6 +411,16 @@ func (s *Space[K, V]) find(
return err
}

switch {
case v.stage == StageData:
case v.stage == StagePointer0:
v.stage = StagePointer1
return nil
case v.stage == StagePointer1:
v.stage = StageData
return nil
}

if v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.State != types.StateData {
v.keyHashP = nil
v.itemP = nil
Expand Down Expand Up @@ -873,6 +889,10 @@ func (s *Space[K, V]) walkPointers(
} else {
v.nextDataNodeState = &pointerNode.Pointers[nextIndex].State
}

if v.stage == StagePointer0 && v.level == 3 {
return nil
}
}

return nil
Expand Down Expand Up @@ -927,6 +947,7 @@ type Entry[K, V comparable] struct {
parentIndex uint64
dataItemIndex uint64
exists bool
stage uint8
level uint8
}

Expand Down
7 changes: 4 additions & 3 deletions tx/genesis/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type Tx struct {

// Execute executes transaction.
func (t *Tx) Execute(
space *space.Space[txtypes.Account, txtypes.Amount],
s *space.Space[txtypes.Account, txtypes.Amount],
snapshotID types.SnapshotID,
tx *pipeline.TransactionRequest,
walRecorder *wal.Recorder,
Expand All @@ -31,8 +31,9 @@ func (t *Tx) Execute(
hashMatches []uint64,
) error {
for _, a := range t.Accounts {
v, err := space.Find(snapshotID, tx, walRecorder, allocator, a.Account, hashBuff, hashMatches)
if err != nil {
var v space.Entry[txtypes.Account, txtypes.Amount]
if err := s.Find(&v, snapshotID, tx, walRecorder, allocator, a.Account, space.StageData, hashBuff,
hashMatches); err != nil {
return err
}

Expand Down
Loading

0 comments on commit 0f87c27

Please sign in to comment.