diff --git a/cdc/processor.go b/cdc/processor.go index 52b34603790..e246b3f5ca9 100644 --- a/cdc/processor.go +++ b/cdc/processor.go @@ -85,7 +85,7 @@ type oldProcessor struct { globalResolvedTs uint64 localResolvedTs uint64 checkpointTs uint64 - globalcheckpointTs uint64 + globalCheckpointTs uint64 appliedLocalCheckpointTs uint64 flushCheckpointInterval time.Duration @@ -238,7 +238,7 @@ func newProcessor( } if err == nil { - p.globalcheckpointTs = info.CheckpointTs + p.globalCheckpointTs = info.CheckpointTs } for tableID, replicaInfo := range p.status.Tables { @@ -676,7 +676,7 @@ func (p *oldProcessor) globalStatusWorker(ctx context.Context) error { ) updateStatus := func(changefeedStatus *model.ChangeFeedStatus) { - atomic.StoreUint64(&p.globalcheckpointTs, changefeedStatus.CheckpointTs) + atomic.StoreUint64(&p.globalCheckpointTs, changefeedStatus.CheckpointTs) if lastResolvedTs == changefeedStatus.ResolvedTs && lastCheckPointTs == changefeedStatus.CheckpointTs { return @@ -783,15 +783,15 @@ func (p *oldProcessor) addTable(ctx context.Context, tableID int64, replicaInfo return } - globalcheckpointTs := atomic.LoadUint64(&p.globalcheckpointTs) + globalCheckpointTs := atomic.LoadUint64(&p.globalCheckpointTs) - if replicaInfo.StartTs < globalcheckpointTs { - // use Warn instead of Panic in case that p.globalcheckpointTs has not been initialized. + if replicaInfo.StartTs < globalCheckpointTs { + // use Warn instead of Panic in case that p.globalCheckpointTs has not been initialized. // The cdc_state_checker will catch a real inconsistency in integration tests. log.Warn("addTable: startTs < checkpoint", util.ZapFieldChangefeed(ctx), zap.Int64("tableID", tableID), - zap.Uint64("checkpoint", globalcheckpointTs), + zap.Uint64("checkpoint", globalCheckpointTs), zap.Uint64("startTs", replicaInfo.StartTs)) } diff --git a/cdc/puller/sorter/backend_pool.go b/cdc/puller/sorter/backend_pool.go index 61997752119..ba12e90c7c1 100644 --- a/cdc/puller/sorter/backend_pool.go +++ b/cdc/puller/sorter/backend_pool.go @@ -193,6 +193,10 @@ func (p *backEndPool) alloc(ctx context.Context) (backEnd, error) { zap.Int64("table-id", tableID), zap.String("table-name", tableName)) + if err := util.CheckDataDirSatisfied(); err != nil { + return nil, errors.Trace(err) + } + ret, err := newFileBackEnd(fname, &msgPackGenSerde{}) if err != nil { return nil, errors.Trace(err) diff --git a/cdc/puller/sorter/backend_pool_test.go b/cdc/puller/sorter/backend_pool_test.go index 8f4cec331bf..d01bfd983cd 100644 --- a/cdc/puller/sorter/backend_pool_test.go +++ b/cdc/puller/sorter/backend_pool_test.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "os" + "path/filepath" "strconv" "time" @@ -34,10 +35,17 @@ var _ = check.SerialSuites(&backendPoolSuite{}) func (s *backendPoolSuite) TestBasicFunction(c *check.C) { defer testleak.AfterTest(c)() - err := os.MkdirAll("/tmp/sorter", 0o755) + dataDir := "/tmp/cdc_data" + err := os.MkdirAll(dataDir, 0o755) + c.Assert(err, check.IsNil) + + sortDir := filepath.Join(dataDir, config.DefaultSortDir) + err = os.MkdirAll(sortDir, 0o755) c.Assert(err, check.IsNil) conf := config.GetDefaultServerConfig() + conf.DataDir = dataDir + conf.Sorter.SortDir = sortDir conf.Sorter.MaxMemoryPressure = 90 // 90% conf.Sorter.MaxMemoryConsumption = 16 * 1024 * 1024 * 1024 // 16G config.StoreGlobalServerConfig(conf) @@ -48,7 +56,7 @@ func (s *backendPoolSuite) TestBasicFunction(c *check.C) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*20) defer cancel() - backEndPool, err := newBackEndPool("/tmp/sorter", "") + backEndPool, err := newBackEndPool(sortDir, "") c.Assert(err, check.IsNil) c.Assert(backEndPool, check.NotNil) defer backEndPool.terminate() @@ -103,14 +111,21 @@ func (s *backendPoolSuite) TestBasicFunction(c *check.C) { func (s *backendPoolSuite) TestDirectoryBadPermission(c *check.C) { defer testleak.AfterTest(c)() - dir := c.MkDir() - err := os.Chmod(dir, 0o311) // no permission to `ls` + dataDir := "tmp/cdc_data" + sortDir := filepath.Join(dataDir, config.DefaultSortDir) + err := os.MkdirAll(sortDir, 0o311) c.Assert(err, check.IsNil) +<<<<<<< HEAD conf := config.GetDefaultServerConfig() +======= + conf := config.GetGlobalServerConfig() + conf.DataDir = dataDir + conf.Sorter.SortDir = sortDir +>>>>>>> 9135351d (CDC Server support data-dir (#1879)) conf.Sorter.MaxMemoryPressure = 0 // force using files - backEndPool, err := newBackEndPool(dir, "") + backEndPool, err := newBackEndPool(sortDir, "") c.Assert(err, check.IsNil) c.Assert(backEndPool, check.NotNil) defer backEndPool.terminate() @@ -131,10 +146,17 @@ func (s *backendPoolSuite) TestDirectoryBadPermission(c *check.C) { func (s *backendPoolSuite) TestCleanUpSelf(c *check.C) { defer testleak.AfterTest(c)() - err := os.MkdirAll("/tmp/sorter", 0o755) + dataDir := c.MkDir() + err := os.Chmod(dataDir, 0o755) + c.Assert(err, check.IsNil) + + sorterDir := filepath.Join(dataDir, config.DefaultSortDir) + err = os.MkdirAll(sorterDir, 0o755) c.Assert(err, check.IsNil) conf := config.GetDefaultServerConfig() + conf.DataDir = dataDir + conf.Sorter.SortDir = sorterDir conf.Sorter.MaxMemoryPressure = 90 // 90% conf.Sorter.MaxMemoryConsumption = 16 * 1024 * 1024 * 1024 // 16G config.StoreGlobalServerConfig(conf) @@ -142,7 +164,7 @@ func (s *backendPoolSuite) TestCleanUpSelf(c *check.C) { err = failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/memoryPressureInjectPoint", "return(100)") c.Assert(err, check.IsNil) - backEndPool, err := newBackEndPool("/tmp/sorter", "") + backEndPool, err := newBackEndPool(sorterDir, "") c.Assert(err, check.IsNil) c.Assert(backEndPool, check.NotNil) diff --git a/cdc/puller/sorter/sorter_test.go b/cdc/puller/sorter/sorter_test.go index 77d2c5bd9dc..5ed2498cae2 100644 --- a/cdc/puller/sorter/sorter_test.go +++ b/cdc/puller/sorter/sorter_test.go @@ -17,12 +17,15 @@ import ( "context" "math" "os" + "path/filepath" "sync/atomic" "testing" "time" "go.uber.org/zap/zapcore" + _ "net/http/pprof" + "github.com/pingcap/check" "github.com/pingcap/failpoint" "github.com/pingcap/log" @@ -32,7 +35,6 @@ import ( "github.com/pingcap/ticdc/pkg/util/testleak" "go.uber.org/zap" "golang.org/x/sync/errgroup" - _ "net/http/pprof" ) const ( @@ -62,18 +64,21 @@ func (s *sorterSuite) TestSorterBasic(c *check.C) { defer UnifiedSorterCleanUp() conf := config.GetDefaultServerConfig() + conf.DataDir = "/tmp/cdc_data" + sortDir := filepath.Join(conf.DataDir, config.DefaultSortDir) conf.Sorter = &config.SorterConfig{ NumConcurrentWorker: 8, ChunkSizeLimit: 1 * 1024 * 1024 * 1024, MaxMemoryPressure: 60, MaxMemoryConsumption: 16 * 1024 * 1024 * 1024, NumWorkerPoolGoroutine: 4, + SortDir: sortDir, } config.StoreGlobalServerConfig(conf) - err := os.MkdirAll("/tmp/sorter", 0o755) + err := os.MkdirAll(conf.Sorter.SortDir, 0o755) c.Assert(err, check.IsNil) - sorter, err := NewUnifiedSorter("/tmp/sorter", "test-cf", "test", 0, "0.0.0.0:0") + sorter, err := NewUnifiedSorter(conf.Sorter.SortDir, "test-cf", "test", 0, "0.0.0.0:0") c.Assert(err, check.IsNil) ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) @@ -87,18 +92,21 @@ func (s *sorterSuite) TestSorterCancel(c *check.C) { defer UnifiedSorterCleanUp() conf := config.GetDefaultServerConfig() + conf.DataDir = "/tmp/cdc_data" + sortDir := filepath.Join(conf.DataDir, config.DefaultSortDir) conf.Sorter = &config.SorterConfig{ NumConcurrentWorker: 8, ChunkSizeLimit: 1 * 1024 * 1024 * 1024, MaxMemoryPressure: 60, MaxMemoryConsumption: 0, NumWorkerPoolGoroutine: 4, + SortDir: sortDir, } config.StoreGlobalServerConfig(conf) - err := os.MkdirAll("/tmp/sorter", 0o755) + err := os.MkdirAll(conf.Sorter.SortDir, 0o755) c.Assert(err, check.IsNil) - sorter, err := NewUnifiedSorter("/tmp/sorter", "test-cf", "test", 0, "0.0.0.0:0") + sorter, err := NewUnifiedSorter(conf.Sorter.SortDir, "test-cf", "test", 0, "0.0.0.0:0") c.Assert(err, check.IsNil) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) @@ -127,6 +135,11 @@ func testSorter(ctx context.Context, c *check.C, sorter puller.EventSorter, coun log.Panic("Could not enable failpoint", zap.Error(err)) } + c.Assert(failpoint.Enable("github.com/pingcap/ticdc/pkg/util/InjectCheckDataDirSatisfied", ""), check.IsNil) + defer func() { + c.Assert(failpoint.Disable("github.com/pingcap/ticdc/pkg/util/InjectCheckDataDirSatisfied"), check.IsNil) + }() + ctx, cancel := context.WithCancel(ctx) errg, ctx := errgroup.WithContext(ctx) errg.Go(func() error { @@ -284,16 +297,19 @@ func (s *sorterSuite) TestSorterCancelRestart(c *check.C) { defer UnifiedSorterCleanUp() conf := config.GetDefaultServerConfig() + conf.DataDir = "/tmp/cdc_data" + sortDir := filepath.Join(conf.DataDir, config.DefaultSortDir) conf.Sorter = &config.SorterConfig{ NumConcurrentWorker: 8, ChunkSizeLimit: 1 * 1024 * 1024 * 1024, MaxMemoryPressure: 0, // disable memory sort MaxMemoryConsumption: 0, NumWorkerPoolGoroutine: 4, + SortDir: sortDir, } config.StoreGlobalServerConfig(conf) - err := os.MkdirAll("/tmp/sorter", 0o755) + err := os.MkdirAll(conf.Sorter.SortDir, 0o755) c.Assert(err, check.IsNil) // enable the failpoint to simulate delays @@ -311,7 +327,7 @@ func (s *sorterSuite) TestSorterCancelRestart(c *check.C) { }() for i := 0; i < 5; i++ { - sorter, err := NewUnifiedSorter("/tmp/sorter", "test-cf", "test", 0, "0.0.0.0:0") + sorter, err := NewUnifiedSorter(conf.Sorter.SortDir, "test-cf", "test", 0, "0.0.0.0:0") c.Assert(err, check.IsNil) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) err = testSorter(ctx, c, sorter, 100000000, true) @@ -328,18 +344,21 @@ func (s *sorterSuite) TestSorterIOError(c *check.C) { defer log.SetLevel(zapcore.InfoLevel) conf := config.GetDefaultServerConfig() + conf.DataDir = "/tmp/cdc_data" + sortDir := filepath.Join(conf.DataDir, config.DefaultSortDir) conf.Sorter = &config.SorterConfig{ NumConcurrentWorker: 8, ChunkSizeLimit: 1 * 1024 * 1024 * 1024, MaxMemoryPressure: 60, MaxMemoryConsumption: 0, NumWorkerPoolGoroutine: 4, + SortDir: sortDir, } config.StoreGlobalServerConfig(conf) - err := os.MkdirAll("/tmp/sorter", 0o755) + err := os.MkdirAll(conf.Sorter.SortDir, 0o755) c.Assert(err, check.IsNil) - sorter, err := NewUnifiedSorter("/tmp/sorter", "test-cf", "test", 0, "0.0.0.0:0") + sorter, err := NewUnifiedSorter(conf.Sorter.SortDir, "test-cf", "test", 0, "0.0.0.0:0") c.Assert(err, check.IsNil) ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) @@ -374,6 +393,13 @@ func (s *sorterSuite) TestSorterIOError(c *check.C) { _ = failpoint.Disable("github.com/pingcap/ticdc/cdc/puller/sorter/InjectErrorBackEndWrite") }() +<<<<<<< HEAD +======= + // recreate the sorter + sorter, err = NewUnifiedSorter(conf.Sorter.SortDir, "test-cf", "test", 0, "0.0.0.0:0") + c.Assert(err, check.IsNil) + +>>>>>>> 9135351d (CDC Server support data-dir (#1879)) finishedCh = make(chan struct{}) go func() { err := testSorter(ctx, c, sorter, 10000, true) @@ -397,18 +423,21 @@ func (s *sorterSuite) TestSorterErrorReportCorrect(c *check.C) { defer log.SetLevel(zapcore.InfoLevel) conf := config.GetDefaultServerConfig() + conf.DataDir = "/tmp/cdc_data" + sortDir := filepath.Join(conf.DataDir, config.DefaultSortDir) conf.Sorter = &config.SorterConfig{ NumConcurrentWorker: 8, ChunkSizeLimit: 1 * 1024 * 1024 * 1024, MaxMemoryPressure: 60, MaxMemoryConsumption: 0, NumWorkerPoolGoroutine: 4, + SortDir: sortDir, } config.StoreGlobalServerConfig(conf) - err := os.MkdirAll("/tmp/sorter", 0o755) + err := os.MkdirAll(conf.Sorter.SortDir, 0o755) c.Assert(err, check.IsNil) - sorter, err := NewUnifiedSorter("/tmp/sorter", "test-cf", "test", 0, "0.0.0.0:0") + sorter, err := NewUnifiedSorter(conf.Sorter.SortDir, "test-cf", "test", 0, "0.0.0.0:0") c.Assert(err, check.IsNil) ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) diff --git a/cdc/server.go b/cdc/server.go index 70cc2c8e120..261e0b71215 100644 --- a/cdc/server.go +++ b/cdc/server.go @@ -17,6 +17,8 @@ import ( "context" "fmt" "net/http" + "os" + "path/filepath" "strings" "sync" "time" @@ -42,11 +44,17 @@ import ( const ( ownerRunInterval = time.Millisecond * 500 +<<<<<<< HEAD // DefaultCDCGCSafePointTTL is the default value of cdc gc safe-point ttl, specified in seconds. DefaultCDCGCSafePointTTL = 24 * 60 * 60 defaultCaptureSessionTTL = 10 +======= + defaultDataDir = "/tmp/cdc_data" + // dataDirThreshold is used to warn if the free space of the specified data-dir is lower than it, unit is GB + dataDirThreshold = 500 +>>>>>>> 9135351d (CDC Server support data-dir (#1879)) ) // Server is the capture server @@ -100,6 +108,46 @@ func (s *Server) Run(ctx context.Context) error { return cerror.WrapError(cerror.ErrServerNewPDClient, err) } s.pdClient = pdClient +<<<<<<< HEAD +======= + if config.NewReplicaImpl { + tlsConfig, err := conf.Security.ToTLSConfig() + if err != nil { + return errors.Trace(err) + } + logConfig := logutil.DefaultZapLoggerConfig + logConfig.Level = zap.NewAtomicLevelAt(zapcore.ErrorLevel) + etcdCli, err := clientv3.New(clientv3.Config{ + Endpoints: s.pdEndpoints, + TLS: tlsConfig, + Context: ctx, + LogConfig: &logConfig, + DialTimeout: 5 * time.Second, + DialOptions: []grpc.DialOption{ + grpcTLSOption, + grpc.WithBlock(), + grpc.WithConnectParams(grpc.ConnectParams{ + Backoff: backoff.Config{ + BaseDelay: time.Second, + Multiplier: 1.1, + Jitter: 0.1, + MaxDelay: 3 * time.Second, + }, + MinConnectTimeout: 3 * time.Second, + }), + }, + }) + if err != nil { + return errors.Annotate(cerror.WrapError(cerror.ErrNewCaptureFailed, err), "new etcd client") + } + etcdClient := kv.NewCDCEtcdClient(ctx, etcdCli) + s.etcdClient = &etcdClient + } + + if err := s.initDataDir(ctx); err != nil { + return errors.Trace(err) + } +>>>>>>> 9135351d (CDC Server support data-dir (#1879)) // To not block CDC server startup, we need to warn instead of error // when TiKV is incompatible. @@ -296,3 +344,90 @@ func (s *Server) Close() { s.statusServer = nil } } + +func (s *Server) initDataDir(ctx context.Context) error { + if err := s.setUpDataDir(ctx); err != nil { + return errors.Trace(err) + } + conf := config.GetGlobalServerConfig() + err := os.MkdirAll(conf.DataDir, 0o755) + if err != nil { + return errors.Trace(err) + } + diskInfo, err := util.GetDiskInfo(conf.DataDir) + if err != nil { + return errors.Trace(err) + } + + if diskInfo.Avail < dataDirThreshold { + log.Warn(fmt.Sprintf("%s is set as data-dir (%dGB available), ticdc recommend disk for data-dir "+ + "at least have %dGB available space", conf.DataDir, diskInfo.Avail, dataDirThreshold)) + } + + return nil +} + +func (s *Server) setUpDataDir(ctx context.Context) error { + conf := config.GetGlobalServerConfig() + if conf.DataDir != "" { + conf.Sorter.SortDir = filepath.Join(conf.DataDir, config.DefaultSortDir) + config.StoreGlobalServerConfig(conf) + + return nil + } + + // data-dir will be decide by exist changefeed for backward compatibility + allStatus, err := s.etcdClient.GetAllChangeFeedStatus(ctx) + if err != nil { + return errors.Trace(err) + } + + candidates := make([]string, 0, len(allStatus)) + for id := range allStatus { + info, err := s.etcdClient.GetChangeFeedInfo(ctx, id) + if err != nil { + return errors.Trace(err) + } + if info.SortDir != "" { + candidates = append(candidates, info.SortDir) + } + } + + conf.DataDir = defaultDataDir + best, ok := findBestDataDir(candidates) + if ok { + conf.DataDir = best + } + + conf.Sorter.SortDir = filepath.Join(conf.DataDir, config.DefaultSortDir) + config.StoreGlobalServerConfig(conf) + return nil +} + +// try to find the best data dir by rules +// at the moment, only consider available disk space +func findBestDataDir(candidates []string) (result string, ok bool) { + var low uint64 = 0 + for _, dir := range candidates { + if err := util.IsDirReadWritable(dir); err != nil { + log.Warn("try to get disk info failed", zap.String("dir", dir), zap.Error(err)) + continue + } + info, err := util.GetDiskInfo(dir) + if err != nil { + log.Warn("try to get disk info failed", zap.String("dir", dir), zap.Error(err)) + continue + } + if info.Avail > low { + result = dir + low = info.Avail + ok = true + } + } + + if !ok && len(candidates) != 0 { + log.Warn("try to find directory for data-dir failed, use `/tmp/cdc_data` as data-dir", zap.Strings("candidates", candidates)) + } + + return result, ok +} diff --git a/cdc/server_test.go b/cdc/server_test.go index f49fcc08475..4ff469abb46 100644 --- a/cdc/server_test.go +++ b/cdc/server_test.go @@ -16,9 +16,11 @@ package cdc import ( "context" "net/url" + "path/filepath" "time" "github.com/pingcap/check" + "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/etcd" "github.com/pingcap/ticdc/pkg/util" "github.com/pingcap/ticdc/pkg/util/testleak" @@ -76,3 +78,27 @@ func (s *serverSuite) TestEtcdHealthChecker(c *check.C) { time.Sleep(time.Second * 4) cancel() } + +func (s *serverSuite) TestInitDataDir(c *check.C) { + defer testleak.AfterTest(c)() + defer s.TearDownTest(c) + + ctx, cancel := context.WithCancel(context.Background()) + pdEndpoints := []string{ + "http://" + s.clientURL.Host, + "http://invalid-pd-host:2379", + } + server, err := NewServer(pdEndpoints) + c.Assert(err, check.IsNil) + c.Assert(server, check.NotNil) + + conf := config.GetGlobalServerConfig() + conf.DataDir = c.MkDir() + + err = server.initDataDir(ctx) + c.Assert(err, check.IsNil) + c.Assert(conf.DataDir, check.Not(check.Equals), "") + c.Assert(conf.Sorter.SortDir, check.Equals, filepath.Join(conf.DataDir, "/tmp/sorter")) + config.StoreGlobalServerConfig(conf) + cancel() +} diff --git a/cmd/client_changefeed.go b/cmd/client_changefeed.go index 6c30830ab84..3c2459e11b4 100644 --- a/cmd/client_changefeed.go +++ b/cmd/client_changefeed.go @@ -21,6 +21,7 @@ import ( "strings" "time" + "github.com/fatih/color" "github.com/google/uuid" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -40,12 +41,6 @@ import ( "go.uber.org/zap" ) -const ( - // Use the empty string as the default to let the server local setting override the changefeed setting. - // TODO remove this when we change the changefeed `sort-dir` to no-op, which it currently is NOT. - defaultSortDir = "" -) - var forceEnableOldValueProtocols = []string{ "canal", "maxwell", @@ -88,7 +83,7 @@ func newAdminChangefeedCommand() []*cobra.Command { cmds := []*cobra.Command{ { Use: "pause", - Short: "Pause a replicaiton task (changefeed)", + Short: "Pause a replication task (changefeed)", RunE: func(cmd *cobra.Command, args []string) error { ctx := defaultContext job := model.AdminJob{ @@ -100,7 +95,7 @@ func newAdminChangefeedCommand() []*cobra.Command { }, { Use: "resume", - Short: "Resume a paused replicaiton task (changefeed)", + Short: "Resume a paused replication task (changefeed)", RunE: func(cmd *cobra.Command, args []string) error { ctx := defaultContext job := model.AdminJob{ @@ -115,7 +110,7 @@ func newAdminChangefeedCommand() []*cobra.Command { }, { Use: "remove", - Short: "Remove a replicaiton task (changefeed)", + Short: "Remove a replication task (changefeed)", RunE: func(cmd *cobra.Command, args []string) error { ctx := defaultContext job := model.AdminJob{ @@ -193,7 +188,7 @@ func newListChangefeedCommand() *cobra.Command { func newQueryChangefeedCommand() *cobra.Command { command := &cobra.Command{ Use: "query", - Short: "Query information and status of a replicaiton task (changefeed)", + Short: "Query information and status of a replication task (changefeed)", RunE: func(cmd *cobra.Command, args []string) error { ctx := defaultContext @@ -247,10 +242,14 @@ func newQueryChangefeedCommand() *cobra.Command { return command } +<<<<<<< HEAD func verifyChangefeedParamers(ctx context.Context, cmd *cobra.Command, isCreate bool, credential *security.Credential) (*model.ChangeFeedInfo, error) { +======= +func verifyChangefeedParameters(ctx context.Context, cmd *cobra.Command, isCreate bool, credential *security.Credential, captureInfos []*model.CaptureInfo) (*model.ChangeFeedInfo, error) { +>>>>>>> 9135351d (CDC Server support data-dir (#1879)) if isCreate { if sinkURI == "" { - return nil, errors.New("Creating chengfeed without a sink-uri") + return nil, errors.New("Creating changefeed without a sink-uri") } if startTs == 0 { ts, logical, err := pdCli.GetTS(ctx) @@ -281,7 +280,7 @@ func verifyChangefeedParamers(ctx context.Context, cmd *cobra.Command, isCreate } if cyclicReplicaID != 0 || len(cyclicFilterReplicaIDs) != 0 { if !(cyclicReplicaID != 0 && len(cyclicFilterReplicaIDs) != 0) { - return nil, errors.New("invaild cyclic config, please make sure using " + + return nil, errors.New("invalid cyclic config, please make sure using " + "nonzero replica ID and specify filter replica IDs") } filter := make([]uint64, 0, len(cyclicFilterReplicaIDs)) @@ -306,7 +305,7 @@ func verifyChangefeedParamers(ctx context.Context, cmd *cobra.Command, isCreate protocol := sinkURIParsed.Query().Get("protocol") for _, fp := range forceEnableOldValueProtocols { if protocol == fp { - log.Warn("Attemping to replicate without old value enabled. CDC will enable old value and continue.", zap.String("protocol", protocol)) + log.Warn("Attempting to replicate without old value enabled. CDC will enable old value and continue.", zap.String("protocol", protocol)) cfg.EnableOldValue = true break } @@ -331,7 +330,12 @@ func verifyChangefeedParamers(ctx context.Context, cmd *cobra.Command, isCreate switch sortEngine { case model.SortUnified, model.SortInMemory, model.SortInFile: default: +<<<<<<< HEAD return nil, errors.Errorf("Creating chengfeed without a invaild sort engine(%s), `%s`,`%s` and `%s` are optional.", sortEngine, model.SortUnified, model.SortInMemory, model.SortInFile) +======= + return nil, errors.Errorf("Creating changefeed with an invalid sort engine(%s), "+ + "`%s`,`%s` and `%s` are optional.", sortEngine, model.SortUnified, model.SortInMemory, model.SortInFile) +>>>>>>> 9135351d (CDC Server support data-dir (#1879)) } info := &model.ChangeFeedInfo{ SinkURI: sinkURI, @@ -347,10 +351,12 @@ func verifyChangefeedParamers(ctx context.Context, cmd *cobra.Command, isCreate CreatorVersion: version.ReleaseVersion, } + // user is not allowed to set sort-dir at changefeed level if sortDir != "" { - cmd.Printf("[WARN] --sort-dir is deprecated in changefeed settings. " + - "Please use `cdc server --sort-dir` if possible. " + - "The sort-dir here will be no-op\n") + cmd.Printf(color.HiYellowString("[WARN] --sort-dir is deprecated in changefeed settings. " + + "Please use `cdc server --data-dir` to start the cdc server if possible, sort-dir will be set automatically. " + + "The --sort-dir here will be no-op\n")) + return nil, errors.New("Creating changefeed with `--sort-dir`, it's invalid") } if info.Engine == model.SortInFile { @@ -426,14 +432,14 @@ func changefeedConfigVariables(command *cobra.Command) { command.PersistentFlags().StringVar(&configFile, "config", "", "Path of the configuration file") command.PersistentFlags().StringSliceVar(&opts, "opts", nil, "Extra options, in the `key=value` format") command.PersistentFlags().StringVar(&sortEngine, "sort-engine", model.SortUnified, "sort engine used for data sort") - command.PersistentFlags().StringVar(&sortDir, "sort-dir", defaultSortDir, "directory used for data sort") + command.PersistentFlags().StringVar(&sortDir, "sort-dir", "", "directory used for data sort") command.PersistentFlags().StringVar(&timezone, "tz", "SYSTEM", "timezone used when checking sink uri (changefeed timezone is determined by cdc server)") - command.PersistentFlags().Uint64Var(&cyclicReplicaID, "cyclic-replica-id", 0, "(Expremental) Cyclic replication replica ID of changefeed") - command.PersistentFlags().UintSliceVar(&cyclicFilterReplicaIDs, "cyclic-filter-replica-ids", []uint{}, "(Expremental) Cyclic replication filter replica ID of changefeed") - command.PersistentFlags().BoolVar(&cyclicSyncDDL, "cyclic-sync-ddl", true, "(Expremental) Cyclic replication sync DDL of changefeed") - command.PersistentFlags().BoolVar(&syncPointEnabled, "sync-point", false, "(Expremental) Set and Record syncpoint in replication(default off)") - command.PersistentFlags().DurationVar(&syncPointInterval, "sync-interval", 10*time.Minute, "(Expremental) Set the interval for syncpoint in replication(default 10min)") - command.PersistentFlags().MarkHidden("sort-dir") //nolint:errcheck + command.PersistentFlags().Uint64Var(&cyclicReplicaID, "cyclic-replica-id", 0, "(Experimental) Cyclic replication replica ID of changefeed") + command.PersistentFlags().UintSliceVar(&cyclicFilterReplicaIDs, "cyclic-filter-replica-ids", []uint{}, "(Experimental) Cyclic replication filter replica ID of changefeed") + command.PersistentFlags().BoolVar(&cyclicSyncDDL, "cyclic-sync-ddl", true, "(Experimental) Cyclic replication sync DDL of changefeed") + command.PersistentFlags().BoolVar(&syncPointEnabled, "sync-point", false, "(Experimental) Set and Record syncpoint in replication(default off)") + command.PersistentFlags().DurationVar(&syncPointInterval, "sync-interval", 10*time.Minute, "(Experimental) Set the interval for syncpoint in replication(default 10min)") + _ = command.PersistentFlags().MarkHidden("sort-dir") //nolint:errcheck } func newCreateChangefeedCommand() *cobra.Command { @@ -448,7 +454,15 @@ func newCreateChangefeedCommand() *cobra.Command { id = uuid.New().String() } +<<<<<<< HEAD info, err := verifyChangefeedParamers(ctx, cmd, true /* isCreate */, getCredential()) +======= + _, captureInfos, err := cdcEtcdCli.GetCaptures(ctx) + if err != nil { + return err + } + info, err := verifyChangefeedParameters(ctx, cmd, true /* isCreate */, getCredential(), captureInfos) +>>>>>>> 9135351d (CDC Server support data-dir (#1879)) if err != nil { return err } @@ -578,7 +592,7 @@ func newUpdateChangefeedCommand() *cobra.Command { return err } if strings.ToLower(strings.TrimSpace(yOrN)) != "y" { - cmd.Printf("No upadte to changefeed.\n") + cmd.Printf("No update to changefeed.\n") return nil } } @@ -646,7 +660,7 @@ func newStatisticsChangefeedCommand() *cobra.Command { ReplicationGap: fmt.Sprintf("%dms", replicationGap), Count: count, } - jsonPrint(cmd, &statistics) + _ = jsonPrint(cmd, &statistics) lastCount = count lastTime = now } @@ -662,7 +676,7 @@ func newStatisticsChangefeedCommand() *cobra.Command { func newCreateChangefeedCyclicCommand() *cobra.Command { command := &cobra.Command{ Use: "cyclic", - Short: "(Expremental) Utility about cyclic replication", + Short: "(Experimental) Utility about cyclic replication", } command.AddCommand( &cobra.Command{ diff --git a/cmd/client_changefeed_test.go b/cmd/client_changefeed_test.go index 3cd26d13e8b..d945a95469c 100644 --- a/cmd/client_changefeed_test.go +++ b/cmd/client_changefeed_test.go @@ -42,14 +42,41 @@ enable-old-value = false c.Assert(err, check.IsNil) sinkURI = "blackhole:///?protocol=maxwell" +<<<<<<< HEAD info, err := verifyChangefeedParamers(ctx, cmd, false /* isCreate */, nil) +======= + info, err := verifyChangefeedParameters(ctx, cmd, false /* isCreate */, nil, nil) +>>>>>>> 9135351d (CDC Server support data-dir (#1879)) c.Assert(err, check.IsNil) c.Assert(info.Config.EnableOldValue, check.IsTrue) - c.Assert(info.SortDir, check.Equals, defaultSortDir) + c.Assert(info.SortDir, check.Equals, "") sinkURI = "" +<<<<<<< HEAD _, err = verifyChangefeedParamers(ctx, cmd, true /* isCreate */, nil) c.Assert(err, check.NotNil) c.Assert(info.Config.EnableOldValue, check.IsTrue) +======= + _, err = verifyChangefeedParameters(ctx, cmd, true /* isCreate */, nil, nil) + c.Assert(err, check.NotNil) + + sinkURI = "blackhole:///" + info, err = verifyChangefeedParameters(ctx, cmd, false /* isCreate */, nil, []*model.CaptureInfo{{Version: "4.0.0"}}) + c.Assert(err, check.IsNil) + c.Assert(info.Config.EnableOldValue, check.IsFalse) + c.Assert(info.Engine, check.Equals, model.SortInMemory) + + sortDir = "/tidb/data" + pdCli = &mockPDClient{} + disableGCSafePointCheck = true + _, err = verifyChangefeedParameters(ctx, cmd, false, nil, nil) + c.Assert(err, check.ErrorMatches, "*Creating changefeed with `--sort-dir`, it's invalid*") + _, err = verifyChangefeedParameters(ctx, cmd, true, nil, nil) + c.Assert(err, check.NotNil) + + sortDir = "" + _, err = verifyChangefeedParameters(ctx, cmd, false, nil, nil) + c.Assert(err, check.IsNil) +>>>>>>> 9135351d (CDC Server support data-dir (#1879)) } diff --git a/cmd/server.go b/cmd/server.go index 478c10ce06d..f82a05b5fb7 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -18,6 +18,7 @@ import ( "strings" "time" + "github.com/fatih/color" "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/ticdc/cdc" @@ -68,6 +69,7 @@ func initServerCmd(cmd *cobra.Command) { cmd.Flags().Int64Var(&serverConfig.GcTTL, "gc-ttl", defaultServerConfig.GcTTL, "CDC GC safepoint TTL duration, specified in seconds") cmd.Flags().StringVar(&serverConfig.LogFile, "log-file", defaultServerConfig.LogFile, "log file path") cmd.Flags().StringVar(&serverConfig.LogLevel, "log-level", defaultServerConfig.LogLevel, "log level (etc: debug|info|warn|error)") + cmd.Flags().StringVar(&serverConfig.DataDir, "data-dir", defaultServerConfig.DataDir, "the path to the directory used to store TiCDC-generated data") cmd.Flags().DurationVar((*time.Duration)(&serverConfig.OwnerFlushInterval), "owner-flush-interval", time.Duration(defaultServerConfig.OwnerFlushInterval), "owner flushes changefeed status interval") cmd.Flags().DurationVar((*time.Duration)(&serverConfig.ProcessorFlushInterval), "processor-flush-interval", time.Duration(defaultServerConfig.ProcessorFlushInterval), "processor flushes task status interval") @@ -83,6 +85,7 @@ func initServerCmd(cmd *cobra.Command) { addSecurityFlags(cmd.Flags(), true /* isServer */) cmd.Flags().StringVar(&serverConfigFilePath, "config", "", "Path of the configuration file") + _ = cmd.Flags().MarkHidden("sort-dir") //nolint:errcheck } func runEServer(cmd *cobra.Command, args []string) error { @@ -146,6 +149,8 @@ func loadAndVerifyServerConfig(cmd *cobra.Command) (*config.ServerConfig, error) conf.LogFile = serverConfig.LogFile case "log-level": conf.LogLevel = serverConfig.LogLevel + case "data-dir": + conf.DataDir = serverConfig.DataDir case "owner-flush-interval": conf.OwnerFlushInterval = serverConfig.OwnerFlushInterval case "processor-flush-interval": @@ -169,7 +174,12 @@ func loadAndVerifyServerConfig(cmd *cobra.Command) (*config.ServerConfig, error) case "cert-allowed-cn": conf.Security.CertAllowedCN = serverConfig.Security.CertAllowedCN case "sort-dir": - conf.Sorter.SortDir = serverConfig.Sorter.SortDir + // user specified sorter dir should not take effect + if serverConfig.Sorter.SortDir != config.DefaultSortDir { + cmd.Printf(color.HiYellowString("[WARN] --sort-dir is deprecated in server settings. " + + "sort-dir will be set to `{data-dir}/tmp/sorter`. The sort-dir here will be no-op\n")) + } + conf.Sorter.SortDir = config.DefaultSortDir case "pd", "config": // do nothing default: @@ -192,5 +202,10 @@ func loadAndVerifyServerConfig(cmd *cobra.Command) (*config.ServerConfig, error) } } + if conf.DataDir == "" { + cmd.Printf(color.HiYellowString("[WARN] TiCDC server data-dir is not set. " + + "Please use `cdc server --data-dir` to start the cdc server if possible.\n")) + } + return conf, nil } diff --git a/cmd/server_test.go b/cmd/server_test.go index ef0bc440cc9..2e591b0335b 100644 --- a/cmd/server_test.go +++ b/cmd/server_test.go @@ -14,6 +14,7 @@ package cmd import ( + "fmt" "io/ioutil" "path/filepath" "time" @@ -36,6 +37,30 @@ func (s *serverSuite) TestPatchTiDBConf(c *check.C) { c.Assert(cfg.TiKVClient.MaxBatchSize, check.Equals, uint(0)) } +func (s *serverSuite) TestDataDirServerConfig(c *check.C) { + defer testleak.AfterTest(c)() + cmd := new(cobra.Command) + initServerCmd(cmd) + c.Assert(cmd.ParseFlags([]string{}), check.IsNil) + cfg, err := loadAndVerifyServerConfig(cmd) + c.Assert(err, check.IsNil) + c.Assert(cfg, check.NotNil) + // data dir default to "" + c.Assert(cfg.DataDir, check.Equals, "") + c.Assert(cfg.Sorter.SortDir, check.Equals, filepath.Join("", "/tmp/sorter")) + + dataDir := c.MkDir() + cmd = new(cobra.Command) + initServerCmd(cmd) + c.Assert(cmd.ParseFlags([]string{"--data-dir=" + dataDir}), check.IsNil) + cfg, err = loadAndVerifyServerConfig(cmd) + c.Assert(err, check.IsNil) + c.Assert(cfg, check.NotNil) + c.Assert(cfg.DataDir, check.Equals, dataDir) + // sorter-dir is not set yet + c.Assert(cfg.Sorter.SortDir, check.Equals, "/tmp/sorter") +} + func (s *serverSuite) TestLoadAndVerifyServerConfig(c *check.C) { defer testleak.AfterTest(c)() // test default flag values @@ -44,6 +69,8 @@ func (s *serverSuite) TestLoadAndVerifyServerConfig(c *check.C) { c.Assert(cmd.ParseFlags([]string{}), check.IsNil) cfg, err := loadAndVerifyServerConfig(cmd) c.Assert(err, check.IsNil) + c.Assert(cfg, check.NotNil) + defcfg := config.GetDefaultServerConfig() c.Assert(defcfg.ValidateAndAdjust(), check.IsNil) c.Assert(cfg, check.DeepEquals, defcfg) @@ -71,6 +98,7 @@ func (s *serverSuite) TestLoadAndVerifyServerConfig(c *check.C) { c.Assert(err, check.IsNil) // test flags without config file + dataDir := c.MkDir() cmd = new(cobra.Command) initServerCmd(cmd) c.Assert(cmd.ParseFlags([]string{ @@ -78,6 +106,7 @@ func (s *serverSuite) TestLoadAndVerifyServerConfig(c *check.C) { "--advertise-addr", "127.5.5.1:7777", "--log-file", "/root/cdc.log", "--log-level", "debug", + "--data-dir", dataDir, "--gc-ttl", "10", "--tz", "UTC", "--owner-flush-interval", "150ms", @@ -99,6 +128,7 @@ func (s *serverSuite) TestLoadAndVerifyServerConfig(c *check.C) { AdvertiseAddr: "127.5.5.1:7777", LogFile: "/root/cdc.log", LogLevel: "debug", + DataDir: dataDir, GcTTL: 10, TZ: "UTC", OwnerFlushInterval: config.TomlDuration(150 * time.Millisecond), @@ -109,7 +139,7 @@ func (s *serverSuite) TestLoadAndVerifyServerConfig(c *check.C) { MaxMemoryPressure: 70, MaxMemoryConsumption: 60000, NumWorkerPoolGoroutine: 90, - SortDir: "/tmp/just_a_test", + SortDir: config.DefaultSortDir, }, Security: &config.SecurityConfig{ CertPath: "bb", @@ -120,15 +150,16 @@ func (s *serverSuite) TestLoadAndVerifyServerConfig(c *check.C) { }) // test decode config file + dataDir = c.MkDir() tmpDir := c.MkDir() configPath := filepath.Join(tmpDir, "ticdc.toml") - configContent := ` + configContent := fmt.Sprintf(` addr = "128.0.0.1:1234" advertise-addr = "127.0.0.1:1111" log-file = "/root/cdc1.log" log-level = "warn" - +data-dir = "%+v" gc-ttl = 500 tz = "US" @@ -142,7 +173,7 @@ max-memory-percentage = 3 num-concurrent-worker = 4 num-workerpool-goroutine = 5 sort-dir = "/tmp/just_a_test" -` +`, dataDir) err = ioutil.WriteFile(configPath, []byte(configContent), 0o644) c.Assert(err, check.IsNil) cmd = new(cobra.Command) @@ -155,6 +186,7 @@ sort-dir = "/tmp/just_a_test" AdvertiseAddr: "127.0.0.1:1111", LogFile: "/root/cdc1.log", LogLevel: "warn", + DataDir: dataDir, GcTTL: 500, TZ: "US", OwnerFlushInterval: config.TomlDuration(600 * time.Millisecond), @@ -165,7 +197,7 @@ sort-dir = "/tmp/just_a_test" MaxMemoryPressure: 3, MaxMemoryConsumption: 2000000, NumWorkerPoolGoroutine: 5, - SortDir: "/tmp/just_a_test", + SortDir: config.DefaultSortDir, }, Security: &config.SecurityConfig{}, PerTableMemoryQuota: 20 * 1024 * 1024, // 20M @@ -186,6 +218,7 @@ cert-allowed-cn = ["dd","ee"] "--addr", "127.5.5.1:8833", "--log-file", "/root/cdc.log", "--log-level", "debug", + "--data-dir", dataDir, "--gc-ttl", "10", "--tz", "UTC", "--owner-flush-interval", "150ms", @@ -204,6 +237,7 @@ cert-allowed-cn = ["dd","ee"] AdvertiseAddr: "127.0.0.1:1111", LogFile: "/root/cdc.log", LogLevel: "debug", + DataDir: dataDir, GcTTL: 10, TZ: "UTC", OwnerFlushInterval: config.TomlDuration(150 * time.Millisecond), @@ -214,7 +248,7 @@ cert-allowed-cn = ["dd","ee"] MaxMemoryPressure: 70, MaxMemoryConsumption: 60000000, NumWorkerPoolGoroutine: 5, - SortDir: "/tmp/just_a_test", + SortDir: config.DefaultSortDir, }, Security: &config.SecurityConfig{ CertPath: "bb", diff --git a/errors.toml b/errors.toml index a980dc8e539..62f4b0e1544 100755 --- a/errors.toml +++ b/errors.toml @@ -121,6 +121,21 @@ error = ''' failed to request PD ''' +["CDC:ErrCheckDataDirSatisfied"] +error = ''' +check data dir satisfied failed +''' + +["CDC:ErrCheckDirReadable"] +error = ''' +check dir readable failed +''' + +["CDC:ErrCheckDirValid"] +error = ''' +check dir valid failed +''' + ["CDC:ErrCheckDirWritable"] error = ''' check dir writable failed @@ -266,6 +281,11 @@ error = ''' get stores from pd failed ''' +["CDC:ErrGetDiskInfo"] +error = ''' +get dir disk info failed +''' + ["CDC:ErrGetRegionFailed"] error = ''' get region failed diff --git a/go.mod b/go.mod index bd137120e1a..1398bcd56f7 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,12 @@ require ( github.com/coreos/go-semver v0.3.0 github.com/davecgh/go-spew v1.1.1 github.com/edwingeng/deque v0.0.0-20191220032131-8596380dee17 +<<<<<<< HEAD github.com/fatih/color v1.10.0 // indirect +======= + github.com/fatih/color v1.10.0 + github.com/frankban/quicktest v1.11.1 // indirect +>>>>>>> 9135351d (CDC Server support data-dir (#1879)) github.com/go-sql-driver/mysql v1.5.0 github.com/golang/protobuf v1.3.4 github.com/golang/snappy v0.0.2 // indirect diff --git a/pkg/config/config.go b/pkg/config/config.go index ba7c7ad8f9a..947f0310339 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -29,9 +29,23 @@ import ( "go.uber.org/zap" ) +<<<<<<< HEAD // NewReplicaImpl is true if we using new processor // new owner should be also switched on after it implemented const NewReplicaImpl = false +======= +const ( + // NewReplicaImpl is true if we using new processor + // new owner should be also switched on after it implemented + NewReplicaImpl = true + // DefaultSortDir is the default value of sort-dir, it will be s sub directory of data-dir. + DefaultSortDir = "/tmp/sorter" +) + +func init() { + StoreGlobalServerConfig(GetDefaultServerConfig()) +} +>>>>>>> 9135351d (CDC Server support data-dir (#1879)) var defaultReplicaConfig = &ReplicaConfig{ CaseSensitive: true, @@ -139,12 +153,27 @@ func GetDefaultReplicaConfig() *ReplicaConfig { type SecurityConfig = security.Credential var defaultServerConfig = &ServerConfig{ +<<<<<<< HEAD Addr: "127.0.0.1:8300", AdvertiseAddr: "", LogFile: "", LogLevel: "info", GcTTL: 24 * 60 * 60, // 24H TZ: "System", +======= + Addr: "127.0.0.1:8300", + AdvertiseAddr: "", + LogFile: "", + LogLevel: "info", + DataDir: "", + GcTTL: 24 * 60 * 60, // 24H + TZ: "System", + // The default election-timeout in PD is 3s and minimum session TTL is 5s, + // which is calculated by `math.Ceil(3 * election-timeout / 2)`, we choose + // default capture session ttl to 10s to increase robust to PD jitter, + // however it will decrease RTO when single TiCDC node error happens. + CaptureSessionTTL: 10, +>>>>>>> 9135351d (CDC Server support data-dir (#1879)) OwnerFlushInterval: TomlDuration(200 * time.Millisecond), ProcessorFlushInterval: TomlDuration(100 * time.Millisecond), Sorter: &SorterConfig{ @@ -153,7 +182,7 @@ var defaultServerConfig = &ServerConfig{ MaxMemoryPressure: 80, MaxMemoryConsumption: 8 * 1024 * 1024 * 1024, // 8GB NumWorkerPoolGoroutine: 16, - SortDir: "/tmp/cdc_sort", + SortDir: DefaultSortDir, }, Security: &SecurityConfig{}, PerTableMemoryQuota: 20 * 1024 * 1024, // 20MB @@ -166,6 +195,7 @@ type ServerConfig struct { LogFile string `toml:"log-file" json:"log-file"` LogLevel string `toml:"log-level" json:"log-level"` + DataDir string `toml:"data-dir" json:"data-dir"` GcTTL int64 `toml:"gc-ttl" json:"gc-ttl"` TZ string `toml:"tz" json:"tz"` @@ -256,6 +286,7 @@ func (c *ServerConfig) ValidateAndAdjust() error { if c.Sorter == nil { c.Sorter = defaultServerConfig.Sorter } + c.Sorter.SortDir = DefaultSortDir if c.Sorter.ChunkSizeLimit < 1*1024*1024 { return cerror.ErrIllegalUnifiedSorterParameter.GenWithStackByArgs("chunk-size-limit should be at least 1MB") diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 0c5869ea8ca..f7f04d04a84 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -81,14 +81,23 @@ var _ = check.Suite(&serverConfigSuite{}) func (s *serverConfigSuite) TestMarshal(c *check.C) { defer testleak.AfterTest(c)() + rawConfig := `{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","data-dir":"","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":30,"max-memory-consumption":17179869184,"num-workerpool-goroutine":16,"sort-dir":"/tmp/sorter"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520,"kv-client":{"worker-concurrent":8,"worker-pool-size":0,"region-scan-limit":40}}` + conf := GetDefaultServerConfig() conf.Addr = "192.155.22.33:8887" conf.Sorter.ChunkSizeLimit = 999 b, err := conf.Marshal() c.Assert(err, check.IsNil) +<<<<<<< HEAD c.Assert(b, check.Equals, `{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":80,"max-memory-consumption":8589934592,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520}`) conf2 := new(ServerConfig) err = conf2.Unmarshal([]byte(`{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":80,"max-memory-consumption":8589934592,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520}`)) +======= + + c.Assert(b, check.Equals, rawConfig) + conf2 := new(ServerConfig) + err = conf2.Unmarshal([]byte(rawConfig)) +>>>>>>> 9135351d (CDC Server support data-dir (#1879)) c.Assert(err, check.IsNil) c.Assert(conf2, check.DeepEquals, conf) } diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 8efc5f26a93..8be144234b9 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -111,6 +111,10 @@ var ( ErrCheckClusterVersionFromPD = errors.Normalize("failed to request PD", errors.RFCCodeText("CDC:ErrCheckClusterVersionFromPD")) ErrNewSemVersion = errors.Normalize("create sem version", errors.RFCCodeText("CDC:ErrNewSemVersion")) ErrCheckDirWritable = errors.Normalize("check dir writable failed", errors.RFCCodeText("CDC:ErrCheckDirWritable")) + ErrCheckDirReadable = errors.Normalize("check dir readable failed", errors.RFCCodeText("CDC:ErrCheckDirReadable")) + ErrCheckDirValid = errors.Normalize("check dir valid failed", errors.RFCCodeText("CDC:ErrCheckDirValid")) + ErrGetDiskInfo = errors.Normalize("get dir disk info failed", errors.RFCCodeText("CDC:ErrGetDiskInfo")) + ErrCheckDataDirSatisfied = errors.Normalize("check data dir satisfied failed", errors.RFCCodeText("CDC:ErrCheckDataDirSatisfied")) ErrLoadTimezone = errors.Normalize("load timezone", errors.RFCCodeText("CDC:ErrLoadTimezone")) ErrURLFormatInvalid = errors.Normalize("url format is invalid", errors.RFCCodeText("CDC:ErrURLFormatInvalid")) ErrIntersectNoOverlap = errors.Normalize("span doesn't overlap: %+v vs %+v", errors.RFCCodeText("CDC:ErrIntersectNoOverlap")) diff --git a/pkg/util/fileutil.go b/pkg/util/fileutil.go index 5e408d9258e..4b734e88de3 100644 --- a/pkg/util/fileutil.go +++ b/pkg/util/fileutil.go @@ -14,14 +14,24 @@ package util import ( + "fmt" "io/ioutil" "os" "path/filepath" + "syscall" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/log" + "github.com/pingcap/ticdc/pkg/config" cerror "github.com/pingcap/ticdc/pkg/errors" ) +const ( + gb = 1024 * 1024 * 1024 + dataDirAvailLowThreshold = 10 // percentage +) + // IsDirAndWritable checks a given path is directory and writable func IsDirAndWritable(path string) error { st, err := os.Stat(path) @@ -42,3 +52,81 @@ func IsDirWritable(dir string) error { } return cerror.WrapError(cerror.ErrCheckDirWritable, os.Remove(f)) } + +// IsDirReadWritable check if the dir is writable and readable by cdc server +func IsDirReadWritable(dir string) error { + f := filepath.Join(dir, "file.test") + if err := ioutil.WriteFile(f, []byte(""), 0o600); err != nil { + return cerror.WrapError(cerror.ErrCheckDirValid, err) + } + + if _, err := ioutil.ReadFile(f); err != nil { + return cerror.WrapError(cerror.ErrCheckDirValid, err) + } + + return cerror.WrapError(cerror.ErrCheckDirValid, os.Remove(f)) +} + +// DiskInfo present the disk amount information, in gb +type DiskInfo struct { + All uint64 + Used uint64 + Free uint64 + Avail uint64 + AvailPercentage float32 +} + +func (d *DiskInfo) String() string { + return fmt.Sprintf("{All: %+vGB; Used: %+vGB; Free: %+vGB; Available: %+vGB; Available Percentage: %+v%%}", + d.All, d.Used, d.Free, d.Avail, d.AvailPercentage) +} + +// GetDiskInfo return the disk space information of the given directory +// the caller should guarantee that dir exist +func GetDiskInfo(dir string) (*DiskInfo, error) { + f := filepath.Join(dir, "file.test") + if err := ioutil.WriteFile(f, []byte(""), 0o600); err != nil { + return nil, cerror.WrapError(cerror.ErrGetDiskInfo, err) + } + + fs := syscall.Statfs_t{} + if err := syscall.Statfs(dir, &fs); err != nil { + return nil, cerror.WrapError(cerror.ErrGetDiskInfo, err) + } + + info := &DiskInfo{ + All: fs.Blocks * uint64(fs.Bsize) / gb, + Avail: fs.Bavail * uint64(fs.Bsize) / gb, + Free: fs.Bfree * uint64(fs.Bsize) / gb, + } + info.Used = info.All - info.Free + info.AvailPercentage = float32(info.Avail) / float32(info.All) * 100 + + if err := os.Remove(f); err != nil { + if !os.IsNotExist(err) { + return info, cerror.WrapError(cerror.ErrGetDiskInfo, err) + } + } + + return info, nil +} + +// CheckDataDirSatisfied check if the data-dir meet the requirement during server running +// the caller should guarantee that dir exist +func CheckDataDirSatisfied() error { + conf := config.GetGlobalServerConfig() + diskInfo, err := GetDiskInfo(conf.DataDir) + if err != nil { + return cerror.WrapError(cerror.ErrCheckDataDirSatisfied, err) + } + if diskInfo.AvailPercentage < dataDirAvailLowThreshold { + failpoint.Inject("InjectCheckDataDirSatisfied", func() { + log.Info("inject check data dir satisfied error") + failpoint.Return(nil) + }) + return cerror.WrapError(cerror.ErrCheckDataDirSatisfied, errors.Errorf("disk is almost full, TiCDC require that the disk mount data-dir "+ + "have 10%% available space, and the total amount has at least 500GB is preferred. disk info: %+v", diskInfo)) + } + + return nil +} diff --git a/pkg/util/fileutil_test.go b/pkg/util/fileutil_test.go index 09736297bea..e498107d962 100644 --- a/pkg/util/fileutil_test.go +++ b/pkg/util/fileutil_test.go @@ -21,6 +21,8 @@ import ( "runtime" "github.com/pingcap/check" + "github.com/pingcap/failpoint" + "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/util/testleak" ) @@ -62,3 +64,42 @@ func (s *fileUtilSuite) TestIsDirAndWritable(c *check.C) { err = IsDirAndWritable(dir) c.Assert(err, check.IsNil) } + +func (s *fileUtilSuite) TestIsDirReadWritable(c *check.C) { + defer testleak.AfterTest(c)() + + dir := c.MkDir() + err := IsDirReadWritable(dir) + c.Assert(err, check.IsNil) + + path := filepath.Join(dir, "/foo") + err = IsDirReadWritable(path) + c.Assert(err, check.ErrorMatches, ".*no such file or directory") +} + +func (s *fileUtilSuite) TestGetDiskInfo(c *check.C) { + defer testleak.AfterTest(c)() + + dir := c.MkDir() + info, err := GetDiskInfo(dir) + c.Assert(err, check.IsNil) + c.Assert(info, check.NotNil) + + dir = filepath.Join(dir, "/tmp/sorter") + info, err = GetDiskInfo(dir) + c.Assert(info, check.IsNil) + c.Assert(err, check.ErrorMatches, ".*no such file or directory") +} + +func (s *fileUtilSuite) TestCheckDataDirSatisfied(c *check.C) { + defer testleak.AfterTest(c)() + dir := c.MkDir() + conf := config.GetGlobalServerConfig() + conf.DataDir = dir + config.StoreGlobalServerConfig(conf) + + c.Assert(failpoint.Enable("github.com/pingcap/ticdc/pkg/util/InjectCheckDataDirSatisfied", ""), check.IsNil) + err := CheckDataDirSatisfied() + c.Assert(err, check.IsNil) + c.Assert(failpoint.Disable("github.com/pingcap/ticdc/pkg/util/InjectCheckDataDirSatisfied"), check.IsNil) +} diff --git a/tests/_utils/run_cdc_server b/tests/_utils/run_cdc_server index 78ea4ec988e..2be79cc7a76 100755 --- a/tests/_utils/run_cdc_server +++ b/tests/_utils/run_cdc_server @@ -23,6 +23,7 @@ restart= failpoint=$GO_FAILPOINTS config_path= sort_dir= +data_dir= while [[ ${1} ]]; do case "${1}" in @@ -89,6 +90,10 @@ if [ -z "$sort_dir" ]; then sort_dir=${workdir}/cdc-sort${logsuffix} fi +if [ -z "$data_dir" ]; then + data_dir=${workdir}/cdc_data${logsuffix} +fi + echo "[$(date)] <<<<<< START cdc server in $TEST_NAME case >>>>>>" cd $workdir pid=$(ps -C run_cdc_server -o pid=|tr -d '[:space:]') @@ -101,7 +106,8 @@ if [[ "$restart" == "true" ]]; then --log-file $workdir/cdc$logsuffix.log \ --log-level $log_level \ --sorter-num-workerpool-goroutine 4 \ - --sort-dir $sort_dir \ + --data-dir "$data_dir" \ + --sort-dir "$sort_dir" \ $config_path \ $tls \ $certcn \ @@ -117,7 +123,8 @@ else --log-file $workdir/cdc$logsuffix.log \ --log-level $log_level \ --sorter-num-workerpool-goroutine 4 \ - --sort-dir $sort_dir \ + --data-dir "$data_dir" \ + --sort-dir "$sort_dir" \ $config_path \ $tls \ $certcn \ diff --git a/tests/unified_sorter_sort_dir_conflict/run.sh b/tests/unified_sorter_sort_dir_conflict/run.sh index baea77efede..857cc820989 100644 --- a/tests/unified_sorter_sort_dir_conflict/run.sh +++ b/tests/unified_sorter_sort_dir_conflict/run.sh @@ -60,7 +60,7 @@ function prepare() { sleep 20 # starts the first second server instance. It should fail, and bring down the changefeed - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8301" --logsuffix 2 --sort-dir /tmp/cdc_sort_1 + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8301" --logsuffix 1 --sort-dir /tmp/cdc_sort_1 ensure $MAX_RETRIES check_changefeed_mark_stopped_regex http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid} ".*ErrConflictingFileLocks.*" kill $capture_pid