Skip to content

Commit

Permalink
Merge branch 'release-6.1' into cherry-pick-7739-to-release-6.1
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored Jan 18, 2023
2 parents 59f54a0 + 34aaa8b commit 85bd2e5
Show file tree
Hide file tree
Showing 194 changed files with 5,018 additions and 1,902 deletions.
13 changes: 13 additions & 0 deletions cdc/api/open.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,19 @@ func (h *openAPI) CreateChangefeed(c *gin.Context) {
return
}

o, err := h.capture.GetOwner()
if err != nil {
needRemoveGCSafePoint = true
_ = c.Error(err)
return
}
err = o.ValidateChangefeed(&changefeedConfig)
if err != nil {
needRemoveGCSafePoint = true
_ = c.Error(err)
return
}

err = h.capture.EtcdClient.CreateChangefeedInfo(ctx, info,
model.DefaultChangeFeedID(changefeedConfig.ID))
if err != nil {
Expand Down
20 changes: 15 additions & 5 deletions cdc/api/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,22 +188,32 @@ func verifyUpdateChangefeedConfig(ctx context.Context, changefeedConfig model.Ch
}
}

var sinkConfigUpdated, sinkURIUpdated bool
if len(changefeedConfig.IgnoreTxnStartTs) != 0 {
newInfo.Config.Filter.IgnoreTxnStartTs = changefeedConfig.IgnoreTxnStartTs
}

if changefeedConfig.MounterWorkerNum != 0 {
newInfo.Config.Mounter.WorkerNum = changefeedConfig.MounterWorkerNum
}

if changefeedConfig.SinkConfig != nil {
sinkConfigUpdated = true
newInfo.Config.Sink = changefeedConfig.SinkConfig
}

