Skip to content

Commit

Permalink
Store WAL
Browse files Browse the repository at this point in the history
  • Loading branch information
outofforest committed Dec 2, 2024
1 parent ffc4d4d commit 279e0c5
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 78 deletions.
79 changes: 18 additions & 61 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"crypto/rand"
"fmt"
"io"
"math"
"os"
"testing"
"unsafe"
Expand Down Expand Up @@ -71,24 +70,20 @@ func BenchmarkBalanceTransfer(b *testing.B) {
defer stateDeallocFunc()

//nolint:ineffassign,wastedassign,staticcheck
stores, storesCloseFunc, err := fileStores([]string{
"db0.quantum",
// "db1.quantum",
store, storeCloseFunc, err := fileStore(
// "/tmp/d0/wojciech/db.quantum",
// "/tmp/d1/wojciech/db.quantum",
}, size)
"db0.quantum",
size)
if err != nil {
panic(err)
}
defer storesCloseFunc()
defer storeCloseFunc()

stores = []persistent.Store{
persistent.NewDummyStore(),
}
store = persistent.NewDummyStore()

db, err := quantum.New(quantum.Config{
State: state,
Stores: stores,
State: state,
Store: store,
})
if err != nil {
panic(err)
Expand Down Expand Up @@ -189,64 +184,26 @@ func BenchmarkBalanceTransfer(b *testing.B) {
}
}

func fileStores(
paths []string,
size uint64,
) ([]persistent.Store, func(), error) {
numOfStores := uint64(len(paths))
stores := make([]persistent.Store, 0, numOfStores)
funcs := make([]func(), 0, numOfStores)

func fileStore(path string, size uint64) (persistent.Store, func(), error) {
expectedNumOfNodes := size / types.NodeLength
expectedCapacity := (expectedNumOfNodes + numOfStores - 1) / numOfStores * types.NodeLength
expectedCapacity := expectedNumOfNodes * types.NodeLength
seekTo := int64(expectedCapacity - types.NodeLength)
data := make([]byte, 2*types.NodeLength-1)
p := uint64(uintptr(unsafe.Pointer(&data[0])))
p = (p+types.NodeLength-1)/types.NodeLength*types.NodeLength - p
data = data[p : p+types.NodeLength]

var minNumOfNodes uint64 = math.MaxUint64
for _, path := range paths {
file, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0o600)
if err != nil {
return nil, nil, err
}

fileSize, err := file.Seek(0, io.SeekEnd)
if err != nil {
return nil, nil, errors.WithStack(err)
}
if fileSize == 0 {
if _, err := file.Seek(seekTo, io.SeekEnd); err != nil {
return nil, nil, errors.WithStack(err)
}
if _, err := file.Write(data); err != nil {
return nil, nil, errors.WithStack(err)
}
fileSize = int64(expectedCapacity)
}

numOfNodes := uint64(fileSize) / types.NodeLength
if numOfNodes < minNumOfNodes {
minNumOfNodes = numOfNodes
}

store, storeCloseFunc, err := persistent.NewFileStore(file)
if err != nil {
return nil, nil, err
}

stores = append(stores, store)
funcs = append(funcs, storeCloseFunc)
file, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0o600)
if err != nil {
return nil, nil, err
}

if minNumOfNodes*numOfStores < expectedNumOfNodes {
return nil, nil, errors.New("files don't provide enough capacity")
if _, err := file.Seek(seekTo, io.SeekEnd); err != nil {
return nil, nil, errors.WithStack(err)
}
if _, err := file.Write(data); err != nil {
return nil, nil, errors.WithStack(err)
}

return stores, func() {
for _, f := range funcs {
f()
}
}, nil
return persistent.NewFileStore(file)
}
67 changes: 50 additions & 17 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ import (
"github.com/outofforest/quantum/tx/types/spaces"
"github.com/outofforest/quantum/types"
"github.com/outofforest/quantum/wal"
wallist "github.com/outofforest/quantum/wal/list"
waltypes "github.com/outofforest/quantum/wal/types"
)

// Config stores snapshot configuration.
type Config struct {
State *alloc.State
Stores []persistent.Store
State *alloc.State
Store persistent.Store
}

// SpaceToCommit represents requested space which might require to be committed.
Expand Down Expand Up @@ -136,7 +137,7 @@ func (db *DB) Commit() error {
tx.SyncCh = syncCh
db.queue.Push(tx)

commitCh := make(chan error, len(db.config.Stores)+1) // 1 is for supervisor
commitCh := make(chan error, 2) // 1 for store and 1 for supervisor
tx = db.txRequestFactory.New()
tx.Type = pipeline.Commit
tx.CommitCh = commitCh
Expand All @@ -145,10 +146,8 @@ func (db *DB) Commit() error {
}
db.queue.Push(tx)

for range len(db.config.Stores) {
if err := <-commitCh; err != nil {
return err
}
if err := <-commitCh; err != nil {
return err
}

db.config.State.Commit()
Expand Down Expand Up @@ -197,8 +196,8 @@ func (db *DB) Run(ctx context.Context) error {
pointerHashReaders = append(pointerHashReaders, nextReader)
prevHashReader = nextReader
}

commitSyncReader := pipeline.NewReader(prevHashReader)
walListReader := pipeline.NewReader(prevHashReader)
storeWALReader := pipeline.NewReader(walListReader)

spawn("supervisor", parallel.Exit, func(ctx context.Context) error {
var lastSyncCh chan<- struct{}
Expand Down Expand Up @@ -265,8 +264,11 @@ func (db *DB) Run(ctx context.Context) error {
return db.updatePointerHashes(ctx, reader, uint64(i))
})
}
spawn("commitSync", parallel.Fail, func(ctx context.Context) error {
return db.syncOnCommit(ctx, commitSyncReader)
spawn("walList", parallel.Fail, func(ctx context.Context) error {
return db.buildWALList(ctx, db.config.Store, walListReader)
})
spawn("storeWAL", parallel.Fail, func(ctx context.Context) error {
return db.storeWAL(ctx, db.config.Store, storeWALReader)
})

return nil
Expand Down Expand Up @@ -1050,18 +1052,24 @@ func (db *DB) updatePointerHashes(
}
}

func (db *DB) syncOnCommit(ctx context.Context, pipeReader *pipeline.Reader) error {
func (db *DB) buildWALList(ctx context.Context, store persistent.Store, pipeReader *pipeline.Reader) error {
allocator := db.config.State.NewAllocator()

for processedCount := uint64(0); ; processedCount++ {
req, err := pipeReader.Read(ctx)
if err != nil {
return err
}

if req.Type == pipeline.Commit {
for _, store := range db.config.Stores {
err := store.Sync()
req.CommitCh <- err
if err != nil {
for wr := req.WALRequest; wr != nil; wr = wr.Next {
newTail, err := wallist.StoreAddress(&db.singularityNode.WALListTail, wr.NodeAddress, db.config.State,
allocator)
if err != nil {
return err
}
if newTail {
if err := store.Write(db.singularityNode.WALListTail,
db.config.State.Bytes(db.singularityNode.WALListTail)); err != nil {
return err
}
}
Expand All @@ -1071,6 +1079,31 @@ func (db *DB) syncOnCommit(ctx context.Context, pipeReader *pipeline.Reader) err
}
}

func (db *DB) storeWAL(ctx context.Context, store persistent.Store, pipeReader *pipeline.Reader) error {
for processedCount := uint64(0); ; processedCount++ {
req, err := pipeReader.Read(ctx)
if err != nil {
return err
}

for wr := req.WALRequest; wr != nil; wr = wr.Next {
if err := store.Write(wr.NodeAddress, db.config.State.Bytes(wr.NodeAddress)); err != nil {
return err
}
}

if req.Type == pipeline.Commit {
err := store.Sync()
req.CommitCh <- err
if err != nil {
return err
}
}

pipeReader.Acknowledge(processedCount+1, req)
}
}

func (db *DB) deallocateNode(
nodeSnapshotID types.SnapshotID,
nodeAddress types.NodeAddress,
Expand Down
1 change: 1 addition & 0 deletions types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,5 @@ type SingularityNode struct {
Hash Hash
LastSnapshotID SnapshotID
SnapshotRoot Pointer
WALListTail NodeAddress
}
92 changes: 92 additions & 0 deletions wal/list/list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package list

import (
"github.com/outofforest/quantum/alloc"
"github.com/outofforest/quantum/types"
)

// StoreAddress adds address to the list.
func StoreAddress(
listTail *types.NodeAddress,
nodeAddress types.NodeAddress,
state *alloc.State,
allocator *alloc.Allocator,
) (bool, error) {
if *listTail == 0 {
var err error
*listTail, err = allocator.Allocate()
if err != nil {
return false, err
}
node := ProjectNode(state.Node(*listTail))

node.Slots[0] = nodeAddress
node.NumOfAddresses = 1
// This is needed because list nodes are not zeroed.
node.Next = 0

return true, nil
}

node := ProjectNode(state.Node(*listTail))
if node.NumOfAddresses < NumOfAddresses {
node.Slots[node.NumOfAddresses] = nodeAddress
node.NumOfAddresses++

return false, nil
}

var err error
*listTail, err = allocator.Allocate()
if err != nil {
return false, err
}
node.Next = *listTail
node = ProjectNode(state.Node(*listTail))

node.Slots[0] = nodeAddress
node.NumOfAddresses = 1
// This is needed because list nodes are not zeroed.
node.Next = 0

return true, nil
}

// Merge merges two lists.
func Merge(
list1Front, list1Tail *types.NodeAddress,
list2Front, list2Tail types.NodeAddress,
state *alloc.State,
) {
if list2Front == 0 {
return
}
if *list1Front == 0 {
*list1Front = list2Front
} else {
node := ProjectNode(state.Node(*list1Tail))
node.Next = list2Front
}

*list1Tail = list2Tail
}

// Iterator iterates over addresses in the list.
func Iterator(listFront types.NodeAddress, state *alloc.State) func(func(types.NodeAddress) bool) {
return func(yield func(types.NodeAddress) bool) {
for {
if listFront == 0 {
return
}

node := ProjectNode(state.Node(listFront))
for i := range node.NumOfAddresses {
if !yield(node.Slots[i]) {
return
}
}

listFront = node.Next
}
}
}
23 changes: 23 additions & 0 deletions wal/list/node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package list

import (
"unsafe"

"github.com/outofforest/quantum/types"
)

// NumOfAddresses defines number of available slots in the list node.
const NumOfAddresses = 510

// Node represents list node.
type Node struct {
Slots [NumOfAddresses]types.NodeAddress

Next types.NodeAddress
NumOfAddresses uint16
}

// ProjectNode projects node to list node.
func ProjectNode(n unsafe.Pointer) *Node {
return (*Node)(n)
}
14 changes: 14 additions & 0 deletions wal/list/node_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package list

import (
"testing"
"unsafe"

"github.com/stretchr/testify/require"

"github.com/outofforest/quantum/types"
)

func TestPointerNode(t *testing.T) {
require.LessOrEqual(t, unsafe.Sizeof(Node{}), uintptr(types.NodeLength))
}

0 comments on commit 279e0c5

Please sign in to comment.