Skip to content

Commit

Permalink
br/lightning: migrate chekpoints tests to testify (pingcap#31920)
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing authored Jan 25, 2022
1 parent 3cb5b7d commit f868024
Show file tree
Hide file tree
Showing 4 changed files with 255 additions and 197 deletions.
209 changes: 109 additions & 100 deletions br/pkg/lightning/checkpoints/checkpoints_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,19 @@ package checkpoints_test

import (
"context"
"os"
"path/filepath"
"sort"
"testing"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
"github.com/pingcap/tidb/br/pkg/lightning/verification"
"github.com/stretchr/testify/require"
)

func Test(t *testing.T) {
TestingT(t)
}

var _ = Suite(&cpFileSuite{})

type cpFileSuite struct {
cpdb *checkpoints.FileCheckpointsDB
}

func newTestConfig() *config.Config {
cfg := config.NewConfig()
cfg.Mydumper.SourceDir = "/data"
Expand All @@ -36,16 +27,15 @@ func newTestConfig() *config.Config {
return cfg
}

func (s *cpFileSuite) SetUpTest(c *C) {
dir := c.MkDir()
s.cpdb = checkpoints.NewFileCheckpointsDB(filepath.Join(dir, "cp.pb"))

func newFileCheckpointsDB(t *testing.T) (*checkpoints.FileCheckpointsDB, func()) {
dir, err := os.MkdirTemp("", "checkpointDB***")
require.NoError(t, err)
cpdb := checkpoints.NewFileCheckpointsDB(filepath.Join(dir, "cp.pb"))
ctx := context.Background()
cpdb := s.cpdb

// 2. initialize with checkpoint data.
cfg := newTestConfig()
err := cpdb.Initialize(ctx, cfg, map[string]*checkpoints.TidbDBInfo{
err = cpdb.Initialize(ctx, cfg, map[string]*checkpoints.TidbDBInfo{
"db1": {
Name: "db1",
Tables: map[string]*checkpoints.TidbTableInfo{
Expand All @@ -60,7 +50,7 @@ func (s *cpFileSuite) SetUpTest(c *C) {
},
},
})
c.Assert(err, IsNil)
require.NoError(t, err)

// 3. set some checkpoints

Expand Down Expand Up @@ -90,15 +80,15 @@ func (s *cpFileSuite) SetUpTest(c *C) {
Chunks: nil,
},
})
c.Assert(err, IsNil)
require.NoError(t, err)

err = cpdb.InsertEngineCheckpoints(ctx, "`db2`.`t3`", map[int32]*checkpoints.EngineCheckpoint{
-1: {
Status: checkpoints.CheckpointStatusLoaded,
Chunks: nil,
},
})
c.Assert(err, IsNil)
require.NoError(t, err)

// 4. update some checkpoints

Expand Down Expand Up @@ -131,13 +121,14 @@ func (s *cpFileSuite) SetUpTest(c *C) {
ccm.MergeInto(cpd)

cpdb.Update(map[string]*checkpoints.TableCheckpointDiff{"`db1`.`t2`": cpd})
return cpdb, func() {
err := cpdb.Close()
require.NoError(t, err)
os.RemoveAll(dir)
}
}

func (s *cpFileSuite) TearDownTest(c *C) {
c.Assert(s.cpdb.Close(), IsNil)
}

func (s *cpFileSuite) setInvalidStatus() {
func setInvalidStatus(cpdb *checkpoints.FileCheckpointsDB) {
cpd := checkpoints.NewTableCheckpointDiff()
scm := checkpoints.StatusCheckpointMerger{
EngineID: -1,
Expand All @@ -146,20 +137,22 @@ func (s *cpFileSuite) setInvalidStatus() {
scm.SetInvalid()
scm.MergeInto(cpd)

s.cpdb.Update(map[string]*checkpoints.TableCheckpointDiff{
cpdb.Update(map[string]*checkpoints.TableCheckpointDiff{
"`db1`.`t2`": cpd,
"`db2`.`t3`": cpd,
})
}

func (s *cpFileSuite) TestGet(c *C) {
func TestGet(t *testing.T) {
ctx := context.Background()
cpdb, clean := newFileCheckpointsDB(t)
defer clean()

// 5. get back the checkpoints

cp, err := s.cpdb.Get(ctx, "`db1`.`t2`")
c.Assert(err, IsNil)
c.Assert(cp, DeepEquals, &checkpoints.TableCheckpoint{
cp, err := cpdb.Get(ctx, "`db1`.`t2`")
require.NoError(t, err)
expect := &checkpoints.TableCheckpoint{
Status: checkpoints.CheckpointStatusAllWritten,
AllocBase: 132861,
Checksum: verification.MakeKVChecksum(4492, 686, 486070148910),
Expand Down Expand Up @@ -191,98 +184,110 @@ func (s *cpFileSuite) TestGet(c *C) {
}},
},
},
})
}
require.Equal(t, expect, cp)

cp, err = s.cpdb.Get(ctx, "`db2`.`t3`")
c.Assert(err, IsNil)
c.Assert(cp, DeepEquals, &checkpoints.TableCheckpoint{
cp, err = cpdb.Get(ctx, "`db2`.`t3`")
require.NoError(t, err)
expect = &checkpoints.TableCheckpoint{
Status: checkpoints.CheckpointStatusLoaded,
Engines: map[int32]*checkpoints.EngineCheckpoint{
-1: {
Status: checkpoints.CheckpointStatusLoaded,
Chunks: []*checkpoints.ChunkCheckpoint{},
},
},
})
}
require.Equal(t, expect, cp)

cp, err = s.cpdb.Get(ctx, "`db3`.`not-exists`")
c.Assert(cp, IsNil)
c.Assert(errors.IsNotFound(err), IsTrue)
cp, err = cpdb.Get(ctx, "`db3`.`not-exists`")
require.Nil(t, cp)
require.True(t, errors.IsNotFound(err))
}

func (s *cpFileSuite) TestRemoveAllCheckpoints(c *C) {
func TestRemoveAllCheckpoints(t *testing.T) {
ctx := context.Background()
cpdb, clean := newFileCheckpointsDB(t)
defer clean()

err := s.cpdb.RemoveCheckpoint(ctx, "all")
c.Assert(err, IsNil)
err := cpdb.RemoveCheckpoint(ctx, "all")
require.NoError(t, err)

cp, err := s.cpdb.Get(ctx, "`db1`.`t2`")
c.Assert(cp, IsNil)
c.Assert(errors.IsNotFound(err), IsTrue)
cp, err := cpdb.Get(ctx, "`db1`.`t2`")
require.Nil(t, cp)
require.True(t, errors.IsNotFound(err))

cp, err = s.cpdb.Get(ctx, "`db2`.`t3`")
c.Assert(cp, IsNil)
c.Assert(errors.IsNotFound(err), IsTrue)
cp, err = cpdb.Get(ctx, "`db2`.`t3`")
require.Nil(t, cp)
require.True(t, errors.IsNotFound(err))
}

func (s *cpFileSuite) TestRemoveOneCheckpoint(c *C) {
func TestRemoveOneCheckpoint(t *testing.T) {
ctx := context.Background()
cpdb, clean := newFileCheckpointsDB(t)
defer clean()

err := s.cpdb.RemoveCheckpoint(ctx, "`db1`.`t2`")
c.Assert(err, IsNil)
err := cpdb.RemoveCheckpoint(ctx, "`db1`.`t2`")
require.NoError(t, err)

cp, err := s.cpdb.Get(ctx, "`db1`.`t2`")
c.Assert(cp, IsNil)
c.Assert(errors.IsNotFound(err), IsTrue)
cp, err := cpdb.Get(ctx, "`db1`.`t2`")
require.Nil(t, cp)
require.True(t, errors.IsNotFound(err))

cp, err = s.cpdb.Get(ctx, "`db2`.`t3`")
c.Assert(err, IsNil)
c.Assert(cp.Status, Equals, checkpoints.CheckpointStatusLoaded)
cp, err = cpdb.Get(ctx, "`db2`.`t3`")
require.NoError(t, err)
require.Equal(t, checkpoints.CheckpointStatusLoaded, cp.Status)
}

func (s *cpFileSuite) TestIgnoreAllErrorCheckpoints(c *C) {
func TestIgnoreAllErrorCheckpoints(t *testing.T) {
ctx := context.Background()
cpdb, clean := newFileCheckpointsDB(t)
defer clean()

s.setInvalidStatus()
setInvalidStatus(cpdb)

err := s.cpdb.IgnoreErrorCheckpoint(ctx, "all")
c.Assert(err, IsNil)
err := cpdb.IgnoreErrorCheckpoint(ctx, "all")
require.NoError(t, err)

cp, err := s.cpdb.Get(ctx, "`db1`.`t2`")
c.Assert(err, IsNil)
c.Assert(cp.Status, Equals, checkpoints.CheckpointStatusLoaded)
cp, err := cpdb.Get(ctx, "`db1`.`t2`")
require.NoError(t, err)
require.Equal(t, checkpoints.CheckpointStatusLoaded, cp.Status)

cp, err = s.cpdb.Get(ctx, "`db2`.`t3`")
c.Assert(err, IsNil)
c.Assert(cp.Status, Equals, checkpoints.CheckpointStatusLoaded)
cp, err = cpdb.Get(ctx, "`db2`.`t3`")
require.NoError(t, err)
require.Equal(t, checkpoints.CheckpointStatusLoaded, cp.Status)
}

func (s *cpFileSuite) TestIgnoreOneErrorCheckpoints(c *C) {
func TestIgnoreOneErrorCheckpoints(t *testing.T) {
ctx := context.Background()
cpdb, clean := newFileCheckpointsDB(t)
defer clean()

s.setInvalidStatus()
setInvalidStatus(cpdb)

err := s.cpdb.IgnoreErrorCheckpoint(ctx, "`db1`.`t2`")
c.Assert(err, IsNil)
err := cpdb.IgnoreErrorCheckpoint(ctx, "`db1`.`t2`")
require.NoError(t, err)

cp, err := s.cpdb.Get(ctx, "`db1`.`t2`")
c.Assert(err, IsNil)
c.Assert(cp.Status, Equals, checkpoints.CheckpointStatusLoaded)
cp, err := cpdb.Get(ctx, "`db1`.`t2`")
require.NoError(t, err)
require.Equal(t, checkpoints.CheckpointStatusLoaded, cp.Status)

cp, err = s.cpdb.Get(ctx, "`db2`.`t3`")
c.Assert(err, IsNil)
c.Assert(cp.Status, Equals, checkpoints.CheckpointStatusAllWritten/10)
cp, err = cpdb.Get(ctx, "`db2`.`t3`")
require.NoError(t, err)
require.Equal(t, checkpoints.CheckpointStatusAllWritten/10, cp.Status)
}

func (s *cpFileSuite) TestDestroyAllErrorCheckpoints(c *C) {
func TestDestroyAllErrorCheckpoints(t *testing.T) {
ctx := context.Background()
cpdb, clean := newFileCheckpointsDB(t)
defer clean()

s.setInvalidStatus()
setInvalidStatus(cpdb)

dtc, err := s.cpdb.DestroyErrorCheckpoint(ctx, "all")
c.Assert(err, IsNil)
dtc, err := cpdb.DestroyErrorCheckpoint(ctx, "all")
require.NoError(t, err)
sort.Slice(dtc, func(i, j int) bool { return dtc[i].TableName < dtc[j].TableName })
c.Assert(dtc, DeepEquals, []checkpoints.DestroyedTableCheckpoint{
expect := []checkpoints.DestroyedTableCheckpoint{
{
TableName: "`db1`.`t2`",
MinEngineID: -1,
Expand All @@ -293,37 +298,41 @@ func (s *cpFileSuite) TestDestroyAllErrorCheckpoints(c *C) {
MinEngineID: -1,
MaxEngineID: -1,
},
})
}
require.Equal(t, expect, dtc)

cp, err := s.cpdb.Get(ctx, "`db1`.`t2`")
c.Assert(cp, IsNil)
c.Assert(errors.IsNotFound(err), IsTrue)
cp, err := cpdb.Get(ctx, "`db1`.`t2`")
require.Nil(t, cp)
require.True(t, errors.IsNotFound(err))

cp, err = s.cpdb.Get(ctx, "`db2`.`t3`")
c.Assert(cp, IsNil)
c.Assert(errors.IsNotFound(err), IsTrue)
cp, err = cpdb.Get(ctx, "`db2`.`t3`")
require.Nil(t, cp)
require.True(t, errors.IsNotFound(err))
}

func (s *cpFileSuite) TestDestroyOneErrorCheckpoint(c *C) {
func TestDestroyOneErrorCheckpoint(t *testing.T) {
ctx := context.Background()
cpdb, clean := newFileCheckpointsDB(t)
defer clean()

s.setInvalidStatus()
setInvalidStatus(cpdb)

dtc, err := s.cpdb.DestroyErrorCheckpoint(ctx, "`db1`.`t2`")
c.Assert(err, IsNil)
c.Assert(dtc, DeepEquals, []checkpoints.DestroyedTableCheckpoint{
dtc, err := cpdb.DestroyErrorCheckpoint(ctx, "`db1`.`t2`")
require.NoError(t, err)
expect := []checkpoints.DestroyedTableCheckpoint{
{
TableName: "`db1`.`t2`",
MinEngineID: -1,
MaxEngineID: 0,
},
})
}
require.Equal(t, expect, dtc)

cp, err := s.cpdb.Get(ctx, "`db1`.`t2`")
c.Assert(cp, IsNil)
c.Assert(errors.IsNotFound(err), IsTrue)
cp, err := cpdb.Get(ctx, "`db1`.`t2`")
require.Nil(t, cp)
require.True(t, errors.IsNotFound(err))

cp, err = s.cpdb.Get(ctx, "`db2`.`t3`")
c.Assert(err, IsNil)
c.Assert(cp.Status, Equals, checkpoints.CheckpointStatusAllWritten/10)
cp, err = cpdb.Get(ctx, "`db2`.`t3`")
require.NoError(t, err)
require.Equal(t, checkpoints.CheckpointStatusAllWritten/10, cp.Status)
}
Loading

0 comments on commit f868024

Please sign in to comment.