Skip to content

Commit

Permalink
Merge persistent writer and state
Browse files Browse the repository at this point in the history
  • Loading branch information
outofforest committed Dec 19, 2024
1 parent 3897b9a commit 37659a8
Show file tree
Hide file tree
Showing 24 changed files with 187 additions and 235 deletions.
16 changes: 0 additions & 16 deletions alloc/test.go

This file was deleted.

27 changes: 4 additions & 23 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,11 @@ import (

"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"golang.org/x/sys/unix"

"github.com/outofforest/logger"
"github.com/outofforest/parallel"
"github.com/outofforest/quantum"
"github.com/outofforest/quantum/alloc"
"github.com/outofforest/quantum/persistent"
"github.com/outofforest/quantum/state"
"github.com/outofforest/quantum/tx/genesis"
"github.com/outofforest/quantum/tx/transfer"
txtypes "github.com/outofforest/quantum/tx/types"
Expand Down Expand Up @@ -61,27 +59,19 @@ func BenchmarkBalanceTransfer(b *testing.B) {
func() {
_, _ = rand.Read(accountBytes)

store, err := fileStore(
var size uint64 = 10 * 1024 * 1024 * 1024
state, stateDeallocFunc, err := state.New(
size, true,
// "./db.quantum",
"./disk",
)
if err != nil {
panic(err)
}

var size uint64 = 10 * 1024 * 1024 * 1024
state, stateDeallocFunc, err := alloc.NewState(
size, store.Size(),
true,
)
if err != nil {
panic(err)
}
defer stateDeallocFunc()

db := quantum.New(quantum.Config{
State: state,
Store: store,
})

ctx, cancel := context.WithCancel(logger.WithLogger(context.Background(), logger.New(logger.DefaultConfig)))
Expand Down Expand Up @@ -173,12 +163,3 @@ func BenchmarkBalanceTransfer(b *testing.B) {
}()
}
}

func fileStore(path string) (*persistent.Store, error) {
file, err := os.OpenFile(path, os.O_RDWR|unix.O_DIRECT, 0o600)
if err != nil {
return nil, err
}

return persistent.NewStore(file)
}
43 changes: 19 additions & 24 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@ import (

"github.com/outofforest/parallel"
"github.com/outofforest/photon"
"github.com/outofforest/quantum/alloc"
"github.com/outofforest/quantum/hash"
"github.com/outofforest/quantum/list"
"github.com/outofforest/quantum/persistent"
"github.com/outofforest/quantum/pipeline"
"github.com/outofforest/quantum/space"
"github.com/outofforest/quantum/state"
"github.com/outofforest/quantum/tx/genesis"
"github.com/outofforest/quantum/tx/transfer"
txtypes "github.com/outofforest/quantum/tx/types"
Expand All @@ -26,8 +25,7 @@ import (

// Config stores snapshot configuration.
type Config struct {
State *alloc.State
Store *persistent.Store
State *state.State
}

// SpaceToCommit represents requested space which might require to be committed.
Expand Down Expand Up @@ -117,8 +115,6 @@ func (db *DB) Close() {

// Run runs db goroutines.
func (db *DB) Run(ctx context.Context) error {
defer db.config.Store.Close()

return parallel.Run(ctx, func(ctx context.Context, spawn parallel.SpawnFn) error {
supervisorReader := db.queueReader
prepareTxReader := pipeline.CloneReader(db.queueReader)
Expand Down Expand Up @@ -223,8 +219,7 @@ func (db *DB) Run(ctx context.Context) error {
return pointerHashReader.Run(ctx, pipe04UpdatePointerHashes(db.config.State))
})
spawn("pipe05StoreNodes", parallel.Fail, func(ctx context.Context) error {
storeWriter, err := db.config.Store.NewWriter(db.config.State.Origin(),
db.config.State.VolatileSize())
storeWriter, err := db.config.State.NewPersistentWriter()
if err != nil {
return err
}
Expand Down Expand Up @@ -259,7 +254,7 @@ func pipe01PrepareTransaction(balanceSpace *space.Space[txtypes.Account, txtypes
}

func pipe02ExecuteTransaction(
state *alloc.State,
state *state.State,
snapshotID *types.SnapshotID,
snapshotInfo *types.SnapshotInfo,
snapshotSpace *space.Space[types.SnapshotID, types.SnapshotInfo],
Expand Down Expand Up @@ -358,7 +353,7 @@ func pipe02ExecuteTransaction(
}
}

func pipe03UpdateDataHashes(state *alloc.State, modMask, mod uint64) pipeline.TxFunc {
func pipe03UpdateDataHashes(state *state.State, modMask, mod uint64) pipeline.TxFunc {
var matrix [16]*byte
matrixP := &matrix[0]

Expand Down Expand Up @@ -473,7 +468,7 @@ func (s *slot) Read() error {
return nil
}

func pipe04UpdatePointerHashes(state *alloc.State) pipeline.TxFunc {
func pipe04UpdatePointerHashes(state *state.State) pipeline.TxFunc {
r := &reader{}

var slots [16]*slot
Expand Down Expand Up @@ -580,7 +575,7 @@ func pipe04UpdatePointerHashes(state *alloc.State) pipeline.TxFunc {
}
}

func pipe05StoreNodes(storeWriter *persistent.Writer) pipeline.TxFunc {
func pipe05StoreNodes(storeWriter *state.Writer) pipeline.TxFunc {
return func(tx *pipeline.TransactionRequest, readCount uint64) (uint64, error) {
for lr := tx.ListRequest; lr != nil; lr = lr.Next {
for i := range lr.ListsToStore {
Expand Down Expand Up @@ -613,11 +608,11 @@ func pipe05StoreNodes(storeWriter *persistent.Writer) pipeline.TxFunc {
func deleteSnapshot(
snapshotID types.SnapshotID,
deleteSnapshotID types.SnapshotID,
state *alloc.State,
state *state.State,
tx *pipeline.TransactionRequest,
volatileAllocator *alloc.Allocator[types.VolatileAddress],
volatileDeallocator *alloc.Deallocator[types.VolatileAddress],
persistentDeallocator *alloc.Deallocator[types.PersistentAddress],
volatileAllocator *state.Allocator[types.VolatileAddress],
volatileDeallocator *state.Deallocator[types.VolatileAddress],
persistentDeallocator *state.Deallocator[types.PersistentAddress],
snapshotSpace *space.Space[types.SnapshotID, types.SnapshotInfo],
deallocationNodeAssistant *space.DataNodeAssistant[deallocationKey, types.ListRoot],
) error {
Expand Down Expand Up @@ -711,11 +706,11 @@ func commit(
snapshotID types.SnapshotID,
snapshotInfo types.SnapshotInfo,
tx *pipeline.TransactionRequest,
state *alloc.State,
state *state.State,
deallocationListsToCommit map[types.SnapshotID]*types.ListRoot,
volatileAllocator *alloc.Allocator[types.VolatileAddress],
persistentAllocator *alloc.Allocator[types.PersistentAddress],
persistentDeallocator *alloc.Deallocator[types.PersistentAddress],
volatileAllocator *state.Allocator[types.VolatileAddress],
persistentAllocator *state.Allocator[types.PersistentAddress],
persistentDeallocator *state.Deallocator[types.PersistentAddress],
snapshotSpace *space.Space[types.SnapshotID, types.SnapshotInfo],
deallocationListSpace *space.Space[deallocationKey, types.ListRoot],
) error {
Expand Down Expand Up @@ -815,14 +810,14 @@ func commit(
}

func deallocateNode(
state *alloc.State,
state *state.State,
snapshotID types.SnapshotID,
nodeSnapshotID types.SnapshotID,
nodeAddress types.PersistentAddress,
deallocationListsToCommit map[types.SnapshotID]*types.ListRoot,
volatileAllocator *alloc.Allocator[types.VolatileAddress],
persistentAllocator *alloc.Allocator[types.PersistentAddress],
persistentDeallocator *alloc.Deallocator[types.PersistentAddress],
volatileAllocator *state.Allocator[types.VolatileAddress],
persistentAllocator *state.Allocator[types.PersistentAddress],
persistentDeallocator *state.Deallocator[types.PersistentAddress],
immediateDeallocation bool,
) (types.ListRoot, error) {
// Latest persistent snapshot cannot be deleted, so there is no gap between that snapshot and the pending one.
Expand Down
4 changes: 2 additions & 2 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"github.com/samber/lo"
"github.com/stretchr/testify/require"

"github.com/outofforest/quantum/alloc"
"github.com/outofforest/quantum/pipeline"
"github.com/outofforest/quantum/space"
"github.com/outofforest/quantum/state"
txtypes "github.com/outofforest/quantum/tx/types"
"github.com/outofforest/quantum/types"
)
Expand All @@ -35,7 +35,7 @@ func TestPipe01PrepareTransactionsDoesNothingIfTransactionIsNil(t *testing.T) {
}

func newSpace(t *testing.T) *space.Space[txtypes.Account, txtypes.Amount] {
state := alloc.NewForTest(t, stateSize)
state := state.NewForTest(t, stateSize)

dataNodeAssistant, err := space.NewDataNodeAssistant[txtypes.Account, txtypes.Amount]()
require.NoError(t, err)
Expand Down
12 changes: 6 additions & 6 deletions hash/asm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,18 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/outofforest/quantum/alloc"
"github.com/outofforest/quantum/state"
"github.com/outofforest/quantum/types"
)

var (
zeroNode = func() []byte {
b, _, _ := alloc.Allocate(types.NodeLength, 64, false)
b, _, _ := state.Allocate(types.NodeLength, 64, false)
return unsafe.Slice((*byte)(b), types.NodeLength)
}()
zn = &zeroNode[0]
oneNode = func() []byte {
b, _, _ := alloc.Allocate(types.NodeLength, 64, false)
b, _, _ := state.Allocate(types.NodeLength, 64, false)
bSlice := unsafe.Slice((*byte)(b), types.NodeLength)
for i := range bSlice {
bSlice[i] = 0xff
Expand All @@ -48,7 +48,7 @@ func TestBlake3OneMessage(t *testing.T) {
matrix := zeroMatrix
matrix[i] = on

hashesP, hashesDealloc, err := alloc.Allocate(16*types.HashLength, 32, false)
hashesP, hashesDealloc, err := state.Allocate(16*types.HashLength, 32, false)
require.NoError(t, err)
t.Cleanup(hashesDealloc)

Expand All @@ -74,7 +74,7 @@ func TestBlake3OneMessage(t *testing.T) {
func TestBlake3Zeros(t *testing.T) {
matrix := zeroMatrix

hashesP, hashesDealloc, err := alloc.Allocate(16*types.HashLength, 32, false)
hashesP, hashesDealloc, err := state.Allocate(16*types.HashLength, 32, false)
require.NoError(t, err)
t.Cleanup(hashesDealloc)

Expand All @@ -97,7 +97,7 @@ func TestLastHashIsStored(t *testing.T) {
matrix := zeroMatrix
matrix[i] = on

hashesP, hashesDealloc, err := alloc.Allocate(2*types.HashLength, 32, false)
hashesP, hashesDealloc, err := state.Allocate(2*types.HashLength, 32, false)
require.NoError(t, err)
t.Cleanup(hashesDealloc)

Expand Down
6 changes: 3 additions & 3 deletions hash/blake3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ import (
blake3zeebo "github.com/zeebo/blake3"
blake3luke "lukechampine.com/blake3"

"github.com/outofforest/quantum/alloc"
"github.com/outofforest/quantum/state"
"github.com/outofforest/quantum/types"
)

//nolint:unparam
func randData(size uint64) []byte {
dataP, _, _ := alloc.Allocate(size, 64, false)
dataP, _, _ := state.Allocate(size, 64, false)
data := unsafe.Slice((*byte)(dataP), size)
if _, err := rand.Read(data); err != nil {
panic(err)
Expand Down Expand Up @@ -88,7 +88,7 @@ func BenchmarkChecksum4KAVX(b *testing.B) {

var z [16]*byte
for i := range z {
zP, dealloc, err := alloc.Allocate(types.HashLength, 32, false)
zP, dealloc, err := state.Allocate(types.HashLength, 32, false)
require.NoError(b, err)
b.Cleanup(dealloc)
z[i] = (*byte)(zP)
Expand Down
14 changes: 7 additions & 7 deletions list/list.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package list

import (
"github.com/outofforest/quantum/alloc"
"github.com/outofforest/quantum/state"
"github.com/outofforest/quantum/types"
)

// Add adds address to the list.
func Add(
listRoot *types.ListRoot,
nodeAddress types.PersistentAddress,
state *alloc.State,
volatileAllocator *alloc.Allocator[types.VolatileAddress],
persistentAllocator *alloc.Allocator[types.PersistentAddress],
state *state.State,
volatileAllocator *state.Allocator[types.VolatileAddress],
persistentAllocator *state.Allocator[types.PersistentAddress],
) (types.ListRoot, error) {
if listRoot.VolatileAddress == types.FreeAddress {
var err error
Expand Down Expand Up @@ -65,9 +65,9 @@ func Add(
// Deallocate deallocates nodes referenced by the list.
func Deallocate(
listRoot types.ListRoot,
state *alloc.State,
volatileDeallocator *alloc.Deallocator[types.VolatileAddress],
persistentDeallocator *alloc.Deallocator[types.PersistentAddress],
state *state.State,
volatileDeallocator *state.Deallocator[types.VolatileAddress],
persistentDeallocator *state.Deallocator[types.PersistentAddress],
) error {
for {
// It is safe to do deallocations here because deallocated nodes are not reallocated until commit is finalized.
Expand Down
4 changes: 2 additions & 2 deletions list/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

"github.com/stretchr/testify/require"

"github.com/outofforest/quantum/alloc"
"github.com/outofforest/quantum/state"
"github.com/outofforest/quantum/types"
)

Expand All @@ -15,7 +15,7 @@ const stateSize = (numOfAddresses + 6) * types.NodeLength
func TestList(t *testing.T) {
requireT := require.New(t)

state := alloc.NewForTest(t, stateSize)
state := state.NewForTest(t, stateSize)
volatileAllocator := state.NewVolatileAllocator()
persistentAllocator := state.NewPersistentAllocator()

Expand Down
4 changes: 2 additions & 2 deletions space/compare/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ import (
"testing"
"unsafe"

"github.com/outofforest/quantum/alloc"
"github.com/outofforest/quantum/state"
)

var (
resDef = [32]uint64{}
values = func() []uint64 {
const numOfItems = 32

p, _, _ := alloc.Allocate(numOfItems*8, 64, false)
p, _, _ := state.Allocate(numOfItems*8, 64, false)
result := unsafe.Slice((*uint64)(p), numOfItems)

copy(result, []uint64{
Expand Down
Loading

0 comments on commit 37659a8

Please sign in to comment.