From 546ae0827bea4be564e6e432c6eaedc5257aaf52 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Thu, 10 Jun 2021 15:34:30 +0800 Subject: [PATCH] lightning: save chunk checkpoint timely (#1080) (#1183) --- pkg/lightning/backend/backend.go | 13 +- pkg/lightning/backend/backend_test.go | 16 +-- pkg/lightning/backend/importer/importer.go | 8 +- .../backend/importer/importer_test.go | 3 +- pkg/lightning/backend/local/local.go | 97 +++++++++++--- pkg/lightning/backend/local/local_test.go | 22 +++- pkg/lightning/backend/noop/noop.go | 8 +- pkg/lightning/backend/tidb/tidb.go | 8 +- pkg/lightning/backend/tidb/tidb_test.go | 8 +- pkg/lightning/restore/restore.go | 123 ++++++++++++++---- pkg/lightning/restore/restore_test.go | 1 + pkg/mock/backend.go | 21 ++- 12 files changed, 257 insertions(+), 71 deletions(-) diff --git a/pkg/lightning/backend/backend.go b/pkg/lightning/backend/backend.go index e28d94793..77836a314 100644 --- a/pkg/lightning/backend/backend.go +++ b/pkg/lightning/backend/backend.go @@ -375,10 +375,14 @@ func (w *LocalEngineWriter) WriteRows(ctx context.Context, columnNames []string, return w.writer.AppendRows(ctx, w.tableName, columnNames, w.ts, rows) } -func (w *LocalEngineWriter) Close(ctx context.Context) error { +func (w *LocalEngineWriter) Close(ctx context.Context) (ChunkFlushStatus, error) { return w.writer.Close(ctx) } +func (w *LocalEngineWriter) IsSynced() bool { + return w.writer.IsSynced() +} + // UnsafeCloseEngine closes the engine without first opening it. // This method is "unsafe" as it does not follow the normal operation sequence // (Open -> Write -> Close -> Import). This method should only be used when one @@ -442,6 +446,10 @@ func (engine *ClosedEngine) Logger() log.Logger { return engine.logger } +type ChunkFlushStatus interface { + Flushed() bool +} + type EngineWriter interface { AppendRows( ctx context.Context, @@ -450,5 +458,6 @@ type EngineWriter interface { commitTS uint64, rows kv.Rows, ) error - Close(ctx context.Context) error + IsSynced() bool + Close(ctx context.Context) (ChunkFlushStatus, error) } diff --git a/pkg/lightning/backend/backend_test.go b/pkg/lightning/backend/backend_test.go index 48fd65d78..ed5c1a31a 100644 --- a/pkg/lightning/backend/backend_test.go +++ b/pkg/lightning/backend/backend_test.go @@ -140,7 +140,7 @@ func (s *backendSuite) TestWriteEngine(c *C) { mockWriter.EXPECT(). AppendRows(ctx, "`db`.`table`", []string{"c1", "c2"}, gomock.Any(), rows1). Return(nil) - mockWriter.EXPECT().Close(ctx).Return(nil).AnyTimes() + mockWriter.EXPECT().Close(ctx).Return(nil, nil).AnyTimes() mockWriter.EXPECT(). AppendRows(ctx, "`db`.`table`", []string{"c1", "c2"}, gomock.Any(), rows2). Return(nil) @@ -153,7 +153,7 @@ func (s *backendSuite) TestWriteEngine(c *C) { c.Assert(err, IsNil) err = writer.WriteRows(ctx, []string{"c1", "c2"}, rows2) c.Assert(err, IsNil) - err = writer.Close(ctx) + _, err = writer.Close(ctx) c.Assert(err, IsNil) } @@ -167,7 +167,7 @@ func (s *backendSuite) TestWriteToEngineWithNothing(c *C) { s.mockBackend.EXPECT().OpenEngine(ctx, &backend.EngineConfig{}, gomock.Any()).Return(nil) mockWriter.EXPECT().AppendRows(ctx, gomock.Any(), gomock.Any(), gomock.Any(), emptyRows).Return(nil) - mockWriter.EXPECT().Close(ctx).Return(nil) + mockWriter.EXPECT().Close(ctx).Return(nil, nil) s.mockBackend.EXPECT().LocalWriter(ctx, &backend.LocalWriterConfig{}, gomock.Any()).Return(mockWriter, nil) engine, err := s.backend.OpenEngine(ctx, &backend.EngineConfig{}, "`db`.`table`", 1, s.ts) @@ -176,7 +176,7 @@ func (s *backendSuite) TestWriteToEngineWithNothing(c *C) { c.Assert(err, IsNil) err = writer.WriteRows(ctx, nil, emptyRows) c.Assert(err, IsNil) - err = writer.Close(ctx) + _, err = writer.Close(ctx) c.Assert(err, IsNil) } @@ -207,7 +207,7 @@ func (s *backendSuite) TestWriteEngineFailed(c *C) { mockWriter.EXPECT(). AppendRows(ctx, gomock.Any(), gomock.Any(), gomock.Any(), rows). Return(errors.Annotate(context.Canceled, "fake unrecoverable write error")) - mockWriter.EXPECT().Close(ctx).Return(nil) + mockWriter.EXPECT().Close(ctx).Return(nil, nil) engine, err := s.backend.OpenEngine(ctx, &backend.EngineConfig{}, "`db`.`table`", 1, s.ts) c.Assert(err, IsNil) @@ -215,7 +215,7 @@ func (s *backendSuite) TestWriteEngineFailed(c *C) { c.Assert(err, IsNil) err = writer.WriteRows(ctx, nil, rows) c.Assert(err, ErrorMatches, "fake unrecoverable write error.*") - err = writer.Close(ctx) + _, err = writer.Close(ctx) c.Assert(err, IsNil) } @@ -233,7 +233,7 @@ func (s *backendSuite) TestWriteBatchSendFailedWithRetry(c *C) { mockWriter.EXPECT().AppendRows(ctx, gomock.Any(), gomock.Any(), gomock.Any(), rows). Return(errors.New("fake recoverable write batch error")). MinTimes(1) - mockWriter.EXPECT().Close(ctx).Return(nil).MinTimes(1) + mockWriter.EXPECT().Close(ctx).Return(nil, nil).MinTimes(1) engine, err := s.backend.OpenEngine(ctx, &backend.EngineConfig{}, "`db`.`table`", 1, s.ts) c.Assert(err, IsNil) @@ -241,7 +241,7 @@ func (s *backendSuite) TestWriteBatchSendFailedWithRetry(c *C) { c.Assert(err, IsNil) err = writer.WriteRows(ctx, nil, rows) c.Assert(err, ErrorMatches, ".*fake recoverable write batch error") - err = writer.Close(ctx) + _, err = writer.Close(ctx) c.Assert(err, IsNil) } diff --git a/pkg/lightning/backend/importer/importer.go b/pkg/lightning/backend/importer/importer.go index 227ed739c..8489d4256 100644 --- a/pkg/lightning/backend/importer/importer.go +++ b/pkg/lightning/backend/importer/importer.go @@ -335,10 +335,14 @@ type Writer struct { engineUUID uuid.UUID } -func (w *Writer) Close(ctx context.Context) error { - return nil +func (w *Writer) Close(ctx context.Context) (backend.ChunkFlushStatus, error) { + return nil, nil } func (w *Writer) AppendRows(ctx context.Context, tableName string, columnNames []string, ts uint64, rows kv.Rows) error { return w.importer.WriteRows(ctx, w.engineUUID, tableName, columnNames, ts, rows) } + +func (w *Writer) IsSynced() bool { + return true +} diff --git a/pkg/lightning/backend/importer/importer_test.go b/pkg/lightning/backend/importer/importer_test.go index e86df4f5d..0b4cf99de 100644 --- a/pkg/lightning/backend/importer/importer_test.go +++ b/pkg/lightning/backend/importer/importer_test.go @@ -115,8 +115,9 @@ func (s *importerSuite) TestWriteRows(c *C) { c.Assert(err, IsNil) err = writer.WriteRows(s.ctx, nil, s.kvPairs) c.Assert(err, IsNil) - err = writer.Close(s.ctx) + st, err := writer.Close(s.ctx) c.Assert(err, IsNil) + c.Assert(st, IsNil) } func (s *importerSuite) TestWriteHeadSendFailed(c *C) { diff --git a/pkg/lightning/backend/local/local.go b/pkg/lightning/backend/local/local.go index 874fc1349..2be3361d5 100644 --- a/pkg/lightning/backend/local/local.go +++ b/pkg/lightning/backend/local/local.go @@ -171,6 +171,13 @@ type File struct { sstIngester sstIngester finishedRanges syncedRanges + // sst seq lock + seqLock sync.Mutex + // seq number for incoming sst meta + nextSeq int32 + // max seq of sst metas ingested into pebble + finishedMetaSeq atomic.Int32 + config backend.LocalEngineConfig // total size of SST files waiting to be ingested @@ -332,27 +339,36 @@ func (e *File) unlock() { e.mutex.Unlock() } -type intHeap struct { - arr []int32 +type metaSeq struct { + // the sequence for this flush message, a flush call can return only if + // all the other flush will lower `flushSeq` are done + flushSeq int32 + // the max sstMeta sequence number in this flush, after the flush is done (all SSTs are ingested), + // we can save chunks will a lower meta sequence number safely. + metaSeq int32 +} + +type metaSeqHeap struct { + arr []metaSeq } -func (h *intHeap) Len() int { +func (h *metaSeqHeap) Len() int { return len(h.arr) } -func (h *intHeap) Less(i, j int) bool { - return h.arr[i] < h.arr[j] +func (h *metaSeqHeap) Less(i, j int) bool { + return h.arr[i].flushSeq < h.arr[j].flushSeq } -func (h *intHeap) Swap(i, j int) { +func (h *metaSeqHeap) Swap(i, j int) { h.arr[i], h.arr[j] = h.arr[j], h.arr[i] } -func (h *intHeap) Push(x interface{}) { - h.arr = append(h.arr, x.(int32)) +func (h *metaSeqHeap) Push(x interface{}) { + h.arr = append(h.arr, x.(metaSeq)) } -func (h *intHeap) Pop() interface{} { +func (h *metaSeqHeap) Pop() interface{} { item := h.arr[len(h.arr)-1] h.arr = h.arr[:len(h.arr)-1] return item @@ -373,7 +389,7 @@ func (e *File) ingestSSTLoop() { flushQueue := make([]flushSeq, 0) // inSyncSeqs is a heap that stores all the finished compaction tasks whose seq is bigger than `finishedSeq + 1` // this mean there are still at lease one compaction task with a lower seq unfinished. - inSyncSeqs := &intHeap{arr: make([]int32, 0)} + inSyncSeqs := &metaSeqHeap{arr: make([]metaSeq, 0)} type metaAndSeq struct { metas []*sstMeta @@ -417,6 +433,8 @@ func (e *File) ingestSSTLoop() { } ingestMetas = []*sstMeta{newMeta} } + // batchIngestSSTs will change ingestMetas' order, so we record the max seq here + metasMaxSeq := ingestMetas[len(ingestMetas)-1].seq if err := e.batchIngestSSTs(ingestMetas); err != nil { e.setError(err) @@ -426,9 +444,11 @@ func (e *File) ingestSSTLoop() { finSeq := finishedSeq.Load() if metas.seq == finSeq+1 { finSeq = metas.seq + finMetaSeq := metasMaxSeq for len(inSyncSeqs.arr) > 0 { - if inSyncSeqs.arr[0] == finSeq+1 { + if inSyncSeqs.arr[0].flushSeq == finSeq+1 { finSeq++ + finMetaSeq = inSyncSeqs.arr[0].metaSeq heap.Remove(inSyncSeqs, 0) } else { break @@ -445,12 +465,13 @@ func (e *File) ingestSSTLoop() { } flushQueue = flushQueue[len(flushChans):] finishedSeq.Store(finSeq) + e.finishedMetaSeq.Store(finMetaSeq) seqLock.Unlock() for _, c := range flushChans { c <- struct{}{} } } else { - heap.Push(inSyncSeqs, metas.seq) + heap.Push(inSyncSeqs, metaSeq{flushSeq: metas.seq, metaSeq: metasMaxSeq}) seqLock.Unlock() } } @@ -561,16 +582,22 @@ readMetaLoop: } } -func (e *File) addSST(ctx context.Context, m *sstMeta) error { +func (e *File) addSST(ctx context.Context, m *sstMeta) (int32, error) { // set pending size after SST file is generated e.pendingFileSize.Add(m.fileSize) + // make sure sstMeta is sent into the chan in order + e.seqLock.Lock() + defer e.seqLock.Unlock() + e.nextSeq++ + seq := e.nextSeq + m.seq = seq select { case e.sstMetasChan <- metaOrFlush{meta: m}: case <-ctx.Done(): - return ctx.Err() + return 0, ctx.Err() case <-e.ctx.Done(): } - return e.ingestErr.Get() + return seq, e.ingestErr.Get() } func (e *File) batchIngestSSTs(metas []*sstMeta) error { @@ -623,6 +650,7 @@ func (e *File) ingestSSTs(metas []*sstMeta) error { log.L().Info("write data to local DB", zap.Int64("size", totalSize), zap.Int64("kvs", totalCount), + zap.Int("files", len(metas)), zap.Int64("sstFileSize", fileSize), zap.String("file", metas[0].path), logutil.Key("firstKey", metas[0].minKey), @@ -2490,6 +2518,7 @@ type sstMeta struct { totalCount int64 // used for calculate disk-quota fileSize int64 + seq int32 } type Writer struct { @@ -2507,6 +2536,8 @@ type Writer struct { kvBuffer *bytesBuffer writer *sstWriter + + lastMetaSeq int32 } func (w *Writer) appendRowsSorted(kvs []common.KvPair) error { @@ -2610,15 +2641,25 @@ func (w *Writer) flush(ctx context.Context) error { return errors.Trace(err) } w.writer = nil + w.batchCount = 0 if meta != nil && meta.totalSize > 0 { - return w.local.addSST(ctx, meta) + return w.addSST(ctx, meta) } } return nil } -func (w *Writer) Close(ctx context.Context) error { +type flushStatus struct { + local *File + seq int32 +} + +func (f flushStatus) Flushed() bool { + return f.seq <= f.local.finishedMetaSeq.Load() +} + +func (w *Writer) Close(ctx context.Context) (backend.ChunkFlushStatus, error) { defer w.kvBuffer.destroy() defer w.local.localWriters.Delete(w) err := w.flush(ctx) @@ -2626,7 +2667,11 @@ func (w *Writer) Close(ctx context.Context) error { // this can resolve the memory consistently increasing issue. // maybe this is a bug related to go GC mechanism. w.writeBatch = nil - return err + return flushStatus{local: w.local, seq: w.lastMetaSeq}, err +} + +func (w *Writer) IsSynced() bool { + return w.batchCount == 0 && w.lastMetaSeq <= w.local.finishedMetaSeq.Load() } func (w *Writer) flushKVs(ctx context.Context) error { @@ -2646,7 +2691,7 @@ func (w *Writer) flushKVs(ctx context.Context) error { if err != nil { return errors.Trace(err) } - err = w.local.addSST(ctx, meta) + err = w.addSST(ctx, meta) if err != nil { return errors.Trace(err) } @@ -2658,6 +2703,15 @@ func (w *Writer) flushKVs(ctx context.Context) error { return nil } +func (w *Writer) addSST(ctx context.Context, meta *sstMeta) error { + seq, err := w.local.addSST(ctx, meta) + if err != nil { + return err + } + w.lastMetaSeq = seq + return nil +} + func (w *Writer) createSSTWriter() (*sstWriter, error) { path := filepath.Join(w.local.sstDir, uuid.New().String()+".sst") writer, err := newSSTWriter(path) @@ -2822,10 +2876,13 @@ func (i dbSSTIngester) mergeSSTs(metas []*sstMeta, dir string) (*sstMeta, error) } start := time.Now() - newMeta := &sstMeta{} + newMeta := &sstMeta{ + seq: metas[len(metas)-1].seq, + } mergeIter := &sstIterHeap{ iters: make([]*sstIter, 0, len(metas)), } + for _, p := range metas { f, err := os.Open(p.path) if err != nil { diff --git a/pkg/lightning/backend/local/local_test.go b/pkg/lightning/backend/local/local_test.go index 0525ec31d..50eb68b76 100644 --- a/pkg/lightning/backend/local/local_test.go +++ b/pkg/lightning/backend/local/local_test.go @@ -389,9 +389,10 @@ func testLocalWriter(c *C, needSort bool, partitialSort bool) { c.Assert(err, IsNil) err = w.AppendRows(ctx, "", []string{}, 1, kv.MakeRowsFromKvPairs(rows3)) c.Assert(err, IsNil) - err = w.Close(context.Background()) + flushStatus, err := w.Close(context.Background()) c.Assert(err, IsNil) c.Assert(f.flushEngineWithoutLock(ctx), IsNil) + c.Assert(flushStatus.Flushed(), IsTrue) o := &pebble.IterOptions{} it := db.NewIter(o) @@ -520,8 +521,13 @@ func (i testIngester) mergeSSTs(metas []*sstMeta, dir string) (*sstMeta, error) } else if len(metas) == 1 { return metas[0], nil } + if metas[len(metas)-1].seq-metas[0].seq != int32(len(metas)-1) { + panic("metas is not add in order") + } - newMeta := &sstMeta{} + newMeta := &sstMeta{ + seq: metas[len(metas)-1].seq, + } for _, m := range metas { newMeta.totalSize += m.totalSize newMeta.totalCount += m.totalCount @@ -574,15 +580,18 @@ func (s *localSuite) TestLocalIngestLoop(c *C) { totalSize := int64(0) concurrency := 4 count := 500 + var metaSeqLock sync.Mutex + maxMetaSeq := int32(0) for i := 0; i < concurrency; i++ { go func() { defer wg.Done() flushCnt := rand.Int31n(10) + 1 + seq := int32(0) for i := 0; i < count; i++ { size := int64(rand.Int31n(50) + 1) m := &sstMeta{totalSize: size, totalCount: 1} atomic.AddInt64(&totalSize, size) - err := f.addSST(engineCtx, m) + metaSeq, err := f.addSST(engineCtx, m) c.Assert(err, IsNil) if int32(i) >= flushCnt { f.mutex.RLock() @@ -591,7 +600,13 @@ func (s *localSuite) TestLocalIngestLoop(c *C) { f.mutex.RUnlock() flushCnt += rand.Int31n(10) + 1 } + seq = metaSeq + } + metaSeqLock.Lock() + if atomic.LoadInt32(&maxMetaSeq) < seq { + atomic.StoreInt32(&maxMetaSeq, seq) } + metaSeqLock.Unlock() }() } wg.Wait() @@ -606,6 +621,7 @@ func (s *localSuite) TestLocalIngestLoop(c *C) { c.Assert(f.ingestErr.Get(), IsNil) c.Assert(totalSize, Equals, f.TotalSize.Load()) c.Assert(f.Length.Load(), Equals, int64(concurrency*count)) + c.Assert(f.finishedMetaSeq.Load(), Equals, atomic.LoadInt32(&maxMetaSeq)) } func (s *localSuite) TestCheckRequirementsTiFlash(c *C) { diff --git a/pkg/lightning/backend/noop/noop.go b/pkg/lightning/backend/noop/noop.go index 930214686..42c40cada 100644 --- a/pkg/lightning/backend/noop/noop.go +++ b/pkg/lightning/backend/noop/noop.go @@ -167,6 +167,10 @@ func (w noopWriter) AppendRows(context.Context, string, []string, uint64, kv.Row return nil } -func (w noopWriter) Close(context.Context) error { - return nil +func (w noopWriter) IsSynced() bool { + return true +} + +func (w noopWriter) Close(context.Context) (backend.ChunkFlushStatus, error) { + return nil, nil } diff --git a/pkg/lightning/backend/tidb/tidb.go b/pkg/lightning/backend/tidb/tidb.go index b39a96f34..317a5e6b7 100644 --- a/pkg/lightning/backend/tidb/tidb.go +++ b/pkg/lightning/backend/tidb/tidb.go @@ -568,14 +568,18 @@ type Writer struct { engineUUID uuid.UUID } -func (w *Writer) Close(ctx context.Context) error { - return nil +func (w *Writer) Close(ctx context.Context) (backend.ChunkFlushStatus, error) { + return nil, nil } func (w *Writer) AppendRows(ctx context.Context, tableName string, columnNames []string, arg1 uint64, rows kv.Rows) error { return w.be.WriteRows(ctx, w.engineUUID, tableName, columnNames, arg1, rows) } +func (w *Writer) IsSynced() bool { + return true +} + type TableAutoIDInfo struct { Column string NextID int64 diff --git a/pkg/lightning/backend/tidb/tidb_test.go b/pkg/lightning/backend/tidb/tidb_test.go index e4021941f..6ecda132a 100644 --- a/pkg/lightning/backend/tidb/tidb_test.go +++ b/pkg/lightning/backend/tidb/tidb_test.go @@ -124,8 +124,9 @@ func (s *mysqlSuite) TestWriteRowsReplaceOnDup(c *C) { c.Assert(err, IsNil) err = writer.WriteRows(ctx, []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o"}, dataRows) c.Assert(err, IsNil) - err = writer.Close(ctx) + st, err := writer.Close(ctx) c.Assert(err, IsNil) + c.Assert(st, IsNil) } func (s *mysqlSuite) TestWriteRowsIgnoreOnDup(c *C) { @@ -157,7 +158,7 @@ func (s *mysqlSuite) TestWriteRowsIgnoreOnDup(c *C) { c.Assert(err, IsNil) err = writer.WriteRows(ctx, []string{"a"}, dataRows) c.Assert(err, IsNil) - err = writer.Close(ctx) + _, err = writer.Close(ctx) c.Assert(err, IsNil) // test encode rows with _tidb_rowid @@ -202,8 +203,9 @@ func (s *mysqlSuite) TestWriteRowsErrorOnDup(c *C) { c.Assert(err, IsNil) err = writer.WriteRows(ctx, []string{"a"}, dataRows) c.Assert(err, IsNil) - err = writer.Close(ctx) + st, err := writer.Close(ctx) c.Assert(err, IsNil) + c.Assert(st, IsNil) } // TODO: temporarily disable this test before we fix strict mode diff --git a/pkg/lightning/restore/restore.go b/pkg/lightning/restore/restore.go index b83963f62..9f5d79ab0 100644 --- a/pkg/lightning/restore/restore.go +++ b/pkg/lightning/restore/restore.go @@ -1731,9 +1731,9 @@ func (tr *TableRestore) restoreEngine( // if the key are ordered, LocalWrite can optimize the writing. // table has auto-incremented _tidb_rowid must satisfy following restrictions: // - clustered index disable and primary key is not number - // - no auto random bits (auto random or shard rowid) + // - no auto random bits (auto random or shard row id) // - no partition table - // - no explicit _tidb_rowid field (A this time we can't determine if the soure file contains _tidb_rowid field, + // - no explicit _tidb_rowid field (A this time we can't determine if the source files contain _tidb_rowid field, // so we will do this check in LocalWriter when the first row is received.) hasAutoIncrementAutoID := common.TableHasAutoRowID(tr.tableInfo.Core) && tr.tableInfo.Core.AutoRandomBits == 0 && tr.tableInfo.Core.ShardRowIDBits == 0 && @@ -1751,12 +1751,52 @@ func (tr *TableRestore) restoreEngine( var wg sync.WaitGroup var chunkErr common.OnceError + type chunkFlushStatus struct { + dataStatus backend.ChunkFlushStatus + indexStatus backend.ChunkFlushStatus + chunkCp *checkpoints.ChunkCheckpoint + } + + // chunks that are finished writing, but checkpoints are not finished due to flush not finished. + var checkFlushLock sync.Mutex + flushPendingChunks := make([]chunkFlushStatus, 0, 16) + + chunkCpChan := make(chan *checkpoints.ChunkCheckpoint, 16) + go func() { + for { + select { + case cp, ok := <-chunkCpChan: + if !ok { + return + } + saveCheckpoint(rc, tr, engineID, cp) + case <-ctx.Done(): + return + } + } + }() + // Restore table data for chunkIndex, chunk := range cp.Chunks { if chunk.Chunk.Offset >= chunk.Chunk.EndOffset { continue } + checkFlushLock.Lock() + finished := 0 + for _, c := range flushPendingChunks { + if c.indexStatus.Flushed() && c.dataStatus.Flushed() { + chunkCpChan <- c.chunkCp + finished++ + } else { + break + } + } + if finished > 0 { + flushPendingChunks = flushPendingChunks[finished:] + } + checkFlushLock.Unlock() + select { case <-pCtx.Done(): return nil, pCtx.Err() @@ -1804,15 +1844,29 @@ func (tr *TableRestore) restoreEngine( }() metric.ChunkCounter.WithLabelValues(metric.ChunkStateRunning).Add(remainChunkCnt) err := cr.restore(ctx, tr, engineID, dataWriter, indexWriter, rc) + var dataFlushStatus, indexFlushStaus backend.ChunkFlushStatus if err == nil { - err = dataWriter.Close(ctx) + dataFlushStatus, err = dataWriter.Close(ctx) } if err == nil { - err = indexWriter.Close(ctx) + indexFlushStaus, err = indexWriter.Close(ctx) } if err == nil { metric.ChunkCounter.WithLabelValues(metric.ChunkStateFinished).Add(remainChunkCnt) metric.BytesCounter.WithLabelValues(metric.TableStateWritten).Add(float64(cr.chunk.Checksum.SumSize())) + if dataFlushStatus != nil && indexFlushStaus != nil { + if dataFlushStatus.Flushed() && indexFlushStaus.Flushed() { + saveCheckpoint(rc, tr, engineID, cr.chunk) + } else { + checkFlushLock.Lock() + flushPendingChunks = append(flushPendingChunks, chunkFlushStatus{ + dataStatus: dataFlushStatus, + indexStatus: indexFlushStaus, + chunkCp: cr.chunk, + }) + checkFlushLock.Unlock() + } + } } else { metric.ChunkCounter.WithLabelValues(metric.ChunkStateFailed).Add(remainChunkCnt) chunkErr.Set(err) @@ -1837,14 +1891,19 @@ func (tr *TableRestore) restoreEngine( zap.Uint64("written", totalKVSize), ) - flushAndSaveAllChunks := func(flushCtx context.Context) error { - if err = indexEngine.Flush(flushCtx); err != nil { - return errors.Trace(err) - } - // Currently we write all the checkpoints after data&index engine are flushed. - for _, chunk := range cp.Chunks { - saveCheckpoint(rc, tr, engineID, chunk) + trySavePendingChunks := func(flushCtx context.Context) error { + checkFlushLock.Lock() + cnt := 0 + for _, chunk := range flushPendingChunks { + if chunk.dataStatus.Flushed() && chunk.indexStatus.Flushed() { + saveCheckpoint(rc, tr, engineID, chunk.chunkCp) + cnt++ + } else { + break + } } + flushPendingChunks = flushPendingChunks[cnt:] + checkFlushLock.Unlock() return nil } @@ -1862,7 +1921,7 @@ func (tr *TableRestore) restoreEngine( log.L().Warn("flush all chunk checkpoints failed before manually exits", zap.Error(err2)) return nil, errors.Trace(err) } - if err2 := flushAndSaveAllChunks(context.Background()); err2 != nil { + if err2 := trySavePendingChunks(context.Background()); err2 != nil { log.L().Warn("flush all chunk checkpoints failed before manually exits", zap.Error(err2)) } } @@ -1873,13 +1932,11 @@ func (tr *TableRestore) restoreEngine( // For local backend, if checkpoint is enabled, we must flush index engine to avoid data loss. // this flush action impact up to 10% of the performance, so we only do it if necessary. if err == nil && rc.cfg.Checkpoint.Enable && rc.isLocalBackend() { - if err = flushAndSaveAllChunks(ctx); err != nil { + if err = indexEngine.Flush(ctx); err != nil { return nil, errors.Trace(err) } - - // Currently we write all the checkpoints after data&index engine are flushed. - for _, chunk := range cp.Chunks { - saveCheckpoint(rc, tr, engineID, chunk) + if err = trySavePendingChunks(ctx); err != nil { + return nil, errors.Trace(err) } } rc.saveStatusCheckpoint(tr.tableName, engineID, err, checkpoints.CheckpointStatusClosed) @@ -2622,6 +2679,7 @@ func (cr *chunkRestore) deliverLoop( dataKVs := rc.backend.MakeEmptyRows() indexKVs := rc.backend.MakeEmptyRows() + dataSynced := true for !channelClosed { var dataChecksum, indexChecksum verify.KVChecksum var columns []string @@ -2651,16 +2709,15 @@ func (cr *chunkRestore) deliverLoop( } } - // we are allowed to save checkpoint when the disk quota state moved to "importing" - // since all engines are flushed. - if atomic.LoadInt32(&rc.diskQuotaState) == diskQuotaStateImporting { - saveCheckpoint(rc, t, engineID, cr.chunk) - } - err = func() error { rc.diskQuotaLock.RLock() defer rc.diskQuotaLock.RUnlock() + // try to update chunk checkpoint, this can help save checkpoint after importing when disk-quota is triggered + if !dataSynced { + dataSynced = cr.maybeSaveCheckpoint(rc, t, engineID, cr.chunk, dataEngine, indexEngine) + } + // Write KVs into the engine start := time.Now() @@ -2690,6 +2747,7 @@ func (cr *chunkRestore) deliverLoop( if err != nil { return } + dataSynced = false dataKVs = dataKVs.Clear() indexKVs = indexKVs.Clear() @@ -2702,9 +2760,10 @@ func (cr *chunkRestore) deliverLoop( cr.chunk.Checksum.Add(&indexChecksum) cr.chunk.Chunk.Offset = offset cr.chunk.Chunk.PrevRowIDMax = rowID - if !rc.isLocalBackend() && (dataChecksum.SumKVS() != 0 || indexChecksum.SumKVS() != 0) { + + if dataChecksum.SumKVS() != 0 || indexChecksum.SumKVS() != 0 { // No need to save checkpoint if nothing was delivered. - saveCheckpoint(rc, t, engineID, cr.chunk) + dataSynced = cr.maybeSaveCheckpoint(rc, t, engineID, cr.chunk, dataEngine, indexEngine) } failpoint.Inject("SlowDownWriteRows", func() { deliverLogger.Warn("Slowed down write rows") @@ -2725,6 +2784,20 @@ func (cr *chunkRestore) deliverLoop( return } +func (cr *chunkRestore) maybeSaveCheckpoint( + rc *Controller, + t *TableRestore, + engineID int32, + chunk *checkpoints.ChunkCheckpoint, + data, index *backend.LocalEngineWriter, +) bool { + if data.IsSynced() && index.IsSynced() { + saveCheckpoint(rc, t, engineID, chunk) + return true + } + return false +} + func saveCheckpoint(rc *Controller, t *TableRestore, engineID int32, chunk *checkpoints.ChunkCheckpoint) { // We need to update the AllocBase every time we've finished a file. // The AllocBase is determined by the maximum of the "handle" (_tidb_rowid diff --git a/pkg/lightning/restore/restore_test.go b/pkg/lightning/restore/restore_test.go index 49e3a1d4a..715da3ce3 100644 --- a/pkg/lightning/restore/restore_test.go +++ b/pkg/lightning/restore/restore_test.go @@ -1014,6 +1014,7 @@ func (s *chunkRestoreSuite) TestDeliverLoop(c *C) { mockBackend.EXPECT().MakeEmptyRows().Return(kv.MakeRowsFromKvPairs(nil)).AnyTimes() mockWriter := mock.NewMockEngineWriter(controller) mockBackend.EXPECT().LocalWriter(ctx, gomock.Any(), gomock.Any()).Return(mockWriter, nil).AnyTimes() + mockWriter.EXPECT().IsSynced().Return(true).AnyTimes() dataEngine, err := importer.OpenEngine(ctx, &backend.EngineConfig{}, s.tr.tableName, 0, 0) c.Assert(err, IsNil) diff --git a/pkg/mock/backend.go b/pkg/mock/backend.go index 7351cd5e9..d40fd82d1 100644 --- a/pkg/mock/backend.go +++ b/pkg/mock/backend.go @@ -305,11 +305,12 @@ func (mr *MockEngineWriterMockRecorder) AppendRows(arg0, arg1, arg2, arg3, arg4 } // Close mocks base method. -func (m *MockEngineWriter) Close(arg0 context.Context) error { +func (m *MockEngineWriter) Close(arg0 context.Context) (backend.ChunkFlushStatus, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Close", arg0) - ret0, _ := ret[0].(error) - return ret0 + ret0, _ := ret[0].(backend.ChunkFlushStatus) + ret1, _ := ret[1].(error) + return ret0, ret1 } // Close indicates an expected call of Close. @@ -317,3 +318,17 @@ func (mr *MockEngineWriterMockRecorder) Close(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockEngineWriter)(nil).Close), arg0) } + +// IsSynced mocks base method. +func (m *MockEngineWriter) IsSynced() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsSynced") + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsSynced indicates an expected call of IsSynced. +func (mr *MockEngineWriterMockRecorder) IsSynced() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsSynced", reflect.TypeOf((*MockEngineWriter)(nil).IsSynced)) +}