From 9c3745a3f0a9511dd10aad62aeff53f5eceb521d Mon Sep 17 00:00:00 2001 From: Wojciech Malota-Wojcik Date: Mon, 2 Dec 2024 10:04:33 +0100 Subject: [PATCH] If data are available in persistent node, copy it from there instead of storing in WAL --- db.go | 33 ++++++++++++++++++++++----------- space/space.go | 6 +++++- wal/record.go | 34 ++++++++++++++++++++++++++++++++-- 3 files changed, 59 insertions(+), 14 deletions(-) diff --git a/db.go b/db.go index 0923f72..67b37a8 100644 --- a/db.go +++ b/db.go @@ -673,8 +673,9 @@ func (db *DB) processDeallocations(ctx context.Context, pipeReader *pipeline.Rea case wal.RecordSet32: wrIndex += 41 case wal.RecordSet: - wrIndex += 9 - wrIndex += *(*uint16)(unsafe.Pointer(&walNode[wrIndex])) + 2 + wrIndex += *(*uint16)(unsafe.Pointer(&walNode[wrIndex+9])) + 11 + case wal.RecordCopy: + wrIndex += 19 case wal.RecordImmediateDeallocation, wal.RecordDelayedDeallocation: nodeSnapshotID := *(*types.SnapshotID)(unsafe.Pointer(&walNode[wrIndex+1])) oldNodeAddress := *(*types.NodeAddress)(unsafe.Pointer(&walNode[wrIndex+9])) @@ -685,7 +686,7 @@ func (db *DB) processDeallocations(ctx context.Context, pipeReader *pipeline.Rea return err } default: - panic("unrecognized record type") + panic(fmt.Sprintf("unrecognized record type at index %d", wrIndex)) } } } @@ -731,8 +732,9 @@ func (db *DB) copyNodes( case wal.RecordSet32: wrIndex += 41 case wal.RecordSet: - wrIndex += 9 - wrIndex += *(*uint16)(unsafe.Pointer(&walNode[wrIndex])) + 2 + wrIndex += *(*uint16)(unsafe.Pointer(&walNode[wrIndex+9])) + 11 + case wal.RecordCopy: + wrIndex += 19 case wal.RecordImmediateDeallocation, wal.RecordDelayedDeallocation: oldNodeAddress := *(*types.NodeAddress)(unsafe.Pointer(&walNode[wrIndex+9])) newNodeAddress := *(*types.NodeAddress)(unsafe.Pointer(&walNode[wrIndex+17])) @@ -740,7 +742,7 @@ func (db *DB) copyNodes( copy(db.config.State.Bytes(newNodeAddress), db.config.State.Bytes(oldNodeAddress)) default: - panic("unrecognized record type") + panic(fmt.Sprintf("unrecognized record type at index %d", wrIndex)) } } } @@ -766,26 +768,35 @@ func (db *DB) applyWALChanges(ctx context.Context, pipeReader *pipeline.Reader) for wrIndex < types.NodeLength && wal.RecordType(walNode[wrIndex]) != wal.RecordEnd { switch recordType := wal.RecordType(walNode[wrIndex]); recordType { case wal.RecordSet1: - offset := *(*uint64)(unsafe.Pointer(&walNode[wrIndex+1])) + offset := *(*uintptr)(unsafe.Pointer(&walNode[wrIndex+1])) *(*byte)(unsafe.Add(origin, offset)) = walNode[wrIndex+9] wrIndex += 10 case wal.RecordSet8: - offset := *(*uint64)(unsafe.Pointer(&walNode[wrIndex+1])) + offset := *(*uintptr)(unsafe.Pointer(&walNode[wrIndex+1])) copy(unsafe.Slice((*byte)(unsafe.Add(origin, offset)), 8), walNode[wrIndex+9:]) wrIndex += 17 case wal.RecordSet32: - offset := *(*uint64)(unsafe.Pointer(&walNode[wrIndex+1])) + offset := *(*uintptr)(unsafe.Pointer(&walNode[wrIndex+1])) copy(unsafe.Slice((*byte)(unsafe.Add(origin, offset)), 32), walNode[wrIndex+9:]) wrIndex += 41 case wal.RecordSet: - offset := *(*uint64)(unsafe.Pointer(&walNode[wrIndex+1])) + offset := *(*uintptr)(unsafe.Pointer(&walNode[wrIndex+1])) size := *(*uint16)(unsafe.Pointer(&walNode[wrIndex+9])) copy(unsafe.Slice((*byte)(unsafe.Add(origin, offset)), size), walNode[wrIndex+11:]) wrIndex += size + 11 + case wal.RecordCopy: + offsetDst := *(*uintptr)(unsafe.Pointer(&walNode[wrIndex+1])) + offsetSrc := *(*uintptr)(unsafe.Pointer(&walNode[wrIndex+9])) + size := *(*uint16)(unsafe.Pointer(&walNode[wrIndex+17])) + + copy(unsafe.Slice((*byte)(unsafe.Add(origin, offsetDst)), size), + unsafe.Slice((*byte)(unsafe.Add(origin, offsetSrc)), size)) + + wrIndex += 19 case wal.RecordImmediateDeallocation, wal.RecordDelayedDeallocation: wrIndex += 25 default: - panic("unrecognized record type") + panic(fmt.Sprintf("unrecognized record type at index %d", wrIndex)) } } } diff --git a/space/space.go b/space/space.go index 193f704..b2e36e3 100644 --- a/space/space.go +++ b/space/space.go @@ -600,8 +600,12 @@ func (s *Space[K, V]) splitDataNode( } newDataNodeItem := s.config.DataNodeAssistant.Item(newDataNode, s.config.DataNodeAssistant.ItemOffset(i)) - if err := wal.Set2(walRecorder, tx, newNodePersistentAddress, + if err := wal.Copy(walRecorder, tx, newNodePersistentAddress, existingNodePointer.PersistentAddress, newDataNodeItem, item, + ); err != nil { + return err + } + if err := wal.Set1(walRecorder, tx, newNodePersistentAddress, &newKeyHashes[i], &keyHashes[i], ); err != nil { return err diff --git a/wal/record.go b/wal/record.go index d560a38..09b2efa 100644 --- a/wal/record.go +++ b/wal/record.go @@ -28,6 +28,9 @@ const ( // RecordSet records modification of variable-length slice. RecordSet + // RecordCopy records a request to copy data from one place to another. + RecordCopy + // RecordImmediateDeallocation records node immediate deallocation. RecordImmediateDeallocation @@ -155,11 +158,11 @@ func Reserve[T comparable]( recorder *Recorder, tx *pipeline.TransactionRequest, dstNodeAddress qtypes.NodeAddress, - srcPointer *T, + dstPointer *T, ) (*T, error) { var t T - offset := uintptr(dstNodeAddress*qtypes.NodeLength) + uintptr(unsafe.Pointer(srcPointer))%qtypes.NodeLength + offset := uintptr(dstNodeAddress*qtypes.NodeLength) + uintptr(unsafe.Pointer(dstPointer))%qtypes.NodeLength switch size := unsafe.Sizeof(t); size { case 1: @@ -305,6 +308,33 @@ func Set4[T1, T2, T3, T4 comparable]( return nil } +// Copy requests to copy data from one place to another. +func Copy[T comparable]( + recorder *Recorder, tx *pipeline.TransactionRequest, dstNodeAddress, srcNodeAddress qtypes.NodeAddress, + dst, src *T, +) error { + var t T + + size := uint16(unsafe.Sizeof(t)) + + offset := uintptr(unsafe.Pointer(dst)) % qtypes.NodeLength + offsetDst := uintptr(dstNodeAddress*qtypes.NodeLength) + offset + offsetSrc := uintptr(srcNodeAddress*qtypes.NodeLength) + offset + + p, err := recorder.insert(tx, RecordCopy, 18) + if err != nil { + return err + } + + *(*uintptr)(p) = offsetDst + *(*uintptr)(unsafe.Add(p, 8)) = offsetSrc + *(*uint16)(unsafe.Add(p, 16)) = size + + *dst = *src + + return nil +} + // Deallocate deallocate records node deallocation. // FIXME (wojciech): nodeSnapshotID is needed only in case of delayed deallocation. func Deallocate(