Skip to content

Commit

Permalink
Fix pointer node hash computation (#309)
Browse files Browse the repository at this point in the history
  • Loading branch information
outofforest authored Dec 16, 2024
1 parent 53b3699 commit 292dfc7
Showing 1 changed file with 53 additions and 59 deletions.
112 changes: 53 additions & 59 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,8 @@ func (db *DB) Run(ctx context.Context) error {
dataHashReaders = append(dataHashReaders, nextReader)
prevHashReader = nextReader
}
pointerHashReaders := make([]*pipeline.Reader, 0, 2)
for range cap(pointerHashReaders) {
nextReader := pipeline.NewReader(prevHashReader)
pointerHashReaders = append(pointerHashReaders, nextReader)
prevHashReader = nextReader
}
storeNodesReader := pipeline.NewReader(prevHashReader)
pointerHashReader := pipeline.NewReader(prevHashReader)
storeNodesReader := pipeline.NewReader(pointerHashReader)

spawn("supervisor", parallel.Exit, func(ctx context.Context) error {
var lastCommitCh chan<- error
Expand Down Expand Up @@ -182,11 +177,9 @@ func (db *DB) Run(ctx context.Context) error {
return db.updateDataHashes(ctx, reader, uint64(i))
})
}
for i, reader := range pointerHashReaders {
spawn(fmt.Sprintf("pointerhash-%02d", i), parallel.Fail, func(ctx context.Context) error {
return db.updatePointerHashes(ctx, reader, uint64(i))
})
}
spawn("pointerhash", parallel.Fail, func(ctx context.Context) error {
return db.updatePointerHashes(ctx, pointerHashReader)
})
spawn("storeNodes", parallel.Fail, func(ctx context.Context) error {
return db.storeNodes(ctx, storeNodesReader)
})
Expand Down Expand Up @@ -681,12 +674,9 @@ func (r *reader) Acknowledge(count uint64, req *pipeline.TransactionRequest, com
r.pipeReader.Acknowledge(count, req)
}

// FIXME (wojciech): Fix data races:
// - pointer present later in a pipeline must be included in later slot.
func (db *DB) updatePointerHashes(
ctx context.Context,
pipeReader *pipeline.Reader,
mod uint64,
) error {
r := &reader{
pipeReader: pipeReader,
Expand All @@ -702,74 +692,78 @@ func (db *DB) updatePointerHashes(
var hashes [16]*byte
hashesP := &hashes[0]

n2 := 0
for {
var mask uint16
n := n2
n2 = 0
for _, req := range slots[:n] {
if req.PointerIndex == 0 {
continue
}

minReq := request{
Count: math.MaxUint64,
}
req.PointerIndex--
if req.StoreRequest.Store[req.PointerIndex].Pointer.Revision != uintptr(unsafe.Pointer(req.StoreRequest)) {
// Pointers are processed from the data node up t the root node. If at any level
// revision test fails, it doesn't make sense to process parent nodes because revision
// test will fail there for sure.
continue
}

var nilSlots int
slots[n2] = req
n2++
}

riLoop:
for ri := range slots {
req := slots[ri]

var volatileAddress types.VolatileAddress
var hash *types.Hash
for ri := n2; ri < len(slots); ri++ {
for {
if req.PointerIndex == 0 {
var err error
req, err = r.Read(ctx)
if err != nil {
return err
}

if req.TxRequest.Type == pipeline.Commit {
commitReq = req
}

if req.StoreRequest == nil {
nilSlots++
slots[ri] = req
continue riLoop
}
req, err := r.Read(ctx)
if err != nil {
return err
}

req.PointerIndex--
volatileAddress = req.StoreRequest.Store[req.PointerIndex].VolatileAddress
if req.TxRequest.Type == pipeline.Commit {
commitReq = req
}

if uint64(volatileAddress)&1 != mod {
continue
if req.StoreRequest == nil {
break riLoop
}

pointer := req.StoreRequest.Store[req.PointerIndex].Pointer
if pointer.Revision != uintptr(unsafe.Pointer(req.StoreRequest)) {
// Pointers are processed from the data node up t the root node. If at any level
req.PointerIndex--
if req.StoreRequest.Store[req.PointerIndex].Pointer.Revision != uintptr(unsafe.Pointer(req.StoreRequest)) {
// Pointers are processed from the data node up to the root node. If at any level
// revision test fails, it doesn't make sense to process parent nodes because revision
// test will fail there for sure.
req.PointerIndex = 0
continue
}

if req.Count < minReq.Count {
minReq = req
}
hash = req.StoreRequest.Store[req.PointerIndex].Hash
slots[n2] = req
n2++

break
}

slots[ri] = req
mask |= 1 << ri
matrix[ri] = (*byte)(db.config.State.Node(volatileAddress))
hashes[ri] = &hash[0]
}

if nilSlots == len(slots) {
if n2 == 0 {
r.Acknowledge(commitReq.Count, commitReq.TxRequest, true)
commitReq = request{}
} else {
var mask uint16

minReq := request{
Count: math.MaxUint64,
}

for ri, req := range slots[:n2] {
if req.Count < minReq.Count {
minReq = req
}

mask |= 1 << ri
matrix[ri] = (*byte)(db.config.State.Node(req.StoreRequest.Store[req.PointerIndex].VolatileAddress))
hashes[ri] = &req.StoreRequest.Store[req.PointerIndex].Hash[0]
}

hash.Blake32048(matrixP, hashesP, mask)
r.Acknowledge(minReq.Count-1, minReq.TxRequest, false)
}
Expand Down

0 comments on commit 292dfc7

Please sign in to comment.