Skip to content

Commit

Permalink
CDC Server support data-dir (#1879)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ling Jin authored Jun 16, 2021
1 parent 67c8cd3 commit 9135351
Show file tree
Hide file tree
Showing 19 changed files with 471 additions and 75 deletions.
14 changes: 7 additions & 7 deletions cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type oldProcessor struct {
globalResolvedTs uint64
localResolvedTs uint64
checkpointTs uint64
globalcheckpointTs uint64
globalCheckpointTs uint64
appliedLocalCheckpointTs uint64
flushCheckpointInterval time.Duration

Expand Down Expand Up @@ -234,7 +234,7 @@ func newProcessor(
}

if err == nil {
p.globalcheckpointTs = info.CheckpointTs
p.globalCheckpointTs = info.CheckpointTs
}

for tableID, replicaInfo := range p.status.Tables {
Expand Down Expand Up @@ -672,7 +672,7 @@ func (p *oldProcessor) globalStatusWorker(ctx context.Context) error {
)

updateStatus := func(changefeedStatus *model.ChangeFeedStatus) {
atomic.StoreUint64(&p.globalcheckpointTs, changefeedStatus.CheckpointTs)
atomic.StoreUint64(&p.globalCheckpointTs, changefeedStatus.CheckpointTs)
if lastResolvedTs == changefeedStatus.ResolvedTs &&
lastCheckPointTs == changefeedStatus.CheckpointTs {
return
Expand Down Expand Up @@ -779,15 +779,15 @@ func (p *oldProcessor) addTable(ctx context.Context, tableID int64, replicaInfo
return
}

globalcheckpointTs := atomic.LoadUint64(&p.globalcheckpointTs)
globalCheckpointTs := atomic.LoadUint64(&p.globalCheckpointTs)

if replicaInfo.StartTs < globalcheckpointTs {
// use Warn instead of Panic in case that p.globalcheckpointTs has not been initialized.
if replicaInfo.StartTs < globalCheckpointTs {
// use Warn instead of Panic in case that p.globalCheckpointTs has not been initialized.
// The cdc_state_checker will catch a real inconsistency in integration tests.
log.Warn("addTable: startTs < checkpoint",
util.ZapFieldChangefeed(ctx),
zap.Int64("tableID", tableID),
zap.Uint64("checkpoint", globalcheckpointTs),
zap.Uint64("checkpoint", globalCheckpointTs),
zap.Uint64("startTs", replicaInfo.StartTs))
}

Expand Down
4 changes: 4 additions & 0 deletions cdc/puller/sorter/backend_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ func (p *backEndPool) alloc(ctx context.Context) (backEnd, error) {
zap.Int64("table-id", tableID),
zap.String("table-name", tableName))

if err := util.CheckDataDirSatisfied(); err != nil {
return nil, errors.Trace(err)
}

ret, err := newFileBackEnd(fname, &msgPackGenSerde{})
if err != nil {
return nil, errors.Trace(err)
Expand Down
32 changes: 25 additions & 7 deletions cdc/puller/sorter/backend_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"os"
"path/filepath"
"strconv"
"time"

Expand All @@ -34,10 +35,17 @@ var _ = check.SerialSuites(&backendPoolSuite{})
func (s *backendPoolSuite) TestBasicFunction(c *check.C) {
defer testleak.AfterTest(c)()

err := os.MkdirAll("/tmp/sorter", 0o755)
dataDir := "/tmp/cdc_data"
err := os.MkdirAll(dataDir, 0o755)
c.Assert(err, check.IsNil)

sortDir := filepath.Join(dataDir, config.DefaultSortDir)
err = os.MkdirAll(sortDir, 0o755)
c.Assert(err, check.IsNil)

conf := config.GetDefaultServerConfig()
conf.DataDir = dataDir
conf.Sorter.SortDir = sortDir
conf.Sorter.MaxMemoryPressure = 90 // 90%
conf.Sorter.MaxMemoryConsumption = 16 * 1024 * 1024 * 1024 // 16G
config.StoreGlobalServerConfig(conf)
Expand All @@ -48,7 +56,7 @@ func (s *backendPoolSuite) TestBasicFunction(c *check.C) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*20)
defer cancel()

backEndPool, err := newBackEndPool("/tmp/sorter", "")
backEndPool, err := newBackEndPool(sortDir, "")
c.Assert(err, check.IsNil)
c.Assert(backEndPool, check.NotNil)
defer backEndPool.terminate()
Expand Down Expand Up @@ -103,14 +111,17 @@ func (s *backendPoolSuite) TestBasicFunction(c *check.C) {
func (s *backendPoolSuite) TestDirectoryBadPermission(c *check.C) {
defer testleak.AfterTest(c)()

dir := c.MkDir()
err := os.Chmod(dir, 0o311) // no permission to `ls`
dataDir := "tmp/cdc_data"
sortDir := filepath.Join(dataDir, config.DefaultSortDir)
err := os.MkdirAll(sortDir, 0o311)
c.Assert(err, check.IsNil)

conf := config.GetGlobalServerConfig()
conf.DataDir = dataDir
conf.Sorter.SortDir = sortDir
conf.Sorter.MaxMemoryPressure = 0 // force using files

backEndPool, err := newBackEndPool(dir, "")
backEndPool, err := newBackEndPool(sortDir, "")
c.Assert(err, check.IsNil)
c.Assert(backEndPool, check.NotNil)
defer backEndPool.terminate()
Expand All @@ -131,10 +142,17 @@ func (s *backendPoolSuite) TestDirectoryBadPermission(c *check.C) {
func (s *backendPoolSuite) TestCleanUpSelf(c *check.C) {
defer testleak.AfterTest(c)()

err := os.MkdirAll("/tmp/sorter", 0o755)
dataDir := c.MkDir()
err := os.Chmod(dataDir, 0o755)
c.Assert(err, check.IsNil)

sorterDir := filepath.Join(dataDir, config.DefaultSortDir)
err = os.MkdirAll(sorterDir, 0o755)
c.Assert(err, check.IsNil)

conf := config.GetDefaultServerConfig()
conf.DataDir = dataDir
conf.Sorter.SortDir = sorterDir
conf.Sorter.MaxMemoryPressure = 90 // 90%
conf.Sorter.MaxMemoryConsumption = 16 * 1024 * 1024 * 1024 // 16G
config.StoreGlobalServerConfig(conf)
Expand All @@ -143,7 +161,7 @@ func (s *backendPoolSuite) TestCleanUpSelf(c *check.C) {
c.Assert(err, check.IsNil)
defer failpoint.Disable("github.com/pingcap/ticdc/cdc/puller/sorter/memoryPressureInjectPoint") //nolint:errcheck

backEndPool, err := newBackEndPool("/tmp/sorter", "")
backEndPool, err := newBackEndPool(sorterDir, "")
c.Assert(err, check.IsNil)
c.Assert(backEndPool, check.NotNil)

Expand Down
46 changes: 34 additions & 12 deletions cdc/puller/sorter/sorter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ import (
"context"
"math"
"os"
"path/filepath"
"sync/atomic"
"testing"
"time"

"go.uber.org/zap/zapcore"

_ "net/http/pprof"

"github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
Expand All @@ -32,7 +35,6 @@ import (
"github.com/pingcap/ticdc/pkg/util/testleak"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
_ "net/http/pprof"
)

const (
Expand Down Expand Up @@ -62,18 +64,21 @@ func (s *sorterSuite) TestSorterBasic(c *check.C) {
defer UnifiedSorterCleanUp()

conf := config.GetDefaultServerConfig()
conf.DataDir = "/tmp/cdc_data"
sortDir := filepath.Join(conf.DataDir, config.DefaultSortDir)
conf.Sorter = &config.SorterConfig{
NumConcurrentWorker: 8,
ChunkSizeLimit: 1 * 1024 * 1024 * 1024,
MaxMemoryPressure: 60,
MaxMemoryConsumption: 16 * 1024 * 1024 * 1024,
NumWorkerPoolGoroutine: 4,
SortDir: sortDir,
}
config.StoreGlobalServerConfig(conf)

err := os.MkdirAll("/tmp/sorter", 0o755)
err := os.MkdirAll(conf.Sorter.SortDir, 0o755)
c.Assert(err, check.IsNil)
sorter, err := NewUnifiedSorter("/tmp/sorter", "test-cf", "test", 0, "0.0.0.0:0")
sorter, err := NewUnifiedSorter(conf.Sorter.SortDir, "test-cf", "test", 0, "0.0.0.0:0")
c.Assert(err, check.IsNil)

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
Expand All @@ -87,18 +92,21 @@ func (s *sorterSuite) TestSorterCancel(c *check.C) {
defer UnifiedSorterCleanUp()

conf := config.GetDefaultServerConfig()
conf.DataDir = "/tmp/cdc_data"
sortDir := filepath.Join(conf.DataDir, config.DefaultSortDir)
conf.Sorter = &config.SorterConfig{
NumConcurrentWorker: 8,
ChunkSizeLimit: 1 * 1024 * 1024 * 1024,
MaxMemoryPressure: 60,
MaxMemoryConsumption: 0,
NumWorkerPoolGoroutine: 4,
SortDir: sortDir,
}
config.StoreGlobalServerConfig(conf)

err := os.MkdirAll("/tmp/sorter", 0o755)
err := os.MkdirAll(conf.Sorter.SortDir, 0o755)
c.Assert(err, check.IsNil)
sorter, err := NewUnifiedSorter("/tmp/sorter", "test-cf", "test", 0, "0.0.0.0:0")
sorter, err := NewUnifiedSorter(conf.Sorter.SortDir, "test-cf", "test", 0, "0.0.0.0:0")
c.Assert(err, check.IsNil)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
Expand Down Expand Up @@ -127,6 +135,11 @@ func testSorter(ctx context.Context, c *check.C, sorter puller.EventSorter, coun
log.Panic("Could not enable failpoint", zap.Error(err))
}

c.Assert(failpoint.Enable("github.com/pingcap/ticdc/pkg/util/InjectCheckDataDirSatisfied", ""), check.IsNil)
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/ticdc/pkg/util/InjectCheckDataDirSatisfied"), check.IsNil)
}()

ctx, cancel := context.WithCancel(ctx)
errg, ctx := errgroup.WithContext(ctx)
errg.Go(func() error {
Expand Down Expand Up @@ -284,16 +297,19 @@ func (s *sorterSuite) TestSorterCancelRestart(c *check.C) {
defer UnifiedSorterCleanUp()

conf := config.GetDefaultServerConfig()
conf.DataDir = "/tmp/cdc_data"
sortDir := filepath.Join(conf.DataDir, config.DefaultSortDir)
conf.Sorter = &config.SorterConfig{
NumConcurrentWorker: 8,
ChunkSizeLimit: 1 * 1024 * 1024 * 1024,
MaxMemoryPressure: 0, // disable memory sort
MaxMemoryConsumption: 0,
NumWorkerPoolGoroutine: 4,
SortDir: sortDir,
}
config.StoreGlobalServerConfig(conf)

err := os.MkdirAll("/tmp/sorter", 0o755)
err := os.MkdirAll(conf.Sorter.SortDir, 0o755)
c.Assert(err, check.IsNil)

// enable the failpoint to simulate delays
Expand All @@ -311,7 +327,7 @@ func (s *sorterSuite) TestSorterCancelRestart(c *check.C) {
}()

for i := 0; i < 5; i++ {
sorter, err := NewUnifiedSorter("/tmp/sorter", "test-cf", "test", 0, "0.0.0.0:0")
sorter, err := NewUnifiedSorter(conf.Sorter.SortDir, "test-cf", "test", 0, "0.0.0.0:0")
c.Assert(err, check.IsNil)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
err = testSorter(ctx, c, sorter, 100000000, true)
Expand All @@ -325,18 +341,21 @@ func (s *sorterSuite) TestSorterIOError(c *check.C) {
defer UnifiedSorterCleanUp()

conf := config.GetDefaultServerConfig()
conf.DataDir = "/tmp/cdc_data"
sortDir := filepath.Join(conf.DataDir, config.DefaultSortDir)
conf.Sorter = &config.SorterConfig{
NumConcurrentWorker: 8,
ChunkSizeLimit: 1 * 1024 * 1024 * 1024,
MaxMemoryPressure: 60,
MaxMemoryConsumption: 0,
NumWorkerPoolGoroutine: 4,
SortDir: sortDir,
}
config.StoreGlobalServerConfig(conf)

err := os.MkdirAll("/tmp/sorter", 0o755)
err := os.MkdirAll(conf.Sorter.SortDir, 0o755)
c.Assert(err, check.IsNil)
sorter, err := NewUnifiedSorter("/tmp/sorter", "test-cf", "test", 0, "0.0.0.0:0")
sorter, err := NewUnifiedSorter(conf.Sorter.SortDir, "test-cf", "test", 0, "0.0.0.0:0")
c.Assert(err, check.IsNil)

ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
Expand Down Expand Up @@ -373,7 +392,7 @@ func (s *sorterSuite) TestSorterIOError(c *check.C) {
}()

// recreate the sorter
sorter, err = NewUnifiedSorter("/tmp/sorter", "test-cf", "test", 0, "0.0.0.0:0")
sorter, err = NewUnifiedSorter(conf.Sorter.SortDir, "test-cf", "test", 0, "0.0.0.0:0")
c.Assert(err, check.IsNil)

finishedCh = make(chan struct{})
Expand All @@ -399,18 +418,21 @@ func (s *sorterSuite) TestSorterErrorReportCorrect(c *check.C) {
defer log.SetLevel(zapcore.InfoLevel)

conf := config.GetDefaultServerConfig()
conf.DataDir = "/tmp/cdc_data"
sortDir := filepath.Join(conf.DataDir, config.DefaultSortDir)
conf.Sorter = &config.SorterConfig{
NumConcurrentWorker: 8,
ChunkSizeLimit: 1 * 1024 * 1024 * 1024,
MaxMemoryPressure: 60,
MaxMemoryConsumption: 0,
NumWorkerPoolGoroutine: 4,
SortDir: sortDir,
}
config.StoreGlobalServerConfig(conf)

err := os.MkdirAll("/tmp/sorter", 0o755)
err := os.MkdirAll(conf.Sorter.SortDir, 0o755)
c.Assert(err, check.IsNil)
sorter, err := NewUnifiedSorter("/tmp/sorter", "test-cf", "test", 0, "0.0.0.0:0")
sorter, err := NewUnifiedSorter(conf.Sorter.SortDir, "test-cf", "test", 0, "0.0.0.0:0")
c.Assert(err, check.IsNil)

ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
Expand Down
Loading

0 comments on commit 9135351

Please sign in to comment.