Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

lightning: save chunk checkpoint timely (#1080) #1181

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,17 @@ func (w *LocalEngineWriter) WriteRows(ctx context.Context, columnNames []string,
return w.writer.AppendRows(ctx, w.tableName, columnNames, w.ts, rows)
}

<<<<<<< HEAD
func (w *LocalEngineWriter) Close() error {
return w.writer.Close()
=======
func (w *LocalEngineWriter) Close(ctx context.Context) (ChunkFlushStatus, error) {
return w.writer.Close(ctx)
>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080))
}

func (w *LocalEngineWriter) IsSynced() bool {
return w.writer.IsSynced()
}

// UnsafeCloseEngine closes the engine without first opening it.
Expand Down Expand Up @@ -420,6 +429,10 @@ func (engine *ClosedEngine) Logger() log.Logger {
return engine.logger
}

type ChunkFlushStatus interface {
Flushed() bool
}

type EngineWriter interface {
AppendRows(
ctx context.Context,
Expand All @@ -428,5 +441,10 @@ type EngineWriter interface {
commitTS uint64,
rows kv.Rows,
) error
<<<<<<< HEAD
Close() error
=======
IsSynced() bool
Close(ctx context.Context) (ChunkFlushStatus, error)
>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080))
}
33 changes: 33 additions & 0 deletions pkg/lightning/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ func (s *backendSuite) TestWriteEngine(c *C) {
mockWriter.EXPECT().
AppendRows(ctx, "`db`.`table`", []string{"c1", "c2"}, gomock.Any(), rows1).
Return(nil)
<<<<<<< HEAD
=======
mockWriter.EXPECT().Close(ctx).Return(nil, nil).AnyTimes()
>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080))
mockWriter.EXPECT().
AppendRows(ctx, "`db`.`table`", []string{"c1", "c2"}, gomock.Any(), rows2).
Return(nil)
Expand All @@ -151,7 +155,11 @@ func (s *backendSuite) TestWriteEngine(c *C) {
c.Assert(err, IsNil)
err = writer.WriteRows(ctx, []string{"c1", "c2"}, rows2)
c.Assert(err, IsNil)
<<<<<<< HEAD
err = writer.Close()
=======
_, err = writer.Close(ctx)
>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080))
c.Assert(err, IsNil)
}

Expand All @@ -165,16 +173,25 @@ func (s *backendSuite) TestWriteToEngineWithNothing(c *C) {

s.mockBackend.EXPECT().OpenEngine(ctx, gomock.Any()).Return(nil)
mockWriter.EXPECT().AppendRows(ctx, gomock.Any(), gomock.Any(), gomock.Any(), emptyRows).Return(nil)
<<<<<<< HEAD
mockWriter.EXPECT().Close().Return(nil)
s.mockBackend.EXPECT().LocalWriter(ctx, gomock.Any()).Return(mockWriter, nil)
=======
mockWriter.EXPECT().Close(ctx).Return(nil, nil)
s.mockBackend.EXPECT().LocalWriter(ctx, &backend.LocalWriterConfig{}, gomock.Any()).Return(mockWriter, nil)
>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080))

engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1, s.ts)
c.Assert(err, IsNil)
writer, err := engine.LocalWriter(ctx)
c.Assert(err, IsNil)
err = writer.WriteRows(ctx, nil, emptyRows)
c.Assert(err, IsNil)
<<<<<<< HEAD
err = writer.Close()
=======
_, err = writer.Close(ctx)
>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080))
c.Assert(err, IsNil)
}

Expand Down Expand Up @@ -204,15 +221,23 @@ func (s *backendSuite) TestWriteEngineFailed(c *C) {
mockWriter.EXPECT().
AppendRows(ctx, gomock.Any(), gomock.Any(), gomock.Any(), rows).
Return(errors.Annotate(context.Canceled, "fake unrecoverable write error"))
<<<<<<< HEAD
mockWriter.EXPECT().Close().Return(nil)
=======
mockWriter.EXPECT().Close(ctx).Return(nil, nil)
>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080))

engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1, s.ts)
c.Assert(err, IsNil)
writer, err := engine.LocalWriter(ctx)
c.Assert(err, IsNil)
err = writer.WriteRows(ctx, nil, rows)
c.Assert(err, ErrorMatches, "fake unrecoverable write error.*")
<<<<<<< HEAD
err = writer.Close()
=======
_, err = writer.Close(ctx)
>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080))
c.Assert(err, IsNil)
}

Expand All @@ -229,15 +254,23 @@ func (s *backendSuite) TestWriteBatchSendFailedWithRetry(c *C) {
mockWriter.EXPECT().AppendRows(ctx, gomock.Any(), gomock.Any(), gomock.Any(), rows).
Return(errors.New("fake recoverable write batch error")).
MinTimes(1)
<<<<<<< HEAD
mockWriter.EXPECT().Close().Return(nil).MinTimes(1)
=======
mockWriter.EXPECT().Close(ctx).Return(nil, nil).MinTimes(1)
>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080))

engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1, s.ts)
c.Assert(err, IsNil)
writer, err := engine.LocalWriter(ctx)
c.Assert(err, IsNil)
err = writer.WriteRows(ctx, nil, rows)
c.Assert(err, ErrorMatches, ".*fake recoverable write batch error")
<<<<<<< HEAD
err = writer.Close()
=======
_, err = writer.Close(ctx)
>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080))
c.Assert(err, IsNil)
}

Expand Down
9 changes: 9 additions & 0 deletions pkg/lightning/backend/importer/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,10 +335,19 @@ type Writer struct {
engineUUID uuid.UUID
}

<<<<<<< HEAD
func (w *Writer) Close() error {
return nil
=======
func (w *Writer) Close(ctx context.Context) (backend.ChunkFlushStatus, error) {
return nil, nil
>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080))
}

func (w *Writer) AppendRows(ctx context.Context, tableName string, columnNames []string, ts uint64, rows kv.Rows) error {
return w.importer.WriteRows(ctx, w.engineUUID, tableName, columnNames, ts, rows)
}

func (w *Writer) IsSynced() bool {
return true
}
5 changes: 5 additions & 0 deletions pkg/lightning/backend/importer/importer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,13 @@ func (s *importerSuite) TestWriteRows(c *C) {
c.Assert(err, IsNil)
err = writer.WriteRows(s.ctx, nil, s.kvPairs)
c.Assert(err, IsNil)
<<<<<<< HEAD
err = writer.Close()
=======
st, err := writer.Close(s.ctx)
>>>>>>> 37433a1b (lightning: save chunk checkpoint timely (#1080))
c.Assert(err, IsNil)
c.Assert(st, IsNil)
}

func (s *importerSuite) TestWriteHeadSendFailed(c *C) {
Expand Down
Loading