Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge persistent writer and state #326

Merged
merged 1 commit into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 0 additions & 16 deletions alloc/test.go

This file was deleted.

27 changes: 4 additions & 23 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,11 @@ import (

"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"golang.org/x/sys/unix"

"github.com/outofforest/logger"
"github.com/outofforest/parallel"
"github.com/outofforest/quantum"
"github.com/outofforest/quantum/alloc"
"github.com/outofforest/quantum/persistent"
"github.com/outofforest/quantum/state"
"github.com/outofforest/quantum/tx/genesis"
"github.com/outofforest/quantum/tx/transfer"
txtypes "github.com/outofforest/quantum/tx/types"
Expand Down Expand Up @@ -61,27 +59,19 @@ func BenchmarkBalanceTransfer(b *testing.B) {
func() {
_, _ = rand.Read(accountBytes)

store, err := fileStore(
var size uint64 = 10 * 1024 * 1024 * 1024
state, stateDeallocFunc, err := state.New(
size, true,
// "./db.quantum",
"./disk",
)
if err != nil {
panic(err)
}

var size uint64 = 10 * 1024 * 1024 * 1024
state, stateDeallocFunc, err := alloc.NewState(
size, store.Size(),
true,
)
if err != nil {
panic(err)
}
defer stateDeallocFunc()

db := quantum.New(quantum.Config{
State: state,
Store: store,
})

ctx, cancel := context.WithCancel(logger.WithLogger(context.Background(), logger.New(logger.DefaultConfig)))
Expand Down Expand Up @@ -173,12 +163,3 @@ func BenchmarkBalanceTransfer(b *testing.B) {
}()
}
}

func fileStore(path string) (*persistent.Store, error) {
file, err := os.OpenFile(path, os.O_RDWR|unix.O_DIRECT, 0o600)
if err != nil {
return nil, err
}

return persistent.NewStore(file)
}
43 changes: 19 additions & 24 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@ import (

"github.com/outofforest/parallel"
"github.com/outofforest/photon"
"github.com/outofforest/quantum/alloc"
"github.com/outofforest/quantum/hash"
"github.com/outofforest/quantum/list"
"github.com/outofforest/quantum/persistent"
"github.com/outofforest/quantum/pipeline"
"github.com/outofforest/quantum/space"
"github.com/outofforest/quantum/state"
"github.com/outofforest/quantum/tx/genesis"
"github.com/outofforest/quantum/tx/transfer"
txtypes "github.com/outofforest/quantum/tx/types"
Expand All @@ -26,8 +25,7 @@ import (

// Config stores snapshot configuration.
type Config struct {
State *alloc.State
Store *persistent.Store
State *state.State
}

// SpaceToCommit represents requested space which might require to be committed.
Expand Down Expand Up @@ -117,8 +115,6 @@ func (db *DB) Close() {

// Run runs db goroutines.
func (db *DB) Run(ctx context.Context) error {
defer db.config.Store.Close()

return parallel.Run(ctx, func(ctx context.Context, spawn parallel.SpawnFn) error {
supervisorReader := db.queueReader
prepareTxReader := pipeline.CloneReader(db.queueReader)
Expand Down Expand Up @@ -223,8 +219,7 @@ func (db *DB) Run(ctx context.Context) error {
return pointerHashReader.Run(ctx, pipe04UpdatePointerHashes(db.config.State))
})
spawn("pipe05StoreNodes", parallel.Fail, func(ctx context.Context) error {
storeWriter, err := db.config.Store.NewWriter(db.config.State.Origin(),
db.config.State.VolatileSize())
storeWriter, err := db.config.State.NewPersistentWriter()
if err != nil {
return err
}
Expand Down Expand Up @@ -259,7 +254,7 @@ func pipe01PrepareTransaction(balanceSpace *space.Space[txtypes.Account, txtypes
}

func pipe02ExecuteTransaction(
state *alloc.State,
state *state.State,
snapshotID *types.SnapshotID,
snapshotInfo *types.SnapshotInfo,
snapshotSpace *space.Space[types.SnapshotID, types.SnapshotInfo],
Expand Down Expand Up @@ -358,7 +353,7 @@ func pipe02ExecuteTransaction(
}
}

func pipe03UpdateDataHashes(state *alloc.State, modMask, mod uint64) pipeline.TxFunc {
func pipe03UpdateDataHashes(state *state.State, modMask, mod uint64) pipeline.TxFunc {
var matrix [16]*byte
matrixP := &matrix[0]

Expand Down Expand Up @@ -473,7 +468,7 @@ func (s *slot) Read() error {
return nil
}

func pipe04UpdatePointerHashes(state *alloc.State) pipeline.TxFunc {
func pipe04UpdatePointerHashes(state *state.State) pipeline.TxFunc {
r := &reader{}

var slots [16]*slot
Expand Down Expand Up @@ -580,7 +575,7 @@ func pipe04UpdatePointerHashes(state *alloc.State) pipeline.TxFunc {
}
}

func pipe05StoreNodes(storeWriter *persistent.Writer) pipeline.TxFunc {
func pipe05StoreNodes(storeWriter *state.Writer) pipeline.TxFunc {
return func(tx *pipeline.TransactionRequest, readCount uint64) (uint64, error) {
for lr := tx.ListRequest; lr != nil; lr = lr.Next {
for i := range lr.ListsToStore {
Expand Down Expand Up @@ -613,11 +608,11 @@ func pipe05StoreNodes(storeWriter *persistent.Writer) pipeline.TxFunc {
func deleteSnapshot(
snapshotID types.SnapshotID,
deleteSnapshotID types.SnapshotID,
state *alloc.State,
state *state.State,
tx *pipeline.TransactionRequest,
volatileAllocator *alloc.Allocator[types.VolatileAddress],
volatileDeallocator *alloc.Deallocator[types.VolatileAddress],
persistentDeallocator *alloc.Deallocator[types.PersistentAddress],
volatileAllocator *state.Allocator[types.VolatileAddress],
volatileDeallocator *state.Deallocator[types.VolatileAddress],
persistentDeallocator *state.Deallocator[types.PersistentAddress],
snapshotSpace *space.Space[types.SnapshotID, types.SnapshotInfo],
deallocationNodeAssistant *space.DataNodeAssistant[deallocationKey, types.ListRoot],
) error {
Expand Down Expand Up @@ -711,11 +706,11 @@ func commit(
snapshotID types.SnapshotID,
snapshotInfo types.SnapshotInfo,
tx *pipeline.TransactionRequest,
state *alloc.State,
state *state.State,
deallocationListsToCommit map[types.SnapshotID]*types.ListRoot,
volatileAllocator *alloc.Allocator[types.VolatileAddress],
persistentAllocator *alloc.Allocator[types.PersistentAddress],
persistentDeallocator *alloc.Deallocator[types.PersistentAddress],
volatileAllocator *state.Allocator[types.VolatileAddress],
persistentAllocator *state.Allocator[types.PersistentAddress],
persistentDeallocator *state.Deallocator[types.PersistentAddress],
snapshotSpace *space.Space[types.SnapshotID, types.SnapshotInfo],
deallocationListSpace *space.Space[deallocationKey, types.ListRoot],
) error {
Expand Down Expand Up @@ -815,14 +810,14 @@ func commit(
}

func deallocateNode(
state *alloc.State,
state *state.State,
snapshotID types.SnapshotID,
nodeSnapshotID types.SnapshotID,
nodeAddress types.PersistentAddress,
deallocationListsToCommit map[types.SnapshotID]*types.ListRoot,
volatileAllocator *alloc.Allocator[types.VolatileAddress],
persistentAllocator *alloc.Allocator[types.PersistentAddress],
persistentDeallocator *alloc.Deallocator[types.PersistentAddress],
volatileAllocator *state.Allocator[types.VolatileAddress],
persistentAllocator *state.Allocator[types.PersistentAddress],
persistentDeallocator *state.Deallocator[types.PersistentAddress],
immediateDeallocation bool,
) (types.ListRoot, error) {
// Latest persistent snapshot cannot be deleted, so there is no gap between that snapshot and the pending one.
Expand Down
4 changes: 2 additions & 2 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"github.com/samber/lo"
"github.com/stretchr/testify/require"

"github.com/outofforest/quantum/alloc"
"github.com/outofforest/quantum/pipeline"
"github.com/outofforest/quantum/space"
"github.com/outofforest/quantum/state"
txtypes "github.com/outofforest/quantum/tx/types"
"github.com/outofforest/quantum/types"
)
Expand All @@ -35,7 +35,7 @@ func TestPipe01PrepareTransactionsDoesNothingIfTransactionIsNil(t *testing.T) {
}

func newSpace(t *testing.T) *space.Space[txtypes.Account, txtypes.Amount] {
state := alloc.NewForTest(t, stateSize)
state := state.NewForTest(t, stateSize)

dataNodeAssistant, err := space.NewDataNodeAssistant[txtypes.Account, txtypes.Amount]()
require.NoError(t, err)
Expand Down
12 changes: 6 additions & 6 deletions hash/asm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,18 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/outofforest/quantum/alloc"
"github.com/outofforest/quantum/state"
"github.com/outofforest/quantum/types"
)

var (
zeroNode = func() []byte {
b, _, _ := alloc.Allocate(types.NodeLength, 64, false)
b, _, _ := state.Allocate(types.NodeLength, 64, false)
return unsafe.Slice((*byte)(b), types.NodeLength)
}()
zn = &zeroNode[0]
oneNode = func() []byte {
b, _, _ := alloc.Allocate(types.NodeLength, 64, false)
b, _, _ := state.Allocate(types.NodeLength, 64, false)
bSlice := unsafe.Slice((*byte)(b), types.NodeLength)
for i := range bSlice {
bSlice[i] = 0xff
Expand All @@ -48,7 +48,7 @@ func TestBlake3OneMessage(t *testing.T) {
matrix := zeroMatrix
matrix[i] = on

hashesP, hashesDealloc, err := alloc.Allocate(16*types.HashLength, 32, false)
hashesP, hashesDealloc, err := state.Allocate(16*types.HashLength, 32, false)
require.NoError(t, err)
t.Cleanup(hashesDealloc)

Expand All @@ -74,7 +74,7 @@ func TestBlake3OneMessage(t *testing.T) {
func TestBlake3Zeros(t *testing.T) {
matrix := zeroMatrix

hashesP, hashesDealloc, err := alloc.Allocate(16*types.HashLength, 32, false)
hashesP, hashesDealloc, err := state.Allocate(16*types.HashLength, 32, false)
require.NoError(t, err)
t.Cleanup(hashesDealloc)

Expand All @@ -97,7 +97,7 @@ func TestLastHashIsStored(t *testing.T) {
matrix := zeroMatrix
matrix[i] = on

hashesP, hashesDealloc, err := alloc.Allocate(2*types.HashLength, 32, false)
hashesP, hashesDealloc, err := state.Allocate(2*types.HashLength, 32, false)
require.NoError(t, err)
t.Cleanup(hashesDealloc)

Expand Down
6 changes: 3 additions & 3 deletions hash/blake3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ import (
blake3zeebo "github.com/zeebo/blake3"
blake3luke "lukechampine.com/blake3"

"github.com/outofforest/quantum/alloc"
"github.com/outofforest/quantum/state"
"github.com/outofforest/quantum/types"
)

//nolint:unparam
func randData(size uint64) []byte {
dataP, _, _ := alloc.Allocate(size, 64, false)
dataP, _, _ := state.Allocate(size, 64, false)
data := unsafe.Slice((*byte)(dataP), size)
if _, err := rand.Read(data); err != nil {
panic(err)
Expand Down Expand Up @@ -88,7 +88,7 @@ func BenchmarkChecksum4KAVX(b *testing.B) {

var z [16]*byte
for i := range z {
zP, dealloc, err := alloc.Allocate(types.HashLength, 32, false)
zP, dealloc, err := state.Allocate(types.HashLength, 32, false)
require.NoError(b, err)
b.Cleanup(dealloc)
z[i] = (*byte)(zP)
Expand Down
14 changes: 7 additions & 7 deletions list/list.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package list

import (
"github.com/outofforest/quantum/alloc"
"github.com/outofforest/quantum/state"
"github.com/outofforest/quantum/types"
)

// Add adds address to the list.
func Add(
listRoot *types.ListRoot,
nodeAddress types.PersistentAddress,
state *alloc.State,
volatileAllocator *alloc.Allocator[types.VolatileAddress],
persistentAllocator *alloc.Allocator[types.PersistentAddress],
state *state.State,
volatileAllocator *state.Allocator[types.VolatileAddress],
persistentAllocator *state.Allocator[types.PersistentAddress],
) (types.ListRoot, error) {
if listRoot.VolatileAddress == types.FreeAddress {
var err error
Expand Down Expand Up @@ -65,9 +65,9 @@ func Add(
// Deallocate deallocates nodes referenced by the list.
func Deallocate(
listRoot types.ListRoot,
state *alloc.State,
volatileDeallocator *alloc.Deallocator[types.VolatileAddress],
persistentDeallocator *alloc.Deallocator[types.PersistentAddress],
state *state.State,
volatileDeallocator *state.Deallocator[types.VolatileAddress],
persistentDeallocator *state.Deallocator[types.PersistentAddress],
) error {
for {
// It is safe to do deallocations here because deallocated nodes are not reallocated until commit is finalized.
Expand Down
4 changes: 2 additions & 2 deletions list/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

"github.com/stretchr/testify/require"

"github.com/outofforest/quantum/alloc"
"github.com/outofforest/quantum/state"
"github.com/outofforest/quantum/types"
)

Expand All @@ -15,7 +15,7 @@ const stateSize = (numOfAddresses + 6) * types.NodeLength
func TestList(t *testing.T) {
requireT := require.New(t)

state := alloc.NewForTest(t, stateSize)
state := state.NewForTest(t, stateSize)
volatileAllocator := state.NewVolatileAllocator()
persistentAllocator := state.NewPersistentAllocator()

Expand Down
4 changes: 2 additions & 2 deletions space/compare/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ import (
"testing"
"unsafe"

"github.com/outofforest/quantum/alloc"
"github.com/outofforest/quantum/state"
)

var (
resDef = [32]uint64{}
values = func() []uint64 {
const numOfItems = 32

p, _, _ := alloc.Allocate(numOfItems*8, 64, false)
p, _, _ := state.Allocate(numOfItems*8, 64, false)
result := unsafe.Slice((*uint64)(p), numOfItems)

copy(result, []uint64{
Expand Down
Loading
Loading