Skip to content

Commit

Permalink
redo(ticdc): support gcs scheme in redo log (#7993) (#8016)
Browse files Browse the repository at this point in the history
close #6335, close #7987
  • Loading branch information
ti-chi-bot authored Jan 5, 2023
1 parent 38f1d35 commit f69c799
Show file tree
Hide file tree
Showing 51 changed files with 1,411 additions and 1,036 deletions.
3 changes: 2 additions & 1 deletion cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/orchestrator"
redoCfg "github.com/pingcap/tiflow/pkg/redo"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -564,7 +565,7 @@ func (c *changefeed) cleanupRedoManager(ctx context.Context) {
log.Warn("changefeed is removed, but state is not complete", zap.Any("state", c.state))
return
}
if !redo.IsConsistentEnabled(c.state.Info.Config.Consistent.Level) {
if !redoCfg.IsConsistentEnabled(c.state.Info.Config.Consistent.Level) {
return
}
// when removing a paused changefeed, the redo manager is nil, create a new one
Expand Down
5 changes: 3 additions & 2 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,8 +488,9 @@ func TestRemoveChangefeed(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(dir)
info.Config.Consistent = &config.ConsistentConfig{
Level: "eventual",
Storage: filepath.Join("nfs://", dir),
Level: "eventual",
Storage: filepath.Join("nfs://", dir),
FlushIntervalInMs: config.MinFlushIntervalInMs,
}
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: ctx.ChangefeedVars().ID,
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/pipeline/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/redo"
"github.com/pingcap/tiflow/cdc/sorter"
"github.com/pingcap/tiflow/cdc/sorter/leveldb"
"github.com/pingcap/tiflow/cdc/sorter/memory"
Expand All @@ -34,6 +33,7 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/pipeline"
pmessage "github.com/pingcap/tiflow/pkg/pipeline/message"
"github.com/pingcap/tiflow/pkg/redo"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/pipeline/sorter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/redo"
"github.com/pingcap/tiflow/cdc/sorter"
"github.com/pingcap/tiflow/cdc/sorter/memory"
"github.com/pingcap/tiflow/cdc/sorter/unified"
Expand All @@ -30,6 +29,7 @@ import (
cdcContext "github.com/pingcap/tiflow/pkg/context"
"github.com/pingcap/tiflow/pkg/pipeline"
pmessage "github.com/pingcap/tiflow/pkg/pipeline/message"
"github.com/pingcap/tiflow/pkg/redo"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
Expand Down
34 changes: 0 additions & 34 deletions cdc/redo/applier.go

This file was deleted.

2 changes: 1 addition & 1 deletion cdc/redo/convert.go → cdc/redo/common/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package redo
package common

import (
"bytes"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package redo
package common

import (
"testing"
Expand Down
75 changes: 0 additions & 75 deletions cdc/redo/common/redo.go

This file was deleted.

40 changes: 40 additions & 0 deletions cdc/redo/common/redo_meta.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

//go:generate msgp

package common

import (
"github.com/pingcap/tiflow/cdc/model"
)

// LogMeta is used for store meta info.
type LogMeta struct {
CheckpointTs uint64 `msg:"checkpointTs"`
ResolvedTs uint64 `msg:"resolvedTs"`
}

// ParseMeta parses meta.
func ParseMeta(metas []*LogMeta, checkpointTs, resolvedTs *model.Ts) {
*checkpointTs = 0
*resolvedTs = 0
for _, meta := range metas {
if *checkpointTs < meta.CheckpointTs {
*checkpointTs = meta.CheckpointTs
}
if *resolvedTs < meta.ResolvedTs {
*resolvedTs = meta.ResolvedTs
}
}
}
File renamed without changes.
File renamed without changes.
106 changes: 0 additions & 106 deletions cdc/redo/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,118 +14,12 @@
package common

import (
"context"
"fmt"
"net/url"
"path/filepath"
"strings"

"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
)

const (
// RedoLogFileFormatV1 was used before v6.1.0, which doesn't contain namespace information
// layout: captureID_changefeedID_fileType_maxEventCommitTs_uuid.fileExtName
RedoLogFileFormatV1 = "%s_%s_%s_%d_%s%s"
// RedoLogFileFormatV2 is available since v6.1.0, which contains namespace information
// layout: captureID_namespace_changefeedID_fileType_maxEventCommitTs_uuid.fileExtName
RedoLogFileFormatV2 = "%s_%s_%s_%s_%d_%s%s"
)

// InitS3storage init a storage used for s3,
// s3URI should be like s3URI="s3://logbucket/test-changefeed?endpoint=http://$S3_ENDPOINT/"
var InitS3storage = func(ctx context.Context, uri url.URL) (storage.ExternalStorage, error) {
if len(uri.Host) == 0 {
return nil, cerror.WrapChangefeedUnretryableErr(
cerror.WrapError(cerror.ErrS3StorageInitialize,
errors.Errorf("please specify the bucket for s3 in %v", uri)))
}

prefix := strings.Trim(uri.Path, "/")
s3 := &backuppb.S3{Bucket: uri.Host, Prefix: prefix}
options := &storage.BackendOptions{}
storage.ExtractQueryParameters(&uri, &options.S3)
if err := options.S3.Apply(s3); err != nil {
return nil, cerror.WrapChangefeedUnretryableErr(
cerror.WrapError(cerror.ErrS3StorageInitialize, err))
}

// we should set this to true, since br set it by default in parseBackend
s3.ForcePathStyle = true
backend := &backuppb.StorageBackend{
Backend: &backuppb.StorageBackend_S3{S3: s3},
}
s3storage, err := storage.New(ctx, backend, &storage.ExternalStorageOptions{
SendCredentials: false,
HTTPClient: nil,
})
if err != nil {
return nil, cerror.WrapChangefeedUnretryableErr(
cerror.WrapError(cerror.ErrS3StorageInitialize, err))
}

return s3storage, nil
}

// logFormat2ParseFormat converts redo log file name format to the space separated
// format, which can be read and parsed by sscanf. Besides remove the suffix `%s`
// which is used as file name extension, since we will parse extension first.
func logFormat2ParseFormat(fmtStr string) string {
return strings.TrimSuffix(strings.ReplaceAll(fmtStr, "_", " "), "%s")
}

// ParseLogFileName extract the commitTs, fileType from log fileName
func ParseLogFileName(name string) (uint64, string, error) {
ext := filepath.Ext(name)
if ext == MetaEXT {
return 0, DefaultMetaFileType, nil
}

// if .sort, the name should be like
// fmt.Sprintf("%s_%s_%s_%d_%s_%d%s", w.cfg.captureID,
// w.cfg.changeFeedID.Namespace,w.cfg.changeFeedID.ID,
// w.cfg.fileType, w.commitTS.Load(), uuid, LogEXT)+SortLogEXT
if ext == SortLogEXT {
name = strings.TrimSuffix(name, SortLogEXT)
ext = filepath.Ext(name)
}
if ext != LogEXT && ext != TmpEXT {
return 0, "", nil
}

var commitTs uint64
var captureID, namespace, changefeedID, fileType, uid string
// if the namespace is not default, the log looks like:
// fmt.Sprintf("%s_%s_%s_%s_%d_%s%s", w.cfg.captureID,
// w.cfg.changeFeedID.Namespace,w.cfg.changeFeedID.ID,
// w.cfg.fileType, w.commitTS.Load(), uuid, redo.LogEXT)
// otherwise it looks like:
// fmt.Sprintf("%s_%s_%s_%d_%s%s", w.cfg.captureID,
// w.cfg.changeFeedID.ID,
// w.cfg.fileType, w.commitTS.Load(), uuid, redo.LogEXT)
var (
vars []any
formatStr string
)
if len(strings.Split(name, "_")) == 6 {
formatStr = logFormat2ParseFormat(RedoLogFileFormatV2)
vars = []any{&captureID, &namespace, &changefeedID, &fileType, &commitTs, &uid}
} else {
formatStr = logFormat2ParseFormat(RedoLogFileFormatV1)
vars = []any{&captureID, &changefeedID, &fileType, &commitTs, &uid}
}
name = strings.ReplaceAll(name, "_", " ")
_, err := fmt.Sscanf(name, formatStr, vars...)
if err != nil {
return 0, "", errors.Annotatef(err, "bad log name: %s", name)
}
return commitTs, fileType, nil
}

// FilterChangefeedFiles return the files that match to the changefeed.
func FilterChangefeedFiles(files []string, changefeedID model.ChangeFeedID) []string {
var (
Expand Down
Loading

0 comments on commit f69c799

Please sign in to comment.