Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

redolog: add a precleanup process when s3 enable (#3525) #3878

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cdc/redo/writer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,10 @@ func (w *Writer) shouldRemoved(checkPointTs uint64, f os.FileInfo) (bool, error)
func (w *Writer) getShouldRemovedFiles(checkPointTs uint64) ([]os.FileInfo, error) {
files, err := ioutil.ReadDir(w.cfg.Dir)
if err != nil {
if os.IsNotExist(err) {
log.Warn("check removed log dir fail", zap.Error(err))
return []os.FileInfo{}, nil
}
return nil, cerror.WrapError(cerror.ErrRedoFileOp, errors.Annotatef(err, "can't read log file directory: %s", w.cfg.Dir))
}

Expand Down
11 changes: 10 additions & 1 deletion cdc/redo/writer/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,17 @@ func TestWriterGC(t *testing.T) {
require.Nil(t, err, files[0].Name())
require.EqualValues(t, 3, ts)
require.Equal(t, common.DefaultRowLogFileType, fileType)

time.Sleep(time.Duration(100) * time.Millisecond)

w1 := &Writer{
cfg: cfg,
uint64buf: make([]byte, 8),
storage: mockStorage,
}
w1.cfg.Dir += "not-exist"
w1.running.Store(true)
err = w1.GC(111)
require.Nil(t, err)
}

func TestAdvanceTs(t *testing.T) {
Expand Down
53 changes: 52 additions & 1 deletion cdc/redo/writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,52 @@ func NewLogWriter(ctx context.Context, cfg *LogWriterConfig) (*LogWriter, error)
if err != nil {
return nil, err
}
} else {
if cfg.S3Storage {
// since other process get the remove changefeed job async, may still write some logs after owner delete the log
err = logWriter.preCleanUpS3(ctx)
if err != nil {
return nil, err
}
}
}
logWriters[cfg.ChangeFeedID] = logWriter
go logWriter.runGC(ctx)
return logWriter, nil
}

func (l *LogWriter) preCleanUpS3(ctx context.Context) error {
ret, err := l.storage.FileExists(ctx, l.getDeletedChangefeedMarker())
if err != nil {
return cerror.WrapError(cerror.ErrS3StorageAPI, err)
}
if !ret {
return nil
}

files, err := getAllFilesInS3(ctx, l)
if err != nil {
return err
}

ff := []string{}
for _, file := range files {
if file != l.getDeletedChangefeedMarker() {
ff = append(ff, file)
}
}
err = l.deleteFilesInS3(ctx, ff)
if err != nil {
return err
}
err = l.storage.DeleteFile(ctx, l.getDeletedChangefeedMarker())
if !isNotExistInS3(err) {
return cerror.WrapError(cerror.ErrS3StorageAPI, err)
}

return nil
}

func (l *LogWriter) initMeta(ctx context.Context) error {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -433,7 +473,18 @@ func (l *LogWriter) DeleteAllLogs(ctx context.Context) error {
}
// after delete logs, rm the LogWriter since it is already closed
l.cleanUpLogWriter()
return nil

// write a marker to s3, since other process get the remove changefeed job async,
// may still write some logs after owner delete the log
return l.writeDeletedMarkerToS3(ctx)
}

func (l *LogWriter) getDeletedChangefeedMarker() string {
return fmt.Sprintf("delete_%s", l.cfg.ChangeFeedID)
}

func (l *LogWriter) writeDeletedMarkerToS3(ctx context.Context) error {
return cerror.WrapError(cerror.ErrS3StorageAPI, l.storage.WriteFile(ctx, l.getDeletedChangefeedMarker(), []byte("D")))
}

func (l *LogWriter) cleanUpLogWriter() {
Expand Down
109 changes: 108 additions & 1 deletion cdc/redo/writer/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"io/ioutil"
"math"
"net/url"
"os"
"path/filepath"
"strings"
Expand All @@ -29,6 +30,7 @@ import (
"github.com/golang/mock/gomock"
"github.com/pingcap/errors"
mockstorage "github.com/pingcap/tidb/br/pkg/mock/storage"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/redo/common"
cerror "github.com/pingcap/tiflow/pkg/errors"
Expand Down Expand Up @@ -675,6 +677,31 @@ func TestNewLogWriter(t *testing.T) {
require.Equal(t, meta.ResolvedTs, l.meta.ResolvedTs)
require.Equal(t, map[int64]uint64{}, l.meta.ResolvedTsList)
time.Sleep(time.Millisecond * time.Duration(math.Max(float64(defaultFlushIntervalInMs), float64(defaultGCIntervalInMs))+1))

origin := common.InitS3storage
defer func() {
common.InitS3storage = origin
}()
controller := gomock.NewController(t)
mockStorage := mockstorage.NewMockExternalStorage(controller)
// skip pre cleanup
mockStorage.EXPECT().FileExists(gomock.Any(), gomock.Any()).Return(false, nil)
common.InitS3storage = func(ctx context.Context, uri url.URL) (storage.ExternalStorage, error) {
return mockStorage, nil
}
cfg3 := &LogWriterConfig{
Dir: dir,
ChangeFeedID: "test-cf112232",
CaptureID: "cp",
MaxLogSize: 10,
CreateTime: time.Date(2000, 1, 1, 1, 1, 1, 1, &time.Location{}),
FlushIntervalInMs: 5,
S3Storage: true,
}
l3, err := NewLogWriter(ctx, cfg3)
require.Nil(t, err)
err = l3.Close()
require.Nil(t, err)
}

func TestWriterRedoGC(t *testing.T) {
Expand Down Expand Up @@ -750,6 +777,7 @@ func TestDeleteAllLogs(t *testing.T) {
closeErr error
getAllFilesInS3Err error
deleteFileErr error
writeFileErr error
wantErr string
}{
{
Expand Down Expand Up @@ -783,6 +811,12 @@ func TestDeleteAllLogs(t *testing.T) {
args: args{enableS3: true},
deleteFileErr: awserr.New(s3.ErrCodeNoSuchKey, "no such key", nil),
},
{
name: "writerFile err",
args: args{enableS3: true},
writeFileErr: errors.New("xx"),
wantErr: ".*xx*.",
},
}

for _, tt := range tests {
Expand All @@ -803,6 +837,8 @@ func TestDeleteAllLogs(t *testing.T) {
mockStorage := mockstorage.NewMockExternalStorage(controller)

mockStorage.EXPECT().DeleteFile(gomock.Any(), gomock.Any()).Return(tt.deleteFileErr).MaxTimes(2)
mockStorage.EXPECT().WriteFile(gomock.Any(), gomock.Any(), gomock.Any()).Return(tt.writeFileErr).MaxTimes(1)

mockWriter := &mockFileWriter{}
mockWriter.On("Close").Return(tt.closeErr)
cfg := &LogWriterConfig{
Expand All @@ -829,7 +865,8 @@ func TestDeleteAllLogs(t *testing.T) {
require.Regexp(t, tt.wantErr, ret.Error(), tt.name)
} else {
require.Nil(t, ret, tt.name)
require.Equal(t, 0, len(logWriters), tt.name)
_, ok := logWriters[writer.cfg.ChangeFeedID]
require.False(t, ok, tt.name)
if !tt.args.enableS3 {
_, err := os.Stat(dir)
require.True(t, os.IsNotExist(err), tt.name)
Expand All @@ -839,3 +876,73 @@ func TestDeleteAllLogs(t *testing.T) {
getAllFilesInS3 = origin
}
}

func TestPreCleanUpS3(t *testing.T) {
testCases := []struct {
name string
fileExistsErr error
fileExists bool
getAllFilesInS3Err error
deleteFileErr error
wantErr string
}{
{
name: "happy no marker",
fileExists: false,
},
{
name: "fileExists err",
fileExistsErr: errors.New("xx"),
wantErr: ".*xx*.",
},
{
name: "getAllFilesInS3 err",
fileExists: true,
getAllFilesInS3Err: errors.New("xx"),
wantErr: ".*xx*.",
},
{
name: "deleteFile normal err",
fileExists: true,
deleteFileErr: errors.New("xx"),
wantErr: ".*ErrS3StorageAPI*.",
},
{
name: "deleteFile notExist err",
fileExists: true,
deleteFileErr: awserr.New(s3.ErrCodeNoSuchKey, "no such key", nil),
},
}

for _, tc := range testCases {
origin := getAllFilesInS3
getAllFilesInS3 = func(ctx context.Context, l *LogWriter) ([]string, error) {
return []string{"1", "11", "delete_test-cf"}, tc.getAllFilesInS3Err
}
controller := gomock.NewController(t)
mockStorage := mockstorage.NewMockExternalStorage(controller)

mockStorage.EXPECT().FileExists(gomock.Any(), gomock.Any()).Return(tc.fileExists, tc.fileExistsErr)
mockStorage.EXPECT().DeleteFile(gomock.Any(), gomock.Any()).Return(tc.deleteFileErr).MaxTimes(3)

cfg := &LogWriterConfig{
Dir: "dir",
ChangeFeedID: "test-cf",
CaptureID: "cp",
MaxLogSize: 10,
CreateTime: time.Date(2000, 1, 1, 1, 1, 1, 1, &time.Location{}),
FlushIntervalInMs: 5,
}
writer := LogWriter{
cfg: cfg,
storage: mockStorage,
}
ret := writer.preCleanUpS3(context.Background())
if tc.wantErr != "" {
require.Regexp(t, tc.wantErr, ret.Error(), tc.name)
} else {
require.Nil(t, ret, tc.name)
}
getAllFilesInS3 = origin
}
}