Skip to content
This repository has been archived by the owner on Dec 16, 2022. It is now read-only.

Commit

Permalink
Merge pull request vitessio#5627 from planetscale/mysql-replication-b…
Browse files Browse the repository at this point in the history
…ugfix

Fix MySQL replication error
  • Loading branch information
deepthi authored Jan 3, 2020
2 parents 5cb5ca6 + cafd109 commit 020bce0
Show file tree
Hide file tree
Showing 10 changed files with 265 additions and 9 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ require (
github.com/golang/mock v1.3.1
github.com/golang/protobuf v1.3.2
github.com/golang/snappy v0.0.0-20170215233205-553a64147049
github.com/google/btree v1.0.0 // indirect
github.com/golangci/gocyclo v0.0.0-20180528144436-0a533e8fa43d // indirect
github.com/golangci/golangci-lint v1.21.0 // indirect
github.com/golangci/revgrep v0.0.0-20180812185044-276a5c0a1039 // indirect
Expand All @@ -50,6 +51,7 @@ require (
github.com/mattn/go-runewidth v0.0.1 // indirect
github.com/minio/minio-go v0.0.0-20190131015406-c8a261de75c1
github.com/mitchellh/go-testing-interface v1.0.0 // indirect
github.com/mitchellh/mapstructure v1.1.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/olekukonko/tablewriter v0.0.0-20160115111002-cca8bbc07984
Expand Down
8 changes: 8 additions & 0 deletions go/mysql/flavor.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ type flavor interface {
// startSlave returns the command to start the slave.
startSlaveCommand() string

// restartSlave returns the commands to stop, reset and start the slave.
restartSlaveCommands() []string

// startSlaveUntilAfter will restart replication, but only allow it
// to run until `pos` is reached. After reaching pos, replication will be stopped again
startSlaveUntilAfter(pos Position) string
Expand Down Expand Up @@ -165,6 +168,11 @@ func (c *Conn) StartSlaveCommand() string {
return c.flavor.startSlaveCommand()
}

// RestartSlaveCommands returns the commands to stop, reset and start the slave.
func (c *Conn) RestartSlaveCommands() []string {
return c.flavor.restartSlaveCommands()
}

// StartSlaveUntilAfterCommand returns the command to start the slave.
func (c *Conn) StartSlaveUntilAfterCommand(pos Position) string {
return c.flavor.startSlaveUntilAfter(pos)
Expand Down
4 changes: 4 additions & 0 deletions go/mysql/flavor_filepos.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ func (flv *filePosFlavor) startSlaveCommand() string {
return "unsupported"
}

func (flv *filePosFlavor) restartSlaveCommands() []string {
return []string{"unsupported"}
}

func (flv *filePosFlavor) stopSlaveCommand() string {
return "unsupported"
}
Expand Down
8 changes: 8 additions & 0 deletions go/mysql/flavor_mariadb.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ func (mariadbFlavor) startSlaveCommand() string {
return "START SLAVE"
}

func (mariadbFlavor) restartSlaveCommands() []string {
return []string{
"STOP SLAVE",
"RESET SLAVE",
"START SLAVE",
}
}

func (mariadbFlavor) stopSlaveCommand() string {
return "STOP SLAVE"
}
Expand Down
8 changes: 8 additions & 0 deletions go/mysql/flavor_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ func (mysqlFlavor) startSlaveCommand() string {
return "START SLAVE"
}

func (mysqlFlavor) restartSlaveCommands() []string {
return []string{
"STOP SLAVE",
"RESET SLAVE",
"START SLAVE",
}
}

func (mysqlFlavor) startSlaveUntilAfter(pos Position) string {
return fmt.Sprintf("START SLAVE UNTIL SQL_AFTER_GTIDS = '%s'", pos)
}
Expand Down
40 changes: 34 additions & 6 deletions go/vt/mysqlctl/fakemysqldaemon/fakemysqldaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,20 @@ type FakeMysqlDaemon struct {
// test owner responsibility to have these two match)
Replicating bool

// SlaveIORunning is always true except in one testcase
// where we want to test error handling during SetMaster
SlaveIORunning bool

// CurrentMasterPosition is returned by MasterPosition
// and SlaveStatus
CurrentMasterPosition mysql.Position

// SlaveStatusError is used by SlaveStatus
SlaveStatusError error

// StartSlaveError is used by StartSlave
StartSlaveError error

// CurrentMasterHost is returned by SlaveStatus
CurrentMasterHost string

Expand All @@ -91,6 +98,9 @@ type FakeMysqlDaemon struct {
// (as "%v:%v"). If it doesn't match, SetMaster will return an error.
SetMasterInput string

// SetMasterError is used by SetMaster
SetMasterError error

// WaitMasterPosition is checked by WaitMasterPos, if the
// same it returns nil, if different it returns an error
WaitMasterPosition mysql.Position
Expand Down Expand Up @@ -147,8 +157,9 @@ type FakeMysqlDaemon struct {
// 'db' can be nil if the test doesn't use a database at all.
func NewFakeMysqlDaemon(db *fakesqldb.DB) *FakeMysqlDaemon {
result := &FakeMysqlDaemon{
db: db,
Running: true,
db: db,
Running: true,
SlaveIORunning: true,
}
if db != nil {
result.appPool = dbconnpool.NewConnectionPool("AppConnPool", 5, time.Minute, 0)
Expand Down Expand Up @@ -211,10 +222,12 @@ func (fmd *FakeMysqlDaemon) SlaveStatus() (mysql.SlaveStatus, error) {
return mysql.SlaveStatus{
Position: fmd.CurrentMasterPosition,
SecondsBehindMaster: fmd.SecondsBehindMaster,
SlaveIORunning: fmd.Replicating,
SlaveSQLRunning: fmd.Replicating,
MasterHost: fmd.CurrentMasterHost,
MasterPort: fmd.CurrentMasterPort,
// implemented as AND to avoid changing all tests that were
// previously using Replicating = false
SlaveIORunning: fmd.Replicating && fmd.SlaveIORunning,
SlaveSQLRunning: fmd.Replicating,
MasterHost: fmd.CurrentMasterHost,
MasterPort: fmd.CurrentMasterPort,
}, nil
}

Expand Down Expand Up @@ -250,11 +263,23 @@ func (fmd *FakeMysqlDaemon) SetSuperReadOnly(on bool) error {

// StartSlave is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) StartSlave(hookExtraEnv map[string]string) error {
if fmd.StartSlaveError != nil {
return fmd.StartSlaveError
}
return fmd.ExecuteSuperQueryList(context.Background(), []string{
"START SLAVE",
})
}

// RestartSlave is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) RestartSlave(hookExtraEnv map[string]string) error {
return fmd.ExecuteSuperQueryList(context.Background(), []string{
"STOP SLAVE",
"RESET SLAVE",
"START SLAVE",
})
}

// StartSlaveUntilAfter is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) StartSlaveUntilAfter(ctx context.Context, pos mysql.Position) error {
if !reflect.DeepEqual(fmd.StartSlaveUntilAfterPos, pos) {
Expand Down Expand Up @@ -289,6 +314,9 @@ func (fmd *FakeMysqlDaemon) SetMaster(ctx context.Context, masterHost string, ma
if fmd.SetMasterInput != input {
return fmt.Errorf("wrong input for SetMasterCommands: expected %v got %v", fmd.SetMasterInput, input)
}
if fmd.SetMasterError != nil {
return fmd.SetMasterError
}
cmds := []string{}
if slaveStopBefore {
cmds = append(cmds, "STOP SLAVE")
Expand Down
1 change: 1 addition & 0 deletions go/vt/mysqlctl/mysql_daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type MysqlDaemon interface {

// replication related methods
StartSlave(hookExtraEnv map[string]string) error
RestartSlave(hookExtraEnv map[string]string) error
StartSlaveUntilAfter(ctx context.Context, pos mysql.Position) error
StopSlave(hookExtraEnv map[string]string) error
SlaveStatus() (mysql.SlaveStatus, error)
Expand Down
23 changes: 23 additions & 0 deletions go/vt/mysqlctl/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,29 @@ func (mysqld *Mysqld) StopSlave(hookExtraEnv map[string]string) error {
return mysqld.executeSuperQueryListConn(ctx, conn, []string{conn.StopSlaveCommand()})
}

// RestartSlave stops, resets and starts a slave.
func (mysqld *Mysqld) RestartSlave(hookExtraEnv map[string]string) error {
h := hook.NewSimpleHook("preflight_stop_slave")
h.ExtraEnv = hookExtraEnv
if err := h.ExecuteOptional(); err != nil {
return err
}
ctx := context.TODO()
conn, err := getPoolReconnect(ctx, mysqld.dbaPool)
if err != nil {
return err
}
defer conn.Recycle()

if err := mysqld.executeSuperQueryListConn(ctx, conn, conn.RestartSlaveCommands()); err != nil {
return err
}

h = hook.NewSimpleHook("postflight_start_slave")
h.ExtraEnv = hookExtraEnv
return h.ExecuteOptional()
}

// GetMysqlPort returns mysql port
func (mysqld *Mysqld) GetMysqlPort() (int32, error) {
qr, err := mysqld.FetchSuperQuery(context.TODO(), "SHOW VARIABLES LIKE 'port'")
Expand Down
24 changes: 21 additions & 3 deletions go/vt/vttablet/tabletmanager/rpc_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package tabletmanager
import (
"flag"
"fmt"
"strings"
"time"

"vitess.io/vitess/go/vt/logutil"
Expand Down Expand Up @@ -634,7 +635,6 @@ func (agent *ActionAgent) setMasterLocked(ctx context.Context, parentAlias *topo
if err := agent.fixSemiSync(tabletType); err != nil {
return err
}

// Update the master address only if needed.
// We don't want to interrupt replication for no reason.
parent, err := agent.TopoServer.GetTablet(ctx, parentAlias)
Expand All @@ -646,13 +646,17 @@ func (agent *ActionAgent) setMasterLocked(ctx context.Context, parentAlias *topo
if status.MasterHost != masterHost || status.MasterPort != masterPort {
// This handles both changing the address and starting replication.
if err := agent.MysqlDaemon.SetMaster(ctx, masterHost, masterPort, wasReplicating, shouldbeReplicating); err != nil {
return err
if err := agent.handleRelayLogError(err); err != nil {
return err
}
}
} else if shouldbeReplicating {
// The address is correct. Just start replication if needed.
if !status.SlaveRunning() {
if err := agent.MysqlDaemon.StartSlave(agent.hookExtraEnv()); err != nil {
return err
if err := agent.handleRelayLogError(err); err != nil {
return err
}
}
}
}
Expand Down Expand Up @@ -853,3 +857,17 @@ func (agent *ActionAgent) fixSemiSyncAndReplication(tabletType topodatapb.Tablet
}
return nil
}

func (agent *ActionAgent) handleRelayLogError(err error) error {
// attempt to fix this error:
// Slave failed to initialize relay log info structure from the repository (errno 1872) (sqlstate HY000) during query: START SLAVE
// see https://bugs.mysql.com/bug.php?id=83713 or https://github.com/vitessio/vitess/issues/5067
if strings.Contains(err.Error(), "Slave failed to initialize relay log info structure from the repository") {
// Stop, reset and start slave again to resolve this error
if err := agent.MysqlDaemon.RestartSlave(agent.hookExtraEnv()); err != nil {
return err
}
return nil
}
return err
}
Loading

0 comments on commit 020bce0

Please sign in to comment.