// verify sink_uri
if changefeedConfig.SinkURI != "" {
sinkURIUpdated = true
newInfo.SinkURI = changefeedConfig.SinkURI
if err := sink.Validate(ctx, changefeedConfig.SinkURI, newInfo.Config, newInfo.Opts); err != nil {
}

if sinkConfigUpdated || sinkURIUpdated {
// check sink config is compatible with sinkURI
newCfg := newInfo.Config.Sink
oldCfg := oldInfo.Config.Sink
err := newCfg.CheckCompatibilityWithSinkURI(oldCfg, newInfo.SinkURI)
if err != nil {
return nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err)
}

if err := sink.Validate(ctx, newInfo.SinkURI, newInfo.Config, newInfo.Opts); err != nil {
return nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err)
}
}
Expand Down
3 changes: 2 additions & 1 deletion cdc/entry/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ func decodeRowV1(b []byte, tableInfo *model.TableInfo, tz *time.Location) (map[i

// decodeRowV2 decodes value data using new encoding format.
// Ref: https://github.com/pingcap/tidb/pull/12634
// https://github.com/pingcap/tidb/blob/master/docs/design/2018-07-19-row-format.md
//
// https://github.com/pingcap/tidb/blob/master/docs/design/2018-07-19-row-format.md
func decodeRowV2(data []byte, columns []rowcodec.ColInfo, tz *time.Location) (map[int64]types.Datum, error) {
decoder := rowcodec.NewDatumMapDecoder(columns, tz)
datums, err := decoder.DecodeToDatumMap(data, nil)
Expand Down
181 changes: 181 additions & 0 deletions cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,20 @@
package entry

import (
"bytes"
"context"
"strings"
"testing"
"time"

"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tiflow/pkg/sqlmodel"

"github.com/pingcap/log"
ticonfig "github.com/pingcap/tidb/config"
tidbkv "github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -961,3 +970,175 @@ func TestGetDefaultZeroValue(t *testing.T) {
require.Equal(t, tc.Default, val, tc.Name)
}
}

func TestBuildTableInfo(t *testing.T) {
cases := []struct {
origin string
recovered string
recoveredWithNilCol string
}{
{
"CREATE TABLE t1 (c INT PRIMARY KEY)",
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `c` int(0) NOT NULL,\n" +
" PRIMARY KEY (`c`(0)) /*T![clustered_index] CLUSTERED */\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `c` int(0) NOT NULL,\n" +
" PRIMARY KEY (`c`(0)) /*T![clustered_index] CLUSTERED */\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
},
{
"CREATE TABLE t1 (" +
" c INT UNSIGNED," +
" c2 VARCHAR(10) NOT NULL," +
" c3 BIT(10) NOT NULL," +
" UNIQUE KEY (c2, c3)" +
")",
// CDC discards field length.
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `c` int(0) unsigned DEFAULT NULL,\n" +
" `c2` varchar(0) NOT NULL,\n" +
" `c3` bit(0) NOT NULL,\n" +
" UNIQUE KEY `idx_0` (`c2`(0),`c3`(0))\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `omitted` unspecified CHARACTER SET COLLATE GENERATED ALWAYS AS (pass_generated_check) VIRTUAL,\n" +
" `c2` varchar(0) NOT NULL,\n" +
" `c3` bit(0) NOT NULL,\n" +
" UNIQUE KEY `idx_0` (`c2`(0),`c3`(0))\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
},
{
"CREATE TABLE t1 (" +
" c INT UNSIGNED," +
" gen INT AS (c+1) VIRTUAL," +
" c2 VARCHAR(10) NOT NULL," +
" gen2 INT AS (c+2) STORED," +
" c3 BIT(10) NOT NULL," +
" PRIMARY KEY (c, c2)" +
")",
// CDC discards virtual generated column, and generating expression of stored generated column.
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `c` int(0) unsigned NOT NULL,\n" +
" `c2` varchar(0) NOT NULL,\n" +
" `gen2` int(0) GENERATED ALWAYS AS (pass_generated_check) STORED,\n" +
" `c3` bit(0) NOT NULL,\n" +
" PRIMARY KEY (`c`(0),`c2`(0)) /*T![clustered_index] CLUSTERED */\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `c` int(0) unsigned NOT NULL,\n" +
" `c2` varchar(0) NOT NULL,\n" +
" `omitted` unspecified CHARACTER SET COLLATE GENERATED ALWAYS AS (pass_generated_check) VIRTUAL,\n" +
" `omitted` unspecified CHARACTER SET COLLATE GENERATED ALWAYS AS (pass_generated_check) VIRTUAL,\n" +
" PRIMARY KEY (`c`(0),`c2`(0)) /*T![clustered_index] CLUSTERED */\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
},
{
"CREATE TABLE `t1` (" +
" `a` int(11) NOT NULL," +
" `b` int(11) DEFAULT NULL," +
" `c` int(11) DEFAULT NULL," +
" PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */," +
" UNIQUE KEY `b` (`b`)" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `a` int(0) NOT NULL,\n" +
" `b` int(0) DEFAULT NULL,\n" +
" `c` int(0) DEFAULT NULL,\n" +
" PRIMARY KEY (`a`(0)) /*T![clustered_index] CLUSTERED */,\n" +
" UNIQUE KEY `idx_1` (`b`(0))\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `a` int(0) NOT NULL,\n" +
" `omitted` unspecified CHARACTER SET COLLATE GENERATED ALWAYS AS (pass_generated_check) VIRTUAL,\n" +
" `omitted` unspecified CHARACTER SET COLLATE GENERATED ALWAYS AS (pass_generated_check) VIRTUAL,\n" +
" PRIMARY KEY (`a`(0)) /*T![clustered_index] CLUSTERED */\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
},
}
p := parser.New()
for _, c := range cases {
stmt, err := p.ParseOneStmt(c.origin, "", "")
require.NoError(t, err)
originTI, err := ddl.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt))
require.NoError(t, err)
cdcTableInfo := model.WrapTableInfo(0, "test", 0, originTI)
cols, err := datum2Column(cdcTableInfo, map[int64]types.Datum{}, true)
require.NoError(t, err)
recoveredTI := model.BuildTiDBTableInfo(cols, cdcTableInfo.IndexColumnsOffset)
handle := sqlmodel.GetWhereHandle(recoveredTI, recoveredTI)
require.NotNil(t, handle.UniqueNotNullIdx)
require.Equal(t, c.recovered, showCreateTable(t, recoveredTI))

// mimic the columns are set to nil when old value feature is disabled
for i := range cols {
if !cols[i].Flag.IsHandleKey() {
cols[i] = nil
}
}
recoveredTI = model.BuildTiDBTableInfo(cols, cdcTableInfo.IndexColumnsOffset)
handle = sqlmodel.GetWhereHandle(recoveredTI, recoveredTI)
require.NotNil(t, handle.UniqueNotNullIdx)
require.Equal(t, c.recoveredWithNilCol, showCreateTable(t, recoveredTI))
}
}

