Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix pointer node hash computation #309

Merged
merged 1 commit into from
Dec 16, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 53 additions & 59 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,8 @@ func (db *DB) Run(ctx context.Context) error {
dataHashReaders = append(dataHashReaders, nextReader)
prevHashReader = nextReader
}
pointerHashReaders := make([]*pipeline.Reader, 0, 2)
for range cap(pointerHashReaders) {
nextReader := pipeline.NewReader(prevHashReader)
pointerHashReaders = append(pointerHashReaders, nextReader)
prevHashReader = nextReader
}
storeNodesReader := pipeline.NewReader(prevHashReader)
pointerHashReader := pipeline.NewReader(prevHashReader)
storeNodesReader := pipeline.NewReader(pointerHashReader)

spawn("supervisor", parallel.Exit, func(ctx context.Context) error {
var lastCommitCh chan<- error
Expand Down Expand Up @@ -182,11 +177,9 @@ func (db *DB) Run(ctx context.Context) error {
return db.updateDataHashes(ctx, reader, uint64(i))
})
}
for i, reader := range pointerHashReaders {
spawn(fmt.Sprintf("pointerhash-%02d", i), parallel.Fail, func(ctx context.Context) error {
return db.updatePointerHashes(ctx, reader, uint64(i))
})
}
spawn("pointerhash", parallel.Fail, func(ctx context.Context) error {
return db.updatePointerHashes(ctx, pointerHashReader)
})
spawn("storeNodes", parallel.Fail, func(ctx context.Context) error {
return db.storeNodes(ctx, storeNodesReader)
})
Expand Down Expand Up @@ -681,12 +674,9 @@ func (r *reader) Acknowledge(count uint64, req *pipeline.TransactionRequest, com
r.pipeReader.Acknowledge(count, req)
}

// FIXME (wojciech): Fix data races:
// - pointer present later in a pipeline must be included in later slot.
func (db *DB) updatePointerHashes(
ctx context.Context,
pipeReader *pipeline.Reader,
mod uint64,
) error {
r := &reader{
pipeReader: pipeReader,
Expand All @@ -702,74 +692,78 @@ func (db *DB) updatePointerHashes(
var hashes [16]*byte
hashesP := &hashes[0]

n2 := 0
for {
var mask uint16
n := n2
n2 = 0
for _, req := range slots[:n] {
if req.PointerIndex == 0 {
continue
}

minReq := request{
Count: math.MaxUint64,
}
req.PointerIndex--
if req.StoreRequest.Store[req.PointerIndex].Pointer.Revision != uintptr(unsafe.Pointer(req.StoreRequest)) {
// Pointers are processed from the data node up t the root node. If at any level
// revision test fails, it doesn't make sense to process parent nodes because revision
// test will fail there for sure.
continue
}

var nilSlots int
slots[n2] = req
n2++
}

riLoop:
for ri := range slots {
req := slots[ri]

var volatileAddress types.VolatileAddress
var hash *types.Hash
for ri := n2; ri < len(slots); ri++ {
for {
if req.PointerIndex == 0 {
var err error
req, err = r.Read(ctx)
if err != nil {
return err
}

if req.TxRequest.Type == pipeline.Commit {
commitReq = req
}

if req.StoreRequest == nil {
nilSlots++
slots[ri] = req
continue riLoop
}
req, err := r.Read(ctx)
if err != nil {
return err
}

req.PointerIndex--
volatileAddress = req.StoreRequest.Store[req.PointerIndex].VolatileAddress
if req.TxRequest.Type == pipeline.Commit {
commitReq = req
}

if uint64(volatileAddress)&1 != mod {
continue
if req.StoreRequest == nil {
break riLoop
}

pointer := req.StoreRequest.Store[req.PointerIndex].Pointer
if pointer.Revision != uintptr(unsafe.Pointer(req.StoreRequest)) {
// Pointers are processed from the data node up t the root node. If at any level
req.PointerIndex--
if req.StoreRequest.Store[req.PointerIndex].Pointer.Revision != uintptr(unsafe.Pointer(req.StoreRequest)) {
// Pointers are processed from the data node up to the root node. If at any level
// revision test fails, it doesn't make sense to process parent nodes because revision
// test will fail there for sure.
req.PointerIndex = 0
continue
}

if req.Count < minReq.Count {
minReq = req
}
hash = req.StoreRequest.Store[req.PointerIndex].Hash
slots[n2] = req
n2++

break
}

slots[ri] = req
mask |= 1 << ri
matrix[ri] = (*byte)(db.config.State.Node(volatileAddress))
hashes[ri] = &hash[0]
}

if nilSlots == len(slots) {
if n2 == 0 {
r.Acknowledge(commitReq.Count, commitReq.TxRequest, true)
commitReq = request{}
} else {
var mask uint16

minReq := request{
Count: math.MaxUint64,
}

for ri, req := range slots[:n2] {
if req.Count < minReq.Count {
minReq = req
}

mask |= 1 << ri
matrix[ri] = (*byte)(db.config.State.Node(req.StoreRequest.Store[req.PointerIndex].VolatileAddress))
hashes[ri] = &req.StoreRequest.Store[req.PointerIndex].Hash[0]
}

hash.Blake32048(matrixP, hashesP, mask)
r.Acknowledge(minReq.Count-1, minReq.TxRequest, false)
}
Expand Down
Loading