diff --git a/mvcc/index.go b/mvcc/index.go index 397098a7ba75..7227f5977dd5 100644 --- a/mvcc/index.go +++ b/mvcc/index.go @@ -29,7 +29,9 @@ type index interface { RangeSince(key, end []byte, rev int64) []revision Compact(rev int64) map[revision]struct{} Equal(b index) bool + Insert(ki *keyIndex) + KeyIndex(ki *keyIndex) *keyIndex } type treeIndex struct { @@ -60,16 +62,19 @@ func (ti *treeIndex) Put(key []byte, rev revision) { func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, ver int64, err error) { keyi := &keyIndex{key: key} + if keyi = ti.KeyIndex(keyi); keyi == nil { + return revision{}, revision{}, 0, ErrRevisionNotFound + } + return keyi.get(atRev) +} +func (ti *treeIndex) KeyIndex(keyi *keyIndex) *keyIndex { ti.RLock() defer ti.RUnlock() - item := ti.tree.Get(keyi) - if item == nil { - return revision{}, revision{}, 0, ErrRevisionNotFound + if item := ti.tree.Get(keyi); item != nil { + return item.(*keyIndex) } - - keyi = item.(*keyIndex) - return keyi.get(atRev) + return nil } func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []revision) { diff --git a/mvcc/kvstore.go b/mvcc/kvstore.go index 9e75abc37ae6..03322fec8c3c 100644 --- a/mvcc/kvstore.go +++ b/mvcc/kvstore.go @@ -275,23 +275,15 @@ func (s *store) restore() error { } // index keys concurrently as they're loaded in from tx - unorderedc, donec := make(chan map[string]*keyIndex), make(chan struct{}) - go func() { - defer close(donec) - for unordered := range unorderedc { - // restore the tree index from the unordered index. - for _, v := range unordered { - s.kvindex.Insert(v) - } - } - }() + rkvc, revc := restoreIntoIndex(s.kvindex) for { keys, vals := tx.UnsafeRange(keyBucketName, min, max, int64(restoreChunkKeys)) if len(keys) == 0 { break } - // unbuffered so keys don't pile up in memory - unorderedc <- s.restoreChunk(keys, vals, keyToLease) + // rkvc blocks if the total pending keys exceeds the restore + // chunk size to keep keys from consuming too much memory. + restoreChunk(rkvc, keys, vals, keyToLease) if len(keys) < restoreChunkKeys { // partial set implies final set break @@ -301,8 +293,8 @@ func (s *store) restore() error { newMin.sub++ revToBytes(newMin, min) } - close(unorderedc) - <-donec + close(rkvc) + s.currentRev = <-revc // keys in the range [compacted revision -N, compaction] might all be deleted due to compaction. // the correct revision should be set to compaction revision in the case, not the largest revision @@ -334,38 +326,73 @@ func (s *store) restore() error { return nil } -func (s *store) restoreChunk(keys, vals [][]byte, keyToLease map[string]lease.LeaseID) map[string]*keyIndex { - // assume half of keys are overwrites - unordered := make(map[string]*keyIndex, len(keys)/2) +type revKeyValue struct { + key []byte + kv mvccpb.KeyValue + kstr string +} + +func restoreIntoIndex(idx index) (chan<- revKeyValue, <-chan int64) { + rkvc, revc := make(chan revKeyValue, restoreChunkKeys), make(chan int64, 1) + go func() { + currentRev := int64(1) + defer func() { revc <- currentRev }() + // restore the tree index from streaming the unordered index. + kiCache := make(map[string]*keyIndex, restoreChunkKeys) + for rkv := range rkvc { + ki, ok := kiCache[rkv.kstr] + // purge cache if too many keys and missing + if !ok && len(kiCache) >= restoreChunkKeys { + i := 10 + for k := range kiCache { + delete(kiCache, k) + if i--; i == 0 { + break + } + } + } + // cache miss, fetch from tree index if there + if !ok { + ki = &keyIndex{key: rkv.kv.Key} + if idxKey := idx.KeyIndex(ki); idxKey != nil { + kiCache[rkv.kstr], ki = idxKey, idxKey + ok = true + } + } + rev := bytesToRev(rkv.key) + currentRev = rev.main + if ok { + if isTombstone(rkv.key) { + ki.tombstone(rev.main, rev.sub) + continue + } + ki.put(rev.main, rev.sub) + } else if !isTombstone(rkv.key) { + ki.restore(revision{rkv.kv.CreateRevision, 0}, rev, rkv.kv.Version) + idx.Insert(ki) + kiCache[rkv.kstr] = ki + } + } + }() + return rkvc, revc +} + +func restoreChunk(kvc chan<- revKeyValue, keys, vals [][]byte, keyToLease map[string]lease.LeaseID) { for i, key := range keys { - var kv mvccpb.KeyValue - if err := kv.Unmarshal(vals[i]); err != nil { + rkv := revKeyValue{key: key} + if err := rkv.kv.Unmarshal(vals[i]); err != nil { plog.Fatalf("cannot unmarshal event: %v", err) } - rev := bytesToRev(key[:revBytesLen]) - s.currentRev = rev.main - kstr := string(kv.Key) + rkv.kstr = string(rkv.kv.Key) if isTombstone(key) { - if ki, ok := unordered[kstr]; ok { - ki.tombstone(rev.main, rev.sub) - } - delete(keyToLease, kstr) - continue - } - if ki, ok := unordered[kstr]; ok { - ki.put(rev.main, rev.sub) - } else { - ki = &keyIndex{key: kv.Key} - ki.restore(revision{kv.CreateRevision, 0}, rev, kv.Version) - unordered[kstr] = ki - } - if lid := lease.LeaseID(kv.Lease); lid != lease.NoLease { - keyToLease[kstr] = lid + delete(keyToLease, rkv.kstr) + } else if lid := lease.LeaseID(rkv.kv.Lease); lid != lease.NoLease { + keyToLease[rkv.kstr] = lid } else { - delete(keyToLease, kstr) + delete(keyToLease, rkv.kstr) } + kvc <- rkv } - return unordered } func (s *store) Close() error { diff --git a/mvcc/kvstore_test.go b/mvcc/kvstore_test.go index c98172bb7976..ab6b23639f6b 100644 --- a/mvcc/kvstore_test.go +++ b/mvcc/kvstore_test.go @@ -403,6 +403,7 @@ func TestStoreRestore(t *testing.T) { } ki := &keyIndex{key: []byte("foo"), modified: revision{5, 0}, generations: gens} wact = []testutil.Action{ + {"keyIndex", []interface{}{ki}}, {"insert", []interface{}{ki}}, } if g := fi.Action(); !reflect.DeepEqual(g, wact) { @@ -698,6 +699,11 @@ func (i *fakeIndex) Insert(ki *keyIndex) { i.Recorder.Record(testutil.Action{Name: "insert", Params: []interface{}{ki}}) } +func (i *fakeIndex) KeyIndex(ki *keyIndex) *keyIndex { + i.Recorder.Record(testutil.Action{Name: "keyIndex", Params: []interface{}{ki}}) + return nil +} + func createBytesSlice(bytesN, sliceN int) [][]byte { rs := [][]byte{} for len(rs) != sliceN {