Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CDC Server support data-dir (#1879) #2069

Merged
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
c47682e
This is an automated cherry-pick of #1879
jinlingchristopher Jun 16, 2021
5632ce4
fix conflicts .
3AceShowHand Jul 6, 2021
10cda22
fix conflicts.
3AceShowHand Jul 6, 2021
f0277e3
fix conflicts.
3AceShowHand Jul 6, 2021
d4de797
Merge branch 'release-4.0' into cherry-pick-1879-to-release-4.0
3AceShowHand Jul 6, 2021
d8d93c9
fix conflicts in go.mod.
3AceShowHand Jul 6, 2021
1d784f7
fix conflicts.
3AceShowHand Jul 6, 2021
b32f790
Merge branch 'release-4.0' into cherry-pick-1879-to-release-4.0
3AceShowHand Jul 7, 2021
182e87e
Merge branch 'release-4.0' into cherry-pick-1879-to-release-4.0
3AceShowHand Jul 7, 2021
7efbce4
cherry pick, and fix conflicts.
jinlingchristopher Jun 20, 2021
9a05adf
cherry pick, and fix conflicts.
jinlingchristopher Jun 21, 2021
d22f31a
cherry pick, and fix conflicts.
jinlingchristopher Jun 25, 2021
2e6fd64
cherry pick, and fix conflicts.
3AceShowHand Jul 7, 2021
cf384be
make check to fix fmt error.
3AceShowHand Jul 8, 2021
e533936
Merge branch 'release-4.0' into cherry-pick-1879-to-release-4.0
3AceShowHand Jul 8, 2021
581e575
fix make check error.
3AceShowHand Jul 8, 2021
27d7b6d
remove wrong cp .
3AceShowHand Jul 8, 2021
d8466bb
add sinkURI to make client changefeed test pass.
3AceShowHand Jul 8, 2021
2185992
add sinkURI to make client changefeed test pass.
3AceShowHand Jul 8, 2021
79211d7
Merge branch 'release-4.0' into cherry-pick-1879-to-release-4.0
3AceShowHand Jul 8, 2021
d127957
set NewReplicaImpl to false.
3AceShowHand Jul 8, 2021
1826ccd
Merge branch 'cherry-pick-1879-to-release-4.0' of https://github.com/…
3AceShowHand Jul 8, 2021
da7fc0a
fix check_changefeed_mark_stopped_regex
3AceShowHand Jul 9, 2021
ec0c9de
Merge branch 'release-4.0' into cherry-pick-1879-to-release-4.0
3AceShowHand Jul 9, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cdc/kv/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ func (s *etcdSuite) TestOpChangeFeedDetail(c *check.C) {
ctx := context.Background()
detail := &model.ChangeFeedInfo{
SinkURI: "root@tcp(127.0.0.1:3306)/mysql",
SortDir: "/old-version/sorter",
}
cfID := "test-op-cf"

Expand All @@ -236,6 +237,7 @@ func (s *etcdSuite) TestOpChangeFeedDetail(c *check.C) {
d, err := s.client.GetChangeFeedInfo(ctx, cfID)
c.Assert(err, check.IsNil)
c.Assert(d.SinkURI, check.Equals, detail.SinkURI)
c.Assert(d.SortDir, check.Equals, detail.SortDir)

err = s.client.LeaseGuardDeleteChangeFeedInfo(ctx, cfID, sess.Lease())
c.Assert(err, check.IsNil)
Expand Down
4 changes: 3 additions & 1 deletion cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ type ChangeFeedInfo struct {
AdminJobType AdminJobType `json:"admin-job-type"`
Engine SortEngine `json:"sort-engine"`
// SortDir is deprecated
SortDir string `json:"-"`
// it cannot be set by user in changefeed level, any assignment to it should be ignored.
// but can be fetched for backward compatibility
SortDir string `json:"sort-dir"`

Config *config.ReplicaConfig `json:"config"`
State FeedState `json:"state"`
Expand Down
1 change: 1 addition & 0 deletions cdc/model/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func (s *configSuite) TestFillV1(c *check.C) {
},
StartTs: 417136892416622595,
Engine: "memory",
SortDir: ".",
Config: &config.ReplicaConfig{
CaseSensitive: true,
Filter: &config.FilterConfig{
Expand Down
14 changes: 7 additions & 7 deletions cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ type oldProcessor struct {
globalResolvedTs uint64
localResolvedTs uint64
checkpointTs uint64
globalcheckpointTs uint64
globalCheckpointTs uint64
appliedLocalCheckpointTs uint64
flushCheckpointInterval time.Duration

Expand Down Expand Up @@ -238,7 +238,7 @@ func newProcessor(
}

if err == nil {
p.globalcheckpointTs = info.CheckpointTs
p.globalCheckpointTs = info.CheckpointTs
}

for tableID, replicaInfo := range p.status.Tables {
Expand Down Expand Up @@ -676,7 +676,7 @@ func (p *oldProcessor) globalStatusWorker(ctx context.Context) error {
)

updateStatus := func(changefeedStatus *model.ChangeFeedStatus) {
atomic.StoreUint64(&p.globalcheckpointTs, changefeedStatus.CheckpointTs)
atomic.StoreUint64(&p.globalCheckpointTs, changefeedStatus.CheckpointTs)
if lastResolvedTs == changefeedStatus.ResolvedTs &&
lastCheckPointTs == changefeedStatus.CheckpointTs {
return
Expand Down Expand Up @@ -783,15 +783,15 @@ func (p *oldProcessor) addTable(ctx context.Context, tableID int64, replicaInfo
return
}

globalcheckpointTs := atomic.LoadUint64(&p.globalcheckpointTs)
globalCheckpointTs := atomic.LoadUint64(&p.globalCheckpointTs)

if replicaInfo.StartTs < globalcheckpointTs {
// use Warn instead of Panic in case that p.globalcheckpointTs has not been initialized.
if replicaInfo.StartTs < globalCheckpointTs {
// use Warn instead of Panic in case that p.globalCheckpointTs has not been initialized.
// The cdc_state_checker will catch a real inconsistency in integration tests.
log.Warn("addTable: startTs < checkpoint",
util.ZapFieldChangefeed(ctx),
zap.Int64("tableID", tableID),
zap.Uint64("checkpoint", globalcheckpointTs),
zap.Uint64("checkpoint", globalCheckpointTs),
zap.Uint64("startTs", replicaInfo.StartTs))
}

Expand Down
4 changes: 4 additions & 0 deletions cdc/puller/sorter/backend_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ func (p *backEndPool) alloc(ctx context.Context) (backEnd, error) {
zap.Int64("table-id", tableID),
zap.String("table-name", tableName))

if err := util.CheckDataDirSatisfied(); err != nil {
return nil, errors.Trace(err)
}

ret, err := newFileBackEnd(fname, &msgPackGenSerde{})
if err != nil {
return nil, errors.Trace(err)
Expand Down
35 changes: 28 additions & 7 deletions cdc/puller/sorter/backend_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"os"
"path/filepath"
"strconv"
"time"

Expand All @@ -34,10 +35,17 @@ var _ = check.SerialSuites(&backendPoolSuite{})
func (s *backendPoolSuite) TestBasicFunction(c *check.C) {
defer testleak.AfterTest(c)()

err := os.MkdirAll("/tmp/sorter", 0o755)
dataDir := "/tmp/cdc_data"
err := os.MkdirAll(dataDir, 0o755)
c.Assert(err, check.IsNil)

sortDir := filepath.Join(dataDir, config.DefaultSortDir)
err = os.MkdirAll(sortDir, 0o755)
c.Assert(err, check.IsNil)

conf := config.GetDefaultServerConfig()
conf.DataDir = dataDir
conf.Sorter.SortDir = sortDir
conf.Sorter.MaxMemoryPressure = 90 // 90%
conf.Sorter.MaxMemoryConsumption = 16 * 1024 * 1024 * 1024 // 16G
config.StoreGlobalServerConfig(conf)
Expand All @@ -48,7 +56,7 @@ func (s *backendPoolSuite) TestBasicFunction(c *check.C) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*20)
defer cancel()

backEndPool, err := newBackEndPool("/tmp/sorter", "")
backEndPool, err := newBackEndPool(sortDir, "")
c.Assert(err, check.IsNil)
c.Assert(backEndPool, check.NotNil)
defer backEndPool.terminate()
Expand Down Expand Up @@ -103,14 +111,20 @@ func (s *backendPoolSuite) TestBasicFunction(c *check.C) {
func (s *backendPoolSuite) TestDirectoryBadPermission(c *check.C) {
defer testleak.AfterTest(c)()

dir := c.MkDir()
err := os.Chmod(dir, 0o311) // no permission to `ls`
dataDir := c.MkDir()
sortDir := filepath.Join(dataDir, config.DefaultSortDir)
err := os.MkdirAll(sortDir, 0o755)
c.Assert(err, check.IsNil)

err = os.Chmod(sortDir, 0o311) // no permission to `ls`
c.Assert(err, check.IsNil)

conf := config.GetGlobalServerConfig()
conf.DataDir = dataDir
conf.Sorter.SortDir = sortDir
conf.Sorter.MaxMemoryPressure = 0 // force using files

backEndPool, err := newBackEndPool(dir, "")
backEndPool, err := newBackEndPool(sortDir, "")
c.Assert(err, check.IsNil)
c.Assert(backEndPool, check.NotNil)
defer backEndPool.terminate()
Expand All @@ -131,10 +145,17 @@ func (s *backendPoolSuite) TestDirectoryBadPermission(c *check.C) {
func (s *backendPoolSuite) TestCleanUpSelf(c *check.C) {
defer testleak.AfterTest(c)()

err := os.MkdirAll("/tmp/sorter", 0o755)
dataDir := c.MkDir()
err := os.Chmod(dataDir, 0o755)
c.Assert(err, check.IsNil)

sorterDir := filepath.Join(dataDir, config.DefaultSortDir)
err = os.MkdirAll(sorterDir, 0o755)
c.Assert(err, check.IsNil)

conf := config.GetDefaultServerConfig()
conf.DataDir = dataDir
conf.Sorter.SortDir = sorterDir
conf.Sorter.MaxMemoryPressure = 90 // 90%
conf.Sorter.MaxMemoryConsumption = 16 * 1024 * 1024 * 1024 // 16G
config.StoreGlobalServerConfig(conf)
Expand All @@ -143,7 +164,7 @@ func (s *backendPoolSuite) TestCleanUpSelf(c *check.C) {
c.Assert(err, check.IsNil)
defer failpoint.Disable("github.com/pingcap/ticdc/cdc/puller/sorter/memoryPressureInjectPoint") //nolint:errcheck

backEndPool, err := newBackEndPool("/tmp/sorter", "")
backEndPool, err := newBackEndPool(sorterDir, "")
c.Assert(err, check.IsNil)
c.Assert(backEndPool, check.NotNil)

Expand Down
46 changes: 34 additions & 12 deletions cdc/puller/sorter/sorter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ import (
"context"
"math"
"os"
"path/filepath"
"sync/atomic"
"testing"
"time"

"go.uber.org/zap/zapcore"

_ "net/http/pprof"

"github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
Expand All @@ -32,7 +35,6 @@ import (
"github.com/pingcap/ticdc/pkg/util/testleak"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
_ "net/http/pprof"
)

const (
Expand Down Expand Up @@ -62,18 +64,21 @@ func (s *sorterSuite) TestSorterBasic(c *check.C) {
defer UnifiedSorterCleanUp()

conf := config.GetDefaultServerConfig()
conf.DataDir = "/tmp/cdc_data"
sortDir := filepath.Join(conf.DataDir, config.DefaultSortDir)
conf.Sorter = &config.SorterConfig{
NumConcurrentWorker: 8,
ChunkSizeLimit: 1 * 1024 * 1024 * 1024,
MaxMemoryPressure: 60,
MaxMemoryConsumption: 16 * 1024 * 1024 * 1024,
NumWorkerPoolGoroutine: 4,
SortDir: sortDir,
}
config.StoreGlobalServerConfig(conf)

err := os.MkdirAll("/tmp/sorter", 0o755)
err := os.MkdirAll(conf.Sorter.SortDir, 0o755)
c.Assert(err, check.IsNil)
sorter, err := NewUnifiedSorter("/tmp/sorter", "test-cf", "test", 0, "0.0.0.0:0")
sorter, err := NewUnifiedSorter(conf.Sorter.SortDir, "test-cf", "test", 0, "0.0.0.0:0")
c.Assert(err, check.IsNil)

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
Expand All @@ -87,18 +92,21 @@ func (s *sorterSuite) TestSorterCancel(c *check.C) {
defer UnifiedSorterCleanUp()

conf := config.GetDefaultServerConfig()
conf.DataDir = "/tmp/cdc_data"
sortDir := filepath.Join(conf.DataDir, config.DefaultSortDir)
conf.Sorter = &config.SorterConfig{
NumConcurrentWorker: 8,
ChunkSizeLimit: 1 * 1024 * 1024 * 1024,
MaxMemoryPressure: 60,
MaxMemoryConsumption: 0,
NumWorkerPoolGoroutine: 4,
SortDir: sortDir,
}
config.StoreGlobalServerConfig(conf)

err := os.MkdirAll("/tmp/sorter", 0o755)
err := os.MkdirAll(conf.Sorter.SortDir, 0o755)
c.Assert(err, check.IsNil)
sorter, err := NewUnifiedSorter("/tmp/sorter", "test-cf", "test", 0, "0.0.0.0:0")
sorter, err := NewUnifiedSorter(conf.Sorter.SortDir, "test-cf", "test", 0, "0.0.0.0:0")
c.Assert(err, check.IsNil)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
Expand Down Expand Up @@ -127,6 +135,11 @@ func testSorter(ctx context.Context, c *check.C, sorter puller.EventSorter, coun
log.Panic("Could not enable failpoint", zap.Error(err))
}

c.Assert(failpoint.Enable("github.com/pingcap/ticdc/pkg/util/InjectCheckDataDirSatisfied", ""), check.IsNil)
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/ticdc/pkg/util/InjectCheckDataDirSatisfied"), check.IsNil)
}()

ctx, cancel := context.WithCancel(ctx)
errg, ctx := errgroup.WithContext(ctx)
errg.Go(func() error {
Expand Down Expand Up @@ -284,16 +297,19 @@ func (s *sorterSuite) TestSorterCancelRestart(c *check.C) {
defer UnifiedSorterCleanUp()

conf := config.GetDefaultServerConfig()
conf.DataDir = "/tmp/cdc_data"
sortDir := filepath.Join(conf.DataDir, config.DefaultSortDir)
conf.Sorter = &config.SorterConfig{
NumConcurrentWorker: 8,
ChunkSizeLimit: 1 * 1024 * 1024 * 1024,
MaxMemoryPressure: 0, // disable memory sort
MaxMemoryConsumption: 0,
NumWorkerPoolGoroutine: 4,
SortDir: sortDir,
}
config.StoreGlobalServerConfig(conf)

err := os.MkdirAll("/tmp/sorter", 0o755)
err := os.MkdirAll(conf.Sorter.SortDir, 0o755)
c.Assert(err, check.IsNil)

// enable the failpoint to simulate delays
Expand All @@ -311,7 +327,7 @@ func (s *sorterSuite) TestSorterCancelRestart(c *check.C) {
}()

for i := 0; i < 5; i++ {
sorter, err := NewUnifiedSorter("/tmp/sorter", "test-cf", "test", 0, "0.0.0.0:0")
sorter, err := NewUnifiedSorter(conf.Sorter.SortDir, "test-cf", "test", 0, "0.0.0.0:0")
c.Assert(err, check.IsNil)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
err = testSorter(ctx, c, sorter, 100000000, true)
Expand All @@ -328,18 +344,21 @@ func (s *sorterSuite) TestSorterIOError(c *check.C) {
defer log.SetLevel(zapcore.InfoLevel)

conf := config.GetDefaultServerConfig()
conf.DataDir = "/tmp/cdc_data"
sortDir := filepath.Join(conf.DataDir, config.DefaultSortDir)
conf.Sorter = &config.SorterConfig{
NumConcurrentWorker: 8,
ChunkSizeLimit: 1 * 1024 * 1024 * 1024,
MaxMemoryPressure: 60,
MaxMemoryConsumption: 0,
NumWorkerPoolGoroutine: 4,
SortDir: sortDir,
}
config.StoreGlobalServerConfig(conf)

err := os.MkdirAll("/tmp/sorter", 0o755)
err := os.MkdirAll(conf.Sorter.SortDir, 0o755)
c.Assert(err, check.IsNil)
sorter, err := NewUnifiedSorter("/tmp/sorter", "test-cf", "test", 0, "0.0.0.0:0")
sorter, err := NewUnifiedSorter(conf.Sorter.SortDir, "test-cf", "test", 0, "0.0.0.0:0")
c.Assert(err, check.IsNil)

ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
Expand Down Expand Up @@ -376,7 +395,7 @@ func (s *sorterSuite) TestSorterIOError(c *check.C) {
}()

// recreate the sorter
sorter, err = NewUnifiedSorter("/tmp/sorter", "test-cf", "test", 0, "0.0.0.0:0")
sorter, err = NewUnifiedSorter(conf.Sorter.SortDir, "test-cf", "test", 0, "0.0.0.0:0")
c.Assert(err, check.IsNil)

finishedCh = make(chan struct{})
Expand All @@ -402,18 +421,21 @@ func (s *sorterSuite) TestSorterErrorReportCorrect(c *check.C) {
defer log.SetLevel(zapcore.InfoLevel)

conf := config.GetDefaultServerConfig()
conf.DataDir = "/tmp/cdc_data"
sortDir := filepath.Join(conf.DataDir, config.DefaultSortDir)
conf.Sorter = &config.SorterConfig{
NumConcurrentWorker: 8,
ChunkSizeLimit: 1 * 1024 * 1024 * 1024,
MaxMemoryPressure: 60,
MaxMemoryConsumption: 0,
NumWorkerPoolGoroutine: 4,
SortDir: sortDir,
}
config.StoreGlobalServerConfig(conf)

err := os.MkdirAll("/tmp/sorter", 0o755)
err := os.MkdirAll(conf.Sorter.SortDir, 0o755)
c.Assert(err, check.IsNil)
sorter, err := NewUnifiedSorter("/tmp/sorter", "test-cf", "test", 0, "0.0.0.0:0")
sorter, err := NewUnifiedSorter(conf.Sorter.SortDir, "test-cf", "test", 0, "0.0.0.0:0")
c.Assert(err, check.IsNil)

ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
Expand Down
Loading