From ee6de9cc7e30e58a31d910f45114afdcbc24c4eb Mon Sep 17 00:00:00 2001 From: glorv Date: Mon, 7 Jun 2021 22:34:28 +0800 Subject: [PATCH] This is an automated cherry-pick of #1080 Signed-off-by: ti-chi-bot --- pkg/lightning/backend/backend.go | 18 + pkg/lightning/backend/backend_test.go | 33 ++ pkg/lightning/backend/importer/importer.go | 9 + .../backend/importer/importer_test.go | 5 + pkg/lightning/backend/local/local.go | 534 ++++++++++++++++++ pkg/lightning/backend/local/local_test.go | 121 ++++ pkg/lightning/backend/noop/noop.go | 9 + pkg/lightning/backend/tidb/tidb.go | 68 +++ pkg/lightning/backend/tidb/tidb_test.go | 14 + pkg/lightning/restore/restore.go | 139 ++++- pkg/lightning/restore/restore_test.go | 5 + pkg/mock/backend.go | 23 + 12 files changed, 966 insertions(+), 12 deletions(-) diff --git a/pkg/lightning/backend/backend.go b/pkg/lightning/backend/backend.go index 32378d406..f2f17cded 100644 --- a/pkg/lightning/backend/backend.go +++ b/pkg/lightning/backend/backend.go @@ -353,8 +353,17 @@ func (w *LocalEngineWriter) WriteRows(ctx context.Context, columnNames []string, return w.writer.AppendRows(ctx, w.tableName, columnNames, w.ts, rows) } +<<<<<<< HEAD func (w *LocalEngineWriter) Close() error { return w.writer.Close() +======= +func (w *LocalEngineWriter) Close(ctx context.Context) (ChunkFlushStatus, error) { + return w.writer.Close(ctx) +>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080)) +} + +func (w *LocalEngineWriter) IsSynced() bool { + return w.writer.IsSynced() } // UnsafeCloseEngine closes the engine without first opening it. @@ -420,6 +429,10 @@ func (engine *ClosedEngine) Logger() log.Logger { return engine.logger } +type ChunkFlushStatus interface { + Flushed() bool +} + type EngineWriter interface { AppendRows( ctx context.Context, @@ -428,5 +441,10 @@ type EngineWriter interface { commitTS uint64, rows kv.Rows, ) error +<<<<<<< HEAD Close() error +======= + IsSynced() bool + Close(ctx context.Context) (ChunkFlushStatus, error) +>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080)) } diff --git a/pkg/lightning/backend/backend_test.go b/pkg/lightning/backend/backend_test.go index 0a09a12a6..42c585e23 100644 --- a/pkg/lightning/backend/backend_test.go +++ b/pkg/lightning/backend/backend_test.go @@ -138,6 +138,10 @@ func (s *backendSuite) TestWriteEngine(c *C) { mockWriter.EXPECT(). AppendRows(ctx, "`db`.`table`", []string{"c1", "c2"}, gomock.Any(), rows1). Return(nil) +<<<<<<< HEAD +======= + mockWriter.EXPECT().Close(ctx).Return(nil, nil).AnyTimes() +>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080)) mockWriter.EXPECT(). AppendRows(ctx, "`db`.`table`", []string{"c1", "c2"}, gomock.Any(), rows2). Return(nil) @@ -151,7 +155,11 @@ func (s *backendSuite) TestWriteEngine(c *C) { c.Assert(err, IsNil) err = writer.WriteRows(ctx, []string{"c1", "c2"}, rows2) c.Assert(err, IsNil) +<<<<<<< HEAD err = writer.Close() +======= + _, err = writer.Close(ctx) +>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080)) c.Assert(err, IsNil) } @@ -165,8 +173,13 @@ func (s *backendSuite) TestWriteToEngineWithNothing(c *C) { s.mockBackend.EXPECT().OpenEngine(ctx, gomock.Any()).Return(nil) mockWriter.EXPECT().AppendRows(ctx, gomock.Any(), gomock.Any(), gomock.Any(), emptyRows).Return(nil) +<<<<<<< HEAD mockWriter.EXPECT().Close().Return(nil) s.mockBackend.EXPECT().LocalWriter(ctx, gomock.Any()).Return(mockWriter, nil) +======= + mockWriter.EXPECT().Close(ctx).Return(nil, nil) + s.mockBackend.EXPECT().LocalWriter(ctx, &backend.LocalWriterConfig{}, gomock.Any()).Return(mockWriter, nil) +>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080)) engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1, s.ts) c.Assert(err, IsNil) @@ -174,7 +187,11 @@ func (s *backendSuite) TestWriteToEngineWithNothing(c *C) { c.Assert(err, IsNil) err = writer.WriteRows(ctx, nil, emptyRows) c.Assert(err, IsNil) +<<<<<<< HEAD err = writer.Close() +======= + _, err = writer.Close(ctx) +>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080)) c.Assert(err, IsNil) } @@ -204,7 +221,11 @@ func (s *backendSuite) TestWriteEngineFailed(c *C) { mockWriter.EXPECT(). AppendRows(ctx, gomock.Any(), gomock.Any(), gomock.Any(), rows). Return(errors.Annotate(context.Canceled, "fake unrecoverable write error")) +<<<<<<< HEAD mockWriter.EXPECT().Close().Return(nil) +======= + mockWriter.EXPECT().Close(ctx).Return(nil, nil) +>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080)) engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1, s.ts) c.Assert(err, IsNil) @@ -212,7 +233,11 @@ func (s *backendSuite) TestWriteEngineFailed(c *C) { c.Assert(err, IsNil) err = writer.WriteRows(ctx, nil, rows) c.Assert(err, ErrorMatches, "fake unrecoverable write error.*") +<<<<<<< HEAD err = writer.Close() +======= + _, err = writer.Close(ctx) +>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080)) c.Assert(err, IsNil) } @@ -229,7 +254,11 @@ func (s *backendSuite) TestWriteBatchSendFailedWithRetry(c *C) { mockWriter.EXPECT().AppendRows(ctx, gomock.Any(), gomock.Any(), gomock.Any(), rows). Return(errors.New("fake recoverable write batch error")). MinTimes(1) +<<<<<<< HEAD mockWriter.EXPECT().Close().Return(nil).MinTimes(1) +======= + mockWriter.EXPECT().Close(ctx).Return(nil, nil).MinTimes(1) +>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080)) engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1, s.ts) c.Assert(err, IsNil) @@ -237,7 +266,11 @@ func (s *backendSuite) TestWriteBatchSendFailedWithRetry(c *C) { c.Assert(err, IsNil) err = writer.WriteRows(ctx, nil, rows) c.Assert(err, ErrorMatches, ".*fake recoverable write batch error") +<<<<<<< HEAD err = writer.Close() +======= + _, err = writer.Close(ctx) +>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080)) c.Assert(err, IsNil) } diff --git a/pkg/lightning/backend/importer/importer.go b/pkg/lightning/backend/importer/importer.go index 0222b3694..79038a268 100644 --- a/pkg/lightning/backend/importer/importer.go +++ b/pkg/lightning/backend/importer/importer.go @@ -335,10 +335,19 @@ type Writer struct { engineUUID uuid.UUID } +<<<<<<< HEAD func (w *Writer) Close() error { return nil +======= +func (w *Writer) Close(ctx context.Context) (backend.ChunkFlushStatus, error) { + return nil, nil +>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080)) } func (w *Writer) AppendRows(ctx context.Context, tableName string, columnNames []string, ts uint64, rows kv.Rows) error { return w.importer.WriteRows(ctx, w.engineUUID, tableName, columnNames, ts, rows) } + +func (w *Writer) IsSynced() bool { + return true +} diff --git a/pkg/lightning/backend/importer/importer_test.go b/pkg/lightning/backend/importer/importer_test.go index aaf0f5262..7e94bcdbc 100644 --- a/pkg/lightning/backend/importer/importer_test.go +++ b/pkg/lightning/backend/importer/importer_test.go @@ -115,8 +115,13 @@ func (s *importerSuite) TestWriteRows(c *C) { c.Assert(err, IsNil) err = writer.WriteRows(s.ctx, nil, s.kvPairs) c.Assert(err, IsNil) +<<<<<<< HEAD err = writer.Close() +======= + st, err := writer.Close(s.ctx) +>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080)) c.Assert(err, IsNil) + c.Assert(st, IsNil) } func (s *importerSuite) TestWriteHeadSendFailed(c *C) { diff --git a/pkg/lightning/backend/local/local.go b/pkg/lightning/backend/local/local.go index 0d6f48146..70220996c 100644 --- a/pkg/lightning/backend/local/local.go +++ b/pkg/lightning/backend/local/local.go @@ -152,7 +152,44 @@ type File struct { // isImportingAtomic is an atomic variable indicating whether this engine is importing. // This should not be used as a "spin lock" indicator. isImportingAtomic atomic.Uint32 +<<<<<<< HEAD mutex sync.Mutex +======= + // flush and ingest sst hold the rlock, other operation hold the wlock. + mutex sync.RWMutex + + ctx context.Context + cancel context.CancelFunc + sstDir string + sstMetasChan chan metaOrFlush + ingestErr common.OnceError + wg sync.WaitGroup + sstIngester sstIngester + finishedRanges syncedRanges + + // sst seq lock + seqLock sync.Mutex + // seq number for incoming sst meta + nextSeq int32 + // max seq of sst metas ingested into pebble + finishedMetaSeq atomic.Int32 + + config backend.LocalEngineConfig + + // total size of SST files waiting to be ingested + pendingFileSize atomic.Int64 + + // statistics for pebble kv iter. + importedKVSize atomic.Int64 + importedKVCount atomic.Int64 +} + +func (e *File) setError(err error) { + if err != nil { + e.ingestErr.Set(err) + e.cancel() + } +>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080)) } func (e *File) Close() error { @@ -257,6 +294,339 @@ func (e *File) unlock() { e.mutex.Unlock() } +<<<<<<< HEAD +======= +type metaSeq struct { + // the sequence for this flush message, a flush call can return only if + // all the other flush will lower `flushSeq` are done + flushSeq int32 + // the max sstMeta sequence number in this flush, after the flush is done (all SSTs are ingested), + // we can save chunks will a lower meta sequence number safely. + metaSeq int32 +} + +type metaSeqHeap struct { + arr []metaSeq +} + +func (h *metaSeqHeap) Len() int { + return len(h.arr) +} + +func (h *metaSeqHeap) Less(i, j int) bool { + return h.arr[i].flushSeq < h.arr[j].flushSeq +} + +func (h *metaSeqHeap) Swap(i, j int) { + h.arr[i], h.arr[j] = h.arr[j], h.arr[i] +} + +func (h *metaSeqHeap) Push(x interface{}) { + h.arr = append(h.arr, x.(metaSeq)) +} + +func (h *metaSeqHeap) Pop() interface{} { + item := h.arr[len(h.arr)-1] + h.arr = h.arr[:len(h.arr)-1] + return item +} + +func (e *File) ingestSSTLoop() { + defer e.wg.Done() + + type flushSeq struct { + seq int32 + ch chan struct{} + } + + seq := atomic.NewInt32(0) + finishedSeq := atomic.NewInt32(0) + var seqLock sync.Mutex + // a flush is finished iff all the compaction&ingest tasks with a lower seq number are finished. + flushQueue := make([]flushSeq, 0) + // inSyncSeqs is a heap that stores all the finished compaction tasks whose seq is bigger than `finishedSeq + 1` + // this mean there are still at lease one compaction task with a lower seq unfinished. + inSyncSeqs := &metaSeqHeap{arr: make([]metaSeq, 0)} + + type metaAndSeq struct { + metas []*sstMeta + seq int32 + } + + concurrency := e.config.CompactConcurrency + // when compaction is disabled, ingest is an serial action, so 1 routine is enough + if !e.config.Compact { + concurrency = 1 + } + metaChan := make(chan metaAndSeq, concurrency) + for i := 0; i < concurrency; i++ { + e.wg.Add(1) + go func() { + defer e.wg.Done() + defer func() { + if e.ingestErr.Get() != nil { + seqLock.Lock() + for _, f := range flushQueue { + f.ch <- struct{}{} + } + flushQueue = flushQueue[:0] + seqLock.Unlock() + } + }() + for { + select { + case <-e.ctx.Done(): + return + case metas, ok := <-metaChan: + if !ok { + return + } + ingestMetas := metas.metas + if e.config.Compact { + newMeta, err := e.sstIngester.mergeSSTs(metas.metas, e.sstDir) + if err != nil { + e.setError(err) + return + } + ingestMetas = []*sstMeta{newMeta} + } + // batchIngestSSTs will change ingestMetas' order, so we record the max seq here + metasMaxSeq := ingestMetas[len(ingestMetas)-1].seq + + if err := e.batchIngestSSTs(ingestMetas); err != nil { + e.setError(err) + return + } + seqLock.Lock() + finSeq := finishedSeq.Load() + if metas.seq == finSeq+1 { + finSeq = metas.seq + finMetaSeq := metasMaxSeq + for len(inSyncSeqs.arr) > 0 { + if inSyncSeqs.arr[0].flushSeq == finSeq+1 { + finSeq++ + finMetaSeq = inSyncSeqs.arr[0].metaSeq + heap.Remove(inSyncSeqs, 0) + } else { + break + } + } + + var flushChans []chan struct{} + for _, seq := range flushQueue { + if seq.seq <= finSeq { + flushChans = append(flushChans, seq.ch) + } else { + break + } + } + flushQueue = flushQueue[len(flushChans):] + finishedSeq.Store(finSeq) + e.finishedMetaSeq.Store(finMetaSeq) + seqLock.Unlock() + for _, c := range flushChans { + c <- struct{}{} + } + } else { + heap.Push(inSyncSeqs, metaSeq{flushSeq: metas.seq, metaSeq: metasMaxSeq}) + seqLock.Unlock() + } + } + } + }() + } + + compactAndIngestSSTs := func(metas []*sstMeta) { + if len(metas) > 0 { + seqLock.Lock() + metaSeq := seq.Add(1) + seqLock.Unlock() + select { + case <-e.ctx.Done(): + case metaChan <- metaAndSeq{metas: metas, seq: metaSeq}: + } + } + } + + pendingMetas := make([]*sstMeta, 0, 16) + totalSize := int64(0) + metasTmp := make([]*sstMeta, 0) + addMetas := func() { + if len(metasTmp) == 0 { + return + } + metas := metasTmp + metasTmp = make([]*sstMeta, 0, len(metas)) + if !e.config.Compact { + compactAndIngestSSTs(metas) + return + } + for _, m := range metas { + if m.totalCount > 0 { + pendingMetas = append(pendingMetas, m) + totalSize += m.totalSize + if totalSize >= e.config.CompactThreshold { + compactMetas := pendingMetas + pendingMetas = make([]*sstMeta, 0, len(pendingMetas)) + totalSize = 0 + compactAndIngestSSTs(compactMetas) + } + } + } + } +readMetaLoop: + for { + closed := false + select { + case <-e.ctx.Done(): + close(metaChan) + return + case m, ok := <-e.sstMetasChan: + if !ok { + closed = true + break + } + if m.flushCh != nil { + // meet a flush event, we should trigger a ingest task if there are pending metas, + // and then waiting for all the running flush tasks to be done. + if len(metasTmp) > 0 { + addMetas() + } + if len(pendingMetas) > 0 { + seqLock.Lock() + metaSeq := seq.Add(1) + flushQueue = append(flushQueue, flushSeq{ch: m.flushCh, seq: metaSeq}) + seqLock.Unlock() + select { + case metaChan <- metaAndSeq{metas: pendingMetas, seq: metaSeq}: + case <-e.ctx.Done(): + close(metaChan) + return + } + + pendingMetas = make([]*sstMeta, 0, len(pendingMetas)) + totalSize = 0 + } else { + // none remaining metas needed to be ingested + seqLock.Lock() + curSeq := seq.Load() + finSeq := finishedSeq.Load() + // if all pending SST files are written, directly do a db.Flush + if curSeq == finSeq { + seqLock.Unlock() + m.flushCh <- struct{}{} + } else { + // waiting for pending compaction tasks + flushQueue = append(flushQueue, flushSeq{ch: m.flushCh, seq: curSeq}) + seqLock.Unlock() + } + } + continue readMetaLoop + } + metasTmp = append(metasTmp, m.meta) + // try to drain all the sst meta from the chan to make sure all the SSTs are processed before handle a flush msg. + if len(e.sstMetasChan) > 0 { + continue readMetaLoop + } + + addMetas() + } + if closed { + compactAndIngestSSTs(pendingMetas) + close(metaChan) + return + } + } +} + +func (e *File) addSST(ctx context.Context, m *sstMeta) (int32, error) { + // set pending size after SST file is generated + e.pendingFileSize.Add(m.fileSize) + // make sure sstMeta is sent into the chan in order + e.seqLock.Lock() + defer e.seqLock.Unlock() + e.nextSeq++ + seq := e.nextSeq + m.seq = seq + select { + case e.sstMetasChan <- metaOrFlush{meta: m}: + case <-ctx.Done(): + return 0, ctx.Err() + case <-e.ctx.Done(): + } + return seq, e.ingestErr.Get() +} + +func (e *File) batchIngestSSTs(metas []*sstMeta) error { + if len(metas) == 0 { + return nil + } + sort.Slice(metas, func(i, j int) bool { + return bytes.Compare(metas[i].minKey, metas[j].minKey) < 0 + }) + + metaLevels := make([][]*sstMeta, 0) + for _, meta := range metas { + inserted := false + for i, l := range metaLevels { + if bytes.Compare(l[len(l)-1].maxKey, meta.minKey) >= 0 { + continue + } + metaLevels[i] = append(l, meta) + inserted = true + break + } + if !inserted { + metaLevels = append(metaLevels, []*sstMeta{meta}) + } + } + + for _, l := range metaLevels { + if err := e.ingestSSTs(l); err != nil { + return err + } + } + return nil +} + +func (e *File) ingestSSTs(metas []*sstMeta) error { + // use raw RLock to avoid change the lock state during flushing. + e.mutex.RLock() + defer e.mutex.RUnlock() + if e.closed.Load() { + return errorEngineClosed + } + totalSize := int64(0) + totalCount := int64(0) + fileSize := int64(0) + for _, m := range metas { + totalSize += m.totalSize + totalCount += m.totalCount + fileSize += m.fileSize + } + log.L().Info("write data to local DB", + zap.Int64("size", totalSize), + zap.Int64("kvs", totalCount), + zap.Int("files", len(metas)), + zap.Int64("sstFileSize", fileSize), + zap.String("file", metas[0].path), + logutil.Key("firstKey", metas[0].minKey), + logutil.Key("lastKey", metas[len(metas)-1].maxKey)) + if err := e.sstIngester.ingest(metas); err != nil { + return errors.Trace(err) + } + count := int64(0) + size := int64(0) + for _, m := range metas { + count += m.totalCount + size += m.totalSize + } + e.Length.Add(count) + e.TotalSize.Add(size) + return nil +} + +>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080)) func (e *File) flushLocalWriters(parentCtx context.Context) error { eg, ctx := errgroup.WithContext(parentCtx) e.localWriters.Range(func(k, v interface{}) bool { @@ -1862,6 +2232,82 @@ func (s *sizeProperties) iter(f func(p *rangeProperty) bool) { }) } +<<<<<<< HEAD +======= +type sstMeta struct { + path string + minKey []byte + maxKey []byte + totalSize int64 + totalCount int64 + // used for calculate disk-quota + fileSize int64 + seq int32 +} + +type Writer struct { + sync.Mutex + local *File + sstDir string + memtableSizeLimit int64 + writeBatch []common.KvPair + isWriteBatchSorted bool + + batchCount int + batchSize int64 + totalSize int64 + totalCount int64 + + kvBuffer *bytesBuffer + writer *sstWriter + + lastMetaSeq int32 +} + +func (w *Writer) appendRowsSorted(kvs []common.KvPair) error { + if w.writer == nil { + writer, err := w.createSSTWriter() + if err != nil { + return errors.Trace(err) + } + w.writer = writer + w.writer.minKey = append([]byte{}, kvs[0].Key...) + } + for _, pair := range kvs { + w.batchSize += int64(len(pair.Key) + len(pair.Val)) + } + w.batchCount += len(kvs) + w.totalCount += int64(len(kvs)) + return w.writer.writeKVs(kvs) +} + +func (w *Writer) appendRowsUnsorted(ctx context.Context, kvs []common.KvPair) error { + l := len(w.writeBatch) + cnt := w.batchCount + for _, pair := range kvs { + w.batchSize += int64(len(pair.Key) + len(pair.Val)) + key := w.kvBuffer.addBytes(pair.Key) + val := w.kvBuffer.addBytes(pair.Val) + if cnt < l { + w.writeBatch[cnt].Key = key + w.writeBatch[cnt].Val = val + } else { + w.writeBatch = append(w.writeBatch, common.KvPair{Key: key, Val: val}) + } + cnt++ + } + w.batchCount = cnt + + if w.batchSize > w.memtableSizeLimit { + if err := w.flushKVs(ctx); err != nil { + return err + } + } + w.totalCount += int64(len(kvs)) + return nil +} + +>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080)) func (local *local) EngineFileSizes() (res []backend.EngineFileSize) { local.engines.Range(func(k, v interface{}) bool { engine := v.(*File) @@ -1921,6 +2367,7 @@ func (w *Writer) AppendRows(ctx context.Context, tableName string, columnNames [ return errors.Trace(err) } +<<<<<<< HEAD func (w *Writer) Close() error { w.local.localWriters.Delete(w) close(w.kvsChan) @@ -1939,12 +2386,48 @@ func (w *Writer) Close() error { return w.writeErr.Get() case replyErrCh := <-flushCh: replyErrCh <- nil +======= + if w.writer != nil { + meta, err := w.writer.close() + if err != nil { + return errors.Trace(err) + } + w.writer = nil + w.batchCount = 0 + if meta != nil && meta.totalSize > 0 { + return w.addSST(ctx, meta) +>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080)) } } } +<<<<<<< HEAD func (w *Writer) genSSTPath() string { return filepath.Join(w.sstDir, uuid.New().String()+".sst") +======= +type flushStatus struct { + local *File + seq int32 +} + +func (f flushStatus) Flushed() bool { + return f.seq <= f.local.finishedMetaSeq.Load() +} + +func (w *Writer) Close(ctx context.Context) (backend.ChunkFlushStatus, error) { + defer w.kvBuffer.destroy() + defer w.local.localWriters.Delete(w) + err := w.flush(ctx) + // FIXME: in theory this line is useless, but In our benchmark with go1.15 + // this can resolve the memory consistently increasing issue. + // maybe this is a bug related to go GC mechanism. + w.writeBatch = nil + return flushStatus{local: w.local, seq: w.lastMetaSeq}, err +} + +func (w *Writer) IsSynced() bool { + return w.batchCount == 0 && w.lastMetaSeq <= w.local.finishedMetaSeq.Load() +>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080)) } func (w *Writer) writeRowsLoop() { @@ -2005,13 +2488,20 @@ outside: w.writeErr.Set(err) return } +<<<<<<< HEAD if w.writer != nil { if err := w.writer.ingestInto(w.local, 0); err != nil { w.writeErr.Set(err) } +======= + err = w.addSST(ctx, meta) + if err != nil { + return errors.Trace(err) +>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080)) } } +<<<<<<< HEAD func (w *Writer) writeKVsOrIngest(desc localIngestDescription) error { if w.writer != nil { if err := w.writer.writeKVs(&w.writeBatch); err != errorUnorderedSSTInsertion { @@ -2021,6 +2511,20 @@ func (w *Writer) writeKVsOrIngest(desc localIngestDescription) error { // if write failed only because of unorderedness, we immediately ingest the memcache. immWriter, err := newSSTWriter(w.genSSTPath()) +======= +func (w *Writer) addSST(ctx context.Context, meta *sstMeta) error { + seq, err := w.local.addSST(ctx, meta) + if err != nil { + return err + } + w.lastMetaSeq = seq + return nil +} + +func (w *Writer) createSSTWriter() (*sstWriter, error) { + path := filepath.Join(w.local.sstDir, uuid.New().String()+".sst") + writer, err := newSSTWriter(path) +>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080)) if err != nil { return err } @@ -2154,12 +2658,42 @@ type kvMemCache struct { notSorted bool // record "not sorted" instead of "sorted" so that the zero value is correct. } +<<<<<<< HEAD // append more KV pairs to the kvMemCache. func (m *kvMemCache) append(kvs []common.KvPair) { if !m.notSorted { var lastKey []byte if len(m.kvs) > 0 { lastKey = m.kvs[len(m.kvs)-1].Key +======= +func (i dbSSTIngester) mergeSSTs(metas []*sstMeta, dir string) (*sstMeta, error) { + if len(metas) == 0 { + return nil, errors.New("sst metas is empty") + } else if len(metas) == 1 { + return metas[0], nil + } + + start := time.Now() + newMeta := &sstMeta{ + seq: metas[len(metas)-1].seq, + } + mergeIter := &sstIterHeap{ + iters: make([]*sstIter, 0, len(metas)), + } + + for _, p := range metas { + f, err := os.Open(p.path) + if err != nil { + return nil, errors.Trace(err) + } + reader, err := sstable.NewReader(f, sstable.ReaderOptions{}) + if err != nil { + return nil, errors.Trace(err) + } + iter, err := reader.NewIter(nil, nil) + if err != nil { + return nil, errors.Trace(err) +>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080)) } for _, kv := range kvs { if bytes.Compare(kv.Key, lastKey) <= 0 { diff --git a/pkg/lightning/backend/local/local_test.go b/pkg/lightning/backend/local/local_test.go index 58ec837b1..020cdb176 100644 --- a/pkg/lightning/backend/local/local_test.go +++ b/pkg/lightning/backend/local/local_test.go @@ -345,10 +345,17 @@ func testLocalWriter(c *C, needSort bool, partitialSort bool) { c.Assert(err, IsNil) err = w.AppendRows(ctx, "", []string{}, 1, kv.MakeRowsFromKvPairs(rows3)) c.Assert(err, IsNil) +<<<<<<< HEAD err = w.Close() c.Assert(err, IsNil) err = db.Flush() c.Assert(err, IsNil) +======= + flushStatus, err := w.Close(context.Background()) + c.Assert(err, IsNil) + c.Assert(f.flushEngineWithoutLock(ctx), IsNil) + c.Assert(flushStatus.Flushed(), IsTrue) +>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080)) o := &pebble.IterOptions{} it := db.NewIter(o) @@ -459,6 +466,120 @@ func (s *localSuite) TestIsIngestRetryable(c *C) { c.Assert(err, ErrorMatches, "non-retryable error: unknown error") } +<<<<<<< HEAD +======= +type testIngester struct{} + +func (i testIngester) mergeSSTs(metas []*sstMeta, dir string) (*sstMeta, error) { + if len(metas) == 0 { + return nil, errors.New("sst metas is empty") + } else if len(metas) == 1 { + return metas[0], nil + } + if metas[len(metas)-1].seq-metas[0].seq != int32(len(metas)-1) { + panic("metas is not add in order") + } + + newMeta := &sstMeta{ + seq: metas[len(metas)-1].seq, + } + for _, m := range metas { + newMeta.totalSize += m.totalSize + newMeta.totalCount += m.totalCount + } + return newMeta, nil +} + +func (i testIngester) ingest([]*sstMeta) error { + return nil +} + +func (s *localSuite) TestLocalIngestLoop(c *C) { + dir := c.MkDir() + opt := &pebble.Options{ + MemTableSize: 1024 * 1024, + MaxConcurrentCompactions: 16, + L0CompactionThreshold: math.MaxInt32, // set to max try to disable compaction + L0StopWritesThreshold: math.MaxInt32, // set to max try to disable compaction + DisableWAL: true, + ReadOnly: false, + } + db, err := pebble.Open(filepath.Join(dir, "test"), opt) + c.Assert(err, IsNil) + defer db.Close() + tmpPath := filepath.Join(dir, "test.sst") + err = os.Mkdir(tmpPath, 0o755) + c.Assert(err, IsNil) + _, engineUUID := backend.MakeUUID("ww", 0) + engineCtx, cancel := context.WithCancel(context.Background()) + f := File{ + db: db, + UUID: engineUUID, + sstDir: "", + ctx: engineCtx, + cancel: cancel, + sstMetasChan: make(chan metaOrFlush, 64), + config: backend.LocalEngineConfig{ + Compact: true, + CompactThreshold: 100, + CompactConcurrency: 4, + }, + } + f.sstIngester = testIngester{} + f.wg.Add(1) + go f.ingestSSTLoop() + + // add some routines to add ssts + var wg sync.WaitGroup + wg.Add(4) + totalSize := int64(0) + concurrency := 4 + count := 500 + var metaSeqLock sync.Mutex + maxMetaSeq := int32(0) + for i := 0; i < concurrency; i++ { + go func() { + defer wg.Done() + flushCnt := rand.Int31n(10) + 1 + seq := int32(0) + for i := 0; i < count; i++ { + size := int64(rand.Int31n(50) + 1) + m := &sstMeta{totalSize: size, totalCount: 1} + atomic.AddInt64(&totalSize, size) + metaSeq, err := f.addSST(engineCtx, m) + c.Assert(err, IsNil) + if int32(i) >= flushCnt { + f.mutex.RLock() + err = f.flushEngineWithoutLock(engineCtx) + c.Assert(err, IsNil) + f.mutex.RUnlock() + flushCnt += rand.Int31n(10) + 1 + } + seq = metaSeq + } + metaSeqLock.Lock() + if atomic.LoadInt32(&maxMetaSeq) < seq { + atomic.StoreInt32(&maxMetaSeq, seq) + } + metaSeqLock.Unlock() + }() + } + wg.Wait() + + f.mutex.RLock() + err = f.flushEngineWithoutLock(engineCtx) + c.Assert(err, IsNil) + f.mutex.RUnlock() + + close(f.sstMetasChan) + f.wg.Wait() + c.Assert(f.ingestErr.Get(), IsNil) + c.Assert(totalSize, Equals, f.TotalSize.Load()) + c.Assert(f.Length.Load(), Equals, int64(concurrency*count)) + c.Assert(f.finishedMetaSeq.Load(), Equals, atomic.LoadInt32(&maxMetaSeq)) +} + +>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080)) func (s *localSuite) TestCheckRequirementsTiFlash(c *C) { controller := gomock.NewController(c) defer controller.Finish() diff --git a/pkg/lightning/backend/noop/noop.go b/pkg/lightning/backend/noop/noop.go index 97940c93a..9debd579b 100644 --- a/pkg/lightning/backend/noop/noop.go +++ b/pkg/lightning/backend/noop/noop.go @@ -155,6 +155,15 @@ func (w noopWriter) AppendRows(context.Context, string, []string, uint64, kv.Row return nil } +<<<<<<< HEAD func (w noopWriter) Close() error { return nil +======= +func (w noopWriter) IsSynced() bool { + return true +} + +func (w noopWriter) Close(context.Context) (backend.ChunkFlushStatus, error) { + return nil, nil +>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080)) } diff --git a/pkg/lightning/backend/tidb/tidb.go b/pkg/lightning/backend/tidb/tidb.go index 4d07d1a9d..b2917721d 100644 --- a/pkg/lightning/backend/tidb/tidb.go +++ b/pkg/lightning/backend/tidb/tidb.go @@ -594,10 +594,78 @@ type Writer struct { engineUUID uuid.UUID } +<<<<<<< HEAD func (w *Writer) Close() error { return nil +======= +func (w *Writer) Close(ctx context.Context) (backend.ChunkFlushStatus, error) { + return nil, nil +>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080)) } func (w *Writer) AppendRows(ctx context.Context, tableName string, columnNames []string, arg1 uint64, rows kv.Rows) error { return w.be.WriteRows(ctx, w.engineUUID, tableName, columnNames, arg1, rows) } +<<<<<<< HEAD +======= + +func (w *Writer) IsSynced() bool { + return true +} + +type TableAutoIDInfo struct { + Column string + NextID int64 + Type string +} + +func FetchTableAutoIDInfos(ctx context.Context, exec common.QueryExecutor, tableName string) ([]*TableAutoIDInfo, error) { + rows, e := exec.QueryContext(ctx, fmt.Sprintf("SHOW TABLE %s NEXT_ROW_ID", tableName)) + if e != nil { + return nil, errors.Trace(e) + } + var autoIDInfos []*TableAutoIDInfo + for rows.Next() { + var ( + dbName, tblName, columnName, idType string + nextID int64 + ) + columns, err := rows.Columns() + if err != nil { + return nil, errors.Trace(err) + } + + //+--------------+------------+-------------+--------------------+----------------+ + //| DB_NAME | TABLE_NAME | COLUMN_NAME | NEXT_GLOBAL_ROW_ID | ID_TYPE | + //+--------------+------------+-------------+--------------------+----------------+ + //| testsysbench | t | _tidb_rowid | 1 | AUTO_INCREMENT | + //+--------------+------------+-------------+--------------------+----------------+ + + // if columns length is 4, it doesn't contains the last column `ID_TYPE`, and it will always be 'AUTO_INCREMENT' + // for v4.0.0~v4.0.2 show table t next_row_id only returns 4 columns. + if len(columns) == 4 { + err = rows.Scan(&dbName, &tblName, &columnName, &nextID) + idType = "AUTO_INCREMENT" + } else { + err = rows.Scan(&dbName, &tblName, &columnName, &nextID, &idType) + } + if err != nil { + return nil, errors.Trace(err) + } + autoIDInfos = append(autoIDInfos, &TableAutoIDInfo{ + Column: columnName, + NextID: nextID, + Type: idType, + }) + } + // Defer in for-loop would be costly, anyway, we don't need those rows after this turn of iteration. + //nolint:sqlclosecheck + if err := rows.Close(); err != nil { + return nil, errors.Trace(err) + } + if rows.Err() != nil { + return nil, errors.Trace(rows.Err()) + } + return autoIDInfos, nil +} +>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080)) diff --git a/pkg/lightning/backend/tidb/tidb_test.go b/pkg/lightning/backend/tidb/tidb_test.go index d8f48303d..1c7bc538e 100644 --- a/pkg/lightning/backend/tidb/tidb_test.go +++ b/pkg/lightning/backend/tidb/tidb_test.go @@ -124,8 +124,13 @@ func (s *mysqlSuite) TestWriteRowsReplaceOnDup(c *C) { c.Assert(err, IsNil) err = writer.WriteRows(ctx, []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o"}, dataRows) c.Assert(err, IsNil) +<<<<<<< HEAD err = writer.Close() +======= + st, err := writer.Close(ctx) +>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080)) c.Assert(err, IsNil) + c.Assert(st, IsNil) } func (s *mysqlSuite) TestWriteRowsIgnoreOnDup(c *C) { @@ -157,7 +162,11 @@ func (s *mysqlSuite) TestWriteRowsIgnoreOnDup(c *C) { c.Assert(err, IsNil) err = writer.WriteRows(ctx, []string{"a"}, dataRows) c.Assert(err, IsNil) +<<<<<<< HEAD err = writer.Close() +======= + _, err = writer.Close(ctx) +>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080)) c.Assert(err, IsNil) // test encode rows with _tidb_rowid @@ -202,8 +211,13 @@ func (s *mysqlSuite) TestWriteRowsErrorOnDup(c *C) { c.Assert(err, IsNil) err = writer.WriteRows(ctx, []string{"a"}, dataRows) c.Assert(err, IsNil) +<<<<<<< HEAD err = writer.Close() +======= + st, err := writer.Close(ctx) +>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080)) c.Assert(err, IsNil) + c.Assert(st, IsNil) } // TODO: temporarily disable this test before we fix strict mode diff --git a/pkg/lightning/restore/restore.go b/pkg/lightning/restore/restore.go index 4cdb05a9b..954d007da 100644 --- a/pkg/lightning/restore/restore.go +++ b/pkg/lightning/restore/restore.go @@ -1531,6 +1531,23 @@ func (tr *TableRestore) restoreEngine( return closedEngine, nil } +<<<<<<< HEAD +======= + // if the key are ordered, LocalWrite can optimize the writing. + // table has auto-incremented _tidb_rowid must satisfy following restrictions: + // - clustered index disable and primary key is not number + // - no auto random bits (auto random or shard row id) + // - no partition table + // - no explicit _tidb_rowid field (A this time we can't determine if the source files contain _tidb_rowid field, + // so we will do this check in LocalWriter when the first row is received.) + hasAutoIncrementAutoID := common.TableHasAutoRowID(tr.tableInfo.Core) && + tr.tableInfo.Core.AutoRandomBits == 0 && tr.tableInfo.Core.ShardRowIDBits == 0 && + tr.tableInfo.Core.Partition == nil + dataWriterCfg := &backend.LocalWriterConfig{ + IsKVSorted: hasAutoIncrementAutoID, + } + +>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080)) logTask := tr.logger.With(zap.Int32("engineNumber", engineID)).Begin(zap.InfoLevel, "encode kv data and write") dataEngine, err := rc.backend.OpenEngine(ctx, tr.tableName, engineID, rc.ts) @@ -1541,12 +1558,52 @@ func (tr *TableRestore) restoreEngine( var wg sync.WaitGroup var chunkErr common.OnceError + type chunkFlushStatus struct { + dataStatus backend.ChunkFlushStatus + indexStatus backend.ChunkFlushStatus + chunkCp *checkpoints.ChunkCheckpoint + } + + // chunks that are finished writing, but checkpoints are not finished due to flush not finished. + var checkFlushLock sync.Mutex + flushPendingChunks := make([]chunkFlushStatus, 0, 16) + + chunkCpChan := make(chan *checkpoints.ChunkCheckpoint, 16) + go func() { + for { + select { + case cp, ok := <-chunkCpChan: + if !ok { + return + } + saveCheckpoint(rc, tr, engineID, cp) + case <-ctx.Done(): + return + } + } + }() + // Restore table data for chunkIndex, chunk := range cp.Chunks { if chunk.Chunk.Offset >= chunk.Chunk.EndOffset { continue } + checkFlushLock.Lock() + finished := 0 + for _, c := range flushPendingChunks { + if c.indexStatus.Flushed() && c.dataStatus.Flushed() { + chunkCpChan <- c.chunkCp + finished++ + } else { + break + } + } + if finished > 0 { + flushPendingChunks = flushPendingChunks[finished:] + } + checkFlushLock.Unlock() + select { case <-ctx.Done(): return nil, ctx.Err() @@ -1591,15 +1648,36 @@ func (tr *TableRestore) restoreEngine( }() metric.ChunkCounter.WithLabelValues(metric.ChunkStateRunning).Add(remainChunkCnt) err := cr.restore(ctx, tr, engineID, dataWriter, indexWriter, rc) + var dataFlushStatus, indexFlushStaus backend.ChunkFlushStatus if err == nil { +<<<<<<< HEAD err = dataWriter.Close() } if err == nil { err = indexWriter.Close() +======= + dataFlushStatus, err = dataWriter.Close(ctx) + } + if err == nil { + indexFlushStaus, err = indexWriter.Close(ctx) +>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080)) } if err == nil { metric.ChunkCounter.WithLabelValues(metric.ChunkStateFinished).Add(remainChunkCnt) metric.BytesCounter.WithLabelValues(metric.TableStateWritten).Add(float64(cr.chunk.Checksum.SumSize())) + if dataFlushStatus != nil && indexFlushStaus != nil { + if dataFlushStatus.Flushed() && indexFlushStaus.Flushed() { + saveCheckpoint(rc, tr, engineID, cr.chunk) + } else { + checkFlushLock.Lock() + flushPendingChunks = append(flushPendingChunks, chunkFlushStatus{ + dataStatus: dataFlushStatus, + indexStatus: indexFlushStaus, + chunkCp: cr.chunk, + }) + checkFlushLock.Unlock() + } + } } else { metric.ChunkCounter.WithLabelValues(metric.ChunkStateFailed).Add(remainChunkCnt) chunkErr.Set(err) @@ -1624,6 +1702,7 @@ func (tr *TableRestore) restoreEngine( zap.Uint64("written", totalKVSize), ) +<<<<<<< HEAD flushAndSaveAllChunks := func() error { if err = indexEngine.Flush(ctx); err != nil { return errors.Trace(err) @@ -1631,7 +1710,21 @@ func (tr *TableRestore) restoreEngine( // Currently we write all the checkpoints after data&index engine are flushed. for _, chunk := range cp.Chunks { saveCheckpoint(rc, tr, engineID, chunk) +======= + trySavePendingChunks := func(flushCtx context.Context) error { + checkFlushLock.Lock() + cnt := 0 + for _, chunk := range flushPendingChunks { + if chunk.dataStatus.Flushed() && chunk.indexStatus.Flushed() { + saveCheckpoint(rc, tr, engineID, chunk.chunkCp) + cnt++ + } else { + break + } +>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080)) } + flushPendingChunks = flushPendingChunks[cnt:] + checkFlushLock.Unlock() return nil } @@ -1649,7 +1742,11 @@ func (tr *TableRestore) restoreEngine( log.L().Warn("flush all chunk checkpoints failed before manually exits", zap.Error(err2)) return nil, errors.Trace(err) } +<<<<<<< HEAD if err2 := flushAndSaveAllChunks(); err2 != nil { +======= + if err2 := trySavePendingChunks(context.Background()); err2 != nil { +>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080)) log.L().Warn("flush all chunk checkpoints failed before manually exits", zap.Error(err2)) } } @@ -1660,13 +1757,15 @@ func (tr *TableRestore) restoreEngine( // For local backend, if checkpoint is enabled, we must flush index engine to avoid data loss. // this flush action impact up to 10% of the performance, so we only do it if necessary. if err == nil && rc.cfg.Checkpoint.Enable && rc.isLocalBackend() { +<<<<<<< HEAD if err = flushAndSaveAllChunks(); err != nil { +======= + if err = indexEngine.Flush(ctx); err != nil { +>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080)) return nil, errors.Trace(err) } - - // Currently we write all the checkpoints after data&index engine are flushed. - for _, chunk := range cp.Chunks { - saveCheckpoint(rc, tr, engineID, chunk) + if err = trySavePendingChunks(ctx); err != nil { + return nil, errors.Trace(err) } } rc.saveStatusCheckpoint(tr.tableName, engineID, err, checkpoints.CheckpointStatusClosed) @@ -2372,6 +2471,7 @@ func (cr *chunkRestore) deliverLoop( zap.String("task", "deliver"), ) + dataSynced := true for !channelClosed { var dataChecksum, indexChecksum verify.KVChecksum var columns []string @@ -2404,16 +2504,15 @@ func (cr *chunkRestore) deliverLoop( } } - // we are allowed to save checkpoint when the disk quota state moved to "importing" - // since all engines are flushed. - if atomic.LoadInt32(&rc.diskQuotaState) == diskQuotaStateImporting { - saveCheckpoint(rc, t, engineID, cr.chunk) - } - err = func() error { rc.diskQuotaLock.RLock() defer rc.diskQuotaLock.RUnlock() + // try to update chunk checkpoint, this can help save checkpoint after importing when disk-quota is triggered + if !dataSynced { + dataSynced = cr.maybeSaveCheckpoint(rc, t, engineID, cr.chunk, dataEngine, indexEngine) + } + // Write KVs into the engine start := time.Now() @@ -2443,6 +2542,7 @@ func (cr *chunkRestore) deliverLoop( if err != nil { return } + dataSynced = false // Update the table, and save a checkpoint. // (the write to the importer is effective immediately, thus update these here) @@ -2452,9 +2552,10 @@ func (cr *chunkRestore) deliverLoop( cr.chunk.Checksum.Add(&indexChecksum) cr.chunk.Chunk.Offset = offset cr.chunk.Chunk.PrevRowIDMax = rowID - if !rc.isLocalBackend() && (dataChecksum.SumKVS() != 0 || indexChecksum.SumKVS() != 0) { + + if dataChecksum.SumKVS() != 0 || indexChecksum.SumKVS() != 0 { // No need to save checkpoint if nothing was delivered. - saveCheckpoint(rc, t, engineID, cr.chunk) + dataSynced = cr.maybeSaveCheckpoint(rc, t, engineID, cr.chunk, dataEngine, indexEngine) } failpoint.Inject("SlowDownWriteRows", func() { deliverLogger.Warn("Slowed down write rows") @@ -2475,6 +2576,20 @@ func (cr *chunkRestore) deliverLoop( return } +func (cr *chunkRestore) maybeSaveCheckpoint( + rc *Controller, + t *TableRestore, + engineID int32, + chunk *checkpoints.ChunkCheckpoint, + data, index *backend.LocalEngineWriter, +) bool { + if data.IsSynced() && index.IsSynced() { + saveCheckpoint(rc, t, engineID, chunk) + return true + } + return false +} + func saveCheckpoint(rc *Controller, t *TableRestore, engineID int32, chunk *checkpoints.ChunkCheckpoint) { // We need to update the AllocBase every time we've finished a file. // The AllocBase is determined by the maximum of the "handle" (_tidb_rowid diff --git a/pkg/lightning/restore/restore_test.go b/pkg/lightning/restore/restore_test.go index c5584d204..f52ee3695 100644 --- a/pkg/lightning/restore/restore_test.go +++ b/pkg/lightning/restore/restore_test.go @@ -1006,7 +1006,12 @@ func (s *chunkRestoreSuite) TestDeliverLoop(c *C) { mockBackend.EXPECT().OpenEngine(ctx, gomock.Any()).Return(nil).Times(2) mockBackend.EXPECT().MakeEmptyRows().Return(kv.MakeRowsFromKvPairs(nil)).AnyTimes() mockWriter := mock.NewMockEngineWriter(controller) +<<<<<<< HEAD mockBackend.EXPECT().LocalWriter(ctx, gomock.Any()).Return(mockWriter, nil).AnyTimes() +======= + mockBackend.EXPECT().LocalWriter(ctx, gomock.Any(), gomock.Any()).Return(mockWriter, nil).AnyTimes() + mockWriter.EXPECT().IsSynced().Return(true).AnyTimes() +>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080)) dataEngine, err := importer.OpenEngine(ctx, s.tr.tableName, 0, 0) c.Assert(err, IsNil) diff --git a/pkg/mock/backend.go b/pkg/mock/backend.go index e36b1364b..44d140459 100644 --- a/pkg/mock/backend.go +++ b/pkg/mock/backend.go @@ -305,11 +305,20 @@ func (mr *MockEngineWriterMockRecorder) AppendRows(arg0, arg1, arg2, arg3, arg4 } // Close mocks base method. +<<<<<<< HEAD func (m *MockEngineWriter) Close() error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Close") ret0, _ := ret[0].(error) return ret0 +======= +func (m *MockEngineWriter) Close(arg0 context.Context) (backend.ChunkFlushStatus, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close", arg0) + ret0, _ := ret[0].(backend.ChunkFlushStatus) + ret1, _ := ret[1].(error) + return ret0, ret1 +>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080)) } // Close indicates an expected call of Close. @@ -317,3 +326,17 @@ func (mr *MockEngineWriterMockRecorder) Close() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockEngineWriter)(nil).Close)) } + +// IsSynced mocks base method. +func (m *MockEngineWriter) IsSynced() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsSynced") + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsSynced indicates an expected call of IsSynced. +func (mr *MockEngineWriterMockRecorder) IsSynced() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsSynced", reflect.TypeOf((*MockEngineWriter)(nil).IsSynced)) +}