Skip to content

Commit

Permalink
rebuild mysql conn when retry failed chunks and support `transactiona…
Browse files Browse the repository at this point in the history
…l-consistency` parameter (pingcap#199)

* support rebuild mysql connection to retry failed chunks

* refine consistency variables

* add --transactional-consistency and support rebuilding mysql conn to retry
  • Loading branch information
lichunzhu authored Nov 17, 2020
1 parent 552470f commit fe21f01
Show file tree
Hide file tree
Showing 9 changed files with 235 additions and 128 deletions.
109 changes: 58 additions & 51 deletions v4/export/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,46 +23,47 @@ import (
)

const (
flagDatabase = "database"
flagTablesList = "tables-list"
flagHost = "host"
flagUser = "user"
flagPort = "port"
flagPassword = "password"
flagAllowCleartextPasswords = "allow-cleartext-passwords"
flagThreads = "threads"
flagFilesize = "filesize"
flagStatementSize = "statement-size"
flagOutput = "output"
flagLoglevel = "loglevel"
flagLogfile = "logfile"
flagLogfmt = "logfmt"
flagConsistency = "consistency"
flagSnapshot = "snapshot"
flagNoViews = "no-views"
flagStatusAddr = "status-addr"
flagRows = "rows"
flagWhere = "where"
flagEscapeBackslash = "escape-backslash"
flagFiletype = "filetype"
flagNoHeader = "no-header"
flagNoSchemas = "no-schemas"
flagNoData = "no-data"
flagCsvNullValue = "csv-null-value"
flagSql = "sql"
flagFilter = "filter"
flagCaseSensitive = "case-sensitive"
flagDumpEmptyDatabase = "dump-empty-database"
flagTidbMemQuotaQuery = "tidb-mem-quota-query"
flagCA = "ca"
flagCert = "cert"
flagKey = "key"
flagCsvSeparator = "csv-separator"
flagCsvDelimiter = "csv-delimiter"
flagOutputFilenameTemplate = "output-filename-template"
flagCompleteInsert = "complete-insert"
flagParams = "params"
flagReadTimeout = "read-timeout"
flagDatabase = "database"
flagTablesList = "tables-list"
flagHost = "host"
flagUser = "user"
flagPort = "port"
flagPassword = "password"
flagAllowCleartextPasswords = "allow-cleartext-passwords"
flagThreads = "threads"
flagFilesize = "filesize"
flagStatementSize = "statement-size"
flagOutput = "output"
flagLoglevel = "loglevel"
flagLogfile = "logfile"
flagLogfmt = "logfmt"
flagConsistency = "consistency"
flagSnapshot = "snapshot"
flagNoViews = "no-views"
flagStatusAddr = "status-addr"
flagRows = "rows"
flagWhere = "where"
flagEscapeBackslash = "escape-backslash"
flagFiletype = "filetype"
flagNoHeader = "no-header"
flagNoSchemas = "no-schemas"
flagNoData = "no-data"
flagCsvNullValue = "csv-null-value"
flagSql = "sql"
flagFilter = "filter"
flagCaseSensitive = "case-sensitive"
flagDumpEmptyDatabase = "dump-empty-database"
flagTidbMemQuotaQuery = "tidb-mem-quota-query"
flagCA = "ca"
flagCert = "cert"
flagKey = "key"
flagCsvSeparator = "csv-separator"
flagCsvDelimiter = "csv-delimiter"
flagOutputFilenameTemplate = "output-filename-template"
flagCompleteInsert = "complete-insert"
flagParams = "params"
flagReadTimeout = "read-timeout"
flagTransactionalConsistency = "transactional-consistency"

FlagHelp = "help"
)
Expand Down Expand Up @@ -108,15 +109,16 @@ type Config struct {
CsvDelimiter string
ReadTimeout time.Duration

TableFilter filter.Filter `json:"-"`
Rows uint64
Where string
FileType string
CompleteInsert bool
EscapeBackslash bool
DumpEmptyDatabase bool
OutputFileTemplate *template.Template `json:"-"`
SessionParams map[string]interface{}
TableFilter filter.Filter `json:"-"`
Rows uint64
Where string
FileType string
CompleteInsert bool
TransactionalConsistency bool
EscapeBackslash bool
DumpEmptyDatabase bool
OutputFileTemplate *template.Template `json:"-"`
SessionParams map[string]interface{}

PosAfterConnect bool

Expand All @@ -141,7 +143,7 @@ func DefaultConfig() *Config {
SortByPk: true,
Tables: nil,
Snapshot: "",
Consistency: "auto",
Consistency: consistencyTypeAuto,
NoViews: true,
Rows: UnspecifiedSize,
Where: "",
Expand Down Expand Up @@ -202,7 +204,7 @@ func (conf *Config) DefineFlags(flags *pflag.FlagSet) {
flags.String(flagLoglevel, "info", "Log level: {debug|info|warn|error|dpanic|panic|fatal}")
flags.StringP(flagLogfile, "L", "", "Log file `path`, leave empty to write to console")
flags.String(flagLogfmt, "text", "Log `format`: {text|json}")
flags.String(flagConsistency, "auto", "Consistency level during dumping: {auto|none|flush|lock|snapshot}")
flags.String(flagConsistency, consistencyTypeAuto, "Consistency level during dumping: {auto|none|flush|lock|snapshot}")
flags.String(flagSnapshot, "", "Snapshot position (uint64 from pd timestamp for TiDB). Valid only when consistency=snapshot")
flags.BoolP(flagNoViews, "W", true, "Do not dump views")
flags.String(flagStatusAddr, ":8281", "dumpling API server and pprof addr")
Expand Down Expand Up @@ -230,6 +232,7 @@ func (conf *Config) DefineFlags(flags *pflag.FlagSet) {
flags.Bool(FlagHelp, false, "Print help message and quit")
flags.Duration(flagReadTimeout, 15*time.Minute, "I/O read timeout for db connection.")
flags.MarkHidden(flagReadTimeout)
flags.Bool(flagTransactionalConsistency, true, "Only support transactional consistency")
}

// GetDSN generates DSN from Config
Expand Down Expand Up @@ -367,6 +370,10 @@ func (conf *Config) ParseFromFlags(flags *pflag.FlagSet) error {
if err != nil {
return errors.Trace(err)
}
conf.TransactionalConsistency, err = flags.GetBool(flagTransactionalConsistency)
if err != nil {
return errors.Trace(err)
}

if conf.Threads <= 0 {
return errors.Errorf("--threads is set to %d. It should be greater than 0", conf.Threads)
Expand Down
43 changes: 35 additions & 8 deletions v4/export/consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,37 @@ import (
"github.com/pingcap/errors"
)

const (
consistencyTypeAuto = "auto"
consistencyTypeFlush = "flush"
consistencyTypeLock = "lock"
consistencyTypeSnapshot = "snapshot"
consistencyTypeNone = "none"
)

func NewConsistencyController(ctx context.Context, conf *Config, session *sql.DB) (ConsistencyController, error) {
resolveAutoConsistency(conf)
conn, err := session.Conn(ctx)
if err != nil {
return nil, err
}
switch conf.Consistency {
case "flush":
case consistencyTypeFlush:
return &ConsistencyFlushTableWithReadLock{
serverType: conf.ServerInfo.ServerType,
conn: conn,
}, nil
case "lock":
case consistencyTypeLock:
return &ConsistencyLockDumpingTables{
conn: conn,
allTables: conf.Tables,
}, nil
case "snapshot":
case consistencyTypeSnapshot:
if conf.ServerInfo.ServerType != ServerTypeTiDB {
return nil, errors.New("snapshot consistency is not supported for this server")
}
return &ConsistencyNone{}, nil
case "none":
case consistencyTypeNone:
return &ConsistencyNone{}, nil
default:
return nil, errors.Errorf("invalid consistency option %s", conf.Consistency)
Expand All @@ -39,6 +47,7 @@ func NewConsistencyController(ctx context.Context, conf *Config, session *sql.DB
type ConsistencyController interface {
Setup(context.Context) error
TearDown(context.Context) error
PingContext(context.Context) error
}

type ConsistencyNone struct{}
Expand All @@ -51,6 +60,10 @@ func (c *ConsistencyNone) TearDown(_ context.Context) error {
return nil
}

func (c *ConsistencyNone) PingContext(_ context.Context) error {
return nil
}

type ConsistencyFlushTableWithReadLock struct {
serverType ServerType
conn *sql.Conn
Expand All @@ -74,6 +87,13 @@ func (c *ConsistencyFlushTableWithReadLock) TearDown(ctx context.Context) error
return UnlockTables(ctx, c.conn)
}

func (c *ConsistencyFlushTableWithReadLock) PingContext(ctx context.Context) error {
if c.conn == nil {
return errors.New("consistency connection has already been closed!")
}
return c.conn.PingContext(ctx)
}

type ConsistencyLockDumpingTables struct {
conn *sql.Conn
allTables DatabaseTables
Expand Down Expand Up @@ -102,18 +122,25 @@ func (c *ConsistencyLockDumpingTables) TearDown(ctx context.Context) error {
return UnlockTables(ctx, c.conn)
}

func (c *ConsistencyLockDumpingTables) PingContext(ctx context.Context) error {
if c.conn == nil {
return errors.New("consistency connection has already been closed!")
}
return c.conn.PingContext(ctx)
}

const snapshotFieldIndex = 1

func resolveAutoConsistency(conf *Config) {
if conf.Consistency != "auto" {
if conf.Consistency != consistencyTypeAuto {
return
}
switch conf.ServerInfo.ServerType {
case ServerTypeTiDB:
conf.Consistency = "snapshot"
conf.Consistency = consistencyTypeSnapshot
case ServerTypeMySQL, ServerTypeMariaDB:
conf.Consistency = "flush"
conf.Consistency = consistencyTypeFlush
default:
conf.Consistency = "none"
conf.Consistency = consistencyTypeNone
}
}
24 changes: 12 additions & 12 deletions v4/export/consistency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ func (s *testConsistencySuite) TestConsistencyController(c *C) {
conf := DefaultConfig()
resultOk := sqlmock.NewResult(0, 1)

conf.Consistency = "none"
conf.Consistency = consistencyTypeNone
ctrl, _ := NewConsistencyController(ctx, conf, db)
_, ok := ctrl.(*ConsistencyNone)
c.Assert(ok, IsTrue)
s.assertLifetimeErrNil(ctx, ctrl, c)

conf.Consistency = "flush"
conf.Consistency = consistencyTypeFlush
mock.ExpectExec("FLUSH TABLES WITH READ LOCK").WillReturnResult(resultOk)
mock.ExpectExec("UNLOCK TABLES").WillReturnResult(resultOk)
ctrl, _ = NewConsistencyController(ctx, conf, db)
Expand All @@ -50,14 +50,14 @@ func (s *testConsistencySuite) TestConsistencyController(c *C) {
c.Fatalf(err.Error())
}

conf.Consistency = "snapshot"
conf.Consistency = consistencyTypeSnapshot
conf.ServerInfo.ServerType = ServerTypeTiDB
ctrl, _ = NewConsistencyController(ctx, conf, db)
_, ok = ctrl.(*ConsistencyNone)
c.Assert(ok, IsTrue)
s.assertLifetimeErrNil(ctx, ctrl, c)

conf.Consistency = "lock"
conf.Consistency = consistencyTypeLock
conf.Tables = NewDatabaseTables().
AppendTables("db1", "t1", "t2", "t3").
AppendViews("db2", "t4")
Expand All @@ -80,14 +80,14 @@ func (s *testConsistencySuite) TestResolveAutoConsistency(c *C) {
serverTp ServerType
resolvedConsistency string
}{
{ServerTypeTiDB, "snapshot"},
{ServerTypeMySQL, "flush"},
{ServerTypeMariaDB, "flush"},
{ServerTypeUnknown, "none"},
{ServerTypeTiDB, consistencyTypeSnapshot},
{ServerTypeMySQL, consistencyTypeFlush},
{ServerTypeMariaDB, consistencyTypeFlush},
{ServerTypeUnknown, consistencyTypeNone},
}

for _, x := range cases {
conf.Consistency = "auto"
conf.Consistency = consistencyTypeAuto
conf.ServerInfo.ServerType = x.serverTp
resolveAutoConsistency(conf)
cmt := Commentf("server type %s", x.serverTp.String())
Expand All @@ -109,20 +109,20 @@ func (s *testConsistencySuite) TestConsistencyControllerError(c *C) {
c.Assert(strings.Contains(err.Error(), "invalid consistency option"), IsTrue)

// snapshot consistency is only available in TiDB
conf.Consistency = "snapshot"
conf.Consistency = consistencyTypeSnapshot
conf.ServerInfo.ServerType = ServerTypeUnknown
_, err = NewConsistencyController(ctx, conf, db)
c.Assert(err, NotNil)

// flush consistency is unavailable in TiDB
conf.Consistency = "flush"
conf.Consistency = consistencyTypeFlush
conf.ServerInfo.ServerType = ServerTypeTiDB
ctrl, _ := NewConsistencyController(ctx, conf, db)
err = ctrl.Setup(ctx)
c.Assert(err, NotNil)

// lock table fail
conf.Consistency = "lock"
conf.Consistency = consistencyTypeLock
conf.Tables = NewDatabaseTables().AppendTables("db", "t")
mock.ExpectExec("LOCK TABLE").WillReturnError(errors.New(""))
ctrl, _ = NewConsistencyController(ctx, conf, db)
Expand Down
Loading

0 comments on commit fe21f01

Please sign in to comment.