diff --git a/db.go b/db.go index 903aea5..a7f7251 100644 --- a/db.go +++ b/db.go @@ -136,13 +136,8 @@ func (db *DB) Run(ctx context.Context) error { dataHashReaders = append(dataHashReaders, nextReader) prevHashReader = nextReader } - pointerHashReaders := make([]*pipeline.Reader, 0, 2) - for range cap(pointerHashReaders) { - nextReader := pipeline.NewReader(prevHashReader) - pointerHashReaders = append(pointerHashReaders, nextReader) - prevHashReader = nextReader - } - storeNodesReader := pipeline.NewReader(prevHashReader) + pointerHashReader := pipeline.NewReader(prevHashReader) + storeNodesReader := pipeline.NewReader(pointerHashReader) spawn("supervisor", parallel.Exit, func(ctx context.Context) error { var lastCommitCh chan<- error @@ -182,11 +177,9 @@ func (db *DB) Run(ctx context.Context) error { return db.updateDataHashes(ctx, reader, uint64(i)) }) } - for i, reader := range pointerHashReaders { - spawn(fmt.Sprintf("pointerhash-%02d", i), parallel.Fail, func(ctx context.Context) error { - return db.updatePointerHashes(ctx, reader, uint64(i)) - }) - } + spawn("pointerhash", parallel.Fail, func(ctx context.Context) error { + return db.updatePointerHashes(ctx, pointerHashReader) + }) spawn("storeNodes", parallel.Fail, func(ctx context.Context) error { return db.storeNodes(ctx, storeNodesReader) }) @@ -681,12 +674,9 @@ func (r *reader) Acknowledge(count uint64, req *pipeline.TransactionRequest, com r.pipeReader.Acknowledge(count, req) } -// FIXME (wojciech): Fix data races: -// - pointer present later in a pipeline must be included in later slot. func (db *DB) updatePointerHashes( ctx context.Context, pipeReader *pipeline.Reader, - mod uint64, ) error { r := &reader{ pipeReader: pipeReader, @@ -702,74 +692,78 @@ func (db *DB) updatePointerHashes( var hashes [16]*byte hashesP := &hashes[0] + n2 := 0 for { - var mask uint16 + n := n2 + n2 = 0 + for _, req := range slots[:n] { + if req.PointerIndex == 0 { + continue + } - minReq := request{ - Count: math.MaxUint64, - } + req.PointerIndex-- + if req.StoreRequest.Store[req.PointerIndex].Pointer.Revision != uintptr(unsafe.Pointer(req.StoreRequest)) { + // Pointers are processed from the data node up t the root node. If at any level + // revision test fails, it doesn't make sense to process parent nodes because revision + // test will fail there for sure. + continue + } - var nilSlots int + slots[n2] = req + n2++ + } riLoop: - for ri := range slots { - req := slots[ri] - - var volatileAddress types.VolatileAddress - var hash *types.Hash + for ri := n2; ri < len(slots); ri++ { for { - if req.PointerIndex == 0 { - var err error - req, err = r.Read(ctx) - if err != nil { - return err - } - - if req.TxRequest.Type == pipeline.Commit { - commitReq = req - } - - if req.StoreRequest == nil { - nilSlots++ - slots[ri] = req - continue riLoop - } + req, err := r.Read(ctx) + if err != nil { + return err } - req.PointerIndex-- - volatileAddress = req.StoreRequest.Store[req.PointerIndex].VolatileAddress + if req.TxRequest.Type == pipeline.Commit { + commitReq = req + } - if uint64(volatileAddress)&1 != mod { - continue + if req.StoreRequest == nil { + break riLoop } - pointer := req.StoreRequest.Store[req.PointerIndex].Pointer - if pointer.Revision != uintptr(unsafe.Pointer(req.StoreRequest)) { - // Pointers are processed from the data node up t the root node. If at any level + req.PointerIndex-- + if req.StoreRequest.Store[req.PointerIndex].Pointer.Revision != uintptr(unsafe.Pointer(req.StoreRequest)) { + // Pointers are processed from the data node up to the root node. If at any level // revision test fails, it doesn't make sense to process parent nodes because revision // test will fail there for sure. - req.PointerIndex = 0 continue } - if req.Count < minReq.Count { - minReq = req - } - hash = req.StoreRequest.Store[req.PointerIndex].Hash + slots[n2] = req + n2++ break } - - slots[ri] = req - mask |= 1 << ri - matrix[ri] = (*byte)(db.config.State.Node(volatileAddress)) - hashes[ri] = &hash[0] } - if nilSlots == len(slots) { + if n2 == 0 { r.Acknowledge(commitReq.Count, commitReq.TxRequest, true) commitReq = request{} } else { + var mask uint16 + + minReq := request{ + Count: math.MaxUint64, + } + + for ri, req := range slots[:n2] { + if req.Count < minReq.Count { + minReq = req + } + + mask |= 1 << ri + matrix[ri] = (*byte)(db.config.State.Node(req.StoreRequest.Store[req.PointerIndex].VolatileAddress)) + hashes[ri] = &req.StoreRequest.Store[req.PointerIndex].Hash[0] + } + hash.Blake32048(matrixP, hashesP, mask) r.Acknowledge(minReq.Count-1, minReq.TxRequest, false) }