Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store preallocated pointer nodes persistently #72

Merged
merged 1 commit into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions alloc/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/pkg/errors"

"github.com/outofforest/photon"
"github.com/outofforest/quantum/types"
)

Expand Down Expand Up @@ -77,6 +78,11 @@ func (s *State) Node(nodeAddress types.LogicalAddress) unsafe.Pointer {
return unsafe.Add(s.dataP, nodeAddress)
}

// Bytes returns byte slice of a node.
func (s *State) Bytes(nodeAddress types.LogicalAddress) []byte {
return photon.SliceFromPointer[byte](s.Node(nodeAddress), int(s.nodeSize))
}

// Run runs node eraser.
func (s *State) Run(ctx context.Context) error {
return RunEraser(ctx, s.deallocationCh, s.allocationCh, s.nodeSize, s, s.numOfEraseWorkers)
Expand Down
11 changes: 10 additions & 1 deletion benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/outofforest/logger"
"github.com/outofforest/parallel"
"github.com/outofforest/quantum/alloc"
"github.com/outofforest/quantum/persistent"
"github.com/outofforest/quantum/types"
)

Expand All @@ -37,8 +38,9 @@ func BenchmarkBalanceTransfer(b *testing.B) {

for bi := 0; bi < b.N; bi++ {
func() {
var size uint64 = 20 * 1024 * 1024 * 1024
state, stateDeallocFunc, err := alloc.NewState(
70*1024*1024*1024,
size,
4*1024,
1024,
true,
Expand All @@ -51,8 +53,15 @@ func BenchmarkBalanceTransfer(b *testing.B) {

pool := state.NewPool()

store, storeDeallocFunc, err := persistent.NewMemoryStore(size, true)
if err != nil {
panic(err)
}
defer storeDeallocFunc()

db, err := New(Config{
State: state,
Store: store,
})
if err != nil {
panic(err)
Expand Down
128 changes: 103 additions & 25 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package quantum
import (
"context"
"sort"
"sync/atomic"

"github.com/pkg/errors"
"github.com/samber/lo"
Expand All @@ -12,13 +13,15 @@ import (
"github.com/outofforest/photon"
"github.com/outofforest/quantum/alloc"
"github.com/outofforest/quantum/list"
"github.com/outofforest/quantum/persistent"
"github.com/outofforest/quantum/space"
"github.com/outofforest/quantum/types"
)

// Config stores snapshot configuration.
type Config struct {
State *alloc.State
Store persistent.Store
}

// SpaceToCommit represents requested space which might require to be committed.
Expand Down Expand Up @@ -94,7 +97,7 @@ func New(config Config) (*DB, error) {
spacesToCommit: map[types.SpaceID]SpaceToCommit{},
deallocationListsToCommit: map[types.SnapshotID]ListToCommit{},
availableSnapshots: map[types.SnapshotID]struct{}{},
storageEventCh: make(chan any, 100),
eventCh: make(chan any, 100),
doneCh: make(chan struct{}),
}

Expand All @@ -108,7 +111,7 @@ func New(config Config) (*DB, error) {
PointerNodeAllocator: pointerNodeAllocator,
DataNodeAllocator: snapshotInfoNodeAllocator,
MassEntry: mass.New[space.Entry[types.SnapshotID, types.SnapshotInfo]](1000),
StorageEventCh: db.storageEventCh,
StorageEventCh: db.eventCh,
})

db.spaces = space.New[types.SpaceID, types.SpaceInfo](space.Config[types.SpaceID, types.SpaceInfo]{
Expand All @@ -121,7 +124,7 @@ func New(config Config) (*DB, error) {
PointerNodeAllocator: pointerNodeAllocator,
DataNodeAllocator: spaceInfoNodeAllocator,
MassEntry: mass.New[space.Entry[types.SpaceID, types.SpaceInfo]](1000),
StorageEventCh: db.storageEventCh,
StorageEventCh: db.eventCh,
})

db.deallocationLists = space.New[types.SnapshotID, types.Pointer](
Expand All @@ -135,7 +138,7 @@ func New(config Config) (*DB, error) {
PointerNodeAllocator: pointerNodeAllocator,
DataNodeAllocator: snapshotToNodeNodeAllocator,
MassEntry: mass.New[space.Entry[types.SnapshotID, types.Pointer]](1000),
StorageEventCh: db.storageEventCh,
StorageEventCh: db.eventCh,
},
)

Expand Down Expand Up @@ -172,8 +175,8 @@ type DB struct {
deallocationListsToCommit map[types.SnapshotID]ListToCommit
availableSnapshots map[types.SnapshotID]struct{}

storageEventCh chan any
doneCh chan struct{}
eventCh chan any
doneCh chan struct{}
}

// DeleteSnapshot deletes snapshot.
Expand Down Expand Up @@ -211,7 +214,7 @@ func (db *DB) DeleteSnapshot(snapshotID types.SnapshotID, pool *alloc.Pool[types
PointerNodeAllocator: db.pointerNodeAllocator,
DataNodeAllocator: db.snapshotToNodeNodeAllocator,
MassEntry: db.massSnapshotToNodeEntry,
StorageEventCh: db.storageEventCh,
StorageEventCh: db.eventCh,
},
)
} else {
Expand All @@ -232,7 +235,7 @@ func (db *DB) DeleteSnapshot(snapshotID types.SnapshotID, pool *alloc.Pool[types
continue
}

db.storageEventCh <- types.ListDeallocationEvent{
db.eventCh <- types.ListDeallocationEvent{
ListRoot: snapshotItem.Value,
}

Expand Down Expand Up @@ -265,7 +268,7 @@ func (db *DB) DeleteSnapshot(snapshotID types.SnapshotID, pool *alloc.Pool[types
ListRoot: &newNextListNodeAddress,
State: db.config.State,
NodeAllocator: db.listNodeAllocator,
StorageEventCh: db.storageEventCh,
StorageEventCh: db.eventCh,
})
if err != nil {
return err
Expand All @@ -285,7 +288,7 @@ func (db *DB) DeleteSnapshot(snapshotID types.SnapshotID, pool *alloc.Pool[types
}
}

db.storageEventCh <- types.SpaceDeallocationEvent{
db.eventCh <- types.SpaceDeallocationEvent{
SpaceRoot: deallocationListsRoot,
}

Expand Down Expand Up @@ -340,7 +343,7 @@ func (db *DB) DeleteSnapshot(snapshotID types.SnapshotID, pool *alloc.Pool[types
// Commit commits current snapshot and returns next one.
func (db *DB) Commit(pool *alloc.Pool[types.LogicalAddress]) error {
doneCh := make(chan struct{})
db.storageEventCh <- types.DBCommitEvent{
db.eventCh <- types.DBCommitEvent{
DoneCh: doneCh,
}
<-doneCh
Expand Down Expand Up @@ -383,7 +386,7 @@ func (db *DB) Commit(pool *alloc.Pool[types.LogicalAddress]) error {
}

doneCh = make(chan struct{})
db.storageEventCh <- types.DBCommitEvent{
db.eventCh <- types.DBCommitEvent{
DoneCh: doneCh,
}
<-doneCh
Expand All @@ -397,7 +400,7 @@ func (db *DB) Commit(pool *alloc.Pool[types.LogicalAddress]) error {

// Close closed DB.
func (db *DB) Close() {
close(db.storageEventCh)
close(db.eventCh)
db.config.State.Close()

<-db.doneCh
Expand All @@ -408,33 +411,76 @@ func (db *DB) Run(ctx context.Context) error {
defer close(db.doneCh)

return parallel.Run(ctx, func(ctx context.Context, spawn parallel.SpawnFn) error {
spawn("state", parallel.Fail, db.config.State.Run)
spawn("storageEvents", parallel.Continue, func(ctx context.Context) error {
return db.processStorageEvents(ctx, db.storageEventCh)
storeRequestCh := make(chan types.StoreRequest, 500)

spawn("state", parallel.Continue, db.config.State.Run)
spawn("events", parallel.Continue, func(ctx context.Context) error {
defer close(storeRequestCh)
return db.processEvents(ctx, db.eventCh, storeRequestCh)
})
spawn("storeRequests", parallel.Continue, func(ctx context.Context) error {
return db.processStoreRequests(ctx, storeRequestCh)
})

return nil
})
}

func (db *DB) processStorageEvents(ctx context.Context, storageEventCh <-chan any) error {
func (db *DB) processEvents(
ctx context.Context,
eventCh <-chan any,
storeRequestCh chan<- types.StoreRequest,
) error {
volatilePool := db.config.State.NewPool()
persistentPool := alloc.NewPool[types.PhysicalAddress](db.persistentAllocationCh, db.persistentAllocationCh)

pointerNode := db.pointerNodeAllocator.NewNode()
parentPointerNode := db.pointerNodeAllocator.NewNode()
listNode := db.listNodeAllocator.NewNode()

for event := range storageEventCh {
for event := range eventCh {
switch e := event.(type) {
case types.SpacePointerNodeAllocatedEvent:
pointerNodeAddress := e.Pointer.LogicalAddress
for pointerNodeAddress != 0 {
header := photon.FromPointer[space.PointerNodeHeader](db.config.State.Node(pointerNodeAddress))
if header.RevisionHeader.SnapshotID == db.singularityNode.LastSnapshotID {
db.pointerNodeAllocator.Get(e.NodeAddress, pointerNode)

for {
var pointer *types.Pointer
if pointerNode.Header.ParentNodeAddress == 0 {
pointer = e.RootPointer
} else {
db.pointerNodeAllocator.Get(pointerNode.Header.ParentNodeAddress, parentPointerNode)
spacePointer, _ := parentPointerNode.Item(pointerNode.Header.ParentNodeIndex)
pointer = &spacePointer.Pointer
}

revision := atomic.AddUint64(&pointerNode.Header.RevisionHeader.Revision, 1)

if pointerNode.Header.RevisionHeader.SnapshotID != db.singularityNode.LastSnapshotID {
pointerNode.Header.RevisionHeader.SnapshotID = db.singularityNode.LastSnapshotID
pointer.PhysicalAddress = 0
}

terminate := true
if pointer.PhysicalAddress == 0 {
var err error
pointer.PhysicalAddress, err = persistentPool.Allocate()
if err != nil {
return err
}

terminate = pointer == e.RootPointer
}

storeRequestCh <- types.StoreRequest{
Revision: revision,
Pointer: pointer,
}

if terminate {
break
}
header.RevisionHeader.SnapshotID = db.singularityNode.LastSnapshotID
pointerNodeAddress = header.ParentNodeAddress

pointerNode = parentPointerNode
}
case types.SpaceDataNodeAllocatedEvent:
header := photon.FromPointer[space.DataNodeHeader](db.config.State.Node(e.Pointer.LogicalAddress))
Expand Down Expand Up @@ -492,12 +538,44 @@ func (db *DB) processStorageEvents(ctx context.Context, storageEventCh <-chan an
listNode,
)
case types.DBCommitEvent:
ch := make(chan struct{}, 1)
storeRequestCh <- types.StoreRequest{
DoneCh: ch,
}
<-ch
close(e.DoneCh)
}
}
return errors.WithStack(ctx.Err())
}

func (db *DB) processStoreRequests(ctx context.Context, storeRequestCh <-chan types.StoreRequest) error {
for req := range storeRequestCh {
if req.Pointer != nil {
header := photon.FromPointer[types.RevisionHeader](db.config.State.Node(req.Pointer.LogicalAddress))
revision := atomic.LoadUint64(&header.Revision)

if revision != req.Revision {
continue
}

if err := db.config.Store.Write(
req.Pointer.PhysicalAddress,
db.config.State.Bytes(req.Pointer.LogicalAddress),
); err != nil {
return err
}

// TODO: Store node

continue
}

req.DoneCh <- struct{}{}
}
return errors.WithStack(ctx.Err())
}

func (db *DB) prepareNextSnapshot() error {
var snapshotID types.SnapshotID
if db.singularityNode.SnapshotRoot.State != types.StateFree {
Expand Down Expand Up @@ -564,7 +642,7 @@ func GetSpace[K, V comparable](spaceID types.SpaceID, db *DB) (*space.Space[K, V
HashMod: s.HashMod,
SpaceRoot: s.PInfo,
State: db.config.State,
StorageEventCh: db.storageEventCh,
StorageEventCh: db.eventCh,
PointerNodeAllocator: db.pointerNodeAllocator,
DataNodeAllocator: dataNodeAllocator,
MassEntry: mass.New[space.Entry[K, V]](1000),
Expand Down
11 changes: 4 additions & 7 deletions persistent/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"syscall"

"github.com/pkg/errors"

"github.com/outofforest/quantum/types"
)

// NewMemoryStore creates new in-memory "persistent" store.
Expand All @@ -29,13 +31,8 @@ type MemoryStore struct {
data []byte
}

// Size returns size of the store.
func (s *MemoryStore) Size() uint64 {
return uint64(len(s.data))
}

// Write writes data to the store.
func (s *MemoryStore) Write(offset uint64, data []byte) error {
copy(s.data[offset:], data)
func (s *MemoryStore) Write(address types.PhysicalAddress, data []byte) error {
copy(s.data[address:], data)
return nil
}
5 changes: 3 additions & 2 deletions persistent/types.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package persistent

import "github.com/outofforest/quantum/types"

// Store defines the interface of the store.
type Store interface {
Size() uint64
Write(offset uint64, data []byte) error
Write(address types.PhysicalAddress, data []byte) error
}
11 changes: 8 additions & 3 deletions space/alloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ func (na *NodeAllocator[H, T]) Allocate(
return nodeAddress, nil
}

// Index returns index from hash.
func (na *NodeAllocator[H, T]) Index(hash types.Hash) uintptr {
return uintptr(hash) % na.numOfItems
}

// Shift shifts bits in hash.
func (na *NodeAllocator[H, T]) Shift(hash types.Hash) types.Hash {
return hash / types.Hash(na.numOfItems)
Expand All @@ -100,6 +105,7 @@ func (na *NodeAllocator[H, T]) project(nodeP unsafe.Pointer, node *Node[H, T]) {
type PointerNodeHeader struct {
RevisionHeader types.RevisionHeader
ParentNodeAddress types.LogicalAddress
ParentNodeIndex uintptr
HashMod uint64
}

Expand All @@ -118,9 +124,8 @@ type Node[H, T comparable] struct {
itemsP unsafe.Pointer
}

// ItemByHash returns pointers to the item and its state by hash.
func (sn *Node[H, T]) ItemByHash(hash types.Hash) (*T, *types.State) {
index := uintptr(hash) % sn.numOfItems
// Item returns pointers to the item and its state by index.
func (sn *Node[H, T]) Item(index uintptr) (*T, *types.State) {
return (*T)(unsafe.Add(sn.itemsP, sn.itemSize*index)), (*types.State)(unsafe.Add(sn.statesP, index))
}

Expand Down
Loading