Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
save chunk checkpoint timely
Browse files Browse the repository at this point in the history
  • Loading branch information
glorv committed Apr 30, 2021
1 parent b1ed365 commit a8646f6
Show file tree
Hide file tree
Showing 9 changed files with 131 additions and 45 deletions.
8 changes: 6 additions & 2 deletions pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ func (w *LocalEngineWriter) WriteRows(ctx context.Context, columnNames []string,
return w.writer.AppendRows(ctx, w.tableName, columnNames, w.ts, rows)
}

func (w *LocalEngineWriter) Close(ctx context.Context) error {
func (w *LocalEngineWriter) Close(ctx context.Context) (ChunkFlushStatus, error) {
return w.writer.Close(ctx)
}

Expand Down Expand Up @@ -442,6 +442,10 @@ func (engine *ClosedEngine) Logger() log.Logger {
return engine.logger
}

type ChunkFlushStatus interface {
Flushed() bool
}

type EngineWriter interface {
AppendRows(
ctx context.Context,
Expand All @@ -450,5 +454,5 @@ type EngineWriter interface {
commitTS uint64,
rows kv.Rows,
) error
Close(ctx context.Context) error
Close(ctx context.Context) (ChunkFlushStatus, error)
}
16 changes: 8 additions & 8 deletions pkg/lightning/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (s *backendSuite) TestWriteEngine(c *C) {
mockWriter.EXPECT().
AppendRows(ctx, "`db`.`table`", []string{"c1", "c2"}, gomock.Any(), rows1).
Return(nil)
mockWriter.EXPECT().Close(ctx).Return(nil).AnyTimes()
mockWriter.EXPECT().Close(ctx).Return(nil, nil).AnyTimes()
mockWriter.EXPECT().
AppendRows(ctx, "`db`.`table`", []string{"c1", "c2"}, gomock.Any(), rows2).
Return(nil)
Expand All @@ -153,7 +153,7 @@ func (s *backendSuite) TestWriteEngine(c *C) {
c.Assert(err, IsNil)
err = writer.WriteRows(ctx, []string{"c1", "c2"}, rows2)
c.Assert(err, IsNil)
err = writer.Close(ctx)
_, err = writer.Close(ctx)
c.Assert(err, IsNil)
}

Expand All @@ -167,7 +167,7 @@ func (s *backendSuite) TestWriteToEngineWithNothing(c *C) {

s.mockBackend.EXPECT().OpenEngine(ctx, &backend.EngineConfig{}, gomock.Any()).Return(nil)
mockWriter.EXPECT().AppendRows(ctx, gomock.Any(), gomock.Any(), gomock.Any(), emptyRows).Return(nil)
mockWriter.EXPECT().Close(ctx).Return(nil)
mockWriter.EXPECT().Close(ctx).Return(nil, nil)
s.mockBackend.EXPECT().LocalWriter(ctx, &backend.LocalWriterConfig{}, gomock.Any()).Return(mockWriter, nil)

engine, err := s.backend.OpenEngine(ctx, &backend.EngineConfig{}, "`db`.`table`", 1, s.ts)
Expand All @@ -176,7 +176,7 @@ func (s *backendSuite) TestWriteToEngineWithNothing(c *C) {
c.Assert(err, IsNil)
err = writer.WriteRows(ctx, nil, emptyRows)
c.Assert(err, IsNil)
err = writer.Close(ctx)
_, err = writer.Close(ctx)
c.Assert(err, IsNil)
}

Expand Down Expand Up @@ -207,15 +207,15 @@ func (s *backendSuite) TestWriteEngineFailed(c *C) {
mockWriter.EXPECT().
AppendRows(ctx, gomock.Any(), gomock.Any(), gomock.Any(), rows).
Return(errors.Annotate(context.Canceled, "fake unrecoverable write error"))
mockWriter.EXPECT().Close(ctx).Return(nil)
mockWriter.EXPECT().Close(ctx).Return(nil, nil)

engine, err := s.backend.OpenEngine(ctx, &backend.EngineConfig{}, "`db`.`table`", 1, s.ts)
c.Assert(err, IsNil)
writer, err := engine.LocalWriter(ctx, &backend.LocalWriterConfig{})
c.Assert(err, IsNil)
err = writer.WriteRows(ctx, nil, rows)
c.Assert(err, ErrorMatches, "fake unrecoverable write error.*")
err = writer.Close(ctx)
_, err = writer.Close(ctx)
c.Assert(err, IsNil)
}

Expand All @@ -233,15 +233,15 @@ func (s *backendSuite) TestWriteBatchSendFailedWithRetry(c *C) {
mockWriter.EXPECT().AppendRows(ctx, gomock.Any(), gomock.Any(), gomock.Any(), rows).
Return(errors.New("fake recoverable write batch error")).
MinTimes(1)
mockWriter.EXPECT().Close(ctx).Return(nil).MinTimes(1)
mockWriter.EXPECT().Close(ctx).Return(nil, nil).MinTimes(1)

engine, err := s.backend.OpenEngine(ctx, &backend.EngineConfig{}, "`db`.`table`", 1, s.ts)
c.Assert(err, IsNil)
writer, err := engine.LocalWriter(ctx, &backend.LocalWriterConfig{})
c.Assert(err, IsNil)
err = writer.WriteRows(ctx, nil, rows)
c.Assert(err, ErrorMatches, ".*fake recoverable write batch error")
err = writer.Close(ctx)
_, err = writer.Close(ctx)
c.Assert(err, IsNil)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/lightning/backend/importer/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,8 @@ type Writer struct {
engineUUID uuid.UUID
}

func (w *Writer) Close(ctx context.Context) error {
return nil
func (w *Writer) Close(ctx context.Context) (backend.ChunkFlushStatus, error) {
return nil, nil
}

func (w *Writer) AppendRows(ctx context.Context, tableName string, columnNames []string, ts uint64, rows kv.Rows) error {
Expand Down
84 changes: 64 additions & 20 deletions pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,13 @@ type File struct {
sstIngester sstIngester
finishedRanges syncedRanges

// sst seq lock
seqLock sync.Mutex
// seq number for incoming sst meta
nextSeq atomic.Int32
// max seq of sst metas ingested into pebble
finishedMetaSeq atomic.Int32

config backend.LocalEngineConfig

// total size of SST files waiting to be ingested
Expand Down Expand Up @@ -326,27 +333,32 @@ func (e *File) unlock() {
e.mutex.Unlock()
}

type intHeap struct {
arr []int32
type metaSeq struct {
flushSeq int32
metaSeq int32
}

type metaSeqHeap struct {
arr []metaSeq
}

func (h *intHeap) Len() int {
func (h *metaSeqHeap) Len() int {
return len(h.arr)
}

func (h *intHeap) Less(i, j int) bool {
return h.arr[i] < h.arr[j]
func (h *metaSeqHeap) Less(i, j int) bool {
return h.arr[i].flushSeq < h.arr[j].flushSeq
}

func (h *intHeap) Swap(i, j int) {
func (h *metaSeqHeap) Swap(i, j int) {
h.arr[i], h.arr[j] = h.arr[j], h.arr[i]
}

func (h *intHeap) Push(x interface{}) {
h.arr = append(h.arr, x.(int32))
func (h *metaSeqHeap) Push(x interface{}) {
h.arr = append(h.arr, x.(metaSeq))
}

func (h *intHeap) Pop() interface{} {
func (h *metaSeqHeap) Pop() interface{} {
item := h.arr[len(h.arr)-1]
h.arr = h.arr[:len(h.arr)-1]
return item
Expand All @@ -367,7 +379,7 @@ func (e *File) ingestSSTLoop() {
flushQueue := make([]flushSeq, 0)
// inSyncSeqs is a heap that stores all the finished compaction tasks whose seq is bigger than `finishedSeq + 1`
// this mean there are still at lease one compaction task with a lower seq unfinished.
inSyncSeqs := &intHeap{arr: make([]int32, 0)}
inSyncSeqs := &metaSeqHeap{arr: make([]metaSeq, 0)}

type metaAndSeq struct {
metas []*sstMeta
Expand Down Expand Up @@ -420,9 +432,11 @@ func (e *File) ingestSSTLoop() {
finSeq := finishedSeq.Load()
if metas.seq == finSeq+1 {
finSeq = metas.seq
finMetaSeq := ingestMetas[len(ingestMetas)-1].seq
for len(inSyncSeqs.arr) > 0 {
if inSyncSeqs.arr[0] == finSeq+1 {
if inSyncSeqs.arr[0].flushSeq == finSeq+1 {
finSeq++
finMetaSeq = inSyncSeqs.arr[0].metaSeq
heap.Remove(inSyncSeqs, 0)
} else {
break
Expand All @@ -439,12 +453,13 @@ func (e *File) ingestSSTLoop() {
}
flushQueue = flushQueue[len(flushChans):]
finishedSeq.Store(finSeq)
e.finishedMetaSeq.Store(finMetaSeq)
seqLock.Unlock()
for _, c := range flushChans {
c <- struct{}{}
}
} else {
heap.Push(inSyncSeqs, metas.seq)
heap.Push(inSyncSeqs, metaSeq{flushSeq: metas.seq, metaSeq: ingestMetas[len(ingestMetas)-1].seq})
seqLock.Unlock()
}
}
Expand Down Expand Up @@ -555,16 +570,21 @@ readMetaLoop:
}
}

func (e *File) addSST(ctx context.Context, m *sstMeta) error {
func (e *File) addSST(ctx context.Context, m *sstMeta) (int32, error) {
// set pending size after SST file is generated
e.pendingFileSize.Add(m.fileSize)
// make sure sstMeta is sent into the chan in order
e.seqLock.Lock()
defer e.seqLock.Unlock()
seq := e.nextSeq.Add(1)
m.seq = seq
select {
case e.sstMetasChan <- metaOrFlush{meta: m}:
case <-ctx.Done():
return ctx.Err()
return 0, ctx.Err()
case <-e.ctx.Done():
}
return e.ingestErr.Get()
return seq, e.ingestErr.Get()
}

func (e *File) batchIngestSSTs(metas []*sstMeta) error {
Expand Down Expand Up @@ -2403,6 +2423,7 @@ type sstMeta struct {
totalCount int64
// used for calculate disk-quota
fileSize int64
seq int32
}

type Writer struct {
Expand All @@ -2420,6 +2441,8 @@ type Writer struct {

kvBuffer *bytesBuffer
writer *sstWriter

lastMetaSeq int32
}

func (w *Writer) appendRowsSorted(kvs []common.KvPair) error {
Expand Down Expand Up @@ -2520,22 +2543,31 @@ func (w *Writer) flush(ctx context.Context) error {
}
w.writer = nil
if meta != nil && meta.totalSize > 0 {
return w.local.addSST(ctx, meta)
return w.addSST(ctx, meta)
}
}

return nil
}

func (w *Writer) Close(ctx context.Context) error {
type flushStatus struct {
local *File
seq int32
}

func (f flushStatus) Flushed() bool {
return f.seq <= f.local.finishedMetaSeq.Load()
}

func (w *Writer) Close(ctx context.Context) (backend.ChunkFlushStatus, error) {
defer w.kvBuffer.destroy()
defer w.local.localWriters.Delete(w)
err := w.flush(ctx)
// FIXME: in theory this line is useless, but In our benchmark with go1.15
// this can resolve the memory consistently increasing issue.
// maybe this is a bug related to go GC mechanism.
w.writeBatch = nil
return err
return flushStatus{local: w.local, seq: w.lastMetaSeq}, err
}

func (w *Writer) flushKVs(ctx context.Context) error {
Expand All @@ -2555,7 +2587,7 @@ func (w *Writer) flushKVs(ctx context.Context) error {
if err != nil {
return errors.Trace(err)
}
err = w.local.addSST(ctx, meta)
err = w.addSST(ctx, meta)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -2567,6 +2599,15 @@ func (w *Writer) flushKVs(ctx context.Context) error {
return nil
}

func (w *Writer) addSST(ctx context.Context, meta *sstMeta) error {
seq, err := w.local.addSST(ctx, meta)
if err != nil {
return err
}
w.lastMetaSeq = seq
return nil
}

func (w *Writer) createSSTWriter() (*sstWriter, error) {
path := filepath.Join(w.local.sstDir, uuid.New().String()+".sst")
writer, err := newSSTWriter(path)
Expand Down Expand Up @@ -2731,10 +2772,13 @@ func (i dbSSTIngester) mergeSSTs(metas []*sstMeta, dir string) (*sstMeta, error)
}

start := time.Now()
newMeta := &sstMeta{}
newMeta := &sstMeta{
seq: metas[len(metas)-1].seq,
}
mergeIter := &sstIterHeap{
iters: make([]*sstIter, 0, len(metas)),
}

for _, p := range metas {
f, err := os.Open(p.path)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,9 +389,10 @@ func testLocalWriter(c *C, needSort bool, partitialSort bool) {
c.Assert(err, IsNil)
err = w.AppendRows(ctx, "", []string{}, 1, kv.MakeRowsFromKvPairs(rows3))
c.Assert(err, IsNil)
err = w.Close(context.Background())
flushStatus, err := w.Close(context.Background())
c.Assert(err, IsNil)
c.Assert(f.flushEngineWithoutLock(ctx), IsNil)
c.Assert(flushStatus.Flushed(), IsTrue)
o := &pebble.IterOptions{}
it := db.NewIter(o)

Expand Down Expand Up @@ -574,7 +575,7 @@ func (s *localSuite) TestLocalIngestLoop(c *C) {
size := int64(rand.Int31n(50) + 1)
m := &sstMeta{totalSize: size, totalCount: 1}
atomic.AddInt64(&totalSize, size)
err := f.addSST(engineCtx, m)
_, err := f.addSST(engineCtx, m)
c.Assert(err, IsNil)
if int32(i) >= flushCnt {
f.mutex.RLock()
Expand Down
4 changes: 2 additions & 2 deletions pkg/lightning/backend/noop/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,6 @@ func (w noopWriter) AppendRows(context.Context, string, []string, uint64, kv.Row
return nil
}

func (w noopWriter) Close(context.Context) error {
return nil
func (w noopWriter) Close(context.Context) (backend.ChunkFlushStatus, error) {
return nil, nil
}
4 changes: 2 additions & 2 deletions pkg/lightning/backend/tidb/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,8 +596,8 @@ type Writer struct {
engineUUID uuid.UUID
}

func (w *Writer) Close(ctx context.Context) error {
return nil
func (w *Writer) Close(ctx context.Context) (backend.ChunkFlushStatus, error) {
return nil, nil
}

func (w *Writer) AppendRows(ctx context.Context, tableName string, columnNames []string, arg1 uint64, rows kv.Rows) error {
Expand Down
Loading

0 comments on commit a8646f6

Please sign in to comment.