Skip to content

Commit

Permalink
If data are available in persistent node, copy it from there instead …
Browse files Browse the repository at this point in the history
…of storing in WAL (#261)
  • Loading branch information
outofforest authored Dec 2, 2024
1 parent ef26f44 commit ffc4d4d
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 14 deletions.
33 changes: 22 additions & 11 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,8 +673,9 @@ func (db *DB) processDeallocations(ctx context.Context, pipeReader *pipeline.Rea
case wal.RecordSet32:
wrIndex += 41
case wal.RecordSet:
wrIndex += 9
wrIndex += *(*uint16)(unsafe.Pointer(&walNode[wrIndex])) + 2
wrIndex += *(*uint16)(unsafe.Pointer(&walNode[wrIndex+9])) + 11
case wal.RecordCopy:
wrIndex += 19
case wal.RecordImmediateDeallocation, wal.RecordDelayedDeallocation:
nodeSnapshotID := *(*types.SnapshotID)(unsafe.Pointer(&walNode[wrIndex+1]))
oldNodeAddress := *(*types.NodeAddress)(unsafe.Pointer(&walNode[wrIndex+9]))
Expand All @@ -685,7 +686,7 @@ func (db *DB) processDeallocations(ctx context.Context, pipeReader *pipeline.Rea
return err
}
default:
panic("unrecognized record type")
panic(fmt.Sprintf("unrecognized record type at index %d", wrIndex))
}
}
}
Expand Down Expand Up @@ -731,16 +732,17 @@ func (db *DB) copyNodes(
case wal.RecordSet32:
wrIndex += 41
case wal.RecordSet:
wrIndex += 9
wrIndex += *(*uint16)(unsafe.Pointer(&walNode[wrIndex])) + 2
wrIndex += *(*uint16)(unsafe.Pointer(&walNode[wrIndex+9])) + 11
case wal.RecordCopy:
wrIndex += 19
case wal.RecordImmediateDeallocation, wal.RecordDelayedDeallocation:
oldNodeAddress := *(*types.NodeAddress)(unsafe.Pointer(&walNode[wrIndex+9]))
newNodeAddress := *(*types.NodeAddress)(unsafe.Pointer(&walNode[wrIndex+17]))
wrIndex += 25

copy(db.config.State.Bytes(newNodeAddress), db.config.State.Bytes(oldNodeAddress))
default:
panic("unrecognized record type")
panic(fmt.Sprintf("unrecognized record type at index %d", wrIndex))
}
}
}
Expand All @@ -766,26 +768,35 @@ func (db *DB) applyWALChanges(ctx context.Context, pipeReader *pipeline.Reader)
for wrIndex < types.NodeLength && wal.RecordType(walNode[wrIndex]) != wal.RecordEnd {
switch recordType := wal.RecordType(walNode[wrIndex]); recordType {
case wal.RecordSet1:
offset := *(*uint64)(unsafe.Pointer(&walNode[wrIndex+1]))
offset := *(*uintptr)(unsafe.Pointer(&walNode[wrIndex+1]))
*(*byte)(unsafe.Add(origin, offset)) = walNode[wrIndex+9]
wrIndex += 10
case wal.RecordSet8:
offset := *(*uint64)(unsafe.Pointer(&walNode[wrIndex+1]))
offset := *(*uintptr)(unsafe.Pointer(&walNode[wrIndex+1]))
copy(unsafe.Slice((*byte)(unsafe.Add(origin, offset)), 8), walNode[wrIndex+9:])
wrIndex += 17
case wal.RecordSet32:
offset := *(*uint64)(unsafe.Pointer(&walNode[wrIndex+1]))
offset := *(*uintptr)(unsafe.Pointer(&walNode[wrIndex+1]))
copy(unsafe.Slice((*byte)(unsafe.Add(origin, offset)), 32), walNode[wrIndex+9:])
wrIndex += 41
case wal.RecordSet:
offset := *(*uint64)(unsafe.Pointer(&walNode[wrIndex+1]))
offset := *(*uintptr)(unsafe.Pointer(&walNode[wrIndex+1]))
size := *(*uint16)(unsafe.Pointer(&walNode[wrIndex+9]))
copy(unsafe.Slice((*byte)(unsafe.Add(origin, offset)), size), walNode[wrIndex+11:])
wrIndex += size + 11
case wal.RecordCopy:
offsetDst := *(*uintptr)(unsafe.Pointer(&walNode[wrIndex+1]))
offsetSrc := *(*uintptr)(unsafe.Pointer(&walNode[wrIndex+9]))
size := *(*uint16)(unsafe.Pointer(&walNode[wrIndex+17]))

copy(unsafe.Slice((*byte)(unsafe.Add(origin, offsetDst)), size),
unsafe.Slice((*byte)(unsafe.Add(origin, offsetSrc)), size))

wrIndex += 19
case wal.RecordImmediateDeallocation, wal.RecordDelayedDeallocation:
wrIndex += 25
default:
panic("unrecognized record type")
panic(fmt.Sprintf("unrecognized record type at index %d", wrIndex))
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion space/space.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,8 +600,12 @@ func (s *Space[K, V]) splitDataNode(
}

newDataNodeItem := s.config.DataNodeAssistant.Item(newDataNode, s.config.DataNodeAssistant.ItemOffset(i))
if err := wal.Set2(walRecorder, tx, newNodePersistentAddress,
if err := wal.Copy(walRecorder, tx, newNodePersistentAddress, existingNodePointer.PersistentAddress,
newDataNodeItem, item,
); err != nil {
return err
}
if err := wal.Set1(walRecorder, tx, newNodePersistentAddress,
&newKeyHashes[i], &keyHashes[i],
); err != nil {
return err
Expand Down
34 changes: 32 additions & 2 deletions wal/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ const (
// RecordSet records modification of variable-length slice.
RecordSet

// RecordCopy records a request to copy data from one place to another.
RecordCopy

// RecordImmediateDeallocation records node immediate deallocation.
RecordImmediateDeallocation

Expand Down Expand Up @@ -155,11 +158,11 @@ func Reserve[T comparable](
recorder *Recorder,
tx *pipeline.TransactionRequest,
dstNodeAddress qtypes.NodeAddress,
srcPointer *T,
dstPointer *T,
) (*T, error) {
var t T

offset := uintptr(dstNodeAddress*qtypes.NodeLength) + uintptr(unsafe.Pointer(srcPointer))%qtypes.NodeLength
offset := uintptr(dstNodeAddress*qtypes.NodeLength) + uintptr(unsafe.Pointer(dstPointer))%qtypes.NodeLength

switch size := unsafe.Sizeof(t); size {
case 1:
Expand Down Expand Up @@ -305,6 +308,33 @@ func Set4[T1, T2, T3, T4 comparable](
return nil
}

// Copy requests to copy data from one place to another.
func Copy[T comparable](
recorder *Recorder, tx *pipeline.TransactionRequest, dstNodeAddress, srcNodeAddress qtypes.NodeAddress,
dst, src *T,
) error {
var t T

size := uint16(unsafe.Sizeof(t))

offset := uintptr(unsafe.Pointer(dst)) % qtypes.NodeLength
offsetDst := uintptr(dstNodeAddress*qtypes.NodeLength) + offset
offsetSrc := uintptr(srcNodeAddress*qtypes.NodeLength) + offset

p, err := recorder.insert(tx, RecordCopy, 18)
if err != nil {
return err
}

*(*uintptr)(p) = offsetDst
*(*uintptr)(unsafe.Add(p, 8)) = offsetSrc
*(*uint16)(unsafe.Add(p, 16)) = size

*dst = *src

return nil
}

// Deallocate deallocate records node deallocation.
// FIXME (wojciech): nodeSnapshotID is needed only in case of delayed deallocation.
func Deallocate(
Expand Down

0 comments on commit ffc4d4d

Please sign in to comment.