Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't store address of the space, it might change over time #298

Merged
merged 1 commit into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion alloc/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,10 @@ type Deallocator[A Address] struct {

// Deallocate deallocates single node.
func (d *Deallocator[A]) Deallocate(nodeAddress A) {
d.release = append(d.release, nodeAddress)
// ANDing with FlagNaked is done to erase flags from volatile addresses.
// Persistent addresses don't have flags, but it is impossible to have so big values there anyway.
d.release = append(d.release, nodeAddress&types.FlagNaked)

if len(d.release) == cap(d.release) {
d.sinkCh <- d.release

Expand Down
2 changes: 2 additions & 0 deletions alloc/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ func Allocate(size, alignment uint64, useHugePages bool) (unsafe.Pointer, func()

// 1GB hugepages.
_ = unmap(dataPOrig, allocatedSize, 1024*1024*1024)

return
}

// Standard pages.
Expand Down
2 changes: 1 addition & 1 deletion alloc/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (s *State) VolatileSize() uint64 {

// Node returns node bytes.
func (s *State) Node(nodeAddress types.VolatileAddress) unsafe.Pointer {
return unsafe.Add(s.dataP, nodeAddress.Naked()*types.NodeLength)
return unsafe.Add(s.dataP, nodeAddress*types.NodeLength)
}

// Bytes returns byte slice of a node.
Expand Down
13 changes: 7 additions & 6 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/outofforest/quantum/tx/genesis"
"github.com/outofforest/quantum/tx/transfer"
txtypes "github.com/outofforest/quantum/tx/types"
"github.com/outofforest/quantum/tx/types/spaces"
"github.com/outofforest/quantum/types"
)

Expand All @@ -28,9 +29,11 @@ import (
// go tool pprof -http="localhost:8000" pprofbin ./profile.out
// go test -c -o bench ./benchmark_test.go

// * soft memlock unlimited
// * hard memlock unlimited

func BenchmarkBalanceTransfer(b *testing.B) {
const (
spaceID = 0x00
numOfAddresses = 5_000_000
txsPerCommit = 20_000
balance = 100_000
Expand Down Expand Up @@ -66,7 +69,7 @@ func BenchmarkBalanceTransfer(b *testing.B) {
panic(err)
}

var size uint64 = 5 * 1024 * 1024 * 1024
var size uint64 = 1 * 1024 * 1024 * 1024
state, stateDeallocFunc, err := alloc.NewState(
size, store.Size(),
100,
Expand Down Expand Up @@ -95,11 +98,9 @@ func BenchmarkBalanceTransfer(b *testing.B) {
}
}()

defer func() {
db.Close()
}()
defer db.Close()

s, err := quantum.GetSpace[txtypes.Account, txtypes.Amount](spaceID, db)
s, err := quantum.GetSpace[txtypes.Account, txtypes.Amount](spaces.Balances, db)
if err != nil {
panic(err)
}
Expand Down
62 changes: 31 additions & 31 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,36 +253,34 @@ func (db *DB) deleteSnapshot(
},
)

if nextSnapshotInfo.DeallocationRoot.VolatileAddress != types.FreeAddress {
for nextDeallocSnapshot := range space.IteratorAndDeallocator(
nextSnapshotInfo.DeallocationRoot,
db.config.State,
deallocationNodeAssistant,
volatileDeallocator,
persistentDeallocator,
) {
if nextDeallocSnapshot.Key.SnapshotID > snapshotInfo.PreviousSnapshotID &&
nextDeallocSnapshot.Key.SnapshotID <= snapshotID {
if err := list.Deallocate(nextDeallocSnapshot.Value, db.config.State, volatileDeallocator,
persistentDeallocator); err != nil {
return err
}

continue
}

var deallocationListValue space.Entry[deallocationKey, list.Pointer]
deallocationLists.Find(&deallocationListValue, nextDeallocSnapshot.Key, space.StageData, deallocationHashBuff,
deallocationHashMatches)
if err := deallocationListValue.Set(
tx,
volatileAllocator,
nextDeallocSnapshot.Value,
deallocationHashBuff,
deallocationHashMatches,
); err != nil {
for nextDeallocSnapshot := range space.IteratorAndDeallocator(
nextSnapshotInfo.DeallocationRoot,
db.config.State,
deallocationNodeAssistant,
volatileDeallocator,
persistentDeallocator,
) {
if nextDeallocSnapshot.Key.SnapshotID > snapshotInfo.PreviousSnapshotID &&
nextDeallocSnapshot.Key.SnapshotID <= snapshotID {
if err := list.Deallocate(nextDeallocSnapshot.Value, db.config.State, volatileDeallocator,
persistentDeallocator); err != nil {
return err
}

continue
}

var deallocationListValue space.Entry[deallocationKey, list.Pointer]
deallocationLists.Find(&deallocationListValue, nextDeallocSnapshot.Key, space.StageData, deallocationHashBuff,
deallocationHashMatches)
if err := deallocationListValue.Set(
tx,
volatileAllocator,
nextDeallocSnapshot.Value,
deallocationHashBuff,
deallocationHashMatches,
); err != nil {
return err
}
}

Expand Down Expand Up @@ -571,7 +569,8 @@ func (db *DB) executeTransactions(ctx context.Context, pipeReader *pipeline.Read
if root.Pointer.SnapshotID != db.singularityNode.LastSnapshotID {
if root.Pointer.PersistentAddress != 0 {
listNodePointer, err := db.deallocateNode(root.Pointer.SnapshotID, root.Pointer.PersistentAddress,
deallocationListsToCommit, volatileAllocator, persistentAllocator, persistentDeallocator)
deallocationListsToCommit, volatileAllocator, persistentAllocator, persistentDeallocator,
sr.NoSnapshots)
if err != nil {
return err
}
Expand Down Expand Up @@ -859,7 +858,7 @@ func (db *DB) storeNodes(ctx context.Context, pipeReader *pipeline.Reader) error
}

for sr := req.StoreRequest; sr != nil; sr = sr.Next {
for i := sr.PointersToStore - 1; i >= sr.PointersLast; i-- {
for i := sr.PointersToStore - 1; i >= 0; i-- {
pointer := sr.Store[i].Pointer

if pointer.Revision == uintptr(unsafe.Pointer(sr)) {
Expand All @@ -885,10 +884,11 @@ func (db *DB) deallocateNode(
volatileAllocator *alloc.Allocator[types.VolatileAddress],
persistentAllocator *alloc.Allocator[types.PersistentAddress],
persistentDeallocator *alloc.Deallocator[types.PersistentAddress],
immediateDeallocation bool,
) (list.Pointer, error) {
// Latest persistent snapshot cannot be deleted, so there is no gap between that snapshot and the pending one.
// It means the condition here don't need to include snapshot IDs greater than the previous snapshot ID.
if nodeSnapshotID == db.singularityNode.LastSnapshotID {
if nodeSnapshotID == db.singularityNode.LastSnapshotID || immediateDeallocation {
persistentDeallocator.Deallocate(nodeAddress)

return list.Pointer{}, nil
Expand Down
1 change: 0 additions & 1 deletion pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ type StoreRequest struct {
NoSnapshots bool
PointersToStore int8
Store [StoreCapacity]types.NodeRoot
PointersLast int8
Next *StoreRequest
}

Expand Down
84 changes: 42 additions & 42 deletions space/space.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ func New[K, V comparable](config Config[K, V]) *Space[K, V] {
}

defaultInit := Entry[K, V]{
space: s,
storeRequest: pipeline.StoreRequest{
NoSnapshots: s.config.NoSnapshots,
PointersToStore: 1,
Expand Down Expand Up @@ -98,48 +97,48 @@ func (s *Space[K, V]) Query(key K, hashBuff []byte, hashMatches []uint64) (V, bo
}

// Stats returns space-related statistics.
func (s *Space[K, V]) Stats() (uint64, uint64, uint64, float64) {
switch {
case isFree(s.config.SpaceRoot.Pointer.VolatileAddress):
return 0, 0, 0, 0
case !isPointer(s.config.SpaceRoot.Pointer.VolatileAddress):
return 1, 0, 1, 0
func (s *Space[K, V]) Stats() (uint64, uint64, uint64, uint64, float64) {
if isFree(s.config.SpaceRoot.Pointer.VolatileAddress) {
return 0, 0, 0, 0, 0
}

stack := []types.VolatileAddress{s.config.SpaceRoot.Pointer.VolatileAddress}

levels := map[types.VolatileAddress]uint64{
s.config.SpaceRoot.Pointer.VolatileAddress: 1,
s.config.SpaceRoot.Pointer.VolatileAddress: 0,
}
var maxLevel, pointerNodes, dataNodes, dataItems uint64

for {
if len(stack) == 0 {
return maxLevel, pointerNodes, dataNodes, float64(dataItems) / float64(dataNodes*s.numOfDataItems)
return maxLevel, pointerNodes, dataNodes, dataItems, float64(dataItems) / float64(dataNodes*s.numOfDataItems)
}

n := stack[len(stack)-1]
level := levels[n] + 1
pointerNodes++
stack = stack[:len(stack)-1]

pointerNode := ProjectPointerNode(s.config.State.Node(n))
for pi := range pointerNode.Pointers {
volatileAddress := types.Load(&pointerNode.Pointers[pi].VolatileAddress)
switch {
case isFree(volatileAddress):
case isPointer(volatileAddress):
switch {
case isPointer(n):
pointerNodes++
pointerNode := ProjectPointerNode(s.config.State.Node(n))
for pi := range pointerNode.Pointers {
volatileAddress := types.Load(&pointerNode.Pointers[pi].VolatileAddress)
if isFree(volatileAddress) {
continue
}

stack = append(stack, volatileAddress)
levels[volatileAddress] = level
default:
dataNodes++
if level > maxLevel {
maxLevel = level
}
for _, kh := range s.config.DataNodeAssistant.KeyHashes(s.config.State.Node(volatileAddress)) {
if kh != 0 {
dataItems++
}
}
case !isFree(n):
dataNodes++
if level > maxLevel {
maxLevel = level
}
for _, kh := range s.config.DataNodeAssistant.KeyHashes(s.config.State.Node(n)) {
if kh != 0 {
dataItems++
}
}
}
Expand Down Expand Up @@ -196,6 +195,7 @@ func (s *Space[K, V]) initEntry(
) {
initBytes := unsafe.Slice((*byte)(unsafe.Pointer(v)), s.initSize)
copy(initBytes, s.defaultInit)
v.space = s
v.keyHash = keyHash
v.item.Key = key
v.stage = stage
Expand Down Expand Up @@ -302,7 +302,7 @@ func (s *Space[K, V]) find(
func (s *Space[K, V]) set(
v *Entry[K, V],
tx *pipeline.TransactionRequest,
allocator *alloc.Allocator[types.VolatileAddress],
volatileAllocator *alloc.Allocator[types.VolatileAddress],
hashBuff []byte,
hashMatches []uint64,
hashKeyFunc func(key *K, buff []byte, level uint8) types.KeyHash,
Expand All @@ -312,7 +312,7 @@ func (s *Space[K, V]) set(
volatileAddress := types.Load(&v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress)

if isFree(volatileAddress) {
dataNodeVolatileAddress, err := allocator.Allocate()
dataNodeVolatileAddress, err := volatileAllocator.Allocate()
if err != nil {
return err
}
Expand Down Expand Up @@ -342,7 +342,7 @@ func (s *Space[K, V]) set(

// Try to split data node.
if v.storeRequest.PointersToStore > 1 {
splitDone, err := s.splitDataNodeWithoutConflict(tx, allocator, v.parentIndex,
splitDone, err := s.splitDataNodeWithoutConflict(tx, volatileAllocator, v.parentIndex,
v.storeRequest.Store[v.storeRequest.PointersToStore-2].Pointer, v.level)
if err != nil {
return err
Expand All @@ -353,16 +353,16 @@ func (s *Space[K, V]) set(
v.level--
v.nextDataNode = nil

return s.set(v, tx, allocator, hashBuff, hashMatches, hashKeyFunc)
return s.set(v, tx, volatileAllocator, hashBuff, hashMatches, hashKeyFunc)
}
}

// Add pointer node.
if err := s.addPointerNode(v, tx, allocator, conflict, hashBuff, hashKeyFunc); err != nil {
if err := s.addPointerNode(v, tx, volatileAllocator, conflict, hashBuff, hashKeyFunc); err != nil {
return err
}

return s.set(v, tx, allocator, hashBuff, hashMatches, hashKeyFunc)
return s.set(v, tx, volatileAllocator, hashBuff, hashMatches, hashKeyFunc)
}

func (s *Space[K, V]) splitToIndex(parentNodeAddress types.VolatileAddress, index uint64) (uint64, uint64) {
Expand All @@ -387,7 +387,7 @@ func (s *Space[K, V]) splitToIndex(parentNodeAddress types.VolatileAddress, inde

func (s *Space[K, V]) splitDataNodeWithoutConflict(
tx *pipeline.TransactionRequest,
allocator *alloc.Allocator[types.VolatileAddress],
volatileAllocator *alloc.Allocator[types.VolatileAddress],
index uint64,
parentNodePointer *types.Pointer,
level uint8,
Expand All @@ -397,7 +397,7 @@ func (s *Space[K, V]) splitDataNodeWithoutConflict(
return false, nil
}

newNodeVolatileAddress, err := allocator.Allocate()
newNodeVolatileAddress, err := volatileAllocator.Allocate()
if err != nil {
return false, err
}
Expand Down Expand Up @@ -454,7 +454,7 @@ func (s *Space[K, V]) splitDataNodeWithoutConflict(

func (s *Space[K, V]) splitDataNodeWithConflict(
tx *pipeline.TransactionRequest,
allocator *alloc.Allocator[types.VolatileAddress],
volatileAllocator *alloc.Allocator[types.VolatileAddress],
index uint64,
parentNodePointer *types.Pointer,
level uint8,
Expand All @@ -466,7 +466,7 @@ func (s *Space[K, V]) splitDataNodeWithConflict(
return false, nil
}

newNodeVolatileAddress, err := allocator.Allocate()
newNodeVolatileAddress, err := volatileAllocator.Allocate()
if err != nil {
return false, err
}
Expand Down Expand Up @@ -528,12 +528,12 @@ func (s *Space[K, V]) splitDataNodeWithConflict(
func (s *Space[K, V]) addPointerNode(
v *Entry[K, V],
tx *pipeline.TransactionRequest,
allocator *alloc.Allocator[types.VolatileAddress],
volatileAllocator *alloc.Allocator[types.VolatileAddress],
conflict bool,
hashBuff []byte,
hashKeyFunc func(key *K, buff []byte, level uint8) types.KeyHash,
) error {
pointerNodeVolatileAddress, err := allocator.Allocate()
pointerNodeVolatileAddress, err := volatileAllocator.Allocate()
if err != nil {
return err
}
Expand All @@ -554,10 +554,10 @@ func (s *Space[K, V]) addPointerNode(
types.Store(&pointerNodeRoot.Pointer.VolatileAddress, pointerNodeVolatileAddress)

if conflict {
_, err = s.splitDataNodeWithConflict(tx, allocator, 0, pointerNodeRoot.Pointer,
_, err = s.splitDataNodeWithConflict(tx, volatileAllocator, 0, pointerNodeRoot.Pointer,
v.level+1, hashBuff, hashKeyFunc)
} else {
_, err = s.splitDataNodeWithoutConflict(tx, allocator, 0, pointerNodeRoot.Pointer, v.level+1)
_, err = s.splitDataNodeWithoutConflict(tx, volatileAllocator, 0, pointerNodeRoot.Pointer, v.level+1)
}
return err
}
Expand Down Expand Up @@ -679,9 +679,9 @@ func (s *Space[K, V]) detectUpdate(v *Entry[K, V]) {

// Entry represents entry in the space.
type Entry[K, V comparable] struct {
space *Space[K, V]
storeRequest pipeline.StoreRequest

space *Space[K, V]
itemP *DataItem[K, V]
keyHashP *types.KeyHash
keyHash types.KeyHash
Expand Down Expand Up @@ -763,8 +763,8 @@ func IteratorAndDeallocator[K, V comparable](
stackCount--
pointer := stack[stackCount]

// It is safe to do deallocations here because nodes are not reallocated until commit is finalized.
volatileDeallocator.Deallocate(pointer.VolatileAddress.Naked())
// It is safe to deallocate here because nodes are not reallocated until commit is finalized.
volatileDeallocator.Deallocate(pointer.VolatileAddress)
persistentDeallocator.Deallocate(pointer.PersistentAddress)

switch {
Expand Down
Loading
Loading