From c99298c92fc0d465a3fae1f8a04536bb874348da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Ma=C5=82ota-W=C3=B3jcik?= <59281144+outofforest@users.noreply.github.com> Date: Mon, 16 Dec 2024 12:27:12 +0100 Subject: [PATCH] Use round robin to store singularity nodes (#307) --- alloc/allocator.go | 34 ++++++++++++-------------------- alloc/state.go | 49 ++++++++++++++++++++++++++-------------------- db.go | 2 +- persistent/file.go | 3 ++- 4 files changed, 44 insertions(+), 44 deletions(-) diff --git a/alloc/allocator.go b/alloc/allocator.go index e82eceb..8548e34 100644 --- a/alloc/allocator.go +++ b/alloc/allocator.go @@ -1,8 +1,6 @@ package alloc import ( - "math/rand/v2" - "github.com/outofforest/quantum/types" ) @@ -13,29 +11,23 @@ type Address interface { func newAllocationRing[A Address]( size uint64, - randomize bool, -) (*ring[A], A) { - const singularityNodeCount = 1 - + singularityNodeCount uint64, +) (*ring[A], []A) { totalNumOfNodes := size / types.NodeLength - numOfNodes := totalNumOfNodes - singularityNodeCount - - r, addresses := newRing[A](numOfNodes) - for i := range A(numOfNodes) { - addresses[i] = i + singularityNodeCount - } + singularityNodeFrequency := A(totalNumOfNodes / singularityNodeCount) - // FIXME (wojciech): There is no evidence it helps. Consider removing this option. - if randomize { - // Randomly shuffle addresses so consecutive items possibly go to different regions of the device, improving - // performance eventually. - rand.Shuffle(len(addresses), func(i, j int) { - addresses[i], addresses[j] = addresses[j], addresses[i] - }) + r, addresses := newRing[A](totalNumOfNodes - singularityNodeCount) + singularityNodes := make([]A, 0, singularityNodeCount) + for i, j := A(0), 0; i < A(totalNumOfNodes); i++ { + if i%singularityNodeFrequency == 0 { + singularityNodes = append(singularityNodes, i) + } else { + addresses[j] = i + j++ + } } - var singularityNode A - return r, singularityNode + return r, singularityNodes } func newAllocator[A Address](r *ring[A]) *Allocator[A] { diff --git a/alloc/state.go b/alloc/state.go index 5e8b031..e92019e 100644 --- a/alloc/state.go +++ b/alloc/state.go @@ -9,44 +9,51 @@ import ( "github.com/outofforest/quantum/types" ) +const numOfPersistentSingularityNodes = 8 + // NewState creates new DB state. func NewState( volatileSize, persistentSize uint64, useHugePages bool, ) (*State, func(), error) { - // Align allocated memory address to the node volatileSize. It might be required if using O_DIRECT option to open + // Align allocated memory address to the node length. It might be required if using O_DIRECT option to open // files. As a side effect it is also 64-byte aligned which is required by the AVX512 instructions. dataP, deallocateFunc, err := Allocate(volatileSize, types.NodeLength, useHugePages) if err != nil { return nil, nil, errors.Wrapf(err, "memory allocation failed") } - volatileRing, singularityVolatileAddress := newAllocationRing[types.VolatileAddress](volatileSize, false) - persistentRing, singularityPersistentAddress := newAllocationRing[types.PersistentAddress](persistentSize, false) + volatileRing, singularityVolatileNodes := newAllocationRing[types.VolatileAddress](volatileSize, 1) + persistentRing, singularityPersistentNodes := newAllocationRing[types.PersistentAddress](persistentSize, + numOfPersistentSingularityNodes) - return &State{ - singularityNodeRoot: types.ToStore{ - VolatileAddress: singularityVolatileAddress, + singularityNodeRoots := make([]types.ToStore, 0, numOfPersistentSingularityNodes) + for i := range numOfPersistentSingularityNodes { + singularityNodeRoots = append(singularityNodeRoots, types.ToStore{ + VolatileAddress: singularityVolatileNodes[0], Pointer: &types.Pointer{ - Revision: 1, - VolatileAddress: singularityVolatileAddress, - PersistentAddress: singularityPersistentAddress, + VolatileAddress: singularityVolatileNodes[0], + PersistentAddress: singularityPersistentNodes[i], }, - }, - dataP: dataP, - volatileSize: volatileSize, - volatileRing: volatileRing, - persistentRing: persistentRing, + }) + } + + return &State{ + singularityNodeRoots: singularityNodeRoots, + dataP: dataP, + volatileSize: volatileSize, + volatileRing: volatileRing, + persistentRing: persistentRing, }, deallocateFunc, nil } // State stores the DB state. type State struct { - singularityNodeRoot types.ToStore - dataP unsafe.Pointer - volatileSize uint64 - volatileRing *ring[types.VolatileAddress] - persistentRing *ring[types.PersistentAddress] + singularityNodeRoots []types.ToStore + dataP unsafe.Pointer + volatileSize uint64 + volatileRing *ring[types.VolatileAddress] + persistentRing *ring[types.PersistentAddress] } // NewVolatileAllocator creates new volatile address allocator. @@ -70,8 +77,8 @@ func (s *State) NewPersistentDeallocator() *Deallocator[types.PersistentAddress] } // SingularityNodeRoot returns node root of singularity node. -func (s *State) SingularityNodeRoot() types.ToStore { - return s.singularityNodeRoot +func (s *State) SingularityNodeRoot(snapshotID types.SnapshotID) types.ToStore { + return s.singularityNodeRoots[snapshotID%numOfPersistentSingularityNodes] } // Origin returns the pointer to the allocated memory. diff --git a/db.go b/db.go index ff971bb..903aea5 100644 --- a/db.go +++ b/db.go @@ -391,7 +391,7 @@ func (db *DB) commit( PointersToStore: 1, NoSnapshots: true, } - sr.Store[0] = db.config.State.SingularityNodeRoot() + sr.Store[0] = db.config.State.SingularityNodeRoot(db.singularityNode.LastSnapshotID) sr.Store[0].Pointer.Revision = uintptr(unsafe.Pointer(sr)) tx.AddStoreRequest(sr) diff --git a/persistent/file.go b/persistent/file.go index 9a1831e..14c11b6 100644 --- a/persistent/file.go +++ b/persistent/file.go @@ -139,7 +139,8 @@ func (w *Writer) Write(dstAddress types.PersistentAddress, srcAddress types.Vola w.numOfEvents++ switch { - // FIXME (wojciech): There will be more addresses representing singularity node. + // Address 0 means we store singularity node. It is always the last write in the commit. + // That's why fsync is done after writing it. case srcAddress == 0: sqe, err := w.ring.NextSQE() if err != nil {