Skip to content

Commit

Permalink
Don't store address of the next WAL node (#258)
Browse files Browse the repository at this point in the history
  • Loading branch information
outofforest authored Nov 30, 2024
1 parent bca0d2f commit 2f9f1bd
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 52 deletions.
51 changes: 24 additions & 27 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,8 +669,8 @@ func (db *DB) processDeallocations(ctx context.Context, pipeReader *pipeline.Rea
walNode := waltypes.ProjectNode(db.config.State.Node(wr.NodeAddress))
var wrIndex uint16

for wrIndex < waltypes.BlobSize && wal.RecordType(walNode.Blob[wrIndex]) != wal.RecordEnd {
switch recordType := wal.RecordType(walNode.Blob[wrIndex]); recordType {
for wrIndex < types.NodeLength && wal.RecordType(walNode[wrIndex]) != wal.RecordEnd {
switch recordType := wal.RecordType(walNode[wrIndex]); recordType {
case wal.RecordSet1:
wrIndex += 10
case wal.RecordSet8:
Expand All @@ -679,19 +679,18 @@ func (db *DB) processDeallocations(ctx context.Context, pipeReader *pipeline.Rea
wrIndex += 41
case wal.RecordSet:
wrIndex += 9
wrIndex += *(*uint16)(unsafe.Pointer(&walNode.Blob[wrIndex])) + 2
wrIndex += *(*uint16)(unsafe.Pointer(&walNode[wrIndex])) + 2
case wal.RecordImmediateDeallocation, wal.RecordDelayedDeallocation:
nodeSnapshotID := *(*types.SnapshotID)(unsafe.Pointer(&walNode.Blob[wrIndex+1]))
oldNodeAddress := *(*types.NodeAddress)(unsafe.Pointer(&walNode.Blob[wrIndex+9]))
nodeSnapshotID := *(*types.SnapshotID)(unsafe.Pointer(&walNode[wrIndex+1]))
oldNodeAddress := *(*types.NodeAddress)(unsafe.Pointer(&walNode[wrIndex+9]))
wrIndex += 25

if err := db.deallocateNode(nodeSnapshotID, oldNodeAddress, allocator, deallocator,
recordType == wal.RecordImmediateDeallocation); err != nil {
return err
}
default:
fmt.Printf("%#v\n", walNode.Blob)
panic("=============")
panic("unrecognized record type")
}
}
}
Expand Down Expand Up @@ -731,8 +730,8 @@ func (db *DB) copyNodes(
walNode := waltypes.ProjectNode(db.config.State.Node(wr.NodeAddress))
var wrIndex uint16

for wrIndex < waltypes.BlobSize && wal.RecordType(walNode.Blob[wrIndex]) != wal.RecordEnd {
switch recordType := wal.RecordType(walNode.Blob[wrIndex]); recordType {
for wrIndex < types.NodeLength && wal.RecordType(walNode[wrIndex]) != wal.RecordEnd {
switch recordType := wal.RecordType(walNode[wrIndex]); recordType {
case wal.RecordSet1:
wrIndex += 10
case wal.RecordSet8:
Expand All @@ -741,16 +740,15 @@ func (db *DB) copyNodes(
wrIndex += 41
case wal.RecordSet:
wrIndex += 9
wrIndex += *(*uint16)(unsafe.Pointer(&walNode.Blob[wrIndex])) + 2
wrIndex += *(*uint16)(unsafe.Pointer(&walNode[wrIndex])) + 2
case wal.RecordImmediateDeallocation, wal.RecordDelayedDeallocation:
oldNodeAddress := *(*types.NodeAddress)(unsafe.Pointer(&walNode.Blob[wrIndex+9]))
newNodeAddress := *(*types.NodeAddress)(unsafe.Pointer(&walNode.Blob[wrIndex+17]))
oldNodeAddress := *(*types.NodeAddress)(unsafe.Pointer(&walNode[wrIndex+9]))
newNodeAddress := *(*types.NodeAddress)(unsafe.Pointer(&walNode[wrIndex+17]))
wrIndex += 25

copy(db.config.State.Bytes(newNodeAddress), db.config.State.Bytes(oldNodeAddress))
default:
fmt.Printf("%#v\n", walNode.Blob)
panic("=============")
panic("unrecognized record type")
}
}
}
Expand All @@ -773,30 +771,29 @@ func (db *DB) applyWALChanges(ctx context.Context, pipeReader *pipeline.Reader)
walNode := waltypes.ProjectNode(db.config.State.Node(wr.NodeAddress))
var wrIndex uint16

for wrIndex < waltypes.BlobSize && wal.RecordType(walNode.Blob[wrIndex]) != wal.RecordEnd {
switch recordType := wal.RecordType(walNode.Blob[wrIndex]); recordType {
for wrIndex < types.NodeLength && wal.RecordType(walNode[wrIndex]) != wal.RecordEnd {
switch recordType := wal.RecordType(walNode[wrIndex]); recordType {
case wal.RecordSet1:
offset := *(*uint64)(unsafe.Pointer(&walNode.Blob[wrIndex+1]))
*(*byte)(unsafe.Add(origin, offset)) = walNode.Blob[wrIndex+9]
offset := *(*uint64)(unsafe.Pointer(&walNode[wrIndex+1]))
*(*byte)(unsafe.Add(origin, offset)) = walNode[wrIndex+9]
wrIndex += 10
case wal.RecordSet8:
offset := *(*uint64)(unsafe.Pointer(&walNode.Blob[wrIndex+1]))
copy(unsafe.Slice((*byte)(unsafe.Add(origin, offset)), 8), walNode.Blob[wrIndex+9:])
offset := *(*uint64)(unsafe.Pointer(&walNode[wrIndex+1]))
copy(unsafe.Slice((*byte)(unsafe.Add(origin, offset)), 8), walNode[wrIndex+9:])
wrIndex += 17
case wal.RecordSet32:
offset := *(*uint64)(unsafe.Pointer(&walNode.Blob[wrIndex+1]))
copy(unsafe.Slice((*byte)(unsafe.Add(origin, offset)), 32), walNode.Blob[wrIndex+9:])
offset := *(*uint64)(unsafe.Pointer(&walNode[wrIndex+1]))
copy(unsafe.Slice((*byte)(unsafe.Add(origin, offset)), 32), walNode[wrIndex+9:])
wrIndex += 41
case wal.RecordSet:
offset := *(*uint64)(unsafe.Pointer(&walNode.Blob[wrIndex+1]))
size := *(*uint16)(unsafe.Pointer(&walNode.Blob[wrIndex+9]))
copy(unsafe.Slice((*byte)(unsafe.Add(origin, offset)), size), walNode.Blob[wrIndex+11:])
offset := *(*uint64)(unsafe.Pointer(&walNode[wrIndex+1]))
size := *(*uint16)(unsafe.Pointer(&walNode[wrIndex+9]))
copy(unsafe.Slice((*byte)(unsafe.Add(origin, offset)), size), walNode[wrIndex+11:])
wrIndex += size + 2
case wal.RecordImmediateDeallocation, wal.RecordDelayedDeallocation:
wrIndex += 25
default:
fmt.Printf("%#v\n", walNode.Blob)
panic("=============")
panic("unrecognized record type")
}
}
}
Expand Down
32 changes: 16 additions & 16 deletions wal/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func NewRecorder(
state: state,
stateOrigin: uintptr(state.Origin()),
allocator: allocator,
sizeCounter: types.BlobSize,
sizeCounter: qtypes.NodeLength,
}
}

Expand All @@ -62,14 +62,14 @@ type Recorder struct {
// Commit commits pending node.
func (r *Recorder) Commit(tx *pipeline.TransactionRequest) {
if r.node != nil {
if r.sizeCounter < types.BlobSize {
r.node.Blob[r.sizeCounter] = byte(RecordEnd)
if r.sizeCounter < qtypes.NodeLength {
r.node[r.sizeCounter] = byte(RecordEnd)
}
tx.AddWALRequest(r.nodeAddress)
}

r.node = nil
r.sizeCounter = types.BlobSize
r.sizeCounter = qtypes.NodeLength
}

func (r *Recorder) insert(
Expand All @@ -80,9 +80,9 @@ func (r *Recorder) insert(
if err := r.ensureSize(tx, size+1); err != nil {
return nil, err
}
r.node.Blob[r.sizeCounter] = byte(recordType)
r.node[r.sizeCounter] = byte(recordType)

p := unsafe.Pointer(&r.node.Blob[r.sizeCounter+1])
p := unsafe.Pointer(&r.node[r.sizeCounter+1])
r.sizeCounter += size + 1

return p, nil
Expand All @@ -97,11 +97,11 @@ func (r *Recorder) insertWithOffset(
if err := r.ensureSize(tx, size+9); err != nil {
return nil, err
}
r.node.Blob[r.sizeCounter] = byte(recordType)
*(*uintptr)(unsafe.Pointer(&r.node.Blob[r.sizeCounter+1])) = offset
r.node[r.sizeCounter] = byte(recordType)
*(*uintptr)(unsafe.Pointer(&r.node[r.sizeCounter+1])) = offset
r.sizeCounter += 9

p := unsafe.Pointer(&r.node.Blob[r.sizeCounter])
p := unsafe.Pointer(&r.node[r.sizeCounter])
r.sizeCounter += size

return p, nil
Expand All @@ -116,24 +116,24 @@ func (r *Recorder) insertWithOffsetAndSize(
if err := r.ensureSize(tx, size+11); err != nil {
return nil, err
}
r.node.Blob[r.sizeCounter] = byte(recordType)
*(*uintptr)(unsafe.Pointer(&r.node.Blob[r.sizeCounter+1])) = offset
*(*uint16)(unsafe.Pointer(&r.node.Blob[r.sizeCounter+9])) = uint16(size)
r.node[r.sizeCounter] = byte(recordType)
*(*uintptr)(unsafe.Pointer(&r.node[r.sizeCounter+1])) = offset
*(*uint16)(unsafe.Pointer(&r.node[r.sizeCounter+9])) = uint16(size)
r.sizeCounter += 11

p := unsafe.Pointer(&r.node.Blob[r.sizeCounter])
p := unsafe.Pointer(&r.node[r.sizeCounter])
r.sizeCounter += size

return p, nil
}

func (r *Recorder) ensureSize(tx *pipeline.TransactionRequest, size uintptr) error {
if types.BlobSize-r.sizeCounter >= size {
if qtypes.NodeLength-r.sizeCounter >= size {
return nil
}
if r.node != nil {
if r.sizeCounter < types.BlobSize {
r.node.Blob[r.sizeCounter] = byte(RecordEnd)
if r.sizeCounter < qtypes.NodeLength {
r.node[r.sizeCounter] = byte(RecordEnd)
}
tx.AddWALRequest(r.nodeAddress)
}
Expand Down
11 changes: 2 additions & 9 deletions wal/types/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,11 @@ package types
import (
"unsafe"

"github.com/outofforest/quantum/types"
qtypes "github.com/outofforest/quantum/types"
)

// BlobSize defines the size of space available in node.
const BlobSize = 4088

// Node represents list node.
type Node struct {
Blob [BlobSize]byte

Next types.NodeAddress
}
type Node [qtypes.NodeLength]byte

// ProjectNode projects node to list node.
func ProjectNode(n unsafe.Pointer) *Node {
Expand Down

0 comments on commit 2f9f1bd

Please sign in to comment.