From dec3e20e52535640a305a6cd3b3a127e93b7e151 Mon Sep 17 00:00:00 2001 From: CharlesCheung <61726649+CharlesCheung96@users.noreply.github.com> Date: Tue, 26 Apr 2022 15:20:52 +0800 Subject: [PATCH] sink(ticdc): use slice in txnsWithTheSameCommitTs to support splitting transaction (#5203) ref pingcap/tiflow#5231 --- cdc/model/mounter.go | 23 ++ cdc/sink/mysql/txn_cache.go | 22 +- cdc/sink/mysql/txn_cache_test.go | 346 ++++++++++++++++--------- cdc/sorter/memory/entry_sorter.go | 11 +- cdc/sorter/memory/entry_sorter_test.go | 20 +- cdc/sorter/unified/heap.go | 10 +- 6 files changed, 277 insertions(+), 155 deletions(-) diff --git a/cdc/model/mounter.go b/cdc/model/mounter.go index 2e3578ad58a..ab2a3d3bec4 100644 --- a/cdc/model/mounter.go +++ b/cdc/model/mounter.go @@ -48,3 +48,26 @@ func NewResolvedPolymorphicEvent(regionID uint64, resolvedTs uint64) *Polymorphi func (e *PolymorphicEvent) RegionID() uint64 { return e.RawKV.RegionID } + +// ComparePolymorphicEvents compares two events by CRTs, Resolved, StartTs, Delete/Put order. +// It returns true if and only if i should precede j. +func ComparePolymorphicEvents(i, j *PolymorphicEvent) bool { + if i.CRTs == j.CRTs { + if i.RawKV.OpType == OpTypeResolved { + return false + } else if j.RawKV.OpType == OpTypeResolved { + return true + } + + if i.StartTs > j.StartTs { + return false + } else if i.StartTs < j.StartTs { + return true + } + + if i.RawKV.OpType == OpTypeDelete && j.RawKV.OpType != OpTypeDelete { + return true + } + } + return i.CRTs < j.CRTs +} diff --git a/cdc/sink/mysql/txn_cache.go b/cdc/sink/mysql/txn_cache.go index 856e0671212..8a6cc128ddb 100644 --- a/cdc/sink/mysql/txn_cache.go +++ b/cdc/sink/mysql/txn_cache.go @@ -24,7 +24,7 @@ import ( ) type txnsWithTheSameCommitTs struct { - txns map[model.Ts]*model.SingleTableTxn + txns []*model.SingleTableTxn commitTs model.Ts } @@ -34,18 +34,22 @@ func (t *txnsWithTheSameCommitTs) Append(row *model.RowChangedEvent) { zap.Uint64("commitTs of txn", t.commitTs), zap.Any("row", row)) } - if t.txns == nil { - t.txns = make(map[model.Ts]*model.SingleTableTxn) - } - txn, exist := t.txns[row.StartTs] - if !exist { + + var txn *model.SingleTableTxn + if len(t.txns) == 0 || t.txns[len(t.txns)-1].StartTs < row.StartTs { txn = &model.SingleTableTxn{ StartTs: row.StartTs, CommitTs: row.CommitTs, Table: row.Table, ReplicaID: row.ReplicaID, } - t.txns[row.StartTs] = txn + t.txns = append(t.txns, txn) + } else if t.txns[len(t.txns)-1].StartTs == row.StartTs { + txn = t.txns[len(t.txns)-1] + } else { + log.Panic("Row changed event received by the sink module should be ordered", + zap.Any("previousTxn", t.txns[len(t.txns)-1]), + zap.Any("currentRow", row)) } txn.Append(row) } @@ -149,9 +153,7 @@ func splitResolvedTxn( } resolvedTxns := make([]*model.SingleTableTxn, 0, txnsLength) for _, txns := range resolvedTxnsWithTheSameCommitTs { - for _, txn := range txns.txns { - resolvedTxns = append(resolvedTxns, txn) - } + resolvedTxns = append(resolvedTxns, txns.txns...) } resolvedRowsMap[tableID] = resolvedTxns flushedResolvedTsMap[tableID] = resolvedTs diff --git a/cdc/sink/mysql/txn_cache_test.go b/cdc/sink/mysql/txn_cache_test.go index 97f3f8f56b7..a5ca01e5088 100644 --- a/cdc/sink/mysql/txn_cache_test.go +++ b/cdc/sink/mysql/txn_cache_test.go @@ -31,131 +31,233 @@ func TestSplitResolvedTxn(test *testing.T) { input []*model.RowChangedEvent resolvedTsMap map[model.TableID]uint64 expected map[model.TableID][]*model.SingleTableTxn - }{{{ // Testing basic transaction collocation, no txns with the same commitTs - input: []*model.RowChangedEvent{ - {StartTs: 1, CommitTs: 5, Table: &model.TableName{TableID: 1}}, - {StartTs: 1, CommitTs: 5, Table: &model.TableName{TableID: 1}}, - {StartTs: 1, CommitTs: 6, Table: &model.TableName{TableID: 2}}, - {StartTs: 1, CommitTs: 7, Table: &model.TableName{TableID: 3}}, - {StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}}, - {StartTs: 1, CommitTs: 11, Table: &model.TableName{TableID: 1}}, - {StartTs: 1, CommitTs: 12, Table: &model.TableName{TableID: 2}}, + }{ + { // Testing basic transaction collocation, no txns with the same commitTs + { + input: []*model.RowChangedEvent{ + {StartTs: 1, CommitTs: 5, Table: &model.TableName{TableID: 1}}, + {StartTs: 1, CommitTs: 5, Table: &model.TableName{TableID: 1}}, + {StartTs: 1, CommitTs: 6, Table: &model.TableName{TableID: 2}}, + {StartTs: 1, CommitTs: 7, Table: &model.TableName{TableID: 3}}, + {StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}}, + {StartTs: 1, CommitTs: 11, Table: &model.TableName{TableID: 1}}, + {StartTs: 1, CommitTs: 12, Table: &model.TableName{TableID: 2}}, + }, + resolvedTsMap: map[model.TableID]uint64{ + 1: uint64(6), + 2: uint64(6), + }, + expected: map[model.TableID][]*model.SingleTableTxn{ + 1: {{ + Table: &model.TableName{TableID: 1}, + StartTs: 1, + CommitTs: 5, + Rows: []*model.RowChangedEvent{ + {StartTs: 1, CommitTs: 5, Table: &model.TableName{TableID: 1}}, + {StartTs: 1, CommitTs: 5, Table: &model.TableName{TableID: 1}}, + }, + }}, + 2: {{ + Table: &model.TableName{TableID: 2}, StartTs: 1, CommitTs: 6, + Rows: []*model.RowChangedEvent{ + {StartTs: 1, CommitTs: 6, Table: &model.TableName{TableID: 2}}, + }, + }}, + }, + }, { + input: []*model.RowChangedEvent{ + {StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 3}}, + }, + resolvedTsMap: map[model.TableID]uint64{ + 1: uint64(13), + 2: uint64(13), + 3: uint64(13), + }, + expected: map[model.TableID][]*model.SingleTableTxn{ + 1: { + { + Table: &model.TableName{TableID: 1}, + StartTs: 1, + CommitTs: 8, + Rows: []*model.RowChangedEvent{ + {StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}}, + }, + }, + { + Table: &model.TableName{TableID: 1}, + StartTs: 1, + CommitTs: 11, + Rows: []*model.RowChangedEvent{ + {StartTs: 1, CommitTs: 11, Table: &model.TableName{TableID: 1}}, + }, + }, + }, + 2: { + { + Table: &model.TableName{TableID: 2}, + StartTs: 1, CommitTs: 12, + Rows: []*model.RowChangedEvent{ + {StartTs: 1, CommitTs: 12, Table: &model.TableName{TableID: 2}}, + }, + }, + }, + 3: { + { + Table: &model.TableName{TableID: 3}, + StartTs: 1, + CommitTs: 7, + Rows: []*model.RowChangedEvent{ + {StartTs: 1, CommitTs: 7, Table: &model.TableName{TableID: 3}}, + }, + }, + { + Table: &model.TableName{TableID: 3}, + StartTs: 1, + CommitTs: 8, + Rows: []*model.RowChangedEvent{ + {StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 3}}, + }, + }, + }, + }, + }, }, - resolvedTsMap: map[model.TableID]uint64{ - 1: uint64(6), - 2: uint64(6), + { // Testing the short circuit path + { + input: []*model.RowChangedEvent{}, + resolvedTsMap: map[model.TableID]uint64{ + 1: uint64(13), + 2: uint64(13), + 3: uint64(13), + }, + expected: nil, + }, + { + input: []*model.RowChangedEvent{ + {StartTs: 1, CommitTs: 11, Table: &model.TableName{TableID: 1}}, + {StartTs: 1, CommitTs: 12, Table: &model.TableName{TableID: 1}}, + {StartTs: 1, CommitTs: 13, Table: &model.TableName{TableID: 2}}, + }, + resolvedTsMap: map[model.TableID]uint64{ + 1: uint64(6), + 2: uint64(6), + }, + expected: map[model.TableID][]*model.SingleTableTxn{}, + }, }, - expected: map[model.TableID][]*model.SingleTableTxn{ - 1: {{Table: &model.TableName{TableID: 1}, StartTs: 1, CommitTs: 5, Rows: []*model.RowChangedEvent{ - {StartTs: 1, CommitTs: 5, Table: &model.TableName{TableID: 1}}, - {StartTs: 1, CommitTs: 5, Table: &model.TableName{TableID: 1}}, - }}}, - 2: {{Table: &model.TableName{TableID: 2}, StartTs: 1, CommitTs: 6, Rows: []*model.RowChangedEvent{ - {StartTs: 1, CommitTs: 6, Table: &model.TableName{TableID: 2}}, - }}}, + { // Testing the txns with the same commitTs + { + input: []*model.RowChangedEvent{ + {StartTs: 1, CommitTs: 5, Table: &model.TableName{TableID: 1}}, + {StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}}, + {StartTs: 1, CommitTs: 6, Table: &model.TableName{TableID: 2}}, + {StartTs: 1, CommitTs: 6, Table: &model.TableName{TableID: 2}}, + {StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}}, + {StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}}, + {StartTs: 2, CommitTs: 8, Table: &model.TableName{TableID: 1}}, + {StartTs: 2, CommitTs: 6, Table: &model.TableName{TableID: 2}}, + {StartTs: 1, CommitTs: 7, Table: &model.TableName{TableID: 2}}, + }, + resolvedTsMap: map[model.TableID]uint64{ + 1: uint64(6), + 2: uint64(6), + }, + expected: map[model.TableID][]*model.SingleTableTxn{ + 1: { + { + Table: &model.TableName{TableID: 1}, + StartTs: 1, + CommitTs: 5, + Rows: []*model.RowChangedEvent{ + {StartTs: 1, CommitTs: 5, Table: &model.TableName{TableID: 1}}, + }, + }, + }, + 2: { + { + Table: &model.TableName{TableID: 2}, + StartTs: 1, + CommitTs: 6, + Rows: []*model.RowChangedEvent{ + {StartTs: 1, CommitTs: 6, Table: &model.TableName{TableID: 2}}, + {StartTs: 1, CommitTs: 6, Table: &model.TableName{TableID: 2}}, + }, + }, { + Table: &model.TableName{TableID: 2}, + StartTs: 2, + CommitTs: 6, + Rows: []*model.RowChangedEvent{ + {StartTs: 2, CommitTs: 6, Table: &model.TableName{TableID: 2}}, + }, + }, + }, + }, + }, + { + input: []*model.RowChangedEvent{ + {StartTs: 1, CommitTs: 7, Table: &model.TableName{TableID: 2}}, + {StartTs: 2, CommitTs: 7, Table: &model.TableName{TableID: 2}}, + {StartTs: 2, CommitTs: 8, Table: &model.TableName{TableID: 1}}, + {StartTs: 2, CommitTs: 8, Table: &model.TableName{TableID: 1}}, + {StartTs: 1, CommitTs: 9, Table: &model.TableName{TableID: 1}}, + }, + resolvedTsMap: map[model.TableID]uint64{ + 1: uint64(13), + 2: uint64(13), + }, + expected: map[model.TableID][]*model.SingleTableTxn{ + 1: { + { + Table: &model.TableName{TableID: 1}, + StartTs: 1, + CommitTs: 8, + Rows: []*model.RowChangedEvent{ + {StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}}, + {StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}}, + {StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}}, + }, + }, + { + Table: &model.TableName{TableID: 1}, + StartTs: 2, + CommitTs: 8, + Rows: []*model.RowChangedEvent{ + {StartTs: 2, CommitTs: 8, Table: &model.TableName{TableID: 1}}, + {StartTs: 2, CommitTs: 8, Table: &model.TableName{TableID: 1}}, + {StartTs: 2, CommitTs: 8, Table: &model.TableName{TableID: 1}}, + }, + }, + { + Table: &model.TableName{TableID: 1}, + StartTs: 1, + CommitTs: 9, + Rows: []*model.RowChangedEvent{ + {StartTs: 1, CommitTs: 9, Table: &model.TableName{TableID: 1}}, + }, + }, + }, + 2: { + { + Table: &model.TableName{TableID: 2}, + StartTs: 1, + CommitTs: 7, + Rows: []*model.RowChangedEvent{ + {StartTs: 1, CommitTs: 7, Table: &model.TableName{TableID: 2}}, + {StartTs: 1, CommitTs: 7, Table: &model.TableName{TableID: 2}}, + }, + }, { + Table: &model.TableName{TableID: 2}, + StartTs: 2, + CommitTs: 7, + Rows: []*model.RowChangedEvent{ + {StartTs: 2, CommitTs: 7, Table: &model.TableName{TableID: 2}}, + }, + }, + }, + }, + }, }, - }, { - input: []*model.RowChangedEvent{ - {StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 3}}, - }, - resolvedTsMap: map[model.TableID]uint64{ - 1: uint64(13), - 2: uint64(13), - 3: uint64(13), - }, - expected: map[model.TableID][]*model.SingleTableTxn{ - 1: {{Table: &model.TableName{TableID: 1}, StartTs: 1, CommitTs: 8, Rows: []*model.RowChangedEvent{ - {StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}}, - }}, {Table: &model.TableName{TableID: 1}, StartTs: 1, CommitTs: 11, Rows: []*model.RowChangedEvent{ - {StartTs: 1, CommitTs: 11, Table: &model.TableName{TableID: 1}}, - }}}, - 2: {{Table: &model.TableName{TableID: 2}, StartTs: 1, CommitTs: 12, Rows: []*model.RowChangedEvent{ - {StartTs: 1, CommitTs: 12, Table: &model.TableName{TableID: 2}}, - }}}, - 3: {{Table: &model.TableName{TableID: 3}, StartTs: 1, CommitTs: 7, Rows: []*model.RowChangedEvent{ - {StartTs: 1, CommitTs: 7, Table: &model.TableName{TableID: 3}}, - }}, {Table: &model.TableName{TableID: 3}, StartTs: 1, CommitTs: 8, Rows: []*model.RowChangedEvent{ - {StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 3}}, - }}}, - }, - }}, {{ // Testing the short circuit path - input: []*model.RowChangedEvent{}, - resolvedTsMap: map[model.TableID]uint64{ - 1: uint64(13), - 2: uint64(13), - 3: uint64(13), - }, - expected: nil, - }, { - input: []*model.RowChangedEvent{ - {StartTs: 1, CommitTs: 11, Table: &model.TableName{TableID: 1}}, - {StartTs: 1, CommitTs: 12, Table: &model.TableName{TableID: 1}}, - {StartTs: 1, CommitTs: 13, Table: &model.TableName{TableID: 2}}, - }, - resolvedTsMap: map[model.TableID]uint64{ - 1: uint64(6), - 2: uint64(6), - }, - expected: map[model.TableID][]*model.SingleTableTxn{}, - }}, {{ // Testing the txns with the same commitTs - input: []*model.RowChangedEvent{ - {StartTs: 1, CommitTs: 5, Table: &model.TableName{TableID: 1}}, - {StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}}, - {StartTs: 1, CommitTs: 6, Table: &model.TableName{TableID: 2}}, - {StartTs: 2, CommitTs: 6, Table: &model.TableName{TableID: 2}}, - {StartTs: 2, CommitTs: 8, Table: &model.TableName{TableID: 1}}, - {StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}}, - {StartTs: 2, CommitTs: 8, Table: &model.TableName{TableID: 1}}, - {StartTs: 1, CommitTs: 6, Table: &model.TableName{TableID: 2}}, - {StartTs: 1, CommitTs: 7, Table: &model.TableName{TableID: 2}}, - }, - resolvedTsMap: map[model.TableID]uint64{ - 1: uint64(6), - 2: uint64(6), - }, - expected: map[model.TableID][]*model.SingleTableTxn{ - 1: {{Table: &model.TableName{TableID: 1}, StartTs: 1, CommitTs: 5, Rows: []*model.RowChangedEvent{ - {StartTs: 1, CommitTs: 5, Table: &model.TableName{TableID: 1}}, - }}}, - 2: {{Table: &model.TableName{TableID: 2}, StartTs: 1, CommitTs: 6, Rows: []*model.RowChangedEvent{ - {StartTs: 1, CommitTs: 6, Table: &model.TableName{TableID: 2}}, - {StartTs: 1, CommitTs: 6, Table: &model.TableName{TableID: 2}}, - }}, {Table: &model.TableName{TableID: 2}, StartTs: 2, CommitTs: 6, Rows: []*model.RowChangedEvent{ - {StartTs: 2, CommitTs: 6, Table: &model.TableName{TableID: 2}}, - }}}, - }, - }, { - input: []*model.RowChangedEvent{ - {StartTs: 2, CommitTs: 7, Table: &model.TableName{TableID: 2}}, - {StartTs: 1, CommitTs: 7, Table: &model.TableName{TableID: 2}}, - {StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}}, - {StartTs: 2, CommitTs: 8, Table: &model.TableName{TableID: 1}}, - {StartTs: 1, CommitTs: 9, Table: &model.TableName{TableID: 1}}, - }, - resolvedTsMap: map[model.TableID]uint64{ - 1: uint64(13), - 2: uint64(13), - }, - expected: map[model.TableID][]*model.SingleTableTxn{ - 1: {{Table: &model.TableName{TableID: 1}, StartTs: 1, CommitTs: 8, Rows: []*model.RowChangedEvent{ - {StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}}, - {StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}}, - {StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}}, - }}, {Table: &model.TableName{TableID: 1}, StartTs: 2, CommitTs: 8, Rows: []*model.RowChangedEvent{ - {StartTs: 2, CommitTs: 8, Table: &model.TableName{TableID: 1}}, - {StartTs: 2, CommitTs: 8, Table: &model.TableName{TableID: 1}}, - {StartTs: 2, CommitTs: 8, Table: &model.TableName{TableID: 1}}, - }}, {Table: &model.TableName{TableID: 1}, StartTs: 1, CommitTs: 9, Rows: []*model.RowChangedEvent{ - {StartTs: 1, CommitTs: 9, Table: &model.TableName{TableID: 1}}, - }}}, - 2: {{Table: &model.TableName{TableID: 2}, StartTs: 1, CommitTs: 7, Rows: []*model.RowChangedEvent{ - {StartTs: 1, CommitTs: 7, Table: &model.TableName{TableID: 2}}, - {StartTs: 1, CommitTs: 7, Table: &model.TableName{TableID: 2}}, - }}, {Table: &model.TableName{TableID: 2}, StartTs: 2, CommitTs: 7, Rows: []*model.RowChangedEvent{ - {StartTs: 2, CommitTs: 7, Table: &model.TableName{TableID: 2}}, - }}}, - }, - }}} + } for _, tc := range testCases { cache := newUnresolvedTxnCache() for _, t := range tc { diff --git a/cdc/sorter/memory/entry_sorter.go b/cdc/sorter/memory/entry_sorter.go index 36a589ff901..2561a9d8e23 100644 --- a/cdc/sorter/memory/entry_sorter.go +++ b/cdc/sorter/memory/entry_sorter.go @@ -159,16 +159,7 @@ func (es *EntrySorter) Output() <-chan *model.PolymorphicEvent { } func eventLess(i *model.PolymorphicEvent, j *model.PolymorphicEvent) bool { - if i.CRTs == j.CRTs { - if i.RawKV.OpType == model.OpTypeDelete { - return true - } - - if j.RawKV.OpType == model.OpTypeResolved { - return true - } - } - return i.CRTs < j.CRTs + return model.ComparePolymorphicEvents(i, j) } func mergeEvents(kvsA []*model.PolymorphicEvent, kvsB []*model.PolymorphicEvent, output func(*model.PolymorphicEvent)) { diff --git a/cdc/sorter/memory/entry_sorter_test.go b/cdc/sorter/memory/entry_sorter_test.go index 58d5ecbad13..90b95f99e8a 100644 --- a/cdc/sorter/memory/entry_sorter_test.go +++ b/cdc/sorter/memory/entry_sorter_test.go @@ -289,20 +289,29 @@ func TestEntrySorterRandomly(t *testing.T) { func TestEventLess(t *testing.T) { t.Parallel() testCases := []struct { + order int i *model.PolymorphicEvent j *model.PolymorphicEvent expected bool }{ { + 0, &model.PolymorphicEvent{ CRTs: 1, + RawKV: &model.RawKVEntry{ + OpType: model.OpTypePut, + }, }, &model.PolymorphicEvent{ CRTs: 2, + RawKV: &model.RawKVEntry{ + OpType: model.OpTypePut, + }, }, true, }, { + 1, &model.PolymorphicEvent{ CRTs: 2, RawKV: &model.RawKVEntry{ @@ -315,9 +324,10 @@ func TestEventLess(t *testing.T) { OpType: model.OpTypeDelete, }, }, - true, + false, }, { + 2, &model.PolymorphicEvent{ CRTs: 2, RawKV: &model.RawKVEntry{ @@ -330,9 +340,10 @@ func TestEventLess(t *testing.T) { OpType: model.OpTypeResolved, }, }, - true, + false, }, { + 3, &model.PolymorphicEvent{ CRTs: 2, RawKV: &model.RawKVEntry{ @@ -348,6 +359,7 @@ func TestEventLess(t *testing.T) { false, }, { + 4, &model.PolymorphicEvent{ CRTs: 3, RawKV: &model.RawKVEntry{ @@ -364,8 +376,8 @@ func TestEventLess(t *testing.T) { }, } - for _, tc := range testCases { - require.Equal(t, tc.expected, eventLess(tc.i, tc.j)) + for i, tc := range testCases { + require.Equal(t, tc.expected, eventLess(tc.i, tc.j), "case %d", i) } } diff --git a/cdc/sorter/unified/heap.go b/cdc/sorter/unified/heap.go index a8e1baec64f..9145556c740 100644 --- a/cdc/sorter/unified/heap.go +++ b/cdc/sorter/unified/heap.go @@ -24,15 +24,7 @@ type sortHeap []*sortItem func (h sortHeap) Len() int { return len(h) } func (h sortHeap) Less(i, j int) bool { - if h[i].entry.CRTs == h[j].entry.CRTs { - if h[j].entry.RawKV.OpType == model.OpTypeResolved && h[i].entry.RawKV.OpType != model.OpTypeResolved { - return true - } - if h[i].entry.RawKV.OpType == model.OpTypeDelete && h[j].entry.RawKV.OpType != model.OpTypeDelete { - return true - } - } - return h[i].entry.CRTs < h[j].entry.CRTs + return model.ComparePolymorphicEvents(h[i].entry, h[j].entry) } func (h sortHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } func (h *sortHeap) Push(x interface{}) {