From 0205da479e71049f00be80022bea8fdca157f1dc Mon Sep 17 00:00:00 2001 From: Wojciech Malota-Wojcik Date: Fri, 29 Nov 2024 18:33:54 +0100 Subject: [PATCH] Apply changes recorded in WAL --- db.go | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/db.go b/db.go index 579bec4..ca723bf 100644 --- a/db.go +++ b/db.go @@ -1081,12 +1081,46 @@ func (db *DB) updatePointerHashes( } func (db *DB) applyWALChanges(ctx context.Context, pipeReader *pipeline.Reader) error { + origin := db.config.State.Origin() + for processedCount := uint64(0); ; processedCount++ { req, err := pipeReader.Read(ctx) if err != nil { return err } + for wr := req.WALRequest; wr != nil; wr = wr.Next { + walNode := waltypes.ProjectNode(db.config.State.Node(wr.NodeAddress)) + var wrIndex uint16 + + for wrIndex < waltypes.BlobSize && wal.RecordType(walNode.Blob[wrIndex]) != wal.RecordEnd { + switch recordType := wal.RecordType(walNode.Blob[wrIndex]); recordType { + case wal.RecordSet1: + offset := *(*uint64)(unsafe.Pointer(&walNode.Blob[wrIndex+1])) + *(*byte)(unsafe.Add(origin, offset)) = walNode.Blob[wrIndex+9] + wrIndex += 10 + case wal.RecordSet8: + offset := *(*uint64)(unsafe.Pointer(&walNode.Blob[wrIndex+1])) + copy(unsafe.Slice((*byte)(unsafe.Add(origin, offset)), 8), walNode.Blob[wrIndex+9:]) + wrIndex += 17 + case wal.RecordSet32: + offset := *(*uint64)(unsafe.Pointer(&walNode.Blob[wrIndex+1])) + copy(unsafe.Slice((*byte)(unsafe.Add(origin, offset)), 32), walNode.Blob[wrIndex+9:]) + wrIndex += 41 + case wal.RecordSet: + offset := *(*uint64)(unsafe.Pointer(&walNode.Blob[wrIndex+1])) + size := *(*uint16)(unsafe.Pointer(&walNode.Blob[wrIndex+9])) + copy(unsafe.Slice((*byte)(unsafe.Add(origin, offset)), size), walNode.Blob[wrIndex+11:]) + wrIndex += size + 2 + case wal.RecordImmediateDeallocation, wal.RecordDelayedDeallocation: + wrIndex += 25 + default: + fmt.Printf("%#v\n", walNode.Blob) + panic("=============") + } + } + } + pipeReader.Acknowledge(processedCount+1, req) } }