From 5a93d02515a20be8a38c1744bf4e58611f387e54 Mon Sep 17 00:00:00 2001 From: glorv Date: Fri, 30 Apr 2021 18:02:16 +0800 Subject: [PATCH] save chunk checkpoint timely --- pkg/lightning/backend/backend.go | 8 +- pkg/lightning/backend/backend_test.go | 16 ++-- pkg/lightning/backend/importer/importer.go | 4 +- .../backend/importer/importer_test.go | 3 +- pkg/lightning/backend/local/local.go | 84 ++++++++++++++----- pkg/lightning/backend/local/local_test.go | 5 +- pkg/lightning/backend/noop/noop.go | 4 +- pkg/lightning/backend/tidb/tidb.go | 4 +- pkg/lightning/backend/tidb/tidb_test.go | 8 +- pkg/lightning/restore/restore.go | 44 +++++++++- pkg/mock/backend.go | 7 +- 11 files changed, 138 insertions(+), 49 deletions(-) diff --git a/pkg/lightning/backend/backend.go b/pkg/lightning/backend/backend.go index e28d94793..b3ad44465 100644 --- a/pkg/lightning/backend/backend.go +++ b/pkg/lightning/backend/backend.go @@ -375,7 +375,7 @@ func (w *LocalEngineWriter) WriteRows(ctx context.Context, columnNames []string, return w.writer.AppendRows(ctx, w.tableName, columnNames, w.ts, rows) } -func (w *LocalEngineWriter) Close(ctx context.Context) error { +func (w *LocalEngineWriter) Close(ctx context.Context) (ChunkFlushStatus, error) { return w.writer.Close(ctx) } @@ -442,6 +442,10 @@ func (engine *ClosedEngine) Logger() log.Logger { return engine.logger } +type ChunkFlushStatus interface { + Flushed() bool +} + type EngineWriter interface { AppendRows( ctx context.Context, @@ -450,5 +454,5 @@ type EngineWriter interface { commitTS uint64, rows kv.Rows, ) error - Close(ctx context.Context) error + Close(ctx context.Context) (ChunkFlushStatus, error) } diff --git a/pkg/lightning/backend/backend_test.go b/pkg/lightning/backend/backend_test.go index 48fd65d78..ed5c1a31a 100644 --- a/pkg/lightning/backend/backend_test.go +++ b/pkg/lightning/backend/backend_test.go @@ -140,7 +140,7 @@ func (s *backendSuite) TestWriteEngine(c *C) { mockWriter.EXPECT(). AppendRows(ctx, "`db`.`table`", []string{"c1", "c2"}, gomock.Any(), rows1). Return(nil) - mockWriter.EXPECT().Close(ctx).Return(nil).AnyTimes() + mockWriter.EXPECT().Close(ctx).Return(nil, nil).AnyTimes() mockWriter.EXPECT(). AppendRows(ctx, "`db`.`table`", []string{"c1", "c2"}, gomock.Any(), rows2). Return(nil) @@ -153,7 +153,7 @@ func (s *backendSuite) TestWriteEngine(c *C) { c.Assert(err, IsNil) err = writer.WriteRows(ctx, []string{"c1", "c2"}, rows2) c.Assert(err, IsNil) - err = writer.Close(ctx) + _, err = writer.Close(ctx) c.Assert(err, IsNil) } @@ -167,7 +167,7 @@ func (s *backendSuite) TestWriteToEngineWithNothing(c *C) { s.mockBackend.EXPECT().OpenEngine(ctx, &backend.EngineConfig{}, gomock.Any()).Return(nil) mockWriter.EXPECT().AppendRows(ctx, gomock.Any(), gomock.Any(), gomock.Any(), emptyRows).Return(nil) - mockWriter.EXPECT().Close(ctx).Return(nil) + mockWriter.EXPECT().Close(ctx).Return(nil, nil) s.mockBackend.EXPECT().LocalWriter(ctx, &backend.LocalWriterConfig{}, gomock.Any()).Return(mockWriter, nil) engine, err := s.backend.OpenEngine(ctx, &backend.EngineConfig{}, "`db`.`table`", 1, s.ts) @@ -176,7 +176,7 @@ func (s *backendSuite) TestWriteToEngineWithNothing(c *C) { c.Assert(err, IsNil) err = writer.WriteRows(ctx, nil, emptyRows) c.Assert(err, IsNil) - err = writer.Close(ctx) + _, err = writer.Close(ctx) c.Assert(err, IsNil) } @@ -207,7 +207,7 @@ func (s *backendSuite) TestWriteEngineFailed(c *C) { mockWriter.EXPECT(). AppendRows(ctx, gomock.Any(), gomock.Any(), gomock.Any(), rows). Return(errors.Annotate(context.Canceled, "fake unrecoverable write error")) - mockWriter.EXPECT().Close(ctx).Return(nil) + mockWriter.EXPECT().Close(ctx).Return(nil, nil) engine, err := s.backend.OpenEngine(ctx, &backend.EngineConfig{}, "`db`.`table`", 1, s.ts) c.Assert(err, IsNil) @@ -215,7 +215,7 @@ func (s *backendSuite) TestWriteEngineFailed(c *C) { c.Assert(err, IsNil) err = writer.WriteRows(ctx, nil, rows) c.Assert(err, ErrorMatches, "fake unrecoverable write error.*") - err = writer.Close(ctx) + _, err = writer.Close(ctx) c.Assert(err, IsNil) } @@ -233,7 +233,7 @@ func (s *backendSuite) TestWriteBatchSendFailedWithRetry(c *C) { mockWriter.EXPECT().AppendRows(ctx, gomock.Any(), gomock.Any(), gomock.Any(), rows). Return(errors.New("fake recoverable write batch error")). MinTimes(1) - mockWriter.EXPECT().Close(ctx).Return(nil).MinTimes(1) + mockWriter.EXPECT().Close(ctx).Return(nil, nil).MinTimes(1) engine, err := s.backend.OpenEngine(ctx, &backend.EngineConfig{}, "`db`.`table`", 1, s.ts) c.Assert(err, IsNil) @@ -241,7 +241,7 @@ func (s *backendSuite) TestWriteBatchSendFailedWithRetry(c *C) { c.Assert(err, IsNil) err = writer.WriteRows(ctx, nil, rows) c.Assert(err, ErrorMatches, ".*fake recoverable write batch error") - err = writer.Close(ctx) + _, err = writer.Close(ctx) c.Assert(err, IsNil) } diff --git a/pkg/lightning/backend/importer/importer.go b/pkg/lightning/backend/importer/importer.go index 227ed739c..f15d8e77d 100644 --- a/pkg/lightning/backend/importer/importer.go +++ b/pkg/lightning/backend/importer/importer.go @@ -335,8 +335,8 @@ type Writer struct { engineUUID uuid.UUID } -func (w *Writer) Close(ctx context.Context) error { - return nil +func (w *Writer) Close(ctx context.Context) (backend.ChunkFlushStatus, error) { + return nil, nil } func (w *Writer) AppendRows(ctx context.Context, tableName string, columnNames []string, ts uint64, rows kv.Rows) error { diff --git a/pkg/lightning/backend/importer/importer_test.go b/pkg/lightning/backend/importer/importer_test.go index e86df4f5d..0b4cf99de 100644 --- a/pkg/lightning/backend/importer/importer_test.go +++ b/pkg/lightning/backend/importer/importer_test.go @@ -115,8 +115,9 @@ func (s *importerSuite) TestWriteRows(c *C) { c.Assert(err, IsNil) err = writer.WriteRows(s.ctx, nil, s.kvPairs) c.Assert(err, IsNil) - err = writer.Close(s.ctx) + st, err := writer.Close(s.ctx) c.Assert(err, IsNil) + c.Assert(st, IsNil) } func (s *importerSuite) TestWriteHeadSendFailed(c *C) { diff --git a/pkg/lightning/backend/local/local.go b/pkg/lightning/backend/local/local.go index 791eda83b..0645ecf38 100644 --- a/pkg/lightning/backend/local/local.go +++ b/pkg/lightning/backend/local/local.go @@ -165,6 +165,13 @@ type File struct { sstIngester sstIngester finishedRanges syncedRanges + // sst seq lock + seqLock sync.Mutex + // seq number for incoming sst meta + nextSeq atomic.Int32 + // max seq of sst metas ingested into pebble + finishedMetaSeq atomic.Int32 + config backend.LocalEngineConfig // total size of SST files waiting to be ingested @@ -326,27 +333,32 @@ func (e *File) unlock() { e.mutex.Unlock() } -type intHeap struct { - arr []int32 +type metaSeq struct { + flushSeq int32 + metaSeq int32 +} + +type metaSeqHeap struct { + arr []metaSeq } -func (h *intHeap) Len() int { +func (h *metaSeqHeap) Len() int { return len(h.arr) } -func (h *intHeap) Less(i, j int) bool { - return h.arr[i] < h.arr[j] +func (h *metaSeqHeap) Less(i, j int) bool { + return h.arr[i].flushSeq < h.arr[j].flushSeq } -func (h *intHeap) Swap(i, j int) { +func (h *metaSeqHeap) Swap(i, j int) { h.arr[i], h.arr[j] = h.arr[j], h.arr[i] } -func (h *intHeap) Push(x interface{}) { - h.arr = append(h.arr, x.(int32)) +func (h *metaSeqHeap) Push(x interface{}) { + h.arr = append(h.arr, x.(metaSeq)) } -func (h *intHeap) Pop() interface{} { +func (h *metaSeqHeap) Pop() interface{} { item := h.arr[len(h.arr)-1] h.arr = h.arr[:len(h.arr)-1] return item @@ -367,7 +379,7 @@ func (e *File) ingestSSTLoop() { flushQueue := make([]flushSeq, 0) // inSyncSeqs is a heap that stores all the finished compaction tasks whose seq is bigger than `finishedSeq + 1` // this mean there are still at lease one compaction task with a lower seq unfinished. - inSyncSeqs := &intHeap{arr: make([]int32, 0)} + inSyncSeqs := &metaSeqHeap{arr: make([]metaSeq, 0)} type metaAndSeq struct { metas []*sstMeta @@ -420,9 +432,11 @@ func (e *File) ingestSSTLoop() { finSeq := finishedSeq.Load() if metas.seq == finSeq+1 { finSeq = metas.seq + finMetaSeq := ingestMetas[len(ingestMetas)-1].seq for len(inSyncSeqs.arr) > 0 { - if inSyncSeqs.arr[0] == finSeq+1 { + if inSyncSeqs.arr[0].flushSeq == finSeq+1 { finSeq++ + finMetaSeq = inSyncSeqs.arr[0].metaSeq heap.Remove(inSyncSeqs, 0) } else { break @@ -439,12 +453,13 @@ func (e *File) ingestSSTLoop() { } flushQueue = flushQueue[len(flushChans):] finishedSeq.Store(finSeq) + e.finishedMetaSeq.Store(finMetaSeq) seqLock.Unlock() for _, c := range flushChans { c <- struct{}{} } } else { - heap.Push(inSyncSeqs, metas.seq) + heap.Push(inSyncSeqs, metaSeq{flushSeq: metas.seq, metaSeq: ingestMetas[len(ingestMetas)-1].seq}) seqLock.Unlock() } } @@ -555,16 +570,21 @@ readMetaLoop: } } -func (e *File) addSST(ctx context.Context, m *sstMeta) error { +func (e *File) addSST(ctx context.Context, m *sstMeta) (int32, error) { // set pending size after SST file is generated e.pendingFileSize.Add(m.fileSize) + // make sure sstMeta is sent into the chan in order + e.seqLock.Lock() + defer e.seqLock.Unlock() + seq := e.nextSeq.Add(1) + m.seq = seq select { case e.sstMetasChan <- metaOrFlush{meta: m}: case <-ctx.Done(): - return ctx.Err() + return 0, ctx.Err() case <-e.ctx.Done(): } - return e.ingestErr.Get() + return seq, e.ingestErr.Get() } func (e *File) batchIngestSSTs(metas []*sstMeta) error { @@ -2403,6 +2423,7 @@ type sstMeta struct { totalCount int64 // used for calculate disk-quota fileSize int64 + seq int32 } type Writer struct { @@ -2420,6 +2441,8 @@ type Writer struct { kvBuffer *bytesBuffer writer *sstWriter + + lastMetaSeq int32 } func (w *Writer) appendRowsSorted(kvs []common.KvPair) error { @@ -2520,14 +2543,23 @@ func (w *Writer) flush(ctx context.Context) error { } w.writer = nil if meta != nil && meta.totalSize > 0 { - return w.local.addSST(ctx, meta) + return w.addSST(ctx, meta) } } return nil } -func (w *Writer) Close(ctx context.Context) error { +type flushStatus struct { + local *File + seq int32 +} + +func (f flushStatus) Flushed() bool { + return f.seq <= f.local.finishedMetaSeq.Load() +} + +func (w *Writer) Close(ctx context.Context) (backend.ChunkFlushStatus, error) { defer w.kvBuffer.destroy() defer w.local.localWriters.Delete(w) err := w.flush(ctx) @@ -2535,7 +2567,7 @@ func (w *Writer) Close(ctx context.Context) error { // this can resolve the memory consistently increasing issue. // maybe this is a bug related to go GC mechanism. w.writeBatch = nil - return err + return flushStatus{local: w.local, seq: w.lastMetaSeq}, err } func (w *Writer) flushKVs(ctx context.Context) error { @@ -2555,7 +2587,7 @@ func (w *Writer) flushKVs(ctx context.Context) error { if err != nil { return errors.Trace(err) } - err = w.local.addSST(ctx, meta) + err = w.addSST(ctx, meta) if err != nil { return errors.Trace(err) } @@ -2567,6 +2599,15 @@ func (w *Writer) flushKVs(ctx context.Context) error { return nil } +func (w *Writer) addSST(ctx context.Context, meta *sstMeta) error { + seq, err := w.local.addSST(ctx, meta) + if err != nil { + return err + } + w.lastMetaSeq = seq + return nil +} + func (w *Writer) createSSTWriter() (*sstWriter, error) { path := filepath.Join(w.local.sstDir, uuid.New().String()+".sst") writer, err := newSSTWriter(path) @@ -2731,10 +2772,13 @@ func (i dbSSTIngester) mergeSSTs(metas []*sstMeta, dir string) (*sstMeta, error) } start := time.Now() - newMeta := &sstMeta{} + newMeta := &sstMeta{ + seq: metas[len(metas)-1].seq, + } mergeIter := &sstIterHeap{ iters: make([]*sstIter, 0, len(metas)), } + for _, p := range metas { f, err := os.Open(p.path) if err != nil { diff --git a/pkg/lightning/backend/local/local_test.go b/pkg/lightning/backend/local/local_test.go index d607e31c5..59e60c084 100644 --- a/pkg/lightning/backend/local/local_test.go +++ b/pkg/lightning/backend/local/local_test.go @@ -389,9 +389,10 @@ func testLocalWriter(c *C, needSort bool, partitialSort bool) { c.Assert(err, IsNil) err = w.AppendRows(ctx, "", []string{}, 1, kv.MakeRowsFromKvPairs(rows3)) c.Assert(err, IsNil) - err = w.Close(context.Background()) + flushStatus, err := w.Close(context.Background()) c.Assert(err, IsNil) c.Assert(f.flushEngineWithoutLock(ctx), IsNil) + c.Assert(flushStatus.Flushed(), IsTrue) o := &pebble.IterOptions{} it := db.NewIter(o) @@ -574,7 +575,7 @@ func (s *localSuite) TestLocalIngestLoop(c *C) { size := int64(rand.Int31n(50) + 1) m := &sstMeta{totalSize: size, totalCount: 1} atomic.AddInt64(&totalSize, size) - err := f.addSST(engineCtx, m) + _, err := f.addSST(engineCtx, m) c.Assert(err, IsNil) if int32(i) >= flushCnt { f.mutex.RLock() diff --git a/pkg/lightning/backend/noop/noop.go b/pkg/lightning/backend/noop/noop.go index 03f1fb297..650abde90 100644 --- a/pkg/lightning/backend/noop/noop.go +++ b/pkg/lightning/backend/noop/noop.go @@ -163,6 +163,6 @@ func (w noopWriter) AppendRows(context.Context, string, []string, uint64, kv.Row return nil } -func (w noopWriter) Close(context.Context) error { - return nil +func (w noopWriter) Close(context.Context) (backend.ChunkFlushStatus, error) { + return nil, nil } diff --git a/pkg/lightning/backend/tidb/tidb.go b/pkg/lightning/backend/tidb/tidb.go index e4513b5c2..22727cd6a 100644 --- a/pkg/lightning/backend/tidb/tidb.go +++ b/pkg/lightning/backend/tidb/tidb.go @@ -596,8 +596,8 @@ type Writer struct { engineUUID uuid.UUID } -func (w *Writer) Close(ctx context.Context) error { - return nil +func (w *Writer) Close(ctx context.Context) (backend.ChunkFlushStatus, error) { + return nil, nil } func (w *Writer) AppendRows(ctx context.Context, tableName string, columnNames []string, arg1 uint64, rows kv.Rows) error { diff --git a/pkg/lightning/backend/tidb/tidb_test.go b/pkg/lightning/backend/tidb/tidb_test.go index e4021941f..6ecda132a 100644 --- a/pkg/lightning/backend/tidb/tidb_test.go +++ b/pkg/lightning/backend/tidb/tidb_test.go @@ -124,8 +124,9 @@ func (s *mysqlSuite) TestWriteRowsReplaceOnDup(c *C) { c.Assert(err, IsNil) err = writer.WriteRows(ctx, []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o"}, dataRows) c.Assert(err, IsNil) - err = writer.Close(ctx) + st, err := writer.Close(ctx) c.Assert(err, IsNil) + c.Assert(st, IsNil) } func (s *mysqlSuite) TestWriteRowsIgnoreOnDup(c *C) { @@ -157,7 +158,7 @@ func (s *mysqlSuite) TestWriteRowsIgnoreOnDup(c *C) { c.Assert(err, IsNil) err = writer.WriteRows(ctx, []string{"a"}, dataRows) c.Assert(err, IsNil) - err = writer.Close(ctx) + _, err = writer.Close(ctx) c.Assert(err, IsNil) // test encode rows with _tidb_rowid @@ -202,8 +203,9 @@ func (s *mysqlSuite) TestWriteRowsErrorOnDup(c *C) { c.Assert(err, IsNil) err = writer.WriteRows(ctx, []string{"a"}, dataRows) c.Assert(err, IsNil) - err = writer.Close(ctx) + st, err := writer.Close(ctx) c.Assert(err, IsNil) + c.Assert(st, IsNil) } // TODO: temporarily disable this test before we fix strict mode diff --git a/pkg/lightning/restore/restore.go b/pkg/lightning/restore/restore.go index 06057ea22..1f40966f3 100644 --- a/pkg/lightning/restore/restore.go +++ b/pkg/lightning/restore/restore.go @@ -1566,9 +1566,9 @@ func (tr *TableRestore) restoreEngine( // if the key are ordered, LocalWrite can optimize the writing. // table has auto-incremented _tidb_rowid must satisfy following restrictions: // - clustered index disable and primary key is not number - // - no auto random bits (auto random or shard rowid) + // - no auto random bits (auto random or shard row id) // - no partition table - // - no explicit _tidb_rowid field (A this time we can't determine if the soure file contains _tidb_rowid field, + // - no explicit _tidb_rowid field (A this time we can't determine if the source files contain _tidb_rowid field, // so we will do this check in LocalWriter when the first row is received.) hasAutoIncrementAutoID := common.TableHasAutoRowID(tr.tableInfo.Core) && tr.tableInfo.Core.AutoRandomBits == 0 && tr.tableInfo.Core.ShardRowIDBits == 0 && @@ -1586,12 +1586,37 @@ func (tr *TableRestore) restoreEngine( var wg sync.WaitGroup var chunkErr common.OnceError + type chunkFlushStatus struct { + dataStatus backend.ChunkFlushStatus + indexStatus backend.ChunkFlushStatus + chunkCp *checkpoints.ChunkCheckpoint + } + + // chunks that are finished writing, but checkpoints are not finished due to flush not finished. + var checkFlushLock sync.Mutex + flushPendingChunks := make([]chunkFlushStatus, 0, 16) + // Restore table data for chunkIndex, chunk := range cp.Chunks { if chunk.Chunk.Offset >= chunk.Chunk.EndOffset { continue } + checkFlushLock.Lock() + finished := 0 + for _, c := range flushPendingChunks { + if c.indexStatus.Flushed() && c.dataStatus.Flushed() { + saveCheckpoint(rc, tr, engineID, c.chunkCp) + finished++ + } else { + break + } + } + if finished > 0 { + flushPendingChunks = flushPendingChunks[finished:] + } + checkFlushLock.Unlock() + select { case <-ctx.Done(): return nil, ctx.Err() @@ -1639,15 +1664,26 @@ func (tr *TableRestore) restoreEngine( }() metric.ChunkCounter.WithLabelValues(metric.ChunkStateRunning).Add(remainChunkCnt) err := cr.restore(ctx, tr, engineID, dataWriter, indexWriter, rc) + var dataFlushStatus, indexFlushStaus backend.ChunkFlushStatus if err == nil { - err = dataWriter.Close(ctx) + dataFlushStatus, err = dataWriter.Close(ctx) } if err == nil { - err = indexWriter.Close(ctx) + indexFlushStaus, err = indexWriter.Close(ctx) } if err == nil { metric.ChunkCounter.WithLabelValues(metric.ChunkStateFinished).Add(remainChunkCnt) metric.BytesCounter.WithLabelValues(metric.TableStateWritten).Add(float64(cr.chunk.Checksum.SumSize())) + if dataFlushStatus != nil && !dataFlushStatus.Flushed() && + indexFlushStaus != nil && !indexFlushStaus.Flushed() { + checkFlushLock.Lock() + flushPendingChunks = append(flushPendingChunks, chunkFlushStatus{ + dataStatus: dataFlushStatus, + indexStatus: indexFlushStaus, + chunkCp: cr.chunk, + }) + checkFlushLock.Unlock() + } } else { metric.ChunkCounter.WithLabelValues(metric.ChunkStateFailed).Add(remainChunkCnt) chunkErr.Set(err) diff --git a/pkg/mock/backend.go b/pkg/mock/backend.go index 7351cd5e9..01484eb47 100644 --- a/pkg/mock/backend.go +++ b/pkg/mock/backend.go @@ -305,11 +305,12 @@ func (mr *MockEngineWriterMockRecorder) AppendRows(arg0, arg1, arg2, arg3, arg4 } // Close mocks base method. -func (m *MockEngineWriter) Close(arg0 context.Context) error { +func (m *MockEngineWriter) Close(arg0 context.Context) (backend.ChunkFlushStatus, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Close", arg0) - ret0, _ := ret[0].(error) - return ret0 + ret0, _ := ret[0].(backend.ChunkFlushStatus) + ret1, _ := ret[1].(error) + return ret0, ret1 } // Close indicates an expected call of Close.