From 6a31bede2df1780bae0af8616b6c0568effd095c Mon Sep 17 00:00:00 2001 From: Wojciech Malota-Wojcik Date: Fri, 29 Nov 2024 19:02:11 +0100 Subject: [PATCH] Rearrange goroutines in the pipeline --- db.go | 125 +++++++++++++++++++++++++++++----------------------------- 1 file changed, 62 insertions(+), 63 deletions(-) diff --git a/db.go b/db.go index afcfe3c..985ea62 100644 --- a/db.go +++ b/db.go @@ -189,21 +189,22 @@ func (db *DB) Run(ctx context.Context) error { for range cap(copyReaders) { copyReaders = append(copyReaders, pipeline.NewReader(deallocateReader)) } - prevHashReaders := copyReaders + applyWALReader := pipeline.NewReader(copyReaders...) + prevHashReader := applyWALReader dataHashReaders := make([]*pipeline.Reader, 0, 2) for range cap(dataHashReaders) { - nextReader := pipeline.NewReader(prevHashReaders...) + nextReader := pipeline.NewReader(prevHashReader) dataHashReaders = append(dataHashReaders, nextReader) - prevHashReaders = []*pipeline.Reader{nextReader} + prevHashReader = nextReader } pointerHashReaders := make([]*pipeline.Reader, 0, 2) for range cap(pointerHashReaders) { - nextReader := pipeline.NewReader(prevHashReaders...) + nextReader := pipeline.NewReader(prevHashReader) pointerHashReaders = append(pointerHashReaders, nextReader) - prevHashReaders = []*pipeline.Reader{nextReader} + prevHashReader = nextReader } - applyWALReader := pipeline.NewReader(prevHashReaders...) - commitSyncReader := pipeline.NewReader(applyWALReader) + + commitSyncReader := pipeline.NewReader(prevHashReader) spawn("supervisor", parallel.Exit, func(ctx context.Context) error { var lastSyncCh chan<- struct{} @@ -257,6 +258,9 @@ func (db *DB) Run(ctx context.Context) error { return db.copyNodes(ctx, reader, uint64(i)) }) } + spawn("applyWAL", parallel.Fail, func(ctx context.Context) error { + return db.applyWALChanges(ctx, applyWALReader) + }) for i, reader := range dataHashReaders { spawn(fmt.Sprintf("datahash-%02d", i), parallel.Fail, func(ctx context.Context) error { return db.updateDataHashes(ctx, reader, uint64(i)) @@ -267,9 +271,6 @@ func (db *DB) Run(ctx context.Context) error { return db.updatePointerHashes(ctx, reader, uint64(i)) }) } - spawn("applyWAL", parallel.Fail, func(ctx context.Context) error { - return db.applyWALChanges(ctx, applyWALReader) - }) spawn("commitSync", parallel.Fail, func(ctx context.Context) error { return db.syncOnCommit(ctx, commitSyncReader) }) @@ -753,10 +754,9 @@ func (db *DB) processDeallocations(ctx context.Context, pipeReader *pipeline.Rea wrIndex += 9 wrIndex += *(*uint16)(unsafe.Pointer(&walNode.Blob[wrIndex])) + 2 case wal.RecordImmediateDeallocation, wal.RecordDelayedDeallocation: - wrIndex++ - nodeSnapshotID := *(*types.SnapshotID)(unsafe.Pointer(&walNode.Blob[wrIndex])) - oldNodeAddress := *(*types.NodeAddress)(unsafe.Pointer(&walNode.Blob[wrIndex+8])) - wrIndex += 24 + nodeSnapshotID := *(*types.SnapshotID)(unsafe.Pointer(&walNode.Blob[wrIndex+1])) + oldNodeAddress := *(*types.NodeAddress)(unsafe.Pointer(&walNode.Blob[wrIndex+9])) + wrIndex += 25 // FIXME (wojciech): Do something with the list root. if _, err := db.deallocateNode(nodeSnapshotID, oldNodeAddress, allocator, deallocator, @@ -817,10 +817,9 @@ func (db *DB) copyNodes( wrIndex += 9 wrIndex += *(*uint16)(unsafe.Pointer(&walNode.Blob[wrIndex])) + 2 case wal.RecordImmediateDeallocation, wal.RecordDelayedDeallocation: - wrIndex++ - oldNodeAddress := *(*types.NodeAddress)(unsafe.Pointer(&walNode.Blob[wrIndex+8])) - newNodeAddress := *(*types.NodeAddress)(unsafe.Pointer(&walNode.Blob[wrIndex+16])) - wrIndex += 24 + oldNodeAddress := *(*types.NodeAddress)(unsafe.Pointer(&walNode.Blob[wrIndex+9])) + newNodeAddress := *(*types.NodeAddress)(unsafe.Pointer(&walNode.Blob[wrIndex+17])) + wrIndex += 25 copy(db.config.State.Bytes(newNodeAddress), db.config.State.Bytes(oldNodeAddress)) default: @@ -835,6 +834,51 @@ func (db *DB) copyNodes( } } +func (db *DB) applyWALChanges(ctx context.Context, pipeReader *pipeline.Reader) error { + origin := db.config.State.Origin() + + for processedCount := uint64(0); ; processedCount++ { + req, err := pipeReader.Read(ctx) + if err != nil { + return err + } + + for wr := req.WALRequest; wr != nil; wr = wr.Next { + walNode := waltypes.ProjectNode(db.config.State.Node(wr.NodeAddress)) + var wrIndex uint16 + + for wrIndex < waltypes.BlobSize && wal.RecordType(walNode.Blob[wrIndex]) != wal.RecordEnd { + switch recordType := wal.RecordType(walNode.Blob[wrIndex]); recordType { + case wal.RecordSet1: + offset := *(*uint64)(unsafe.Pointer(&walNode.Blob[wrIndex+1])) + *(*byte)(unsafe.Add(origin, offset)) = walNode.Blob[wrIndex+9] + wrIndex += 10 + case wal.RecordSet8: + offset := *(*uint64)(unsafe.Pointer(&walNode.Blob[wrIndex+1])) + copy(unsafe.Slice((*byte)(unsafe.Add(origin, offset)), 8), walNode.Blob[wrIndex+9:]) + wrIndex += 17 + case wal.RecordSet32: + offset := *(*uint64)(unsafe.Pointer(&walNode.Blob[wrIndex+1])) + copy(unsafe.Slice((*byte)(unsafe.Add(origin, offset)), 32), walNode.Blob[wrIndex+9:]) + wrIndex += 41 + case wal.RecordSet: + offset := *(*uint64)(unsafe.Pointer(&walNode.Blob[wrIndex+1])) + size := *(*uint16)(unsafe.Pointer(&walNode.Blob[wrIndex+9])) + copy(unsafe.Slice((*byte)(unsafe.Add(origin, offset)), size), walNode.Blob[wrIndex+11:]) + wrIndex += size + 2 + case wal.RecordImmediateDeallocation, wal.RecordDelayedDeallocation: + wrIndex += 25 + default: + fmt.Printf("%#v\n", walNode.Blob) + panic("=============") + } + } + } + + pipeReader.Acknowledge(processedCount+1, req) + } +} + func (db *DB) updateDataHashes( ctx context.Context, pipeReader *pipeline.Reader, @@ -1080,51 +1124,6 @@ func (db *DB) updatePointerHashes( } } -func (db *DB) applyWALChanges(ctx context.Context, pipeReader *pipeline.Reader) error { - origin := db.config.State.Origin() - - for processedCount := uint64(0); ; processedCount++ { - req, err := pipeReader.Read(ctx) - if err != nil { - return err - } - - for wr := req.WALRequest; wr != nil; wr = wr.Next { - walNode := waltypes.ProjectNode(db.config.State.Node(wr.NodeAddress)) - var wrIndex uint16 - - for wrIndex < waltypes.BlobSize && wal.RecordType(walNode.Blob[wrIndex]) != wal.RecordEnd { - switch recordType := wal.RecordType(walNode.Blob[wrIndex]); recordType { - case wal.RecordSet1: - offset := *(*uint64)(unsafe.Pointer(&walNode.Blob[wrIndex+1])) - *(*byte)(unsafe.Add(origin, offset)) = walNode.Blob[wrIndex+9] - wrIndex += 10 - case wal.RecordSet8: - offset := *(*uint64)(unsafe.Pointer(&walNode.Blob[wrIndex+1])) - copy(unsafe.Slice((*byte)(unsafe.Add(origin, offset)), 8), walNode.Blob[wrIndex+9:]) - wrIndex += 17 - case wal.RecordSet32: - offset := *(*uint64)(unsafe.Pointer(&walNode.Blob[wrIndex+1])) - copy(unsafe.Slice((*byte)(unsafe.Add(origin, offset)), 32), walNode.Blob[wrIndex+9:]) - wrIndex += 41 - case wal.RecordSet: - offset := *(*uint64)(unsafe.Pointer(&walNode.Blob[wrIndex+1])) - size := *(*uint16)(unsafe.Pointer(&walNode.Blob[wrIndex+9])) - copy(unsafe.Slice((*byte)(unsafe.Add(origin, offset)), size), walNode.Blob[wrIndex+11:]) - wrIndex += size + 2 - case wal.RecordImmediateDeallocation, wal.RecordDelayedDeallocation: - wrIndex += 25 - default: - fmt.Printf("%#v\n", walNode.Blob) - panic("=============") - } - } - } - - pipeReader.Acknowledge(processedCount+1, req) - } -} - func (db *DB) syncOnCommit(ctx context.Context, pipeReader *pipeline.Reader) error { for processedCount := uint64(0); ; processedCount++ { req, err := pipeReader.Read(ctx)