Skip to content

Commit

Permalink
Don't store address of the space, it might change over time (#298)
Browse files Browse the repository at this point in the history
  • Loading branch information
outofforest authored Dec 13, 2024
1 parent ed97cd7 commit ee6e663
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 88 deletions.
5 changes: 4 additions & 1 deletion alloc/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,10 @@ type Deallocator[A Address] struct {

// Deallocate deallocates single node.
func (d *Deallocator[A]) Deallocate(nodeAddress A) {
d.release = append(d.release, nodeAddress)
// ANDing with FlagNaked is done to erase flags from volatile addresses.
// Persistent addresses don't have flags, but it is impossible to have so big values there anyway.
d.release = append(d.release, nodeAddress&types.FlagNaked)

if len(d.release) == cap(d.release) {
d.sinkCh <- d.release

Expand Down
2 changes: 2 additions & 0 deletions alloc/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ func Allocate(size, alignment uint64, useHugePages bool) (unsafe.Pointer, func()

// 1GB hugepages.
_ = unmap(dataPOrig, allocatedSize, 1024*1024*1024)

return
}

// Standard pages.
Expand Down
2 changes: 1 addition & 1 deletion alloc/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (s *State) VolatileSize() uint64 {

// Node returns node bytes.
func (s *State) Node(nodeAddress types.VolatileAddress) unsafe.Pointer {
return unsafe.Add(s.dataP, nodeAddress.Naked()*types.NodeLength)
return unsafe.Add(s.dataP, nodeAddress*types.NodeLength)
}

// Bytes returns byte slice of a node.
Expand Down
13 changes: 7 additions & 6 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/outofforest/quantum/tx/genesis"
"github.com/outofforest/quantum/tx/transfer"
txtypes "github.com/outofforest/quantum/tx/types"
"github.com/outofforest/quantum/tx/types/spaces"
"github.com/outofforest/quantum/types"
)

Expand All @@ -28,9 +29,11 @@ import (
// go tool pprof -http="localhost:8000" pprofbin ./profile.out
// go test -c -o bench ./benchmark_test.go

// * soft memlock unlimited
// * hard memlock unlimited

func BenchmarkBalanceTransfer(b *testing.B) {
const (
spaceID = 0x00
numOfAddresses = 5_000_000
txsPerCommit = 20_000
balance = 100_000
Expand Down Expand Up @@ -66,7 +69,7 @@ func BenchmarkBalanceTransfer(b *testing.B) {
panic(err)
}

var size uint64 = 5 * 1024 * 1024 * 1024
var size uint64 = 1 * 1024 * 1024 * 1024
state, stateDeallocFunc, err := alloc.NewState(
size, store.Size(),
100,
Expand Down Expand Up @@ -95,11 +98,9 @@ func BenchmarkBalanceTransfer(b *testing.B) {
}
}()

defer func() {
db.Close()
}()
defer db.Close()

s, err := quantum.GetSpace[txtypes.Account, txtypes.Amount](spaceID, db)
s, err := quantum.GetSpace[txtypes.Account, txtypes.Amount](spaces.Balances, db)
if err != nil {
panic(err)
}
Expand Down
62 changes: 31 additions & 31 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,36 +253,34 @@ func (db *DB) deleteSnapshot(
},
)

if nextSnapshotInfo.DeallocationRoot.VolatileAddress != types.FreeAddress {
for nextDeallocSnapshot := range space.IteratorAndDeallocator(
nextSnapshotInfo.DeallocationRoot,
db.config.State,
deallocationNodeAssistant,
volatileDeallocator,
persistentDeallocator,
) {
if nextDeallocSnapshot.Key.SnapshotID > snapshotInfo.PreviousSnapshotID &&
nextDeallocSnapshot.Key.SnapshotID <= snapshotID {
if err := list.Deallocate(nextDeallocSnapshot.Value, db.config.State, volatileDeallocator,
persistentDeallocator); err != nil {
return err
}

continue
}

var deallocationListValue space.Entry[deallocationKey, list.Pointer]
deallocationLists.Find(&deallocationListValue, nextDeallocSnapshot.Key, space.StageData, deallocationHashBuff,
deallocationHashMatches)
if err := deallocationListValue.Set(
tx,
volatileAllocator,
nextDeallocSnapshot.Value,
deallocationHashBuff,
deallocationHashMatches,
); err != nil {
for nextDeallocSnapshot := range space.IteratorAndDeallocator(
nextSnapshotInfo.DeallocationRoot,
db.config.State,
deallocationNodeAssistant,
volatileDeallocator,
persistentDeallocator,
) {
if nextDeallocSnapshot.Key.SnapshotID > snapshotInfo.PreviousSnapshotID &&
nextDeallocSnapshot.Key.SnapshotID <= snapshotID {
if err := list.Deallocate(nextDeallocSnapshot.Value, db.config.State, volatileDeallocator,
persistentDeallocator); err != nil {
return err
}

continue
}

var deallocationListValue space.Entry[deallocationKey, list.Pointer]
deallocationLists.Find(&deallocationListValue, nextDeallocSnapshot.Key, space.StageData, deallocationHashBuff,
deallocationHashMatches)
if err := deallocationListValue.Set(
tx,
volatileAllocator,
nextDeallocSnapshot.Value,
deallocationHashBuff,
deallocationHashMatches,
); err != nil {
return err
}
}

Expand Down Expand Up @@ -571,7 +569,8 @@ func (db *DB) executeTransactions(ctx context.Context, pipeReader *pipeline.Read
if root.Pointer.SnapshotID != db.singularityNode.LastSnapshotID {
if root.Pointer.PersistentAddress != 0 {
listNodePointer, err := db.deallocateNode(root.Pointer.SnapshotID, root.Pointer.PersistentAddress,
deallocationListsToCommit, volatileAllocator, persistentAllocator, persistentDeallocator)
deallocationListsToCommit, volatileAllocator, persistentAllocator, persistentDeallocator,
sr.NoSnapshots)
if err != nil {
return err
}
Expand Down Expand Up @@ -859,7 +858,7 @@ func (db *DB) storeNodes(ctx context.Context, pipeReader *pipeline.Reader) error
}

for sr := req.StoreRequest; sr != nil; sr = sr.Next {
for i := sr.PointersToStore - 1; i >= sr.PointersLast; i-- {
for i := sr.PointersToStore - 1; i >= 0; i-- {
pointer := sr.Store[i].Pointer

if pointer.Revision == uintptr(unsafe.Pointer(sr)) {
Expand All @@ -885,10 +884,11 @@ func (db *DB) deallocateNode(
volatileAllocator *alloc.Allocator[types.VolatileAddress],
persistentAllocator *alloc.Allocator[types.PersistentAddress],
persistentDeallocator *alloc.Deallocator[types.PersistentAddress],
immediateDeallocation bool,
) (list.Pointer, error) {
// Latest persistent snapshot cannot be deleted, so there is no gap between that snapshot and the pending one.
// It means the condition here don't need to include snapshot IDs greater than the previous snapshot ID.
if nodeSnapshotID == db.singularityNode.LastSnapshotID {
if nodeSnapshotID == db.singularityNode.LastSnapshotID || immediateDeallocation {
persistentDeallocator.Deallocate(nodeAddress)

return list.Pointer{}, nil
Expand Down
1 change: 0 additions & 1 deletion pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ type StoreRequest struct {
NoSnapshots bool
PointersToStore int8
Store [StoreCapacity]types.NodeRoot
PointersLast int8
Next *StoreRequest
}

Expand Down
84 changes: 42 additions & 42 deletions space/space.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ func New[K, V comparable](config Config[K, V]) *Space[K, V] {
}

defaultInit := Entry[K, V]{
space: s,
storeRequest: pipeline.StoreRequest{
NoSnapshots: s.config.NoSnapshots,
PointersToStore: 1,
Expand Down Expand Up @@ -98,48 +97,48 @@ func (s *Space[K, V]) Query(key K, hashBuff []byte, hashMatches []uint64) (V, bo
}

// Stats returns space-related statistics.
func (s *Space[K, V]) Stats() (uint64, uint64, uint64, float64) {
switch {
case isFree(s.config.SpaceRoot.Pointer.VolatileAddress):
return 0, 0, 0, 0
case !isPointer(s.config.SpaceRoot.Pointer.VolatileAddress):
return 1, 0, 1, 0
func (s *Space[K, V]) Stats() (uint64, uint64, uint64, uint64, float64) {
if isFree(s.config.SpaceRoot.Pointer.VolatileAddress) {
return 0, 0, 0, 0, 0
}

stack := []types.VolatileAddress{s.config.SpaceRoot.Pointer.VolatileAddress}

levels := map[types.VolatileAddress]uint64{
s.config.SpaceRoot.Pointer.VolatileAddress: 1,
s.config.SpaceRoot.Pointer.VolatileAddress: 0,
}
var maxLevel, pointerNodes, dataNodes, dataItems uint64

for {
if len(stack) == 0 {
return maxLevel, pointerNodes, dataNodes, float64(dataItems) / float64(dataNodes*s.numOfDataItems)
return maxLevel, pointerNodes, dataNodes, dataItems, float64(dataItems) / float64(dataNodes*s.numOfDataItems)
}

n := stack[len(stack)-1]
level := levels[n] + 1
pointerNodes++
stack = stack[:len(stack)-1]

pointerNode := ProjectPointerNode(s.config.State.Node(n))
for pi := range pointerNode.Pointers {
volatileAddress := types.Load(&pointerNode.Pointers[pi].VolatileAddress)
switch {
case isFree(volatileAddress):
case isPointer(volatileAddress):
switch {
case isPointer(n):
pointerNodes++
pointerNode := ProjectPointerNode(s.config.State.Node(n))
for pi := range pointerNode.Pointers {
volatileAddress := types.Load(&pointerNode.Pointers[pi].VolatileAddress)
if isFree(volatileAddress) {
continue
}

stack = append(stack, volatileAddress)
levels[volatileAddress] = level
default:
dataNodes++
if level > maxLevel {
maxLevel = level
}
for _, kh := range s.config.DataNodeAssistant.KeyHashes(s.config.State.Node(volatileAddress)) {
if kh != 0 {
dataItems++
}
}
case !isFree(n):
dataNodes++
if level > maxLevel {
maxLevel = level
}
for _, kh := range s.config.DataNodeAssistant.KeyHashes(s.config.State.Node(n)) {
if kh != 0 {
dataItems++
}
}
}
Expand Down Expand Up @@ -196,6 +195,7 @@ func (s *Space[K, V]) initEntry(
) {
initBytes := unsafe.Slice((*byte)(unsafe.Pointer(v)), s.initSize)
copy(initBytes, s.defaultInit)
v.space = s
v.keyHash = keyHash
v.item.Key = key
v.stage = stage
Expand Down Expand Up @@ -302,7 +302,7 @@ func (s *Space[K, V]) find(
func (s *Space[K, V]) set(
v *Entry[K, V],
tx *pipeline.TransactionRequest,
allocator *alloc.Allocator[types.VolatileAddress],
volatileAllocator *alloc.Allocator[types.VolatileAddress],
hashBuff []byte,
hashMatches []uint64,
hashKeyFunc func(key *K, buff []byte, level uint8) types.KeyHash,
Expand All @@ -312,7 +312,7 @@ func (s *Space[K, V]) set(
volatileAddress := types.Load(&v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress)

if isFree(volatileAddress) {
dataNodeVolatileAddress, err := allocator.Allocate()
dataNodeVolatileAddress, err := volatileAllocator.Allocate()
if err != nil {
return err
}
Expand Down Expand Up @@ -342,7 +342,7 @@ func (s *Space[K, V]) set(

// Try to split data node.
if v.storeRequest.PointersToStore > 1 {
splitDone, err := s.splitDataNodeWithoutConflict(tx, allocator, v.parentIndex,
splitDone, err := s.splitDataNodeWithoutConflict(tx, volatileAllocator, v.parentIndex,
v.storeRequest.Store[v.storeRequest.PointersToStore-2].Pointer, v.level)
if err != nil {
return err
Expand All @@ -353,16 +353,16 @@ func (s *Space[K, V]) set(
v.level--
v.nextDataNode = nil

return s.set(v, tx, allocator, hashBuff, hashMatches, hashKeyFunc)
return s.set(v, tx, volatileAllocator, hashBuff, hashMatches, hashKeyFunc)
}
}

// Add pointer node.
if err := s.addPointerNode(v, tx, allocator, conflict, hashBuff, hashKeyFunc); err != nil {
if err := s.addPointerNode(v, tx, volatileAllocator, conflict, hashBuff, hashKeyFunc); err != nil {
return err
}

return s.set(v, tx, allocator, hashBuff, hashMatches, hashKeyFunc)
return s.set(v, tx, volatileAllocator, hashBuff, hashMatches, hashKeyFunc)
}

func (s *Space[K, V]) splitToIndex(parentNodeAddress types.VolatileAddress, index uint64) (uint64, uint64) {
Expand All @@ -387,7 +387,7 @@ func (s *Space[K, V]) splitToIndex(parentNodeAddress types.VolatileAddress, inde

func (s *Space[K, V]) splitDataNodeWithoutConflict(
tx *pipeline.TransactionRequest,
allocator *alloc.Allocator[types.VolatileAddress],
volatileAllocator *alloc.Allocator[types.VolatileAddress],
index uint64,
parentNodePointer *types.Pointer,
level uint8,
Expand All @@ -397,7 +397,7 @@ func (s *Space[K, V]) splitDataNodeWithoutConflict(
return false, nil
}

newNodeVolatileAddress, err := allocator.Allocate()
newNodeVolatileAddress, err := volatileAllocator.Allocate()
if err != nil {
return false, err
}
Expand Down Expand Up @@ -454,7 +454,7 @@ func (s *Space[K, V]) splitDataNodeWithoutConflict(

func (s *Space[K, V]) splitDataNodeWithConflict(
tx *pipeline.TransactionRequest,
allocator *alloc.Allocator[types.VolatileAddress],
volatileAllocator *alloc.Allocator[types.VolatileAddress],
index uint64,
parentNodePointer *types.Pointer,
level uint8,
Expand All @@ -466,7 +466,7 @@ func (s *Space[K, V]) splitDataNodeWithConflict(
return false, nil
}

newNodeVolatileAddress, err := allocator.Allocate()
newNodeVolatileAddress, err := volatileAllocator.Allocate()
if err != nil {
return false, err
}
Expand Down Expand Up @@ -528,12 +528,12 @@ func (s *Space[K, V]) splitDataNodeWithConflict(
func (s *Space[K, V]) addPointerNode(
v *Entry[K, V],
tx *pipeline.TransactionRequest,
allocator *alloc.Allocator[types.VolatileAddress],
volatileAllocator *alloc.Allocator[types.VolatileAddress],
conflict bool,
hashBuff []byte,
hashKeyFunc func(key *K, buff []byte, level uint8) types.KeyHash,
) error {
pointerNodeVolatileAddress, err := allocator.Allocate()
pointerNodeVolatileAddress, err := volatileAllocator.Allocate()
if err != nil {
return err
}
Expand All @@ -554,10 +554,10 @@ func (s *Space[K, V]) addPointerNode(
types.Store(&pointerNodeRoot.Pointer.VolatileAddress, pointerNodeVolatileAddress)

if conflict {
_, err = s.splitDataNodeWithConflict(tx, allocator, 0, pointerNodeRoot.Pointer,
_, err = s.splitDataNodeWithConflict(tx, volatileAllocator, 0, pointerNodeRoot.Pointer,
v.level+1, hashBuff, hashKeyFunc)
} else {
_, err = s.splitDataNodeWithoutConflict(tx, allocator, 0, pointerNodeRoot.Pointer, v.level+1)
_, err = s.splitDataNodeWithoutConflict(tx, volatileAllocator, 0, pointerNodeRoot.Pointer, v.level+1)
}
return err
}
Expand Down Expand Up @@ -679,9 +679,9 @@ func (s *Space[K, V]) detectUpdate(v *Entry[K, V]) {

// Entry represents entry in the space.
type Entry[K, V comparable] struct {
space *Space[K, V]
storeRequest pipeline.StoreRequest

space *Space[K, V]
itemP *DataItem[K, V]
keyHashP *types.KeyHash
keyHash types.KeyHash
Expand Down Expand Up @@ -763,8 +763,8 @@ func IteratorAndDeallocator[K, V comparable](
stackCount--
pointer := stack[stackCount]

// It is safe to do deallocations here because nodes are not reallocated until commit is finalized.
volatileDeallocator.Deallocate(pointer.VolatileAddress.Naked())
// It is safe to deallocate here because nodes are not reallocated until commit is finalized.
volatileDeallocator.Deallocate(pointer.VolatileAddress)
persistentDeallocator.Deallocate(pointer.PersistentAddress)

switch {
Expand Down
Loading

0 comments on commit ee6e663

Please sign in to comment.