Skip to content

Commit

Permalink
Use round robin to store singularity nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
outofforest committed Dec 16, 2024
1 parent 232cfb8 commit d7b590e
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 44 deletions.
34 changes: 13 additions & 21 deletions alloc/allocator.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package alloc

import (
"math/rand/v2"

"github.com/outofforest/quantum/types"
)

Expand All @@ -13,29 +11,23 @@ type Address interface {

func newAllocationRing[A Address](
size uint64,
randomize bool,
) (*ring[A], A) {
const singularityNodeCount = 1

singularityNodeCount uint64,
) (*ring[A], []A) {
totalNumOfNodes := size / types.NodeLength
numOfNodes := totalNumOfNodes - singularityNodeCount

r, addresses := newRing[A](numOfNodes)
for i := range A(numOfNodes) {
addresses[i] = i + singularityNodeCount
}
singularityNodeFrequency := A(totalNumOfNodes / singularityNodeCount)

// FIXME (wojciech): There is no evidence it helps. Consider removing this option.
if randomize {
// Randomly shuffle addresses so consecutive items possibly go to different regions of the device, improving
// performance eventually.
rand.Shuffle(len(addresses), func(i, j int) {
addresses[i], addresses[j] = addresses[j], addresses[i]
})
r, addresses := newRing[A](totalNumOfNodes - singularityNodeCount)
singularityNodes := make([]A, 0, singularityNodeCount)
for i, j := A(0), 0; i < A(totalNumOfNodes); i++ {
if i%singularityNodeFrequency == 0 {
singularityNodes = append(singularityNodes, i)
} else {
addresses[j] = i
j++
}
}

var singularityNode A
return r, singularityNode
return r, singularityNodes
}

func newAllocator[A Address](r *ring[A]) *Allocator[A] {
Expand Down
49 changes: 28 additions & 21 deletions alloc/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,44 +9,51 @@ import (
"github.com/outofforest/quantum/types"
)

const numOfPersistentSingularityNodes = 8

// NewState creates new DB state.
func NewState(
volatileSize, persistentSize uint64,
useHugePages bool,
) (*State, func(), error) {
// Align allocated memory address to the node volatileSize. It might be required if using O_DIRECT option to open
// Align allocated memory address to the node length. It might be required if using O_DIRECT option to open
// files. As a side effect it is also 64-byte aligned which is required by the AVX512 instructions.
dataP, deallocateFunc, err := Allocate(volatileSize, types.NodeLength, useHugePages)
if err != nil {
return nil, nil, errors.Wrapf(err, "memory allocation failed")
}

volatileRing, singularityVolatileAddress := newAllocationRing[types.VolatileAddress](volatileSize, false)
persistentRing, singularityPersistentAddress := newAllocationRing[types.PersistentAddress](persistentSize, false)
volatileRing, singularityVolatileNodes := newAllocationRing[types.VolatileAddress](volatileSize, 1)
persistentRing, singularityPersistentNodes := newAllocationRing[types.PersistentAddress](persistentSize,
numOfPersistentSingularityNodes)

return &State{
singularityNodeRoot: types.ToStore{
VolatileAddress: singularityVolatileAddress,
singularityNodeRoots := make([]types.ToStore, 0, numOfPersistentSingularityNodes)
for i := range numOfPersistentSingularityNodes {
singularityNodeRoots = append(singularityNodeRoots, types.ToStore{
VolatileAddress: singularityVolatileNodes[0],
Pointer: &types.Pointer{
Revision: 1,
VolatileAddress: singularityVolatileAddress,
PersistentAddress: singularityPersistentAddress,
VolatileAddress: singularityVolatileNodes[0],
PersistentAddress: singularityPersistentNodes[i],
},
},
dataP: dataP,
volatileSize: volatileSize,
volatileRing: volatileRing,
persistentRing: persistentRing,
})
}

return &State{
singularityNodeRoots: singularityNodeRoots,
dataP: dataP,
volatileSize: volatileSize,
volatileRing: volatileRing,
persistentRing: persistentRing,
}, deallocateFunc, nil
}

// State stores the DB state.
type State struct {
singularityNodeRoot types.ToStore
dataP unsafe.Pointer
volatileSize uint64
volatileRing *ring[types.VolatileAddress]
persistentRing *ring[types.PersistentAddress]
singularityNodeRoots []types.ToStore
dataP unsafe.Pointer
volatileSize uint64
volatileRing *ring[types.VolatileAddress]
persistentRing *ring[types.PersistentAddress]
}

// NewVolatileAllocator creates new volatile address allocator.
Expand All @@ -70,8 +77,8 @@ func (s *State) NewPersistentDeallocator() *Deallocator[types.PersistentAddress]
}

// SingularityNodeRoot returns node root of singularity node.
func (s *State) SingularityNodeRoot() types.ToStore {
return s.singularityNodeRoot
func (s *State) SingularityNodeRoot(snapshotID types.SnapshotID) types.ToStore {
return s.singularityNodeRoots[snapshotID%numOfPersistentSingularityNodes]
}

// Origin returns the pointer to the allocated memory.
Expand Down
2 changes: 1 addition & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func (db *DB) commit(
PointersToStore: 1,
NoSnapshots: true,
}
sr.Store[0] = db.config.State.SingularityNodeRoot()
sr.Store[0] = db.config.State.SingularityNodeRoot(db.singularityNode.LastSnapshotID)
sr.Store[0].Pointer.Revision = uintptr(unsafe.Pointer(sr))
tx.AddStoreRequest(sr)

Expand Down
3 changes: 2 additions & 1 deletion persistent/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ func (w *Writer) Write(dstAddress types.PersistentAddress, srcAddress types.Vola
w.numOfEvents++

switch {
// FIXME (wojciech): There will be more addresses representing singularity node.
// Address 0 means we store singularity node. It is always the last write in the commit.
// That's why fsync is done after writing it.
case srcAddress == 0:
sqe, err := w.ring.NextSQE()
if err != nil {
Expand Down

0 comments on commit d7b590e

Please sign in to comment.