var tiCtx = mock.NewContext()

func showCreateTable(t *testing.T, ti *timodel.TableInfo) string {
result := bytes.NewBuffer(make([]byte, 0, 512))
err := executor.ConstructResultOfShowCreateTable(tiCtx, ti, autoid.Allocators{}, result)
require.NoError(t, err)
return result.String()
}

func TestNewDMRowChange(t *testing.T) {
cases := []struct {
origin string
recovered string
}{
{
"CREATE TABLE t1 (id INT," +
" a1 INT NOT NULL," +
" a3 INT NOT NULL," +
" UNIQUE KEY dex1(a1, a3));",
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `id` int(0) DEFAULT NULL,\n" +
" `a1` int(0) NOT NULL,\n" +
" `a3` int(0) NOT NULL,\n" +
" UNIQUE KEY `idx_0` (`a1`(0),`a3`(0))\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
},
}
p := parser.New()
for _, c := range cases {
stmt, err := p.ParseOneStmt(c.origin, "", "")
require.NoError(t, err)
originTI, err := ddl.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt))
require.NoError(t, err)
cdcTableInfo := model.WrapTableInfo(0, "test", 0, originTI)
cols := []*model.Column{
{
Name: "id", Type: 3, Charset: "binary", Flag: 65, Value: 1, Default: nil,
},
{
Name: "a1", Type: 3, Charset: "binary", Flag: 51, Value: 1, Default: nil,
},
{
Name: "a3", Type: 3, Charset: "binary", Flag: 51, Value: 2, Default: nil,
},
}
recoveredTI := model.BuildTiDBTableInfo(cols, cdcTableInfo.IndexColumnsOffset)
require.Equal(t, c.recovered, showCreateTable(t, recoveredTI))
tableName := &model.TableName{Schema: "db", Table: "t1"}
rowChange := sqlmodel.NewRowChange(tableName, nil, []interface{}{1, 1, 2}, nil, recoveredTI, nil, nil)
sqlGot, argsGot := rowChange.GenSQL(sqlmodel.DMLDelete)
require.Equal(t, "DELETE FROM `db`.`t1` WHERE `a1` = ? AND `a3` = ? LIMIT 1", sqlGot)
require.Equal(t, []interface{}{1, 2}, argsGot)

