Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

lightning: save chunk checkpoint timely #1080

Merged
merged 12 commits into from
Jun 7, 2021
13 changes: 11 additions & 2 deletions pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,10 +375,14 @@ func (w *LocalEngineWriter) WriteRows(ctx context.Context, columnNames []string,
return w.writer.AppendRows(ctx, w.tableName, columnNames, w.ts, rows)
}

func (w *LocalEngineWriter) Close(ctx context.Context) error {
func (w *LocalEngineWriter) Close(ctx context.Context) (ChunkFlushStatus, error) {
return w.writer.Close(ctx)
}

func (w *LocalEngineWriter) IsSynced() bool {
return w.writer.IsSynced()
}

// UnsafeCloseEngine closes the engine without first opening it.
// This method is "unsafe" as it does not follow the normal operation sequence
// (Open -> Write -> Close -> Import). This method should only be used when one
Expand Down Expand Up @@ -442,6 +446,10 @@ func (engine *ClosedEngine) Logger() log.Logger {
return engine.logger
}

type ChunkFlushStatus interface {
Flushed() bool
}

type EngineWriter interface {
AppendRows(
ctx context.Context,
Expand All @@ -450,5 +458,6 @@ type EngineWriter interface {
commitTS uint64,
rows kv.Rows,
) error
Close(ctx context.Context) error
IsSynced() bool
Close(ctx context.Context) (ChunkFlushStatus, error)
}
16 changes: 8 additions & 8 deletions pkg/lightning/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (s *backendSuite) TestWriteEngine(c *C) {
mockWriter.EXPECT().
AppendRows(ctx, "`db`.`table`", []string{"c1", "c2"}, gomock.Any(), rows1).
Return(nil)
mockWriter.EXPECT().Close(ctx).Return(nil).AnyTimes()
mockWriter.EXPECT().Close(ctx).Return(nil, nil).AnyTimes()
mockWriter.EXPECT().
AppendRows(ctx, "`db`.`table`", []string{"c1", "c2"}, gomock.Any(), rows2).
Return(nil)
Expand All @@ -153,7 +153,7 @@ func (s *backendSuite) TestWriteEngine(c *C) {
c.Assert(err, IsNil)
err = writer.WriteRows(ctx, []string{"c1", "c2"}, rows2)
c.Assert(err, IsNil)
err = writer.Close(ctx)
_, err = writer.Close(ctx)
c.Assert(err, IsNil)
}

Expand All @@ -167,7 +167,7 @@ func (s *backendSuite) TestWriteToEngineWithNothing(c *C) {

s.mockBackend.EXPECT().OpenEngine(ctx, &backend.EngineConfig{}, gomock.Any()).Return(nil)
mockWriter.EXPECT().AppendRows(ctx, gomock.Any(), gomock.Any(), gomock.Any(), emptyRows).Return(nil)
mockWriter.EXPECT().Close(ctx).Return(nil)
mockWriter.EXPECT().Close(ctx).Return(nil, nil)
s.mockBackend.EXPECT().LocalWriter(ctx, &backend.LocalWriterConfig{}, gomock.Any()).Return(mockWriter, nil)

engine, err := s.backend.OpenEngine(ctx, &backend.EngineConfig{}, "`db`.`table`", 1, s.ts)
Expand All @@ -176,7 +176,7 @@ func (s *backendSuite) TestWriteToEngineWithNothing(c *C) {
c.Assert(err, IsNil)
err = writer.WriteRows(ctx, nil, emptyRows)
c.Assert(err, IsNil)
err = writer.Close(ctx)
_, err = writer.Close(ctx)
c.Assert(err, IsNil)
}

Expand Down Expand Up @@ -207,15 +207,15 @@ func (s *backendSuite) TestWriteEngineFailed(c *C) {
mockWriter.EXPECT().
AppendRows(ctx, gomock.Any(), gomock.Any(), gomock.Any(), rows).
Return(errors.Annotate(context.Canceled, "fake unrecoverable write error"))
mockWriter.EXPECT().Close(ctx).Return(nil)
mockWriter.EXPECT().Close(ctx).Return(nil, nil)

engine, err := s.backend.OpenEngine(ctx, &backend.EngineConfig{}, "`db`.`table`", 1, s.ts)
c.Assert(err, IsNil)
writer, err := engine.LocalWriter(ctx, &backend.LocalWriterConfig{})
c.Assert(err, IsNil)
err = writer.WriteRows(ctx, nil, rows)
c.Assert(err, ErrorMatches, "fake unrecoverable write error.*")
err = writer.Close(ctx)
_, err = writer.Close(ctx)
c.Assert(err, IsNil)
}

Expand All @@ -233,15 +233,15 @@ func (s *backendSuite) TestWriteBatchSendFailedWithRetry(c *C) {
mockWriter.EXPECT().AppendRows(ctx, gomock.Any(), gomock.Any(), gomock.Any(), rows).
Return(errors.New("fake recoverable write batch error")).
MinTimes(1)
mockWriter.EXPECT().Close(ctx).Return(nil).MinTimes(1)
mockWriter.EXPECT().Close(ctx).Return(nil, nil).MinTimes(1)

engine, err := s.backend.OpenEngine(ctx, &backend.EngineConfig{}, "`db`.`table`", 1, s.ts)
c.Assert(err, IsNil)
writer, err := engine.LocalWriter(ctx, &backend.LocalWriterConfig{})
c.Assert(err, IsNil)
err = writer.WriteRows(ctx, nil, rows)
c.Assert(err, ErrorMatches, ".*fake recoverable write batch error")
err = writer.Close(ctx)
_, err = writer.Close(ctx)
c.Assert(err, IsNil)
}

Expand Down
8 changes: 6 additions & 2 deletions pkg/lightning/backend/importer/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,10 +335,14 @@ type Writer struct {
engineUUID uuid.UUID
}

func (w *Writer) Close(ctx context.Context) error {
return nil
func (w *Writer) Close(ctx context.Context) (backend.ChunkFlushStatus, error) {
return nil, nil
}

func (w *Writer) AppendRows(ctx context.Context, tableName string, columnNames []string, ts uint64, rows kv.Rows) error {
return w.importer.WriteRows(ctx, w.engineUUID, tableName, columnNames, ts, rows)
}

func (w *Writer) IsSynced() bool {
return true
}
3 changes: 2 additions & 1 deletion pkg/lightning/backend/importer/importer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,9 @@ func (s *importerSuite) TestWriteRows(c *C) {
c.Assert(err, IsNil)
err = writer.WriteRows(s.ctx, nil, s.kvPairs)
c.Assert(err, IsNil)
err = writer.Close(s.ctx)
st, err := writer.Close(s.ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you check the st?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checked in L120, though it should be nil for importer backend

c.Assert(err, IsNil)
c.Assert(st, IsNil)
}

func (s *importerSuite) TestWriteHeadSendFailed(c *C) {
Expand Down
90 changes: 70 additions & 20 deletions pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,13 @@ type File struct {
sstIngester sstIngester
finishedRanges syncedRanges

// sst seq lock
seqLock sync.Mutex
// seq number for incoming sst meta
nextSeq int32
// max seq of sst metas ingested into pebble
finishedMetaSeq atomic.Int32

config backend.LocalEngineConfig

// total size of SST files waiting to be ingested
Expand Down Expand Up @@ -326,27 +333,32 @@ func (e *File) unlock() {
e.mutex.Unlock()
}

type intHeap struct {
arr []int32
type metaSeq struct {
flushSeq int32
metaSeq int32
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add some comments about seqs?


type metaSeqHeap struct {
arr []metaSeq
}

func (h *intHeap) Len() int {
func (h *metaSeqHeap) Len() int {
return len(h.arr)
}

func (h *intHeap) Less(i, j int) bool {
return h.arr[i] < h.arr[j]
func (h *metaSeqHeap) Less(i, j int) bool {
return h.arr[i].flushSeq < h.arr[j].flushSeq
}

func (h *intHeap) Swap(i, j int) {
func (h *metaSeqHeap) Swap(i, j int) {
h.arr[i], h.arr[j] = h.arr[j], h.arr[i]
}

func (h *intHeap) Push(x interface{}) {
h.arr = append(h.arr, x.(int32))
func (h *metaSeqHeap) Push(x interface{}) {
h.arr = append(h.arr, x.(metaSeq))
}

func (h *intHeap) Pop() interface{} {
func (h *metaSeqHeap) Pop() interface{} {
item := h.arr[len(h.arr)-1]
h.arr = h.arr[:len(h.arr)-1]
return item
Expand All @@ -367,7 +379,7 @@ func (e *File) ingestSSTLoop() {
flushQueue := make([]flushSeq, 0)
// inSyncSeqs is a heap that stores all the finished compaction tasks whose seq is bigger than `finishedSeq + 1`
// this mean there are still at lease one compaction task with a lower seq unfinished.
inSyncSeqs := &intHeap{arr: make([]int32, 0)}
inSyncSeqs := &metaSeqHeap{arr: make([]metaSeq, 0)}

type metaAndSeq struct {
metas []*sstMeta
Expand Down Expand Up @@ -420,9 +432,11 @@ func (e *File) ingestSSTLoop() {
finSeq := finishedSeq.Load()
if metas.seq == finSeq+1 {
finSeq = metas.seq
finMetaSeq := ingestMetas[len(ingestMetas)-1].seq
for len(inSyncSeqs.arr) > 0 {
if inSyncSeqs.arr[0] == finSeq+1 {
if inSyncSeqs.arr[0].flushSeq == finSeq+1 {
finSeq++
finMetaSeq = inSyncSeqs.arr[0].metaSeq
heap.Remove(inSyncSeqs, 0)
} else {
break
Expand All @@ -439,12 +453,13 @@ func (e *File) ingestSSTLoop() {
}
flushQueue = flushQueue[len(flushChans):]
finishedSeq.Store(finSeq)
e.finishedMetaSeq.Store(finMetaSeq)
seqLock.Unlock()
for _, c := range flushChans {
c <- struct{}{}
}
} else {
heap.Push(inSyncSeqs, metas.seq)
heap.Push(inSyncSeqs, metaSeq{flushSeq: metas.seq, metaSeq: ingestMetas[len(ingestMetas)-1].seq})
seqLock.Unlock()
}
}
Expand Down Expand Up @@ -555,16 +570,22 @@ readMetaLoop:
}
}

func (e *File) addSST(ctx context.Context, m *sstMeta) error {
func (e *File) addSST(ctx context.Context, m *sstMeta) (int32, error) {
// set pending size after SST file is generated
e.pendingFileSize.Add(m.fileSize)
// make sure sstMeta is sent into the chan in order
e.seqLock.Lock()
defer e.seqLock.Unlock()
e.nextSeq++
seq := e.nextSeq
m.seq = seq
select {
case e.sstMetasChan <- metaOrFlush{meta: m}:
case <-ctx.Done():
return ctx.Err()
return 0, ctx.Err()
case <-e.ctx.Done():
}
return e.ingestErr.Get()
return seq, e.ingestErr.Get()
}

func (e *File) batchIngestSSTs(metas []*sstMeta) error {
Expand Down Expand Up @@ -2403,6 +2424,7 @@ type sstMeta struct {
totalCount int64
// used for calculate disk-quota
fileSize int64
seq int32
}

type Writer struct {
Expand All @@ -2420,6 +2442,8 @@ type Writer struct {

kvBuffer *bytesBuffer
writer *sstWriter

lastMetaSeq int32
}

func (w *Writer) appendRowsSorted(kvs []common.KvPair) error {
Expand Down Expand Up @@ -2519,23 +2543,37 @@ func (w *Writer) flush(ctx context.Context) error {
return errors.Trace(err)
}
w.writer = nil
w.batchCount = 0
if meta != nil && meta.totalSize > 0 {
return w.local.addSST(ctx, meta)
return w.addSST(ctx, meta)
}
}

return nil
}

func (w *Writer) Close(ctx context.Context) error {
type flushStatus struct {
local *File
seq int32
}

func (f flushStatus) Flushed() bool {
return f.seq <= f.local.finishedMetaSeq.Load()
}

func (w *Writer) Close(ctx context.Context) (backend.ChunkFlushStatus, error) {
defer w.kvBuffer.destroy()
defer w.local.localWriters.Delete(w)
err := w.flush(ctx)
// FIXME: in theory this line is useless, but In our benchmark with go1.15
// this can resolve the memory consistently increasing issue.
// maybe this is a bug related to go GC mechanism.
w.writeBatch = nil
return err
return flushStatus{local: w.local, seq: w.lastMetaSeq}, err
}

func (w *Writer) IsSynced() bool {
return w.batchCount == 0 && w.lastMetaSeq <= w.local.finishedMetaSeq.Load()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it need to be called in a mutex?

}

func (w *Writer) flushKVs(ctx context.Context) error {
Expand All @@ -2555,7 +2593,7 @@ func (w *Writer) flushKVs(ctx context.Context) error {
if err != nil {
return errors.Trace(err)
}
err = w.local.addSST(ctx, meta)
err = w.addSST(ctx, meta)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -2567,6 +2605,15 @@ func (w *Writer) flushKVs(ctx context.Context) error {
return nil
}

func (w *Writer) addSST(ctx context.Context, meta *sstMeta) error {
seq, err := w.local.addSST(ctx, meta)
if err != nil {
return err
}
w.lastMetaSeq = seq
return nil
}

func (w *Writer) createSSTWriter() (*sstWriter, error) {
path := filepath.Join(w.local.sstDir, uuid.New().String()+".sst")
writer, err := newSSTWriter(path)
Expand Down Expand Up @@ -2731,10 +2778,13 @@ func (i dbSSTIngester) mergeSSTs(metas []*sstMeta, dir string) (*sstMeta, error)
}

start := time.Now()
newMeta := &sstMeta{}
newMeta := &sstMeta{
seq: metas[len(metas)-1].seq,
}
mergeIter := &sstIterHeap{
iters: make([]*sstIter, 0, len(metas)),
}

for _, p := range metas {
f, err := os.Open(p.path)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,9 +389,10 @@ func testLocalWriter(c *C, needSort bool, partitialSort bool) {
c.Assert(err, IsNil)
err = w.AppendRows(ctx, "", []string{}, 1, kv.MakeRowsFromKvPairs(rows3))
c.Assert(err, IsNil)
err = w.Close(context.Background())
flushStatus, err := w.Close(context.Background())
c.Assert(err, IsNil)
c.Assert(f.flushEngineWithoutLock(ctx), IsNil)
c.Assert(flushStatus.Flushed(), IsTrue)
o := &pebble.IterOptions{}
it := db.NewIter(o)

Expand Down Expand Up @@ -574,7 +575,7 @@ func (s *localSuite) TestLocalIngestLoop(c *C) {
size := int64(rand.Int31n(50) + 1)
m := &sstMeta{totalSize: size, totalCount: 1}
atomic.AddInt64(&totalSize, size)
err := f.addSST(engineCtx, m)
_, err := f.addSST(engineCtx, m)
c.Assert(err, IsNil)
if int32(i) >= flushCnt {
f.mutex.RLock()
Expand Down
8 changes: 6 additions & 2 deletions pkg/lightning/backend/noop/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ func (w noopWriter) AppendRows(context.Context, string, []string, uint64, kv.Row
return nil
}

func (w noopWriter) Close(context.Context) error {
return nil
func (w noopWriter) IsSynced() bool {
return true
}

func (w noopWriter) Close(context.Context) (backend.ChunkFlushStatus, error) {
return nil, nil
}
Loading