Skip to content

Commit

Permalink
Space wrapper for testing concurrency (#270)
Browse files Browse the repository at this point in the history
  • Loading branch information
outofforest authored Dec 3, 2024
1 parent 014da76 commit 929e05a
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 510 deletions.
14 changes: 7 additions & 7 deletions space/space.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ func (s *Space[K, V]) find(
hashBuff []byte,
hashMatches []uint64,
) error {
if err := s.walkPointers(snapshotID, tx, walRecorder, allocator, v, hashBuff); err != nil {
if err := s.walkPointers(v, snapshotID, tx, walRecorder, allocator, hashBuff); err != nil {
return err
}

Expand Down Expand Up @@ -466,7 +466,7 @@ func (s *Space[K, V]) set(
hashBuff []byte,
hashMatches []uint64,
) error {
if err := s.walkPointers(snapshotID, tx, walRecorder, allocator, v, hashBuff); err != nil {
if err := s.walkPointers(v, snapshotID, tx, walRecorder, allocator, hashBuff); err != nil {
return err
}

Expand Down Expand Up @@ -550,7 +550,7 @@ func (s *Space[K, V]) set(
}

// Add pointer node.
if err := s.addPointerNode(snapshotID, tx, walRecorder, allocator, v, conflict); err != nil {
if err := s.addPointerNode(v, snapshotID, tx, walRecorder, allocator, conflict); err != nil {
return err
}

Expand Down Expand Up @@ -672,11 +672,11 @@ func (s *Space[K, V]) splitDataNode(
}

func (s *Space[K, V]) addPointerNode(
v *Entry[K, V],
snapshotID types.SnapshotID,
tx *pipeline.TransactionRequest,
walRecorder *wal.Recorder,
allocator *alloc.Allocator,
v *Entry[K, V],
conflict bool,
) error {
pointerNodeVolatileAddress, err := allocator.Allocate()
Expand Down Expand Up @@ -804,27 +804,27 @@ var pointerHops = [NumOfPointers][]uint64{
}

func (s *Space[K, V]) walkPointers(
v *Entry[K, V],
snapshotID types.SnapshotID,
tx *pipeline.TransactionRequest,
walRecorder *wal.Recorder,
allocator *alloc.Allocator,
v *Entry[K, V],
hashBuff []byte,
) error {
for {
more, err := s.walkOnePointer(snapshotID, tx, walRecorder, allocator, v, hashBuff)
more, err := s.walkOnePointer(v, snapshotID, tx, walRecorder, allocator, hashBuff)
if err != nil || !more {
return err
}
}
}

func (s *Space[K, V]) walkOnePointer(
v *Entry[K, V],
snapshotID types.SnapshotID,
tx *pipeline.TransactionRequest,
walRecorder *wal.Recorder,
allocator *alloc.Allocator,
v *Entry[K, V],
hashBuff []byte,
) (bool, error) {
volatileAddress := types.Load(&v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress)
Expand Down
69 changes: 69 additions & 0 deletions space/space_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package space

import (
"github.com/outofforest/quantum/alloc"
"github.com/outofforest/quantum/pipeline"
"github.com/outofforest/quantum/types"
"github.com/outofforest/quantum/wal"
)

func NewSpaceTest[K, V comparable](
space *Space[K, V],
tx *pipeline.TransactionRequest,
walRecorder *wal.Recorder,
allocator *alloc.Allocator,
) *SpaceTest[K, V] {
return &SpaceTest[K, V]{
s: space,
tx: tx,
walRecorder: walRecorder,
allocator: allocator,
hashBuff: space.NewHashBuff(),
hashMatches: space.NewHashMatches(),
}
}

// SpaceTest exposes some private functionality of space to make testing concurrent scenarios possible.
type SpaceTest[K, V comparable] struct {
s *Space[K, V]
tx *pipeline.TransactionRequest
walRecorder *wal.Recorder
allocator *alloc.Allocator
hashBuff []byte
hashMatches []uint64
}

func (s *SpaceTest[K, V]) NewEntry(
snapshotID types.SnapshotID,
key K,
keyHash types.KeyHash,
stage uint8,
) (*Entry[K, V], error) {
v := &Entry[K, V]{}
if err := s.s.initEntry(v, snapshotID, s.tx, s.walRecorder, s.allocator, key, keyHash, stage); err != nil {
return nil, err
}
return v, nil
}

func (s *SpaceTest[K, V]) SplitDataNode(v *Entry[K, V], snapshotID types.SnapshotID) error {
_, err := s.s.splitDataNode(snapshotID, s.tx, s.walRecorder, s.allocator, v.parentIndex,
v.storeRequest.Store[v.storeRequest.PointersToStore-2].Pointer, v.level)
return err
}

func (s *SpaceTest[K, V]) AddPointerNode(v *Entry[K, V], snapshotID types.SnapshotID, conflict bool) error {
return s.s.addPointerNode(v, snapshotID, s.tx, s.walRecorder, s.allocator, conflict)
}

func (s *SpaceTest[K, V]) WalkPointers(v *Entry[K, V], snapshotID types.SnapshotID) error {
return s.s.walkPointers(v, snapshotID, s.tx, s.walRecorder, s.allocator, s.hashBuff)
}

func (s *SpaceTest[K, V]) WalkOnePointer(v *Entry[K, V], snapshotID types.SnapshotID) (bool, error) {
return s.s.walkOnePointer(v, snapshotID, s.tx, s.walRecorder, s.allocator, s.hashBuff)
}

func (s *SpaceTest[K, V]) WalkDataItems(v *Entry[K, V]) bool {
return s.s.walkDataItems(v, s.hashMatches)
}
Loading

0 comments on commit 929e05a

Please sign in to comment.