From 5a93d02515a20be8a38c1744bf4e58611f387e54 Mon Sep 17 00:00:00 2001 From: glorv Date: Fri, 30 Apr 2021 18:02:16 +0800 Subject: [PATCH 1/7] save chunk checkpoint timely --- pkg/lightning/backend/backend.go | 8 +- pkg/lightning/backend/backend_test.go | 16 ++-- pkg/lightning/backend/importer/importer.go | 4 +- .../backend/importer/importer_test.go | 3 +- pkg/lightning/backend/local/local.go | 84 ++++++++++++++----- pkg/lightning/backend/local/local_test.go | 5 +- pkg/lightning/backend/noop/noop.go | 4 +- pkg/lightning/backend/tidb/tidb.go | 4 +- pkg/lightning/backend/tidb/tidb_test.go | 8 +- pkg/lightning/restore/restore.go | 44 +++++++++- pkg/mock/backend.go | 7 +- 11 files changed, 138 insertions(+), 49 deletions(-) diff --git a/pkg/lightning/backend/backend.go b/pkg/lightning/backend/backend.go index e28d94793..b3ad44465 100644 --- a/pkg/lightning/backend/backend.go +++ b/pkg/lightning/backend/backend.go @@ -375,7 +375,7 @@ func (w *LocalEngineWriter) WriteRows(ctx context.Context, columnNames []string, return w.writer.AppendRows(ctx, w.tableName, columnNames, w.ts, rows) } -func (w *LocalEngineWriter) Close(ctx context.Context) error { +func (w *LocalEngineWriter) Close(ctx context.Context) (ChunkFlushStatus, error) { return w.writer.Close(ctx) } @@ -442,6 +442,10 @@ func (engine *ClosedEngine) Logger() log.Logger { return engine.logger } +type ChunkFlushStatus interface { + Flushed() bool +} + type EngineWriter interface { AppendRows( ctx context.Context, @@ -450,5 +454,5 @@ type EngineWriter interface { commitTS uint64, rows kv.Rows, ) error - Close(ctx context.Context) error + Close(ctx context.Context) (ChunkFlushStatus, error) } diff --git a/pkg/lightning/backend/backend_test.go b/pkg/lightning/backend/backend_test.go index 48fd65d78..ed5c1a31a 100644 --- a/pkg/lightning/backend/backend_test.go +++ b/pkg/lightning/backend/backend_test.go @@ -140,7 +140,7 @@ func (s *backendSuite) TestWriteEngine(c *C) { mockWriter.EXPECT(). AppendRows(ctx, "`db`.`table`", []string{"c1", "c2"}, gomock.Any(), rows1). Return(nil) - mockWriter.EXPECT().Close(ctx).Return(nil).AnyTimes() + mockWriter.EXPECT().Close(ctx).Return(nil, nil).AnyTimes() mockWriter.EXPECT(). AppendRows(ctx, "`db`.`table`", []string{"c1", "c2"}, gomock.Any(), rows2). Return(nil) @@ -153,7 +153,7 @@ func (s *backendSuite) TestWriteEngine(c *C) { c.Assert(err, IsNil) err = writer.WriteRows(ctx, []string{"c1", "c2"}, rows2) c.Assert(err, IsNil) - err = writer.Close(ctx) + _, err = writer.Close(ctx) c.Assert(err, IsNil) } @@ -167,7 +167,7 @@ func (s *backendSuite) TestWriteToEngineWithNothing(c *C) { s.mockBackend.EXPECT().OpenEngine(ctx, &backend.EngineConfig{}, gomock.Any()).Return(nil) mockWriter.EXPECT().AppendRows(ctx, gomock.Any(), gomock.Any(), gomock.Any(), emptyRows).Return(nil) - mockWriter.EXPECT().Close(ctx).Return(nil) + mockWriter.EXPECT().Close(ctx).Return(nil, nil) s.mockBackend.EXPECT().LocalWriter(ctx, &backend.LocalWriterConfig{}, gomock.Any()).Return(mockWriter, nil) engine, err := s.backend.OpenEngine(ctx, &backend.EngineConfig{}, "`db`.`table`", 1, s.ts) @@ -176,7 +176,7 @@ func (s *backendSuite) TestWriteToEngineWithNothing(c *C) { c.Assert(err, IsNil) err = writer.WriteRows(ctx, nil, emptyRows) c.Assert(err, IsNil) - err = writer.Close(ctx) + _, err = writer.Close(ctx) c.Assert(err, IsNil) } @@ -207,7 +207,7 @@ func (s *backendSuite) TestWriteEngineFailed(c *C) { mockWriter.EXPECT(). AppendRows(ctx, gomock.Any(), gomock.Any(), gomock.Any(), rows). Return(errors.Annotate(context.Canceled, "fake unrecoverable write error")) - mockWriter.EXPECT().Close(ctx).Return(nil) + mockWriter.EXPECT().Close(ctx).Return(nil, nil) engine, err := s.backend.OpenEngine(ctx, &backend.EngineConfig{}, "`db`.`table`", 1, s.ts) c.Assert(err, IsNil) @@ -215,7 +215,7 @@ func (s *backendSuite) TestWriteEngineFailed(c *C) { c.Assert(err, IsNil) err = writer.WriteRows(ctx, nil, rows) c.Assert(err, ErrorMatches, "fake unrecoverable write error.*") - err = writer.Close(ctx) + _, err = writer.Close(ctx) c.Assert(err, IsNil) } @@ -233,7 +233,7 @@ func (s *backendSuite) TestWriteBatchSendFailedWithRetry(c *C) { mockWriter.EXPECT().AppendRows(ctx, gomock.Any(), gomock.Any(), gomock.Any(), rows). Return(errors.New("fake recoverable write batch error")). MinTimes(1) - mockWriter.EXPECT().Close(ctx).Return(nil).MinTimes(1) + mockWriter.EXPECT().Close(ctx).Return(nil, nil).MinTimes(1) engine, err := s.backend.OpenEngine(ctx, &backend.EngineConfig{}, "`db`.`table`", 1, s.ts) c.Assert(err, IsNil) @@ -241,7 +241,7 @@ func (s *backendSuite) TestWriteBatchSendFailedWithRetry(c *C) { c.Assert(err, IsNil) err = writer.WriteRows(ctx, nil, rows) c.Assert(err, ErrorMatches, ".*fake recoverable write batch error") - err = writer.Close(ctx) + _, err = writer.Close(ctx) c.Assert(err, IsNil) } diff --git a/pkg/lightning/backend/importer/importer.go b/pkg/lightning/backend/importer/importer.go index 227ed739c..f15d8e77d 100644 --- a/pkg/lightning/backend/importer/importer.go +++ b/pkg/lightning/backend/importer/importer.go @@ -335,8 +335,8 @@ type Writer struct { engineUUID uuid.UUID } -func (w *Writer) Close(ctx context.Context) error { - return nil +func (w *Writer) Close(ctx context.Context) (backend.ChunkFlushStatus, error) { + return nil, nil } func (w *Writer) AppendRows(ctx context.Context, tableName string, columnNames []string, ts uint64, rows kv.Rows) error { diff --git a/pkg/lightning/backend/importer/importer_test.go b/pkg/lightning/backend/importer/importer_test.go index e86df4f5d..0b4cf99de 100644 --- a/pkg/lightning/backend/importer/importer_test.go +++ b/pkg/lightning/backend/importer/importer_test.go @@ -115,8 +115,9 @@ func (s *importerSuite) TestWriteRows(c *C) { c.Assert(err, IsNil) err = writer.WriteRows(s.ctx, nil, s.kvPairs) c.Assert(err, IsNil) - err = writer.Close(s.ctx) + st, err := writer.Close(s.ctx) c.Assert(err, IsNil) + c.Assert(st, IsNil) } func (s *importerSuite) TestWriteHeadSendFailed(c *C) { diff --git a/pkg/lightning/backend/local/local.go b/pkg/lightning/backend/local/local.go index 791eda83b..0645ecf38 100644 --- a/pkg/lightning/backend/local/local.go +++ b/pkg/lightning/backend/local/local.go @@ -165,6 +165,13 @@ type File struct { sstIngester sstIngester finishedRanges syncedRanges + // sst seq lock + seqLock sync.Mutex + // seq number for incoming sst meta + nextSeq atomic.Int32 + // max seq of sst metas ingested into pebble + finishedMetaSeq atomic.Int32 + config backend.LocalEngineConfig // total size of SST files waiting to be ingested @@ -326,27 +333,32 @@ func (e *File) unlock() { e.mutex.Unlock() } -type intHeap struct { - arr []int32 +type metaSeq struct { + flushSeq int32 + metaSeq int32 +} + +type metaSeqHeap struct { + arr []metaSeq } -func (h *intHeap) Len() int { +func (h *metaSeqHeap) Len() int { return len(h.arr) } -func (h *intHeap) Less(i, j int) bool { - return h.arr[i] < h.arr[j] +func (h *metaSeqHeap) Less(i, j int) bool { + return h.arr[i].flushSeq < h.arr[j].flushSeq } -func (h *intHeap) Swap(i, j int) { +func (h *metaSeqHeap) Swap(i, j int) { h.arr[i], h.arr[j] = h.arr[j], h.arr[i] } -func (h *intHeap) Push(x interface{}) { - h.arr = append(h.arr, x.(int32)) +func (h *metaSeqHeap) Push(x interface{}) { + h.arr = append(h.arr, x.(metaSeq)) } -func (h *intHeap) Pop() interface{} { +func (h *metaSeqHeap) Pop() interface{} { item := h.arr[len(h.arr)-1] h.arr = h.arr[:len(h.arr)-1] return item @@ -367,7 +379,7 @@ func (e *File) ingestSSTLoop() { flushQueue := make([]flushSeq, 0) // inSyncSeqs is a heap that stores all the finished compaction tasks whose seq is bigger than `finishedSeq + 1` // this mean there are still at lease one compaction task with a lower seq unfinished. - inSyncSeqs := &intHeap{arr: make([]int32, 0)} + inSyncSeqs := &metaSeqHeap{arr: make([]metaSeq, 0)} type metaAndSeq struct { metas []*sstMeta @@ -420,9 +432,11 @@ func (e *File) ingestSSTLoop() { finSeq := finishedSeq.Load() if metas.seq == finSeq+1 { finSeq = metas.seq + finMetaSeq := ingestMetas[len(ingestMetas)-1].seq for len(inSyncSeqs.arr) > 0 { - if inSyncSeqs.arr[0] == finSeq+1 { + if inSyncSeqs.arr[0].flushSeq == finSeq+1 { finSeq++ + finMetaSeq = inSyncSeqs.arr[0].metaSeq heap.Remove(inSyncSeqs, 0) } else { break @@ -439,12 +453,13 @@ func (e *File) ingestSSTLoop() { } flushQueue = flushQueue[len(flushChans):] finishedSeq.Store(finSeq) + e.finishedMetaSeq.Store(finMetaSeq) seqLock.Unlock() for _, c := range flushChans { c <- struct{}{} } } else { - heap.Push(inSyncSeqs, metas.seq) + heap.Push(inSyncSeqs, metaSeq{flushSeq: metas.seq, metaSeq: ingestMetas[len(ingestMetas)-1].seq}) seqLock.Unlock() } } @@ -555,16 +570,21 @@ readMetaLoop: } } -func (e *File) addSST(ctx context.Context, m *sstMeta) error { +func (e *File) addSST(ctx context.Context, m *sstMeta) (int32, error) { // set pending size after SST file is generated e.pendingFileSize.Add(m.fileSize) + // make sure sstMeta is sent into the chan in order + e.seqLock.Lock() + defer e.seqLock.Unlock() + seq := e.nextSeq.Add(1) + m.seq = seq select { case e.sstMetasChan <- metaOrFlush{meta: m}: case <-ctx.Done(): - return ctx.Err() + return 0, ctx.Err() case <-e.ctx.Done(): } - return e.ingestErr.Get() + return seq, e.ingestErr.Get() } func (e *File) batchIngestSSTs(metas []*sstMeta) error { @@ -2403,6 +2423,7 @@ type sstMeta struct { totalCount int64 // used for calculate disk-quota fileSize int64 + seq int32 } type Writer struct { @@ -2420,6 +2441,8 @@ type Writer struct { kvBuffer *bytesBuffer writer *sstWriter + + lastMetaSeq int32 } func (w *Writer) appendRowsSorted(kvs []common.KvPair) error { @@ -2520,14 +2543,23 @@ func (w *Writer) flush(ctx context.Context) error { } w.writer = nil if meta != nil && meta.totalSize > 0 { - return w.local.addSST(ctx, meta) + return w.addSST(ctx, meta) } } return nil } -func (w *Writer) Close(ctx context.Context) error { +type flushStatus struct { + local *File + seq int32 +} + +func (f flushStatus) Flushed() bool { + return f.seq <= f.local.finishedMetaSeq.Load() +} + +func (w *Writer) Close(ctx context.Context) (backend.ChunkFlushStatus, error) { defer w.kvBuffer.destroy() defer w.local.localWriters.Delete(w) err := w.flush(ctx) @@ -2535,7 +2567,7 @@ func (w *Writer) Close(ctx context.Context) error { // this can resolve the memory consistently increasing issue. // maybe this is a bug related to go GC mechanism. w.writeBatch = nil - return err + return flushStatus{local: w.local, seq: w.lastMetaSeq}, err } func (w *Writer) flushKVs(ctx context.Context) error { @@ -2555,7 +2587,7 @@ func (w *Writer) flushKVs(ctx context.Context) error { if err != nil { return errors.Trace(err) } - err = w.local.addSST(ctx, meta) + err = w.addSST(ctx, meta) if err != nil { return errors.Trace(err) } @@ -2567,6 +2599,15 @@ func (w *Writer) flushKVs(ctx context.Context) error { return nil } +func (w *Writer) addSST(ctx context.Context, meta *sstMeta) error { + seq, err := w.local.addSST(ctx, meta) + if err != nil { + return err + } + w.lastMetaSeq = seq + return nil +} + func (w *Writer) createSSTWriter() (*sstWriter, error) { path := filepath.Join(w.local.sstDir, uuid.New().String()+".sst") writer, err := newSSTWriter(path) @@ -2731,10 +2772,13 @@ func (i dbSSTIngester) mergeSSTs(metas []*sstMeta, dir string) (*sstMeta, error) } start := time.Now() - newMeta := &sstMeta{} + newMeta := &sstMeta{ + seq: metas[len(metas)-1].seq, + } mergeIter := &sstIterHeap{ iters: make([]*sstIter, 0, len(metas)), } + for _, p := range metas { f, err := os.Open(p.path) if err != nil { diff --git a/pkg/lightning/backend/local/local_test.go b/pkg/lightning/backend/local/local_test.go index d607e31c5..59e60c084 100644 --- a/pkg/lightning/backend/local/local_test.go +++ b/pkg/lightning/backend/local/local_test.go @@ -389,9 +389,10 @@ func testLocalWriter(c *C, needSort bool, partitialSort bool) { c.Assert(err, IsNil) err = w.AppendRows(ctx, "", []string{}, 1, kv.MakeRowsFromKvPairs(rows3)) c.Assert(err, IsNil) - err = w.Close(context.Background()) + flushStatus, err := w.Close(context.Background()) c.Assert(err, IsNil) c.Assert(f.flushEngineWithoutLock(ctx), IsNil) + c.Assert(flushStatus.Flushed(), IsTrue) o := &pebble.IterOptions{} it := db.NewIter(o) @@ -574,7 +575,7 @@ func (s *localSuite) TestLocalIngestLoop(c *C) { size := int64(rand.Int31n(50) + 1) m := &sstMeta{totalSize: size, totalCount: 1} atomic.AddInt64(&totalSize, size) - err := f.addSST(engineCtx, m) + _, err := f.addSST(engineCtx, m) c.Assert(err, IsNil) if int32(i) >= flushCnt { f.mutex.RLock() diff --git a/pkg/lightning/backend/noop/noop.go b/pkg/lightning/backend/noop/noop.go index 03f1fb297..650abde90 100644 --- a/pkg/lightning/backend/noop/noop.go +++ b/pkg/lightning/backend/noop/noop.go @@ -163,6 +163,6 @@ func (w noopWriter) AppendRows(context.Context, string, []string, uint64, kv.Row return nil } -func (w noopWriter) Close(context.Context) error { - return nil +func (w noopWriter) Close(context.Context) (backend.ChunkFlushStatus, error) { + return nil, nil } diff --git a/pkg/lightning/backend/tidb/tidb.go b/pkg/lightning/backend/tidb/tidb.go index e4513b5c2..22727cd6a 100644 --- a/pkg/lightning/backend/tidb/tidb.go +++ b/pkg/lightning/backend/tidb/tidb.go @@ -596,8 +596,8 @@ type Writer struct { engineUUID uuid.UUID } -func (w *Writer) Close(ctx context.Context) error { - return nil +func (w *Writer) Close(ctx context.Context) (backend.ChunkFlushStatus, error) { + return nil, nil } func (w *Writer) AppendRows(ctx context.Context, tableName string, columnNames []string, arg1 uint64, rows kv.Rows) error { diff --git a/pkg/lightning/backend/tidb/tidb_test.go b/pkg/lightning/backend/tidb/tidb_test.go index e4021941f..6ecda132a 100644 --- a/pkg/lightning/backend/tidb/tidb_test.go +++ b/pkg/lightning/backend/tidb/tidb_test.go @@ -124,8 +124,9 @@ func (s *mysqlSuite) TestWriteRowsReplaceOnDup(c *C) { c.Assert(err, IsNil) err = writer.WriteRows(ctx, []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o"}, dataRows) c.Assert(err, IsNil) - err = writer.Close(ctx) + st, err := writer.Close(ctx) c.Assert(err, IsNil) + c.Assert(st, IsNil) } func (s *mysqlSuite) TestWriteRowsIgnoreOnDup(c *C) { @@ -157,7 +158,7 @@ func (s *mysqlSuite) TestWriteRowsIgnoreOnDup(c *C) { c.Assert(err, IsNil) err = writer.WriteRows(ctx, []string{"a"}, dataRows) c.Assert(err, IsNil) - err = writer.Close(ctx) + _, err = writer.Close(ctx) c.Assert(err, IsNil) // test encode rows with _tidb_rowid @@ -202,8 +203,9 @@ func (s *mysqlSuite) TestWriteRowsErrorOnDup(c *C) { c.Assert(err, IsNil) err = writer.WriteRows(ctx, []string{"a"}, dataRows) c.Assert(err, IsNil) - err = writer.Close(ctx) + st, err := writer.Close(ctx) c.Assert(err, IsNil) + c.Assert(st, IsNil) } // TODO: temporarily disable this test before we fix strict mode diff --git a/pkg/lightning/restore/restore.go b/pkg/lightning/restore/restore.go index 06057ea22..1f40966f3 100644 --- a/pkg/lightning/restore/restore.go +++ b/pkg/lightning/restore/restore.go @@ -1566,9 +1566,9 @@ func (tr *TableRestore) restoreEngine( // if the key are ordered, LocalWrite can optimize the writing. // table has auto-incremented _tidb_rowid must satisfy following restrictions: // - clustered index disable and primary key is not number - // - no auto random bits (auto random or shard rowid) + // - no auto random bits (auto random or shard row id) // - no partition table - // - no explicit _tidb_rowid field (A this time we can't determine if the soure file contains _tidb_rowid field, + // - no explicit _tidb_rowid field (A this time we can't determine if the source files contain _tidb_rowid field, // so we will do this check in LocalWriter when the first row is received.) hasAutoIncrementAutoID := common.TableHasAutoRowID(tr.tableInfo.Core) && tr.tableInfo.Core.AutoRandomBits == 0 && tr.tableInfo.Core.ShardRowIDBits == 0 && @@ -1586,12 +1586,37 @@ func (tr *TableRestore) restoreEngine( var wg sync.WaitGroup var chunkErr common.OnceError + type chunkFlushStatus struct { + dataStatus backend.ChunkFlushStatus + indexStatus backend.ChunkFlushStatus + chunkCp *checkpoints.ChunkCheckpoint + } + + // chunks that are finished writing, but checkpoints are not finished due to flush not finished. + var checkFlushLock sync.Mutex + flushPendingChunks := make([]chunkFlushStatus, 0, 16) + // Restore table data for chunkIndex, chunk := range cp.Chunks { if chunk.Chunk.Offset >= chunk.Chunk.EndOffset { continue } + checkFlushLock.Lock() + finished := 0 + for _, c := range flushPendingChunks { + if c.indexStatus.Flushed() && c.dataStatus.Flushed() { + saveCheckpoint(rc, tr, engineID, c.chunkCp) + finished++ + } else { + break + } + } + if finished > 0 { + flushPendingChunks = flushPendingChunks[finished:] + } + checkFlushLock.Unlock() + select { case <-ctx.Done(): return nil, ctx.Err() @@ -1639,15 +1664,26 @@ func (tr *TableRestore) restoreEngine( }() metric.ChunkCounter.WithLabelValues(metric.ChunkStateRunning).Add(remainChunkCnt) err := cr.restore(ctx, tr, engineID, dataWriter, indexWriter, rc) + var dataFlushStatus, indexFlushStaus backend.ChunkFlushStatus if err == nil { - err = dataWriter.Close(ctx) + dataFlushStatus, err = dataWriter.Close(ctx) } if err == nil { - err = indexWriter.Close(ctx) + indexFlushStaus, err = indexWriter.Close(ctx) } if err == nil { metric.ChunkCounter.WithLabelValues(metric.ChunkStateFinished).Add(remainChunkCnt) metric.BytesCounter.WithLabelValues(metric.TableStateWritten).Add(float64(cr.chunk.Checksum.SumSize())) + if dataFlushStatus != nil && !dataFlushStatus.Flushed() && + indexFlushStaus != nil && !indexFlushStaus.Flushed() { + checkFlushLock.Lock() + flushPendingChunks = append(flushPendingChunks, chunkFlushStatus{ + dataStatus: dataFlushStatus, + indexStatus: indexFlushStaus, + chunkCp: cr.chunk, + }) + checkFlushLock.Unlock() + } } else { metric.ChunkCounter.WithLabelValues(metric.ChunkStateFailed).Add(remainChunkCnt) chunkErr.Set(err) diff --git a/pkg/mock/backend.go b/pkg/mock/backend.go index 7351cd5e9..01484eb47 100644 --- a/pkg/mock/backend.go +++ b/pkg/mock/backend.go @@ -305,11 +305,12 @@ func (mr *MockEngineWriterMockRecorder) AppendRows(arg0, arg1, arg2, arg3, arg4 } // Close mocks base method. -func (m *MockEngineWriter) Close(arg0 context.Context) error { +func (m *MockEngineWriter) Close(arg0 context.Context) (backend.ChunkFlushStatus, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Close", arg0) - ret0, _ := ret[0].(error) - return ret0 + ret0, _ := ret[0].(backend.ChunkFlushStatus) + ret1, _ := ret[1].(error) + return ret0, ret1 } // Close indicates an expected call of Close. From 29b593e342f268835e23781ecb170ed5afdd3b7b Mon Sep 17 00:00:00 2001 From: glorv Date: Thu, 6 May 2021 13:12:45 +0800 Subject: [PATCH 2/7] try also save chunk checkpoint timely --- pkg/lightning/backend/backend.go | 5 ++ pkg/lightning/backend/importer/importer.go | 4 ++ pkg/lightning/backend/local/local.go | 4 ++ pkg/lightning/backend/noop/noop.go | 4 ++ pkg/lightning/backend/tidb/tidb.go | 4 ++ pkg/lightning/restore/restore.go | 78 ++++++++++++++++------ pkg/lightning/restore/restore_test.go | 1 + pkg/mock/backend.go | 14 ++++ 8 files changed, 92 insertions(+), 22 deletions(-) diff --git a/pkg/lightning/backend/backend.go b/pkg/lightning/backend/backend.go index b3ad44465..77836a314 100644 --- a/pkg/lightning/backend/backend.go +++ b/pkg/lightning/backend/backend.go @@ -379,6 +379,10 @@ func (w *LocalEngineWriter) Close(ctx context.Context) (ChunkFlushStatus, error) return w.writer.Close(ctx) } +func (w *LocalEngineWriter) IsSynced() bool { + return w.writer.IsSynced() +} + // UnsafeCloseEngine closes the engine without first opening it. // This method is "unsafe" as it does not follow the normal operation sequence // (Open -> Write -> Close -> Import). This method should only be used when one @@ -454,5 +458,6 @@ type EngineWriter interface { commitTS uint64, rows kv.Rows, ) error + IsSynced() bool Close(ctx context.Context) (ChunkFlushStatus, error) } diff --git a/pkg/lightning/backend/importer/importer.go b/pkg/lightning/backend/importer/importer.go index f15d8e77d..8489d4256 100644 --- a/pkg/lightning/backend/importer/importer.go +++ b/pkg/lightning/backend/importer/importer.go @@ -342,3 +342,7 @@ func (w *Writer) Close(ctx context.Context) (backend.ChunkFlushStatus, error) { func (w *Writer) AppendRows(ctx context.Context, tableName string, columnNames []string, ts uint64, rows kv.Rows) error { return w.importer.WriteRows(ctx, w.engineUUID, tableName, columnNames, ts, rows) } + +func (w *Writer) IsSynced() bool { + return true +} diff --git a/pkg/lightning/backend/local/local.go b/pkg/lightning/backend/local/local.go index 0645ecf38..1f0f533cd 100644 --- a/pkg/lightning/backend/local/local.go +++ b/pkg/lightning/backend/local/local.go @@ -2570,6 +2570,10 @@ func (w *Writer) Close(ctx context.Context) (backend.ChunkFlushStatus, error) { return flushStatus{local: w.local, seq: w.lastMetaSeq}, err } +func (w *Writer) IsSynced() bool { + return w.batchCount == 0 && w.lastMetaSeq <= w.local.finishedMetaSeq.Load() +} + func (w *Writer) flushKVs(ctx context.Context) error { writer, err := w.createSSTWriter() if err != nil { diff --git a/pkg/lightning/backend/noop/noop.go b/pkg/lightning/backend/noop/noop.go index 650abde90..1997de40e 100644 --- a/pkg/lightning/backend/noop/noop.go +++ b/pkg/lightning/backend/noop/noop.go @@ -163,6 +163,10 @@ func (w noopWriter) AppendRows(context.Context, string, []string, uint64, kv.Row return nil } +func (w noopWriter) IsSynced() bool { + return true +} + func (w noopWriter) Close(context.Context) (backend.ChunkFlushStatus, error) { return nil, nil } diff --git a/pkg/lightning/backend/tidb/tidb.go b/pkg/lightning/backend/tidb/tidb.go index 22727cd6a..146008665 100644 --- a/pkg/lightning/backend/tidb/tidb.go +++ b/pkg/lightning/backend/tidb/tidb.go @@ -603,3 +603,7 @@ func (w *Writer) Close(ctx context.Context) (backend.ChunkFlushStatus, error) { func (w *Writer) AppendRows(ctx context.Context, tableName string, columnNames []string, arg1 uint64, rows kv.Rows) error { return w.be.WriteRows(ctx, w.engineUUID, tableName, columnNames, arg1, rows) } + +func (w *Writer) IsSynced() bool { + return true +} diff --git a/pkg/lightning/restore/restore.go b/pkg/lightning/restore/restore.go index 1f40966f3..7185f655f 100644 --- a/pkg/lightning/restore/restore.go +++ b/pkg/lightning/restore/restore.go @@ -1596,6 +1596,21 @@ func (tr *TableRestore) restoreEngine( var checkFlushLock sync.Mutex flushPendingChunks := make([]chunkFlushStatus, 0, 16) + chunkCpChan := make(chan *checkpoints.ChunkCheckpoint, 16) + go func() { + for { + select { + case cp, ok := <-chunkCpChan: + if !ok { + return + } + saveCheckpoint(rc, tr, engineID, cp) + case <-ctx.Done(): + return + } + } + }() + // Restore table data for chunkIndex, chunk := range cp.Chunks { if chunk.Chunk.Offset >= chunk.Chunk.EndOffset { @@ -1606,7 +1621,7 @@ func (tr *TableRestore) restoreEngine( finished := 0 for _, c := range flushPendingChunks { if c.indexStatus.Flushed() && c.dataStatus.Flushed() { - saveCheckpoint(rc, tr, engineID, c.chunkCp) + chunkCpChan <- c.chunkCp finished++ } else { break @@ -1708,14 +1723,19 @@ func (tr *TableRestore) restoreEngine( zap.Uint64("written", totalKVSize), ) - flushAndSaveAllChunks := func(flushCtx context.Context) error { - if err = indexEngine.Flush(flushCtx); err != nil { - return errors.Trace(err) - } - // Currently we write all the checkpoints after data&index engine are flushed. - for _, chunk := range cp.Chunks { - saveCheckpoint(rc, tr, engineID, chunk) + trySavePendingChunks := func(flushCtx context.Context) error { + checkFlushLock.Lock() + cnt := 0 + for _, chunk := range flushPendingChunks { + if chunk.dataStatus.Flushed() && chunk.indexStatus.Flushed() { + saveCheckpoint(rc, tr, engineID, chunk.chunkCp) + cnt++ + } else { + break + } } + flushPendingChunks = flushPendingChunks[cnt:] + checkFlushLock.Unlock() return nil } @@ -1733,7 +1753,7 @@ func (tr *TableRestore) restoreEngine( log.L().Warn("flush all chunk checkpoints failed before manually exits", zap.Error(err2)) return nil, errors.Trace(err) } - if err2 := flushAndSaveAllChunks(context.Background()); err2 != nil { + if err2 := trySavePendingChunks(context.Background()); err2 != nil { log.L().Warn("flush all chunk checkpoints failed before manually exits", zap.Error(err2)) } } @@ -1744,13 +1764,11 @@ func (tr *TableRestore) restoreEngine( // For local backend, if checkpoint is enabled, we must flush index engine to avoid data loss. // this flush action impact up to 10% of the performance, so we only do it if necessary. if err == nil && rc.cfg.Checkpoint.Enable && rc.isLocalBackend() { - if err = flushAndSaveAllChunks(ctx); err != nil { + if err = indexEngine.Flush(ctx); err != nil { return nil, errors.Trace(err) } - - // Currently we write all the checkpoints after data&index engine are flushed. - for _, chunk := range cp.Chunks { - saveCheckpoint(rc, tr, engineID, chunk) + if err = trySavePendingChunks(ctx); err != nil { + return nil, errors.Trace(err) } } rc.saveStatusCheckpoint(tr.tableName, engineID, err, checkpoints.CheckpointStatusClosed) @@ -2461,6 +2479,7 @@ func (cr *chunkRestore) deliverLoop( dataKVs := rc.backend.MakeEmptyRows() indexKVs := rc.backend.MakeEmptyRows() + dataSynced := true for !channelClosed { var dataChecksum, indexChecksum verify.KVChecksum var columns []string @@ -2490,16 +2509,15 @@ func (cr *chunkRestore) deliverLoop( } } - // we are allowed to save checkpoint when the disk quota state moved to "importing" - // since all engines are flushed. - if atomic.LoadInt32(&rc.diskQuotaState) == diskQuotaStateImporting { - saveCheckpoint(rc, t, engineID, cr.chunk) - } - err = func() error { rc.diskQuotaLock.RLock() defer rc.diskQuotaLock.RUnlock() + // try to update chunk checkpoint, this can help save checkpoint after importing when disk-quota is triggered + if !dataSynced { + dataSynced = cr.maybeSaveCheckpoint(rc, t, engineID, cr.chunk, dataEngine, indexEngine) + } + // Write KVs into the engine start := time.Now() @@ -2529,6 +2547,7 @@ func (cr *chunkRestore) deliverLoop( if err != nil { return } + dataSynced = false dataKVs = dataKVs.Clear() indexKVs = indexKVs.Clear() @@ -2541,9 +2560,10 @@ func (cr *chunkRestore) deliverLoop( cr.chunk.Checksum.Add(&indexChecksum) cr.chunk.Chunk.Offset = offset cr.chunk.Chunk.PrevRowIDMax = rowID - if !rc.isLocalBackend() && (dataChecksum.SumKVS() != 0 || indexChecksum.SumKVS() != 0) { + + if dataChecksum.SumKVS() != 0 || indexChecksum.SumKVS() != 0 { // No need to save checkpoint if nothing was delivered. - saveCheckpoint(rc, t, engineID, cr.chunk) + dataSynced = cr.maybeSaveCheckpoint(rc, t, engineID, cr.chunk, dataEngine, indexEngine) } failpoint.Inject("SlowDownWriteRows", func() { deliverLogger.Warn("Slowed down write rows") @@ -2564,6 +2584,20 @@ func (cr *chunkRestore) deliverLoop( return } +func (cr *chunkRestore) maybeSaveCheckpoint( + rc *Controller, + t *TableRestore, + engineID int32, + chunk *checkpoints.ChunkCheckpoint, + data, index *backend.LocalEngineWriter, +) bool { + if data.IsSynced() && index.IsSynced() { + saveCheckpoint(rc, t, engineID, chunk) + return true + } + return false +} + func saveCheckpoint(rc *Controller, t *TableRestore, engineID int32, chunk *checkpoints.ChunkCheckpoint) { // We need to update the AllocBase every time we've finished a file. // The AllocBase is determined by the maximum of the "handle" (_tidb_rowid diff --git a/pkg/lightning/restore/restore_test.go b/pkg/lightning/restore/restore_test.go index 46df2b881..620ebef88 100644 --- a/pkg/lightning/restore/restore_test.go +++ b/pkg/lightning/restore/restore_test.go @@ -1008,6 +1008,7 @@ func (s *chunkRestoreSuite) TestDeliverLoop(c *C) { mockBackend.EXPECT().MakeEmptyRows().Return(kv.MakeRowsFromKvPairs(nil)).AnyTimes() mockWriter := mock.NewMockEngineWriter(controller) mockBackend.EXPECT().LocalWriter(ctx, gomock.Any(), gomock.Any()).Return(mockWriter, nil).AnyTimes() + mockWriter.EXPECT().IsSynced().Return(true).AnyTimes() dataEngine, err := importer.OpenEngine(ctx, &backend.EngineConfig{}, s.tr.tableName, 0, 0) c.Assert(err, IsNil) diff --git a/pkg/mock/backend.go b/pkg/mock/backend.go index 01484eb47..d40fd82d1 100644 --- a/pkg/mock/backend.go +++ b/pkg/mock/backend.go @@ -318,3 +318,17 @@ func (mr *MockEngineWriterMockRecorder) Close(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockEngineWriter)(nil).Close), arg0) } + +// IsSynced mocks base method. +func (m *MockEngineWriter) IsSynced() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsSynced") + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsSynced indicates an expected call of IsSynced. +func (mr *MockEngineWriterMockRecorder) IsSynced() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsSynced", reflect.TypeOf((*MockEngineWriter)(nil).IsSynced)) +} From 02745716cadcbec8d58723868daa917e46c912f5 Mon Sep 17 00:00:00 2001 From: glorv Date: Thu, 6 May 2021 13:27:05 +0800 Subject: [PATCH 3/7] fix a bug --- pkg/lightning/restore/restore.go | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/pkg/lightning/restore/restore.go b/pkg/lightning/restore/restore.go index 7185f655f..c4708372e 100644 --- a/pkg/lightning/restore/restore.go +++ b/pkg/lightning/restore/restore.go @@ -1689,15 +1689,18 @@ func (tr *TableRestore) restoreEngine( if err == nil { metric.ChunkCounter.WithLabelValues(metric.ChunkStateFinished).Add(remainChunkCnt) metric.BytesCounter.WithLabelValues(metric.TableStateWritten).Add(float64(cr.chunk.Checksum.SumSize())) - if dataFlushStatus != nil && !dataFlushStatus.Flushed() && - indexFlushStaus != nil && !indexFlushStaus.Flushed() { - checkFlushLock.Lock() - flushPendingChunks = append(flushPendingChunks, chunkFlushStatus{ - dataStatus: dataFlushStatus, - indexStatus: indexFlushStaus, - chunkCp: cr.chunk, - }) - checkFlushLock.Unlock() + if dataFlushStatus != nil && indexFlushStaus != nil { + if dataFlushStatus.Flushed() && indexFlushStaus.Flushed() { + saveCheckpoint(rc, tr, engineID, cr.chunk) + } else { + checkFlushLock.Lock() + flushPendingChunks = append(flushPendingChunks, chunkFlushStatus{ + dataStatus: dataFlushStatus, + indexStatus: indexFlushStaus, + chunkCp: cr.chunk, + }) + checkFlushLock.Unlock() + } } } else { metric.ChunkCounter.WithLabelValues(metric.ChunkStateFailed).Add(remainChunkCnt) From d911fa3d7d1e7f652cd6d710fde1613dd91a676f Mon Sep 17 00:00:00 2001 From: glorv Date: Thu, 6 May 2021 16:02:12 +0800 Subject: [PATCH 4/7] fix batch count at flush --- pkg/lightning/backend/local/local.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/lightning/backend/local/local.go b/pkg/lightning/backend/local/local.go index 1f0f533cd..69d2124cf 100644 --- a/pkg/lightning/backend/local/local.go +++ b/pkg/lightning/backend/local/local.go @@ -2542,6 +2542,7 @@ func (w *Writer) flush(ctx context.Context) error { return errors.Trace(err) } w.writer = nil + w.batchCount = 0 if meta != nil && meta.totalSize > 0 { return w.addSST(ctx, meta) } From 0375f6ae04b496a1ce8c263b3a37b7dccc6b5730 Mon Sep 17 00:00:00 2001 From: glorv Date: Fri, 7 May 2021 11:20:01 +0800 Subject: [PATCH 5/7] make next_seq unatomic --- pkg/lightning/backend/local/local.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/lightning/backend/local/local.go b/pkg/lightning/backend/local/local.go index 69d2124cf..06f93ee6b 100644 --- a/pkg/lightning/backend/local/local.go +++ b/pkg/lightning/backend/local/local.go @@ -168,7 +168,7 @@ type File struct { // sst seq lock seqLock sync.Mutex // seq number for incoming sst meta - nextSeq atomic.Int32 + nextSeq int32 // max seq of sst metas ingested into pebble finishedMetaSeq atomic.Int32 @@ -576,7 +576,8 @@ func (e *File) addSST(ctx context.Context, m *sstMeta) (int32, error) { // make sure sstMeta is sent into the chan in order e.seqLock.Lock() defer e.seqLock.Unlock() - seq := e.nextSeq.Add(1) + e.nextSeq++ + seq := e.nextSeq m.seq = seq select { case e.sstMetasChan <- metaOrFlush{meta: m}: From c1d85b953a78fc30d730d620eadb9ba5db6d629e Mon Sep 17 00:00:00 2001 From: glorv Date: Fri, 7 May 2021 19:53:48 +0800 Subject: [PATCH 6/7] add some comments --- pkg/lightning/backend/local/local.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/lightning/backend/local/local.go b/pkg/lightning/backend/local/local.go index 06f93ee6b..ca63ffdc3 100644 --- a/pkg/lightning/backend/local/local.go +++ b/pkg/lightning/backend/local/local.go @@ -334,8 +334,12 @@ func (e *File) unlock() { } type metaSeq struct { + // the sequence for this flush message, a flush call can return only if + // all the other flush will lower `flushSeq` are done flushSeq int32 - metaSeq int32 + // the max sstMeta sequence number in this flush, after the flush is done (all SSTs are ingested), + // we can save chunks will a lower meta sequence number safely. + metaSeq int32 } type metaSeqHeap struct { From 44c7e3535828a4c42ee92b7a084b19b8d47cc7a7 Mon Sep 17 00:00:00 2001 From: glorv Date: Wed, 12 May 2021 15:08:50 +0800 Subject: [PATCH 7/7] fix a meta seq is not in order bug --- pkg/lightning/backend/local/local.go | 7 +++++-- pkg/lightning/backend/local/local_test.go | 19 +++++++++++++++++-- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/pkg/lightning/backend/local/local.go b/pkg/lightning/backend/local/local.go index ca63ffdc3..1edd8c2db 100644 --- a/pkg/lightning/backend/local/local.go +++ b/pkg/lightning/backend/local/local.go @@ -427,6 +427,8 @@ func (e *File) ingestSSTLoop() { } ingestMetas = []*sstMeta{newMeta} } + // batchIngestSSTs will change ingestMetas' order, so we record the max seq here + metasMaxSeq := ingestMetas[len(ingestMetas)-1].seq if err := e.batchIngestSSTs(ingestMetas); err != nil { e.setError(err) @@ -436,7 +438,7 @@ func (e *File) ingestSSTLoop() { finSeq := finishedSeq.Load() if metas.seq == finSeq+1 { finSeq = metas.seq - finMetaSeq := ingestMetas[len(ingestMetas)-1].seq + finMetaSeq := metasMaxSeq for len(inSyncSeqs.arr) > 0 { if inSyncSeqs.arr[0].flushSeq == finSeq+1 { finSeq++ @@ -463,7 +465,7 @@ func (e *File) ingestSSTLoop() { c <- struct{}{} } } else { - heap.Push(inSyncSeqs, metaSeq{flushSeq: metas.seq, metaSeq: ingestMetas[len(ingestMetas)-1].seq}) + heap.Push(inSyncSeqs, metaSeq{flushSeq: metas.seq, metaSeq: metasMaxSeq}) seqLock.Unlock() } } @@ -639,6 +641,7 @@ func (e *File) ingestSSTs(metas []*sstMeta) error { log.L().Info("write data to local DB", zap.Int64("size", totalSize), zap.Int64("kvs", totalCount), + zap.Int("files", len(metas)), zap.Int64("sstFileSize", fileSize), zap.String("file", metas[0].path), logutil.Key("firstKey", metas[0].minKey), diff --git a/pkg/lightning/backend/local/local_test.go b/pkg/lightning/backend/local/local_test.go index 59e60c084..24fa94a2b 100644 --- a/pkg/lightning/backend/local/local_test.go +++ b/pkg/lightning/backend/local/local_test.go @@ -513,8 +513,13 @@ func (i testIngester) mergeSSTs(metas []*sstMeta, dir string) (*sstMeta, error) } else if len(metas) == 1 { return metas[0], nil } + if metas[len(metas)-1].seq-metas[0].seq != int32(len(metas)-1) { + panic("metas is not add in order") + } - newMeta := &sstMeta{} + newMeta := &sstMeta{ + seq: metas[len(metas)-1].seq, + } for _, m := range metas { newMeta.totalSize += m.totalSize newMeta.totalCount += m.totalCount @@ -567,15 +572,18 @@ func (s *localSuite) TestLocalIngestLoop(c *C) { totalSize := int64(0) concurrency := 4 count := 500 + var metaSeqLock sync.Mutex + maxMetaSeq := int32(0) for i := 0; i < concurrency; i++ { go func() { defer wg.Done() flushCnt := rand.Int31n(10) + 1 + seq := int32(0) for i := 0; i < count; i++ { size := int64(rand.Int31n(50) + 1) m := &sstMeta{totalSize: size, totalCount: 1} atomic.AddInt64(&totalSize, size) - _, err := f.addSST(engineCtx, m) + metaSeq, err := f.addSST(engineCtx, m) c.Assert(err, IsNil) if int32(i) >= flushCnt { f.mutex.RLock() @@ -584,7 +592,13 @@ func (s *localSuite) TestLocalIngestLoop(c *C) { f.mutex.RUnlock() flushCnt += rand.Int31n(10) + 1 } + seq = metaSeq + } + metaSeqLock.Lock() + if atomic.LoadInt32(&maxMetaSeq) < seq { + atomic.StoreInt32(&maxMetaSeq, seq) } + metaSeqLock.Unlock() }() } wg.Wait() @@ -599,6 +613,7 @@ func (s *localSuite) TestLocalIngestLoop(c *C) { c.Assert(f.ingestErr.Get(), IsNil) c.Assert(totalSize, Equals, f.TotalSize.Load()) c.Assert(f.Length.Load(), Equals, int64(concurrency*count)) + c.Assert(f.finishedMetaSeq.Load(), Equals, atomic.LoadInt32(&maxMetaSeq)) } func (s *localSuite) TestCheckRequirementsTiFlash(c *C) {