Skip to content

Commit

Permalink
Merge branch 'release-5.3' into rustin-patch-owner-nil
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jun 14, 2022
2 parents f8aafb1 + 1bbceb1 commit 3c0641c
Show file tree
Hide file tree
Showing 25 changed files with 413 additions and 44 deletions.
2 changes: 1 addition & 1 deletion dm/dm/config/source_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func (c *SourceConfig) Adjust(ctx context.Context, db *sql.DB) (err error) {
log.L().Warn("using an absolute relay path, relay log can't work when starting multiple relay worker")
}

return c.AdjustCaseSensitive(ctx2, db)
return nil
}

// AdjustCaseSensitive adjust CaseSensitive from DB.
Expand Down
2 changes: 1 addition & 1 deletion dm/dm/master/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (t *testMaster) TestCollectSourceConfigFilesV1Import(c *C) {
cfg1.From.User = user
cfg1.From.Password = password
cfg1.RelayDir = "relay-dir"
c.Assert(checkAndAdjustSourceConfigFunc(ctx, cfg1), IsNil) // adjust source config.
c.Assert(checkAndAdjustSourceConfigForDMCtlFunc(ctx, cfg1), IsNil) // adjust source config.
cfg2 := cfg1.Clone()
cfg2.SourceID = "mysql-replica-02"

Expand Down
36 changes: 31 additions & 5 deletions dm/dm/master/openapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ package master

import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"net/http"
"net/http/httputil"

"github.com/pingcap/failpoint"

"github.com/deepmap/oapi-codegen/pkg/middleware"
"github.com/labstack/echo/v4"
Expand All @@ -45,7 +48,7 @@ const (

// redirectRequestToLeaderMW a middleware auto redirect request to leader.
// because the leader has some data in memory, only the leader can process the request.
func (s *Server) redirectRequestToLeaderMW() echo.MiddlewareFunc {
func (s *Server) reverseRequestToLeaderMW(tlsCfg *tls.Config) echo.MiddlewareFunc {
return func(next echo.HandlerFunc) echo.HandlerFunc {
return func(ctx echo.Context) error {
ctx2 := ctx.Request().Context()
Expand All @@ -58,13 +61,36 @@ func (s *Server) redirectRequestToLeaderMW() echo.MiddlewareFunc {
if err != nil {
return err
}
return ctx.Redirect(http.StatusTemporaryRedirect, fmt.Sprintf("http://%s%s", leaderOpenAPIAddr, ctx.Request().RequestURI))

failpoint.Inject("MockNotSetTls", func() {
tlsCfg = nil
})
// simpleProxy just reverses to leader host
simpleProxy := httputil.ReverseProxy{
Director: func(req *http.Request) {
if tlsCfg != nil {
req.URL.Scheme = "https"
} else {
req.URL.Scheme = "http"
}
req.URL.Host = leaderOpenAPIAddr
req.Host = leaderOpenAPIAddr
},
}
if tlsCfg != nil {
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.TLSClientConfig = tlsCfg
simpleProxy.Transport = transport
}
log.L().Info("reverse request to leader", zap.String("Request URL", ctx.Request().URL.String()), zap.String("leader", leaderOpenAPIAddr), zap.Bool("hasTLS", tlsCfg != nil))
simpleProxy.ServeHTTP(ctx.Response(), ctx.Request())
return nil
}
}
}

// InitOpenAPIHandles init openapi handlers.
func (s *Server) InitOpenAPIHandles() error {
func (s *Server) InitOpenAPIHandles(tlsCfg *tls.Config) error {
swagger, err := openapi.GetSwagger()
if err != nil {
return err
Expand All @@ -77,7 +103,7 @@ func (s *Server) InitOpenAPIHandles() error {
// set logger
e.Use(openapi.ZapLogger(logger))
e.Use(echomiddleware.Recover())
e.Use(s.redirectRequestToLeaderMW())
e.Use(s.reverseRequestToLeaderMW(tlsCfg))
// disables swagger server name validation. it seems to work poorly
swagger.Servers = nil
// use our validation middleware to check all requests against the OpenAPI schema.
Expand Down
103 changes: 100 additions & 3 deletions dm/dm/master/openapi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ import (
"context"
"fmt"
"net/http"
"os"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/DATA-DOG/go-sqlmock"
"github.com/deepmap/oapi-codegen/pkg/testutil"
"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -75,7 +78,7 @@ func (t *openAPISuite) SetUpTest(c *check.C) {
c.Assert(ha.ClearTestInfoOperation(t.etcdTestCli), check.IsNil)
}

func (t *openAPISuite) TestRedirectRequestToLeader(c *check.C) {
func (t *openAPISuite) TestReverseRequestToLeader(c *check.C) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -134,9 +137,103 @@ func (t *openAPISuite) TestRedirectRequestToLeader(c *check.C) {
c.Assert(resultListSource.Data, check.HasLen, 0)
c.Assert(resultListSource.Total, check.Equals, 0)

// list source not from leader will get a redirect
// list source from non-leader will get result too
result2 := testutil.NewRequest().Get(baseURL).Go(t.testT, s2.echo)
c.Assert(result2.Code(), check.Equals, http.StatusTemporaryRedirect)
c.Assert(result2.Code(), check.Equals, http.StatusOK)
var resultListSource2 openapi.GetSourceListResponse
err = result2.UnmarshalBodyToObject(&resultListSource2)
c.Assert(err, check.IsNil)
c.Assert(resultListSource2.Data, check.HasLen, 0)
c.Assert(resultListSource2.Total, check.Equals, 0)
}

func (t *openAPISuite) TestReverseRequestToHttpsLeader(c *check.C) {
pwd, err := os.Getwd()
require.NoError(t.testT, err)
caPath := pwd + "/tls_for_test/ca.pem"
certPath := pwd + "/tls_for_test/dm.pem"
keyPath := pwd + "/tls_for_test/dm.key"

// master1
masterAddr1 := tempurl.Alloc()[len("http://"):]
peerAddr1 := tempurl.Alloc()[len("http://"):]
cfg1 := NewConfig()
require.NoError(t.testT, cfg1.Parse([]string{
"--name=dm-master-tls-1",
fmt.Sprintf("--data-dir=%s", t.testT.TempDir()),
fmt.Sprintf("--master-addr=https://%s", masterAddr1),
fmt.Sprintf("--advertise-addr=https://%s", masterAddr1),
fmt.Sprintf("--peer-urls=https://%s", peerAddr1),
fmt.Sprintf("--advertise-peer-urls=https://%s", peerAddr1),
fmt.Sprintf("--initial-cluster=dm-master-tls-1=https://%s", peerAddr1),
"--ssl-ca=" + caPath,
"--ssl-cert=" + certPath,
"--ssl-key=" + keyPath,
}))
cfg1.ExperimentalFeatures.OpenAPI = true
s1 := NewServer(cfg1)
ctx1, cancel1 := context.WithCancel(context.Background())
require.NoError(t.testT, s1.Start(ctx1))
defer func() {
cancel1()
s1.Close()
}()
// wait the first one become the leader
require.True(t.testT, utils.WaitSomething(30, 100*time.Millisecond, func() bool {
return s1.election.IsLeader() && s1.scheduler.Started()
}))

// master2
masterAddr2 := tempurl.Alloc()[len("http://"):]
peerAddr2 := tempurl.Alloc()[len("http://"):]
cfg2 := NewConfig()
require.NoError(t.testT, cfg2.Parse([]string{
"--name=dm-master-tls-2",
fmt.Sprintf("--data-dir=%s", t.testT.TempDir()),
fmt.Sprintf("--master-addr=https://%s", masterAddr2),
fmt.Sprintf("--advertise-addr=https://%s", masterAddr2),
fmt.Sprintf("--peer-urls=https://%s", peerAddr2),
fmt.Sprintf("--advertise-peer-urls=https://%s", peerAddr2),
"--ssl-ca=" + caPath,
"--ssl-cert=" + certPath,
"--ssl-key=" + keyPath,
}))
cfg2.ExperimentalFeatures.OpenAPI = true
cfg2.Join = s1.cfg.MasterAddr // join to an existing cluster
s2 := NewServer(cfg2)
ctx2, cancel2 := context.WithCancel(context.Background())
require.NoError(t.testT, s2.Start(ctx2))
defer func() {
cancel2()
s2.Close()
}()
// wait the second master ready
require.False(t.testT, utils.WaitSomething(30, 100*time.Millisecond, func() bool {
return s2.election.IsLeader()
}))

baseURL := "/api/v1/sources"
// list source from leader
result := testutil.NewRequest().Get(baseURL).Go(t.testT, s1.echo)
require.Equal(t.testT, http.StatusOK, result.Code())
var resultListSource openapi.GetSourceListResponse
require.NoError(t.testT, result.UnmarshalBodyToObject(&resultListSource))
require.Len(t.testT, resultListSource.Data, 0)
require.Equal(t.testT, 0, resultListSource.Total)

// with tls, list source not from leader will get result too
result = testutil.NewRequest().Get(baseURL).Go(t.testT, s2.echo)
require.Equal(t.testT, http.StatusOK, result.Code())
var resultListSource2 openapi.GetSourceListResponse
require.NoError(t.testT, result.UnmarshalBodyToObject(&resultListSource2))
require.Len(t.testT, resultListSource2.Data, 0)
require.Equal(t.testT, 0, resultListSource2.Total)

// without tls, list source not from leader will be 502
require.NoError(t.testT, failpoint.Enable("github.com/pingcap/tiflow/dm/dm/master/MockNotSetTls", `return()`))
result = testutil.NewRequest().Get(baseURL).Go(t.testT, s2.echo)
require.Equal(t.testT, http.StatusBadGateway, result.Code())
require.NoError(t.testT, failpoint.Disable("github.com/pingcap/tiflow/dm/dm/master/MockNotSetTls"))
}

func (t *openAPISuite) TestOpenAPIWillNotStartInDefaultConfig(c *check.C) {
Expand Down
36 changes: 32 additions & 4 deletions dm/dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package master

import (
"context"
"database/sql"
"fmt"
"net"
"net/http"
Expand Down Expand Up @@ -81,7 +82,11 @@ var (
registerOnce sync.Once
runBackgroundOnce sync.Once

checkAndAdjustSourceConfigFunc = checkAndAdjustSourceConfig
// the difference of below functions is checkAndAdjustSourceConfigForDMCtlFunc will not AdjustCaseSensitive. It's a
// compatibility compromise.
// When we need to change the implementation of dmctl to OpenAPI, we should notice the user about this change.
checkAndAdjustSourceConfigFunc = checkAndAdjustSourceConfig
checkAndAdjustSourceConfigForDMCtlFunc = checkAndAdjustSourceConfigForDMCtl
)

// Server handles RPC requests for dm-master.
Expand Down Expand Up @@ -190,8 +195,14 @@ func (s *Server) Start(ctx context.Context) (err error) {
"/status": getStatusHandle(),
"/debug/": getDebugHandler(),
}

if s.cfg.ExperimentalFeatures.OpenAPI {
if initOpenAPIErr := s.InitOpenAPIHandles(); initOpenAPIErr != nil {
// tls3 is used to openapi reverse proxy
tls3, err1 := toolutils.NewTLS(s.cfg.SSLCA, s.cfg.SSLCert, s.cfg.SSLKey, s.cfg.AdvertiseAddr, s.cfg.CertAllowedCN)
if err1 != nil {
return terror.ErrMasterTLSConfigNotValid.Delegate(err1)
}
if initOpenAPIErr := s.InitOpenAPIHandles(tls3.TLSConfig()); initOpenAPIErr != nil {
return terror.ErrOpenAPICommonError.Delegate(initOpenAPIErr)
}
userHandles["/api/v1/"] = s.echo
Expand Down Expand Up @@ -1121,15 +1132,19 @@ func parseAndAdjustSourceConfig(ctx context.Context, contents []string) ([]*conf
if err != nil {
return cfgs, err
}
if err := checkAndAdjustSourceConfigFunc(ctx, cfg); err != nil {
if err := checkAndAdjustSourceConfigForDMCtlFunc(ctx, cfg); err != nil {
return cfgs, err
}
cfgs[i] = cfg
}
return cfgs, nil
}

func checkAndAdjustSourceConfig(ctx context.Context, cfg *config.SourceConfig) error {
func innerCheckAndAdjustSourceConfig(
ctx context.Context,
cfg *config.SourceConfig,
hook func(sourceConfig *config.SourceConfig, ctx context.Context, db *sql.DB) error,
) error {
dbConfig := cfg.GenerateDBConfig()
fromDB, err := conn.DefaultDBProvider.Apply(dbConfig)
if err != nil {
Expand All @@ -1139,12 +1154,25 @@ func checkAndAdjustSourceConfig(ctx context.Context, cfg *config.SourceConfig) e
if err = cfg.Adjust(ctx, fromDB.DB); err != nil {
return err
}
if hook != nil {
if err = hook(cfg, ctx, fromDB.DB); err != nil {
return err
}
}
if _, err = cfg.Yaml(); err != nil {
return err
}
return cfg.Verify()
}

func checkAndAdjustSourceConfig(ctx context.Context, cfg *config.SourceConfig) error {
return innerCheckAndAdjustSourceConfig(ctx, cfg, (*config.SourceConfig).AdjustCaseSensitive)
}

func checkAndAdjustSourceConfigForDMCtl(ctx context.Context, cfg *config.SourceConfig) error {
return innerCheckAndAdjustSourceConfig(ctx, cfg, nil)
}

func parseSourceConfig(contents []string) ([]*config.SourceConfig, error) {
cfgs := make([]*config.SourceConfig, len(contents))
for i, content := range contents {
Expand Down
4 changes: 2 additions & 2 deletions dm/dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,12 @@ func (t *testMaster) SetUpSuite(c *check.C) {
t.workerClients = make(map[string]workerrpc.Client)
t.saveMaxRetryNum = maxRetryNum
maxRetryNum = 2
checkAndAdjustSourceConfigFunc = checkAndNoAdjustSourceConfigMock
checkAndAdjustSourceConfigForDMCtlFunc = checkAndNoAdjustSourceConfigMock
}

func (t *testMaster) TearDownSuite(c *check.C) {
maxRetryNum = t.saveMaxRetryNum
checkAndAdjustSourceConfigFunc = checkAndAdjustSourceConfig
checkAndAdjustSourceConfigForDMCtlFunc = checkAndAdjustSourceConfig
}

func (t *testMaster) SetUpTest(c *check.C) {
Expand Down
3 changes: 2 additions & 1 deletion dm/pkg/utils/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,13 @@ func FetchLowerCaseTableNamesSetting(ctx context.Context, conn *sql.Conn) (Lower
return LowerCaseTableNamesFlavor(res), nil
}

// GetDBCaseSensitive returns the case sensitive setting of target db.
// GetDBCaseSensitive returns the case-sensitive setting of target db.
func GetDBCaseSensitive(ctx context.Context, db *sql.DB) (bool, error) {
conn, err := db.Conn(ctx)
if err != nil {
return true, terror.DBErrorAdapt(err, terror.ErrDBDriverError)
}
defer conn.Close()
lcFlavor, err := FetchLowerCaseTableNamesSetting(ctx, conn)
if err != nil {
return true, err
Expand Down
2 changes: 2 additions & 0 deletions dm/syncer/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ func (b *binlogPoint) rollback(schemaTracker *schema.Tracker, schema string) (is
b.flushedLocation.ResetSuffix()
b.location = b.flushedLocation
if b.ti == nil {
// TODO: if we forget to save table info for table checkpoint, this is also nil!
// And table checkpoint rollback to flushed point may also be nil!
return // for global checkpoint, no need to rollback the schema.
}

Expand Down
4 changes: 3 additions & 1 deletion dm/syncer/dml_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,9 @@ func (w *DMLWorker) executeBatchJobs(queueID int, jobs []*job) {
time.Sleep(time.Duration(t) * time.Second)
})
// use background context to execute sqls as much as possible
ctx, cancel := w.tctx.WithTimeout(maxDMLExecutionDuration)
// set timeout to maxDMLConnectionDuration to make sure dmls can be replicated to downstream event if the latency is high
// if users need to quit this asap, we can support pause-task/stop-task --force in the future
ctx, cancel := w.tctx.WithTimeout(maxDMLConnectionDuration)
defer cancel()
affect, err = db.ExecuteSQL(ctx, queries, args...)
failpoint.Inject("SafeModeExit", func(val failpoint.Value) {
Expand Down
1 change: 1 addition & 0 deletions dm/syncer/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func (s *Syncer) skipQueryEvent(qec *queryEventContext, ddlInfo *ddlInfo) (bool,
if err != nil {
s.tctx.L().Warn("track ddl failed", zap.Stringer("ddl info", ddlInfo))
}
s.saveTablePoint(table, *qec.lastLocation)
s.tctx.L().Warn("track skipped ddl and return empty string", zap.String("origin sql", qec.originSQL), zap.Stringer("ddl info", ddlInfo))
ddlInfo.originDDL = ""
return true, nil
Expand Down
12 changes: 9 additions & 3 deletions dm/syncer/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ import (
"database/sql"

"github.com/DATA-DOG/go-sqlmock"
"github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
. "github.com/pingcap/check"
bf "github.com/pingcap/tidb-tools/pkg/binlog-filter"
"github.com/pingcap/tidb-tools/pkg/filter"
"github.com/pingcap/tidb/parser"

"github.com/pingcap/tiflow/dm/dm/config"
"github.com/pingcap/tiflow/dm/pkg/binlog"
"github.com/pingcap/tiflow/dm/pkg/conn"
tcontext "github.com/pingcap/tiflow/dm/pkg/context"
"github.com/pingcap/tiflow/dm/pkg/schema"
Expand Down Expand Up @@ -69,6 +70,7 @@ func (s *testFilterSuite) TestSkipQueryEvent(c *C) {
syncer.ddlDBConn = &dbconn.DBConn{Cfg: syncer.cfg, BaseConn: s.baseConn}
syncer.schemaTracker, err = schema.NewTracker(context.Background(), syncer.cfg.Name, defaultTestSessionCfg, syncer.ddlDBConn)
c.Assert(err, IsNil)
defer syncer.schemaTracker.Close()
syncer.exprFilterGroup = NewExprFilterGroup(utils.NewSessionCtx(nil), nil)

// test binlog filter
Expand Down Expand Up @@ -125,9 +127,13 @@ func (s *testFilterSuite) TestSkipQueryEvent(c *C) {
},
}
p := parser.New()
loc := binlog.NewLocation(mysql.MySQLFlavor)
qec := &queryEventContext{
eventContext: &eventContext{tctx: tcontext.Background()},
p: p,
eventContext: &eventContext{
tctx: tcontext.Background(),
lastLocation: &loc,
},
p: p,
}
for _, ca := range cases {
ddlInfo, err := syncer.genDDLInfo(p, ca.schema, ca.sql)
Expand Down
Loading

0 comments on commit 3c0641c

Please sign in to comment.