Skip to content

Commit

Permalink
Rearrange goroutines in the pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
outofforest committed Nov 29, 2024
1 parent 6e7f267 commit 6a31bed
Showing 1 changed file with 62 additions and 63 deletions.
125 changes: 62 additions & 63 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,21 +189,22 @@ func (db *DB) Run(ctx context.Context) error {
for range cap(copyReaders) {
copyReaders = append(copyReaders, pipeline.NewReader(deallocateReader))
}
prevHashReaders := copyReaders
applyWALReader := pipeline.NewReader(copyReaders...)
prevHashReader := applyWALReader
dataHashReaders := make([]*pipeline.Reader, 0, 2)
for range cap(dataHashReaders) {
nextReader := pipeline.NewReader(prevHashReaders...)
nextReader := pipeline.NewReader(prevHashReader)
dataHashReaders = append(dataHashReaders, nextReader)
prevHashReaders = []*pipeline.Reader{nextReader}
prevHashReader = nextReader
}
pointerHashReaders := make([]*pipeline.Reader, 0, 2)
for range cap(pointerHashReaders) {
nextReader := pipeline.NewReader(prevHashReaders...)
nextReader := pipeline.NewReader(prevHashReader)
pointerHashReaders = append(pointerHashReaders, nextReader)
prevHashReaders = []*pipeline.Reader{nextReader}
prevHashReader = nextReader
}
applyWALReader := pipeline.NewReader(prevHashReaders...)
commitSyncReader := pipeline.NewReader(applyWALReader)

commitSyncReader := pipeline.NewReader(prevHashReader)

spawn("supervisor", parallel.Exit, func(ctx context.Context) error {
var lastSyncCh chan<- struct{}
Expand Down Expand Up @@ -257,6 +258,9 @@ func (db *DB) Run(ctx context.Context) error {
return db.copyNodes(ctx, reader, uint64(i))
})
}
spawn("applyWAL", parallel.Fail, func(ctx context.Context) error {
return db.applyWALChanges(ctx, applyWALReader)
})
for i, reader := range dataHashReaders {
spawn(fmt.Sprintf("datahash-%02d", i), parallel.Fail, func(ctx context.Context) error {
return db.updateDataHashes(ctx, reader, uint64(i))
Expand All @@ -267,9 +271,6 @@ func (db *DB) Run(ctx context.Context) error {
return db.updatePointerHashes(ctx, reader, uint64(i))
})
}
spawn("applyWAL", parallel.Fail, func(ctx context.Context) error {
return db.applyWALChanges(ctx, applyWALReader)
})
spawn("commitSync", parallel.Fail, func(ctx context.Context) error {
return db.syncOnCommit(ctx, commitSyncReader)
})
Expand Down Expand Up @@ -753,10 +754,9 @@ func (db *DB) processDeallocations(ctx context.Context, pipeReader *pipeline.Rea
wrIndex += 9
wrIndex += *(*uint16)(unsafe.Pointer(&walNode.Blob[wrIndex])) + 2
case wal.RecordImmediateDeallocation, wal.RecordDelayedDeallocation:
wrIndex++
nodeSnapshotID := *(*types.SnapshotID)(unsafe.Pointer(&walNode.Blob[wrIndex]))
oldNodeAddress := *(*types.NodeAddress)(unsafe.Pointer(&walNode.Blob[wrIndex+8]))
wrIndex += 24
nodeSnapshotID := *(*types.SnapshotID)(unsafe.Pointer(&walNode.Blob[wrIndex+1]))
oldNodeAddress := *(*types.NodeAddress)(unsafe.Pointer(&walNode.Blob[wrIndex+9]))
wrIndex += 25

// FIXME (wojciech): Do something with the list root.
if _, err := db.deallocateNode(nodeSnapshotID, oldNodeAddress, allocator, deallocator,
Expand Down Expand Up @@ -817,10 +817,9 @@ func (db *DB) copyNodes(
wrIndex += 9
wrIndex += *(*uint16)(unsafe.Pointer(&walNode.Blob[wrIndex])) + 2
case wal.RecordImmediateDeallocation, wal.RecordDelayedDeallocation:
wrIndex++
oldNodeAddress := *(*types.NodeAddress)(unsafe.Pointer(&walNode.Blob[wrIndex+8]))
newNodeAddress := *(*types.NodeAddress)(unsafe.Pointer(&walNode.Blob[wrIndex+16]))
wrIndex += 24
oldNodeAddress := *(*types.NodeAddress)(unsafe.Pointer(&walNode.Blob[wrIndex+9]))
newNodeAddress := *(*types.NodeAddress)(unsafe.Pointer(&walNode.Blob[wrIndex+17]))
wrIndex += 25

copy(db.config.State.Bytes(newNodeAddress), db.config.State.Bytes(oldNodeAddress))
default:
Expand All @@ -835,6 +834,51 @@ func (db *DB) copyNodes(
}
}

func (db *DB) applyWALChanges(ctx context.Context, pipeReader *pipeline.Reader) error {
origin := db.config.State.Origin()

for processedCount := uint64(0); ; processedCount++ {
req, err := pipeReader.Read(ctx)
if err != nil {
return err
}

for wr := req.WALRequest; wr != nil; wr = wr.Next {
walNode := waltypes.ProjectNode(db.config.State.Node(wr.NodeAddress))
var wrIndex uint16

for wrIndex < waltypes.BlobSize && wal.RecordType(walNode.Blob[wrIndex]) != wal.RecordEnd {
switch recordType := wal.RecordType(walNode.Blob[wrIndex]); recordType {
case wal.RecordSet1:
offset := *(*uint64)(unsafe.Pointer(&walNode.Blob[wrIndex+1]))
*(*byte)(unsafe.Add(origin, offset)) = walNode.Blob[wrIndex+9]
wrIndex += 10
case wal.RecordSet8:
offset := *(*uint64)(unsafe.Pointer(&walNode.Blob[wrIndex+1]))
copy(unsafe.Slice((*byte)(unsafe.Add(origin, offset)), 8), walNode.Blob[wrIndex+9:])
wrIndex += 17
case wal.RecordSet32:
offset := *(*uint64)(unsafe.Pointer(&walNode.Blob[wrIndex+1]))
copy(unsafe.Slice((*byte)(unsafe.Add(origin, offset)), 32), walNode.Blob[wrIndex+9:])
wrIndex += 41
case wal.RecordSet:
offset := *(*uint64)(unsafe.Pointer(&walNode.Blob[wrIndex+1]))
size := *(*uint16)(unsafe.Pointer(&walNode.Blob[wrIndex+9]))
copy(unsafe.Slice((*byte)(unsafe.Add(origin, offset)), size), walNode.Blob[wrIndex+11:])
wrIndex += size + 2
case wal.RecordImmediateDeallocation, wal.RecordDelayedDeallocation:
wrIndex += 25
default:
fmt.Printf("%#v\n", walNode.Blob)
panic("=============")
}
}
}

pipeReader.Acknowledge(processedCount+1, req)
}
}

func (db *DB) updateDataHashes(
ctx context.Context,
pipeReader *pipeline.Reader,
Expand Down Expand Up @@ -1080,51 +1124,6 @@ func (db *DB) updatePointerHashes(
}
}

func (db *DB) applyWALChanges(ctx context.Context, pipeReader *pipeline.Reader) error {
origin := db.config.State.Origin()

for processedCount := uint64(0); ; processedCount++ {
req, err := pipeReader.Read(ctx)
if err != nil {
return err
}

for wr := req.WALRequest; wr != nil; wr = wr.Next {
walNode := waltypes.ProjectNode(db.config.State.Node(wr.NodeAddress))
var wrIndex uint16

for wrIndex < waltypes.BlobSize && wal.RecordType(walNode.Blob[wrIndex]) != wal.RecordEnd {
switch recordType := wal.RecordType(walNode.Blob[wrIndex]); recordType {
case wal.RecordSet1:
offset := *(*uint64)(unsafe.Pointer(&walNode.Blob[wrIndex+1]))
*(*byte)(unsafe.Add(origin, offset)) = walNode.Blob[wrIndex+9]
wrIndex += 10
case wal.RecordSet8:
offset := *(*uint64)(unsafe.Pointer(&walNode.Blob[wrIndex+1]))
copy(unsafe.Slice((*byte)(unsafe.Add(origin, offset)), 8), walNode.Blob[wrIndex+9:])
wrIndex += 17
case wal.RecordSet32:
offset := *(*uint64)(unsafe.Pointer(&walNode.Blob[wrIndex+1]))
copy(unsafe.Slice((*byte)(unsafe.Add(origin, offset)), 32), walNode.Blob[wrIndex+9:])
wrIndex += 41
case wal.RecordSet:
offset := *(*uint64)(unsafe.Pointer(&walNode.Blob[wrIndex+1]))
size := *(*uint16)(unsafe.Pointer(&walNode.Blob[wrIndex+9]))
copy(unsafe.Slice((*byte)(unsafe.Add(origin, offset)), size), walNode.Blob[wrIndex+11:])
wrIndex += size + 2
case wal.RecordImmediateDeallocation, wal.RecordDelayedDeallocation:
wrIndex += 25
default:
fmt.Printf("%#v\n", walNode.Blob)
panic("=============")
}
}
}

pipeReader.Acknowledge(processedCount+1, req)
}
}

func (db *DB) syncOnCommit(ctx context.Context, pipeReader *pipeline.Reader) error {
for processedCount := uint64(0); ; processedCount++ {
req, err := pipeReader.Read(ctx)
Expand Down

0 comments on commit 6a31bed

Please sign in to comment.