sqlGot, argsGot = sqlmodel.GenDeleteSQL(rowChange, rowChange)
require.Equal(t, "DELETE FROM `db`.`t1` WHERE (`a1`,`a3`) IN ((?,?),(?,?))", sqlGot)
require.Equal(t, []interface{}{1, 2, 1, 2}, argsGot)
}
}
4 changes: 2 additions & 2 deletions cdc/entry/schema/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,8 +712,8 @@ func (s *snapshot) doDropTable(tbInfo *model.TableInfo, currentTs uint64) {

// truncateTable truncate the table with the given ID, and replace it with a new `tbInfo`.
// NOTE: after a table is truncated:
// * physicalTableByID(id) will return nil;
// * IsTruncateTableID(id) should return true.
// - physicalTableByID(id) will return nil;
// - IsTruncateTableID(id) should return true.
func (s *snapshot) truncateTable(id int64, tbInfo *model.TableInfo, currentTs uint64) (err error) {
old, ok := s.physicalTableByID(id)
if !ok {
Expand Down
8 changes: 4 additions & 4 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1150,10 +1150,10 @@ func (s *eventFeedSession) getRPCContextForRegion(ctx context.Context, id tikv.R
// receiveFromStream receives gRPC messages from a stream continuously and sends
// messages to region worker, if `stream.Recv` meets error, this routine will exit
// silently. As for regions managed by this routine, there are two situations:
// 1. established regions: a `nil` event will be sent to region worker, and region
// worker call `s.onRegionFail` to re-establish these regions.
// 2. pending regions: call `s.onRegionFail` for each pending region before this
// routine exits to establish these regions.
// 1. established regions: a `nil` event will be sent to region worker, and region
// worker call `s.onRegionFail` to re-establish these regions.
// 2. pending regions: call `s.onRegionFail` for each pending region before this
// routine exits to establish these regions.
func (s *eventFeedSession) receiveFromStream(
ctx context.Context,
g *errgroup.Group,
Expand Down
30 changes: 15 additions & 15 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2766,10 +2766,10 @@ func TestResolveLockNoCandidate(t *testing.T) {
// TestFailRegionReentrant tests one region could be failover multiple times,
// kv client must avoid duplicated `onRegionFail` call for the same region.
// In this test
// 1. An `unknownErr` is sent to kv client first to trigger `handleSingleRegionError` in region worker.
// 2. We delay the kv client to re-create a new region request by 500ms via failpoint.
// 3. Before new region request is fired, simulate kv client `stream.Recv` returns an error, the stream
// handler will signal region worker to exit, which will evict all active region states then.
// 1. An `unknownErr` is sent to kv client first to trigger `handleSingleRegionError` in region worker.
// 2. We delay the kv client to re-create a new region request by 500ms via failpoint.
// 3. Before new region request is fired, simulate kv client `stream.Recv` returns an error, the stream
// handler will signal region worker to exit, which will evict all active region states then.
func TestFailRegionReentrant(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
Expand Down Expand Up @@ -2849,17 +2849,17 @@ func TestFailRegionReentrant(t *testing.T) {

// TestClientV1UnlockRangeReentrant tests clientV1 can handle region reconnection
// with unstable TiKV store correctly. The test workflow is as follows:
// 1. kv client establishes two regions request, naming region-1, region-2, they
// belong to the same TiKV store.
// 2. The region-1 is firstly established, yet region-2 has some delay after its
// region state is inserted into `pendingRegions`
// 3. At this time the TiKV store crashes and `stream.Recv` returns error. In the
// defer function of `receiveFromStream`, all pending regions will be cleaned
// up, which means the region lock will be unlocked once for these regions.
// 4. In step-2, the region-2 continues to run, it can't get store stream which
// has been deleted in step-3, so it will create new stream but fails because
// of unstable TiKV store, at this point, the kv client should handle with the
// pending region correctly.
// 1. kv client establishes two regions request, naming region-1, region-2, they
// belong to the same TiKV store.
// 2. The region-1 is firstly established, yet region-2 has some delay after its
// region state is inserted into `pendingRegions`
// 3. At this time the TiKV store crashes and `stream.Recv` returns error. In the
// defer function of `receiveFromStream`, all pending regions will be cleaned
// up, which means the region lock will be unlocked once for these regions.
// 4. In step-2, the region-2 continues to run, it can't get store stream which
// has been deleted in step-3, so it will create new stream but fails because
// of unstable TiKV store, at this point, the kv client should handle with the
// pending region correctly.
func TestClientV1UnlockRangeReentrant(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
Expand Down
5 changes: 3 additions & 2 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tiflow/pkg/cyclic/mark"
cerror "github.com/pingcap/tiflow/pkg/errors"
cerrors "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/util"
"github.com/pingcap/tiflow/pkg/version"
"github.com/tikv/client-go/v2/oracle"
Expand Down Expand Up @@ -341,7 +342,7 @@ func (info *ChangeFeedInfo) fixMySQLSinkProtocol() {
return
}

if config.IsMqScheme(uri.Scheme) {
if sink.IsMQScheme(uri.Scheme) {
return
}

Expand All @@ -366,7 +367,7 @@ func (info *ChangeFeedInfo) fixMQSinkProtocol() {
return
}

if !config.IsMqScheme(uri.Scheme) {
if !sink.IsMQScheme(uri.Scheme) {
return
}

Expand Down
6 changes: 3 additions & 3 deletions cdc/model/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,21 +780,21 @@ func TestChangefeedInfoStringer(t *testing.T) {
SinkURI: "mysql://root:[email protected]:3306/",
StartTs: 418881574869139457,
},
`.*mysql://root:xxxx@127.0.0.1:3306.*`,
`.*mysql://root:xxxxx@127.0.0.1:3306.*`,
},
{
&ChangeFeedInfo{
SinkURI: "mysql://[email protected]:3306/",
StartTs: 418881574869139457,
},
`.*mysql://root:xxxx@127.0.0.1:3306.*`,
`.*mysql://[email protected]:3306.*`,
},
{
&ChangeFeedInfo{
SinkURI: "mysql://root:test%21%23%24%25%5E%26%[email protected]:3306/",
StartTs: 418881574869139457,
},
`.*mysql://root:xxxx@127.0.0.1:3306/.*`,
`.*mysql://root:xxxxx@127.0.0.1:3306/.*`,
},
}

Expand Down
Loading

0 comments on commit 85bd2e5

Please sign in to comment.