diff --git a/x/merkledb/db.go b/x/merkledb/db.go index e4406e742cb..a001494fee5 100644 --- a/x/merkledb/db.go +++ b/x/merkledb/db.go @@ -43,7 +43,7 @@ var ( Codec, Version = newCodec() - rootKey = []byte{} + rootKey []byte nodePrefix = []byte("node") metadataPrefix = []byte("metadata") cleanShutdownKey = []byte("cleanShutdown") @@ -65,7 +65,7 @@ type Config struct { Tracer trace.Tracer } -// Can only be edited by committing changes from a trieView. +// Database can only be edited by committing changes from a trieView. type Database struct { // Must be held when reading/writing fields. lock sync.RWMutex @@ -225,7 +225,7 @@ func New(ctx context.Context, db database.Database, config Config) (*Database, e return newDatabase(ctx, db, config, metrics) } -// Commits the key/value pairs within the [proof] to the db. +// CommitChangeProof commits the key/value pairs within the [proof] to the db. func (db *Database) CommitChangeProof(ctx context.Context, proof *ChangeProof) error { db.commitLock.Lock() defer db.commitLock.Unlock() @@ -241,7 +241,7 @@ func (db *Database) CommitChangeProof(ctx context.Context, proof *ChangeProof) e return view.commitToDB(ctx) } -// Commits the key/value pairs within the [proof] to the db. +// CommitRangeProof commits the key/value pairs within the [proof] to the db. // [start] is the smallest key in the range this [proof] covers. func (db *Database) CommitRangeProof(ctx context.Context, start []byte, proof *RangeProof) error { db.commitLock.Lock() @@ -354,7 +354,7 @@ func (db *Database) getValueCopy(key path, lock bool) ([]byte, error) { // getValue returns the value for the given [key]. // Returns database.ErrNotFound if it doesn't exist. // If [lock], [db.lock]'s read lock is acquired. -// Otherwise assumes [db.lock] is already held. +// Otherwise, assumes [db.lock] is already held. func (db *Database) getValue(key path, lock bool) ([]byte, error) { if lock { db.lock.RLock() @@ -375,7 +375,7 @@ func (db *Database) getValue(key path, lock bool) ([]byte, error) { return n.value.value, nil } -// Returns the ID of the root node of the merkle trie. +// GetMerkleRoot returns the ID of the root node of the merkle trie. func (db *Database) GetMerkleRoot(ctx context.Context) (ids.ID, error) { _, span := db.tracer.Start(ctx, "MerkleDB.GetMerkleRoot") defer span.End() @@ -396,7 +396,7 @@ func (db *Database) getMerkleRoot() ids.ID { return db.root.id } -// Returns a proof of the existence/non-existence of [key] in this trie. +// GetProof returns a proof of the existence/non-existence of [key] in this trie. func (db *Database) GetProof(ctx context.Context, key []byte) (*Proof, error) { db.commitLock.RLock() defer db.commitLock.RUnlock() @@ -419,7 +419,7 @@ func (db *Database) getProof(ctx context.Context, key []byte) (*Proof, error) { return view.getProof(ctx, key) } -// Returns a proof for the key/value pairs in this trie within the range +// GetRangeProof returns a proof for the key/value pairs in this trie within the range // [start, end]. func (db *Database) GetRangeProof( ctx context.Context, @@ -433,7 +433,7 @@ func (db *Database) GetRangeProof( return db.getRangeProofAtRoot(ctx, db.getMerkleRoot(), start, end, maxLength) } -// Returns a proof for the key/value pairs in this trie within the range +// GetRangeProofAtRoot returns a proof for the key/value pairs in this trie within the range // [start, end] when the root of the trie was [rootID]. func (db *Database) GetRangeProofAtRoot( ctx context.Context, @@ -470,7 +470,7 @@ func (db *Database) getRangeProofAtRoot( return historicalView.GetRangeProof(ctx, start, end, maxLength) } -// Returns a proof for a subset of the key/value changes in key range +// GetChangeProof returns a proof for a subset of the key/value changes in key range // [start, end] that occurred between [startRootID] and [endRootID]. // Returns at most [maxLength] key/value pairs. func (db *Database) GetChangeProof( @@ -577,7 +577,7 @@ func (db *Database) GetChangeProof( return result, nil } -// Returns a new view on top of this trie. +// NewView returns a new view on top of this trie. // Changes made to the view will only be reflected in the original trie if Commit is called. // Assumes [db.lock] isn't held. func (db *Database) NewView() (TrieView, error) { @@ -591,7 +591,7 @@ func (db *Database) newUntrackedView(estimatedSize int) (*trieView, error) { return newTrieView(db, db, db.root.clone(), estimatedSize) } -// Returns a new view preallocated to hold at least [estimatedSize] value changes at a time. +// NewPreallocatedView returns a new view with memory allocated to hold at least [estimatedSize] value changes at a time. // If more changes are made, additional memory will be allocated. // The returned view is added to [db.childViews]. // Assumes [db.lock] isn't held. @@ -721,7 +721,7 @@ func (db *Database) onEviction(node *node) error { return nil } -// Inserts the key/value pair into the db. +// Put upserts the key/value pair into the db. func (db *Database) Put(k, v []byte) error { return db.Insert(context.Background(), k, v) } diff --git a/x/merkledb/db_test.go b/x/merkledb/db_test.go index 940b1f55ee4..8ff11b91646 100644 --- a/x/merkledb/db_test.go +++ b/x/merkledb/db_test.go @@ -710,10 +710,8 @@ func applyOperations(t *Database, ops []*testOperation) (Trie, error) { if err := view.Remove(context.Background(), op.key); err != nil { return nil, err } - } else { - if err := view.Insert(context.Background(), op.key, op.value); err != nil { - return nil, err - } + } else if err := view.Insert(context.Background(), op.key, op.value); err != nil { + return nil, err } } return view, nil diff --git a/x/merkledb/history.go b/x/merkledb/history.go index 78433e0a5e9..b9d43894129 100644 --- a/x/merkledb/history.go +++ b/x/merkledb/history.go @@ -158,31 +158,33 @@ func (th *trieHistory) getValueChanges(startRoot, endRoot ids.ID, start, end []b // Add the changes from this commit to [combinedChanges]. for key, valueChange := range item.values { - if (len(startPath) == 0 || key.Compare(startPath) >= 0) && - (len(endPath) == 0 || key.Compare(endPath) <= 0) { - // The key is in the range [start, end]. - if existing, ok := combinedChanges.values[key]; ok { - // A change to this key already exists in [combinedChanges]. - existing.after = valueChange.after - if existing.before.hasValue == existing.after.hasValue && - bytes.Equal(existing.before.value, existing.after.value) { - // The change to this key is a no-op, so remove it from [combinedChanges]. - delete(combinedChanges.values, key) - sortedKeys.Delete(key) - } - } else { - combinedChanges.values[key] = &change[Maybe[[]byte]]{ - before: valueChange.before, - after: valueChange.after, - } - sortedKeys.ReplaceOrInsert(key) + // The key is outside the range [start, end]. + if (len(startPath) > 0 && key.Compare(startPath) < 0) || + (len(endPath) > 0 && key.Compare(endPath) > 0) { + continue + } + + // A change to this key already exists in [combinedChanges] + // so update its before value with the earlier before value + if existing, ok := combinedChanges.values[key]; ok { + existing.after = valueChange.after + if existing.before.hasValue == existing.after.hasValue && + bytes.Equal(existing.before.value, existing.after.value) { + // The change to this key is a no-op, so remove it from [combinedChanges]. + delete(combinedChanges.values, key) + sortedKeys.Delete(key) + } + } else { + combinedChanges.values[key] = &change[Maybe[[]byte]]{ + before: valueChange.before, + after: valueChange.after, } + sortedKeys.ReplaceOrInsert(key) } } - + // continue to next change list return true - }, - ) + }) // Keep only the smallest [maxLength] items in [combinedChanges.values]. for sortedKeys.Len() > maxLength { diff --git a/x/merkledb/trieview.go b/x/merkledb/trieview.go index 56ffb16e8b6..fec31a719b6 100644 --- a/x/merkledb/trieview.go +++ b/x/merkledb/trieview.go @@ -997,10 +997,8 @@ func (t *trieView) applyChangedValuesToTrie(ctx context.Context) error { if err := t.removeFromTrie(key); err != nil { return err } - } else { - if _, err := t.insertIntoTrie(key, change); err != nil { - return err - } + } else if _, err := t.insertIntoTrie(key, change); err != nil { + return err } } return nil @@ -1228,7 +1226,7 @@ func (t *trieView) insertIntoTrie( existingChildKey := key[:closestNodeKeyLength+1] + existingChildEntry.compressedPath - // the existing child's key is of length: len(closestNodekey) + 1 for the child index + len(existing child's compressed key) + // the existing child's key is of length: len(closestNodeKey) + 1 for the child index + len(existing child's compressed key) // if that length is less than or equal to the branch node's key that implies that the existing child's key matched the key to be inserted // since it matched the key to be inserted, it should have been returned by GetPathTo if len(existingChildKey) <= len(branchNode.key) { diff --git a/x/sync/client_test.go b/x/sync/client_test.go index 5fb24503368..c7a43c1b2ec 100644 --- a/x/sync/client_test.go +++ b/x/sync/client_test.go @@ -119,7 +119,7 @@ func TestGetRangeProof(t *testing.T) { smallTrieRoot, err := smallTrieDB.GetMerkleRoot(context.Background()) require.NoError(t, err) - largeTrieKeyCount := 10_000 + largeTrieKeyCount := 3 * defaultRequestKeyLimit largeTrieDB, largeTrieKeys, err := generateTrieWithMinKeyLen(t, r, largeTrieKeyCount, 1) require.NoError(t, err) largeTrieRoot, err := largeTrieDB.GetMerkleRoot(context.Background()) @@ -232,7 +232,7 @@ func TestGetRangeProof(t *testing.T) { modifyResponse: func(response *merkledb.RangeProof) { response.KeyValues = response.KeyValues[:len(response.KeyValues)-2] }, - expectedErr: merkledb.ErrInvalidProof, + expectedErr: merkledb.ErrProofNodeNotForKey, }, "removed key from middle of response": { db: largeTrieDB, diff --git a/x/sync/sync_test.go b/x/sync/sync_test.go index 2d2fb33bac3..74db943a4a8 100644 --- a/x/sync/sync_test.go +++ b/x/sync/sync_test.go @@ -548,8 +548,8 @@ func TestFindNextKeyRandom(t *testing.T) { require.NoError(err) var ( - numProofsToTest = 500 - numKeyValues = 500 + numProofsToTest = 250 + numKeyValues = 250 maxKeyLen = 256 maxValLen = 256 maxRangeStartLen = 8 @@ -737,7 +737,7 @@ func TestFindNextKeyRandom(t *testing.T) { func Test_Sync_Result_Correct_Root(t *testing.T) { for i := 0; i < 3; i++ { r := rand.New(rand.NewSource(int64(i))) // #nosec G404 - dbToSync, err := generateTrie(t, r, 5000) + dbToSync, err := generateTrie(t, r, 1000) require.NoError(t, err) syncRoot, err := dbToSync.GetMerkleRoot(context.Background()) require.NoError(t, err) @@ -774,57 +774,32 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { require.Equal(t, syncRoot, newRoot) // make sure they stay in sync - for x := 0; x < 50; x++ { - addkey := make([]byte, r.Intn(50)) - _, err = r.Read(addkey) - require.NoError(t, err) - val := make([]byte, r.Intn(50)) - _, err = r.Read(val) - require.NoError(t, err) - - err = db.Put(addkey, val) - require.NoError(t, err) - - err = dbToSync.Put(addkey, val) - require.NoError(t, err) - - addNilkey := make([]byte, r.Intn(50)) - _, err = r.Read(addNilkey) - require.NoError(t, err) - err = db.Put(addNilkey, nil) - require.NoError(t, err) - - err = dbToSync.Put(addNilkey, nil) - require.NoError(t, err) + addkey := make([]byte, r.Intn(50)) + _, err = r.Read(addkey) + require.NoError(t, err) + val := make([]byte, r.Intn(50)) + _, err = r.Read(val) + require.NoError(t, err) - deleteKeyStart := make([]byte, r.Intn(50)) - _, err = r.Read(deleteKeyStart) - require.NoError(t, err) + err = db.Put(addkey, val) + require.NoError(t, err) - it := dbToSync.NewIteratorWithStart(deleteKeyStart) - if it.Next() { - err = dbToSync.Delete(it.Key()) - require.NoError(t, err) - err = db.Delete(it.Key()) - require.NoError(t, err) - } - require.NoError(t, it.Error()) - it.Release() + err = dbToSync.Put(addkey, val) + require.NoError(t, err) - syncRoot, err = dbToSync.GetMerkleRoot(context.Background()) - require.NoError(t, err) + syncRoot, err = dbToSync.GetMerkleRoot(context.Background()) + require.NoError(t, err) - newRoot, err = db.GetMerkleRoot(context.Background()) - require.NoError(t, err) - require.Equal(t, syncRoot, newRoot) - } + newRoot, err = db.GetMerkleRoot(context.Background()) + require.NoError(t, err) + require.Equal(t, syncRoot, newRoot) } } func Test_Sync_Result_Correct_Root_With_Sync_Restart(t *testing.T) { - for i := 0; i < 5; i++ { + for i := 0; i < 3; i++ { r := rand.New(rand.NewSource(int64(i))) // #nosec G404 - dbToSync, err := generateTrie(t, r, 5000) + dbToSync, err := generateTrie(t, r, 3*maxKeyValuesLimit) require.NoError(t, err) syncRoot, err := dbToSync.GetMerkleRoot(context.Background()) require.NoError(t, err) @@ -876,11 +851,11 @@ func Test_Sync_Result_Correct_Root_With_Sync_Restart(t *testing.T) { }) require.NoError(t, err) require.NotNil(t, newSyncer) - err = newSyncer.StartSyncing(context.Background()) - require.NoError(t, err) + + require.NoError(t, newSyncer.StartSyncing(context.Background())) require.NoError(t, newSyncer.Error()) - err = newSyncer.Wait(context.Background()) - require.NoError(t, err) + require.NoError(t, newSyncer.Wait(context.Background())) + newRoot, err := db.GetMerkleRoot(context.Background()) require.NoError(t, err) require.Equal(t, syncRoot, newRoot) @@ -948,13 +923,41 @@ func Test_Sync_Result_Correct_Root_Update_Root_During(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - for i := 0; i < 4; i++ { + for i := 0; i < 3; i++ { r := rand.New(rand.NewSource(int64(i))) // #nosec G404 - dbToSync, err := generateTrie(t, r, 10000) + dbToSync, err := generateTrie(t, r, 3*maxKeyValuesLimit) require.NoError(err) - syncRoot, err := dbToSync.GetMerkleRoot(context.Background()) + firstSyncRoot, err := dbToSync.GetMerkleRoot(context.Background()) + require.NoError(err) + + for x := 0; x < 100; x++ { + key := make([]byte, r.Intn(50)) + _, err = r.Read(key) + require.NoError(err) + + val := make([]byte, r.Intn(50)) + _, err = r.Read(val) + require.NoError(err) + + err = dbToSync.Put(key, val) + require.NoError(err) + + deleteKeyStart := make([]byte, r.Intn(50)) + _, err = r.Read(deleteKeyStart) + require.NoError(err) + + it := dbToSync.NewIteratorWithStart(deleteKeyStart) + if it.Next() { + err = dbToSync.Delete(it.Key()) + require.NoError(err) + } + require.NoError(it.Error()) + it.Release() + } + + secondSyncRoot, err := dbToSync.GetMerkleRoot(context.Background()) require.NoError(err) db, err := merkledb.New( @@ -971,6 +974,7 @@ func Test_Sync_Result_Correct_Root_Update_Root_During(t *testing.T) { // Only let one response go through until we update the root. updatedRootChan := make(chan struct{}, 1) updatedRootChan <- struct{}{} + client := NewMockClient(ctrl) client.EXPECT().GetRangeProof(gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, request *syncpb.RangeProofRequest) (*merkledb.RangeProof, error) { @@ -994,39 +998,12 @@ func Test_Sync_Result_Correct_Root_Update_Root_During(t *testing.T) { syncer, err := NewStateSyncManager(StateSyncConfig{ SyncDB: db, Client: client, - TargetRoot: syncRoot, + TargetRoot: firstSyncRoot, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, }) require.NoError(err) require.NotNil(t, syncer) - for x := 0; x < 50; x++ { - key := make([]byte, r.Intn(50)) - _, err = r.Read(key) - require.NoError(err) - - val := make([]byte, r.Intn(50)) - _, err = r.Read(val) - require.NoError(err) - - err = dbToSync.Put(key, val) - require.NoError(err) - - deleteKeyStart := make([]byte, r.Intn(50)) - _, err = r.Read(deleteKeyStart) - require.NoError(err) - - it := dbToSync.NewIteratorWithStart(deleteKeyStart) - if it.Next() { - err = dbToSync.Delete(it.Key()) - require.NoError(err) - } - require.NoError(it.Error()) - it.Release() - } - - syncRoot, err = dbToSync.GetMerkleRoot(context.Background()) - require.NoError(err) err = syncer.StartSyncing(context.Background()) require.NoError(err) @@ -1040,20 +1017,18 @@ func Test_Sync_Result_Correct_Root_Update_Root_During(t *testing.T) { return syncer.processedWork.Len() > 0 }, - 3*time.Second, + 5*time.Second, 10*time.Millisecond, ) - err = syncer.UpdateSyncTarget(syncRoot) - require.NoError(err) + require.NoError(syncer.UpdateSyncTarget(secondSyncRoot)) close(updatedRootChan) - err = syncer.Wait(context.Background()) - require.NoError(err) + require.NoError(syncer.Wait(context.Background())) require.NoError(syncer.Error()) newRoot, err := db.GetMerkleRoot(context.Background()) require.NoError(err) - require.Equal(syncRoot, newRoot) + require.Equal(secondSyncRoot, newRoot) } } diff --git a/x/sync/syncmanager.go b/x/sync/syncmanager.go index 6077a35b4f7..d661a3e11b3 100644 --- a/x/sync/syncmanager.go +++ b/x/sync/syncmanager.go @@ -155,8 +155,8 @@ func (m *StateSyncManager) StartSyncing(ctx context.Context) error { return nil } -// Repeatedly awaits signal on [m.unprocessedWorkCond] that there -// is work to do or we're done, and dispatches a goroutine to do +// sync awaits signal on [m.unprocessedWorkCond], which indicates that there +// is work to do or syncing completes. If there is work, sync will dispatch a goroutine to do // the work. func (m *StateSyncManager) sync(ctx context.Context) { defer func() { @@ -259,7 +259,7 @@ func (m *StateSyncManager) getAndApplyChangeProof(ctx context.Context, workItem return } - changeproof, err := m.config.Client.GetChangeProof( + changeProof, err := m.config.Client.GetChangeProof( ctx, &syncpb.ChangeProofRequest{ StartRoot: workItem.LocalRootID[:], @@ -286,7 +286,7 @@ func (m *StateSyncManager) getAndApplyChangeProof(ctx context.Context, workItem // The start or end root IDs are not present in other nodes' history. // Add this range as a fresh uncompleted work item to the work heap. // TODO danlaine send range proof instead of failure notification - if !changeproof.HadRootsInHistory { + if !changeProof.HadRootsInHistory { workItem.LocalRootID = ids.Empty m.enqueueWork(workItem) return @@ -294,15 +294,15 @@ func (m *StateSyncManager) getAndApplyChangeProof(ctx context.Context, workItem largestHandledKey := workItem.end // if the proof wasn't empty, apply changes to the sync DB - if len(changeproof.KeyChanges) > 0 { - if err := m.config.SyncDB.CommitChangeProof(ctx, changeproof); err != nil { + if len(changeProof.KeyChanges) > 0 { + if err := m.config.SyncDB.CommitChangeProof(ctx, changeProof); err != nil { m.setError(err) return } - largestHandledKey = changeproof.KeyChanges[len(changeproof.KeyChanges)-1].Key + largestHandledKey = changeProof.KeyChanges[len(changeProof.KeyChanges)-1].Key } - m.completeWorkItem(ctx, workItem, largestHandledKey, rootID, changeproof.EndProof) + m.completeWorkItem(ctx, workItem, largestHandledKey, rootID, changeProof.EndProof) } // Fetch and apply the range proof given by [workItem]. @@ -493,7 +493,7 @@ func (m *StateSyncManager) Error() error { return m.fatalError } -// Blocks until either: +// Wait blocks until one of the following occurs: // - sync is complete. // - sync fatally errored. // - [ctx] is canceled. @@ -698,15 +698,11 @@ func midPoint(start, end []byte) []byte { if total >= 256 { total -= 256 index := i - 1 - for index >= 0 { - if midpoint[index] != 255 { - midpoint[index]++ - break - } - + for index > 0 && midpoint[index] == 255 { midpoint[index] = 0 index-- } + midpoint[index]++ } midpoint[i] = byte(total) } diff --git a/x/sync/syncworkheap.go b/x/sync/syncworkheap.go index 7189ef7a5ef..e44afb9ad94 100644 --- a/x/sync/syncworkheap.go +++ b/x/sync/syncworkheap.go @@ -7,6 +7,8 @@ import ( "bytes" "container/heap" + "github.com/ava-labs/avalanchego/utils/math" + "github.com/google/btree" ) @@ -78,22 +80,25 @@ func (wh *syncWorkHeap) MergeInsert(item *syncWorkItem) { return } - var mergedRange *heapItem + var mergedBefore, mergedAfter *heapItem + searchItem := &heapItem{ + workItem: &syncWorkItem{ + start: item.start, + }, + } // Find the item with the greatest start range which is less than [item.start]. // Note that the iterator function will run at most once, since it always returns false. wh.sortedItems.DescendLessOrEqual( - &heapItem{ - workItem: &syncWorkItem{ - start: item.start, - }, - }, + searchItem, func(beforeItem *heapItem) bool { if item.LocalRootID == beforeItem.workItem.LocalRootID && bytes.Equal(beforeItem.workItem.end, item.start) { // [beforeItem.start, beforeItem.end] and [item.start, item.end] are // merged into [beforeItem.start, item.end] beforeItem.workItem.end = item.end - mergedRange = beforeItem + beforeItem.workItem.priority = math.Max(item.priority, beforeItem.workItem.priority) + heap.Fix(wh, beforeItem.heapIndex) + mergedBefore = beforeItem } return false }) @@ -101,42 +106,33 @@ func (wh *syncWorkHeap) MergeInsert(item *syncWorkItem) { // Find the item with the smallest start range which is greater than [item.start]. // Note that the iterator function will run at most once, since it always returns false. wh.sortedItems.AscendGreaterOrEqual( - &heapItem{ - workItem: &syncWorkItem{ - start: item.start, - }, - }, + searchItem, func(afterItem *heapItem) bool { if item.LocalRootID == afterItem.workItem.LocalRootID && bytes.Equal(afterItem.workItem.start, item.end) { - if mergedRange != nil { - // [beforeItem.start, item.end] and [afterItem.start, afterItem.end] are merged - // into [beforeItem.start, afterItem.end]. - // Modify [mergedRange] and remove [afterItem] since [mergedRange] now contains the entire - // range that was covered by [afterItem]. - wh.remove(afterItem) - mergedRange.workItem.end = afterItem.workItem.end - if afterItem.workItem.priority > mergedRange.workItem.priority { - mergedRange.workItem.priority = afterItem.workItem.priority - heap.Fix(wh, mergedRange.heapIndex) - } - } else { - // [item.start, item.end] and [afterItem.start, afterItem.end] are merged into - // [item.start, afterItem.end]. - afterItem.workItem.start = item.start - mergedRange = afterItem - } + // [item.start, item.end] and [afterItem.start, afterItem.end] are merged into + // [item.start, afterItem.end]. + afterItem.workItem.start = item.start + afterItem.workItem.priority = math.Max(item.priority, afterItem.workItem.priority) + heap.Fix(wh, afterItem.heapIndex) + mergedAfter = afterItem } return false }) - if mergedRange != nil { - // We merged [item] with at least one existing item. - if item.priority > mergedRange.workItem.priority { - mergedRange.workItem.priority = item.priority - // Priority was updated; fix position in the heap. - heap.Fix(wh, mergedRange.heapIndex) - } - } else { + // if the new item should be merged with both the item before and the item after, + // we can combine the before item with the after item + if mergedBefore != nil && mergedAfter != nil { + // combine the two ranges + mergedBefore.workItem.end = mergedAfter.workItem.end + // remove the second range since it is now covered by the first + wh.remove(mergedAfter) + // update the priority + mergedBefore.workItem.priority = math.Max(mergedBefore.workItem.priority, mergedAfter.workItem.priority) + heap.Fix(wh, mergedBefore.heapIndex) + } + + // nothing was merged, so add new item to the heap + if mergedBefore == nil && mergedAfter == nil { // We didn't merge [item] with an existing one; put it in the heap. heap.Push(wh, &heapItem{workItem: item}) }