Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
lightning: save chunk checkpoint timely (#1080)
Browse files Browse the repository at this point in the history
  • Loading branch information
glorv authored Jun 7, 2021
1 parent 05beea0 commit 37433a1
Show file tree
Hide file tree
Showing 12 changed files with 257 additions and 71 deletions.
13 changes: 11 additions & 2 deletions pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,10 +375,14 @@ func (w *LocalEngineWriter) WriteRows(ctx context.Context, columnNames []string,
return w.writer.AppendRows(ctx, w.tableName, columnNames, w.ts, rows)
}

func (w *LocalEngineWriter) Close(ctx context.Context) error {
func (w *LocalEngineWriter) Close(ctx context.Context) (ChunkFlushStatus, error) {
return w.writer.Close(ctx)
}

func (w *LocalEngineWriter) IsSynced() bool {
return w.writer.IsSynced()
}

// UnsafeCloseEngine closes the engine without first opening it.
// This method is "unsafe" as it does not follow the normal operation sequence
// (Open -> Write -> Close -> Import). This method should only be used when one
Expand Down Expand Up @@ -442,6 +446,10 @@ func (engine *ClosedEngine) Logger() log.Logger {
return engine.logger
}

type ChunkFlushStatus interface {
Flushed() bool
}

type EngineWriter interface {
AppendRows(
ctx context.Context,
Expand All @@ -450,5 +458,6 @@ type EngineWriter interface {
commitTS uint64,
rows kv.Rows,
) error
Close(ctx context.Context) error
IsSynced() bool
Close(ctx context.Context) (ChunkFlushStatus, error)
}
16 changes: 8 additions & 8 deletions pkg/lightning/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (s *backendSuite) TestWriteEngine(c *C) {
mockWriter.EXPECT().
AppendRows(ctx, "`db`.`table`", []string{"c1", "c2"}, gomock.Any(), rows1).
Return(nil)
mockWriter.EXPECT().Close(ctx).Return(nil).AnyTimes()
mockWriter.EXPECT().Close(ctx).Return(nil, nil).AnyTimes()
mockWriter.EXPECT().
AppendRows(ctx, "`db`.`table`", []string{"c1", "c2"}, gomock.Any(), rows2).
Return(nil)
Expand All @@ -153,7 +153,7 @@ func (s *backendSuite) TestWriteEngine(c *C) {
c.Assert(err, IsNil)
err = writer.WriteRows(ctx, []string{"c1", "c2"}, rows2)
c.Assert(err, IsNil)
err = writer.Close(ctx)
_, err = writer.Close(ctx)
c.Assert(err, IsNil)
}

Expand All @@ -167,7 +167,7 @@ func (s *backendSuite) TestWriteToEngineWithNothing(c *C) {

s.mockBackend.EXPECT().OpenEngine(ctx, &backend.EngineConfig{}, gomock.Any()).Return(nil)
mockWriter.EXPECT().AppendRows(ctx, gomock.Any(), gomock.Any(), gomock.Any(), emptyRows).Return(nil)
mockWriter.EXPECT().Close(ctx).Return(nil)
mockWriter.EXPECT().Close(ctx).Return(nil, nil)
s.mockBackend.EXPECT().LocalWriter(ctx, &backend.LocalWriterConfig{}, gomock.Any()).Return(mockWriter, nil)

engine, err := s.backend.OpenEngine(ctx, &backend.EngineConfig{}, "`db`.`table`", 1, s.ts)
Expand All @@ -176,7 +176,7 @@ func (s *backendSuite) TestWriteToEngineWithNothing(c *C) {
c.Assert(err, IsNil)
err = writer.WriteRows(ctx, nil, emptyRows)
c.Assert(err, IsNil)
err = writer.Close(ctx)
_, err = writer.Close(ctx)
c.Assert(err, IsNil)
}

Expand Down Expand Up @@ -207,15 +207,15 @@ func (s *backendSuite) TestWriteEngineFailed(c *C) {
mockWriter.EXPECT().
AppendRows(ctx, gomock.Any(), gomock.Any(), gomock.Any(), rows).
Return(errors.Annotate(context.Canceled, "fake unrecoverable write error"))
mockWriter.EXPECT().Close(ctx).Return(nil)
mockWriter.EXPECT().Close(ctx).Return(nil, nil)

engine, err := s.backend.OpenEngine(ctx, &backend.EngineConfig{}, "`db`.`table`", 1, s.ts)
c.Assert(err, IsNil)
writer, err := engine.LocalWriter(ctx, &backend.LocalWriterConfig{})
c.Assert(err, IsNil)
err = writer.WriteRows(ctx, nil, rows)
c.Assert(err, ErrorMatches, "fake unrecoverable write error.*")
err = writer.Close(ctx)
_, err = writer.Close(ctx)
c.Assert(err, IsNil)
}

Expand All @@ -233,15 +233,15 @@ func (s *backendSuite) TestWriteBatchSendFailedWithRetry(c *C) {
mockWriter.EXPECT().AppendRows(ctx, gomock.Any(), gomock.Any(), gomock.Any(), rows).
Return(errors.New("fake recoverable write batch error")).
MinTimes(1)
mockWriter.EXPECT().Close(ctx).Return(nil).MinTimes(1)
mockWriter.EXPECT().Close(ctx).Return(nil, nil).MinTimes(1)

engine, err := s.backend.OpenEngine(ctx, &backend.EngineConfig{}, "`db`.`table`", 1, s.ts)
c.Assert(err, IsNil)
writer, err := engine.LocalWriter(ctx, &backend.LocalWriterConfig{})
c.Assert(err, IsNil)
err = writer.WriteRows(ctx, nil, rows)
c.Assert(err, ErrorMatches, ".*fake recoverable write batch error")
err = writer.Close(ctx)
_, err = writer.Close(ctx)
c.Assert(err, IsNil)
}

Expand Down
8 changes: 6 additions & 2 deletions pkg/lightning/backend/importer/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,10 +335,14 @@ type Writer struct {
engineUUID uuid.UUID
}

func (w *Writer) Close(ctx context.Context) error {
return nil
func (w *Writer) Close(ctx context.Context) (backend.ChunkFlushStatus, error) {
return nil, nil
}

func (w *Writer) AppendRows(ctx context.Context, tableName string, columnNames []string, ts uint64, rows kv.Rows) error {
return w.importer.WriteRows(ctx, w.engineUUID, tableName, columnNames, ts, rows)
}

func (w *Writer) IsSynced() bool {
return true
}
3 changes: 2 additions & 1 deletion pkg/lightning/backend/importer/importer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,9 @@ func (s *importerSuite) TestWriteRows(c *C) {
c.Assert(err, IsNil)
err = writer.WriteRows(s.ctx, nil, s.kvPairs)
c.Assert(err, IsNil)
err = writer.Close(s.ctx)
st, err := writer.Close(s.ctx)
c.Assert(err, IsNil)
c.Assert(st, IsNil)
}

func (s *importerSuite) TestWriteHeadSendFailed(c *C) {
Expand Down
97 changes: 77 additions & 20 deletions pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,13 @@ type File struct {
sstIngester sstIngester
finishedRanges syncedRanges

// sst seq lock
seqLock sync.Mutex
// seq number for incoming sst meta
nextSeq int32
// max seq of sst metas ingested into pebble
finishedMetaSeq atomic.Int32

config backend.LocalEngineConfig

// total size of SST files waiting to be ingested
Expand Down Expand Up @@ -332,27 +339,36 @@ func (e *File) unlock() {
e.mutex.Unlock()
}

type intHeap struct {
arr []int32
type metaSeq struct {
// the sequence for this flush message, a flush call can return only if
// all the other flush will lower `flushSeq` are done
flushSeq int32
// the max sstMeta sequence number in this flush, after the flush is done (all SSTs are ingested),
// we can save chunks will a lower meta sequence number safely.
metaSeq int32
}

type metaSeqHeap struct {
arr []metaSeq
}

func (h *intHeap) Len() int {
func (h *metaSeqHeap) Len() int {
return len(h.arr)
}

func (h *intHeap) Less(i, j int) bool {
return h.arr[i] < h.arr[j]
func (h *metaSeqHeap) Less(i, j int) bool {
return h.arr[i].flushSeq < h.arr[j].flushSeq
}

func (h *intHeap) Swap(i, j int) {
func (h *metaSeqHeap) Swap(i, j int) {
h.arr[i], h.arr[j] = h.arr[j], h.arr[i]
}

func (h *intHeap) Push(x interface{}) {
h.arr = append(h.arr, x.(int32))
func (h *metaSeqHeap) Push(x interface{}) {
h.arr = append(h.arr, x.(metaSeq))
}

func (h *intHeap) Pop() interface{} {
func (h *metaSeqHeap) Pop() interface{} {
item := h.arr[len(h.arr)-1]
h.arr = h.arr[:len(h.arr)-1]
return item
Expand All @@ -373,7 +389,7 @@ func (e *File) ingestSSTLoop() {
flushQueue := make([]flushSeq, 0)
// inSyncSeqs is a heap that stores all the finished compaction tasks whose seq is bigger than `finishedSeq + 1`
// this mean there are still at lease one compaction task with a lower seq unfinished.
inSyncSeqs := &intHeap{arr: make([]int32, 0)}
inSyncSeqs := &metaSeqHeap{arr: make([]metaSeq, 0)}

type metaAndSeq struct {
metas []*sstMeta
Expand Down Expand Up @@ -417,6 +433,8 @@ func (e *File) ingestSSTLoop() {
}
ingestMetas = []*sstMeta{newMeta}
}
// batchIngestSSTs will change ingestMetas' order, so we record the max seq here
metasMaxSeq := ingestMetas[len(ingestMetas)-1].seq

if err := e.batchIngestSSTs(ingestMetas); err != nil {
e.setError(err)
Expand All @@ -426,9 +444,11 @@ func (e *File) ingestSSTLoop() {
finSeq := finishedSeq.Load()
if metas.seq == finSeq+1 {
finSeq = metas.seq
finMetaSeq := metasMaxSeq
for len(inSyncSeqs.arr) > 0 {
if inSyncSeqs.arr[0] == finSeq+1 {
if inSyncSeqs.arr[0].flushSeq == finSeq+1 {
finSeq++
finMetaSeq = inSyncSeqs.arr[0].metaSeq
heap.Remove(inSyncSeqs, 0)
} else {
break
Expand All @@ -445,12 +465,13 @@ func (e *File) ingestSSTLoop() {
}
flushQueue = flushQueue[len(flushChans):]
finishedSeq.Store(finSeq)
e.finishedMetaSeq.Store(finMetaSeq)
seqLock.Unlock()
for _, c := range flushChans {
c <- struct{}{}
}
} else {
heap.Push(inSyncSeqs, metas.seq)
heap.Push(inSyncSeqs, metaSeq{flushSeq: metas.seq, metaSeq: metasMaxSeq})
seqLock.Unlock()
}
}
Expand Down Expand Up @@ -561,16 +582,22 @@ readMetaLoop:
}
}

func (e *File) addSST(ctx context.Context, m *sstMeta) error {
func (e *File) addSST(ctx context.Context, m *sstMeta) (int32, error) {
// set pending size after SST file is generated
e.pendingFileSize.Add(m.fileSize)
// make sure sstMeta is sent into the chan in order
e.seqLock.Lock()
defer e.seqLock.Unlock()
e.nextSeq++
seq := e.nextSeq
m.seq = seq
select {
case e.sstMetasChan <- metaOrFlush{meta: m}:
case <-ctx.Done():
return ctx.Err()
return 0, ctx.Err()
case <-e.ctx.Done():
}
return e.ingestErr.Get()
return seq, e.ingestErr.Get()
}

func (e *File) batchIngestSSTs(metas []*sstMeta) error {
Expand Down Expand Up @@ -623,6 +650,7 @@ func (e *File) ingestSSTs(metas []*sstMeta) error {
log.L().Info("write data to local DB",
zap.Int64("size", totalSize),
zap.Int64("kvs", totalCount),
zap.Int("files", len(metas)),
zap.Int64("sstFileSize", fileSize),
zap.String("file", metas[0].path),
logutil.Key("firstKey", metas[0].minKey),
Expand Down Expand Up @@ -2490,6 +2518,7 @@ type sstMeta struct {
totalCount int64
// used for calculate disk-quota
fileSize int64
seq int32
}

type Writer struct {
Expand All @@ -2507,6 +2536,8 @@ type Writer struct {

kvBuffer *bytesBuffer
writer *sstWriter

lastMetaSeq int32
}

func (w *Writer) appendRowsSorted(kvs []common.KvPair) error {
Expand Down Expand Up @@ -2610,23 +2641,37 @@ func (w *Writer) flush(ctx context.Context) error {
return errors.Trace(err)
}
w.writer = nil
w.batchCount = 0
if meta != nil && meta.totalSize > 0 {
return w.local.addSST(ctx, meta)
return w.addSST(ctx, meta)
}
}

return nil
}

func (w *Writer) Close(ctx context.Context) error {
type flushStatus struct {
local *File
seq int32
}

func (f flushStatus) Flushed() bool {
return f.seq <= f.local.finishedMetaSeq.Load()
}

func (w *Writer) Close(ctx context.Context) (backend.ChunkFlushStatus, error) {
defer w.kvBuffer.destroy()
defer w.local.localWriters.Delete(w)
err := w.flush(ctx)
// FIXME: in theory this line is useless, but In our benchmark with go1.15
// this can resolve the memory consistently increasing issue.
// maybe this is a bug related to go GC mechanism.
w.writeBatch = nil
return err
return flushStatus{local: w.local, seq: w.lastMetaSeq}, err
}

func (w *Writer) IsSynced() bool {
return w.batchCount == 0 && w.lastMetaSeq <= w.local.finishedMetaSeq.Load()
}

func (w *Writer) flushKVs(ctx context.Context) error {
Expand All @@ -2646,7 +2691,7 @@ func (w *Writer) flushKVs(ctx context.Context) error {
if err != nil {
return errors.Trace(err)
}
err = w.local.addSST(ctx, meta)
err = w.addSST(ctx, meta)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -2658,6 +2703,15 @@ func (w *Writer) flushKVs(ctx context.Context) error {
return nil
}

func (w *Writer) addSST(ctx context.Context, meta *sstMeta) error {
seq, err := w.local.addSST(ctx, meta)
if err != nil {
return err
}
w.lastMetaSeq = seq
return nil
}

func (w *Writer) createSSTWriter() (*sstWriter, error) {
path := filepath.Join(w.local.sstDir, uuid.New().String()+".sst")
writer, err := newSSTWriter(path)
Expand Down Expand Up @@ -2822,10 +2876,13 @@ func (i dbSSTIngester) mergeSSTs(metas []*sstMeta, dir string) (*sstMeta, error)
}

start := time.Now()
newMeta := &sstMeta{}
newMeta := &sstMeta{
seq: metas[len(metas)-1].seq,
}
mergeIter := &sstIterHeap{
iters: make([]*sstIter, 0, len(metas)),
}

for _, p := range metas {
f, err := os.Open(p.path)
if err != nil {
Expand Down
Loading

0 comments on commit 37433a1

Please sign in to comment.