diff --git a/go/flags/endtoend/vtbackup.txt b/go/flags/endtoend/vtbackup.txt index 4702c3e3845..8f3811c58ec 100644 --- a/go/flags/endtoend/vtbackup.txt +++ b/go/flags/endtoend/vtbackup.txt @@ -120,6 +120,12 @@ Usage of vtbackup: --mycnf_slow_log_path string mysql slow query log path --mycnf_socket_file string mysql socket file --mycnf_tmp_dir string mysql tmp directory + --mysql-shell-backup-location string location where the backup will be stored + --mysql-shell-dump-flags string flags to pass to mysql shell dump utility. This should be a JSON string and will be saved in the MANIFEST (default "{\"threads\": 4}") + --mysql-shell-flags string execution flags to pass to mysqlsh binary to be used during dump/load (default "--defaults-file=/dev/null --js -h localhost") + --mysql-shell-load-flags string flags to pass to mysql shell load utility. This should be a JSON string (default "{\"threads\": 4, \"loadUsers\": true, \"updateGtidSet\": \"replace\", \"skipBinlog\": true, \"progressFile\": \"\"}") + --mysql-shell-should-drain decide if we should drain while taking a backup or continue to serving traffic + --mysql-shell-speedup-restore speed up restore by disabling redo logging and double write buffer during the restore process --mysql_port int mysql port (default 3306) --mysql_server_version string MySQL server version to advertise. --mysql_socket string path to the mysql socket diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt index 94f1170f7c3..c3b1fd8cb01 100644 --- a/go/flags/endtoend/vttablet.txt +++ b/go/flags/endtoend/vttablet.txt @@ -194,6 +194,12 @@ Usage of vttablet: --mycnf_slow_log_path string mysql slow query log path --mycnf_socket_file string mysql socket file --mycnf_tmp_dir string mysql tmp directory + --mysql-shell-backup-location string location where the backup will be stored + --mysql-shell-dump-flags string flags to pass to mysql shell dump utility. This should be a JSON string and will be saved in the MANIFEST (default "{\"threads\": 4}") + --mysql-shell-flags string execution flags to pass to mysqlsh binary to be used during dump/load (default "--defaults-file=/dev/null --js -h localhost") + --mysql-shell-load-flags string flags to pass to mysql shell load utility. This should be a JSON string (default "{\"threads\": 4, \"loadUsers\": true, \"updateGtidSet\": \"replace\", \"skipBinlog\": true, \"progressFile\": \"\"}") + --mysql-shell-should-drain decide if we should drain while taking a backup or continue to serving traffic + --mysql-shell-speedup-restore speed up restore by disabling redo logging and double write buffer during the restore process --mysql_server_version string MySQL server version to advertise. --mysqlctl_mycnf_template string template file to use for generating the my.cnf file during server init --mysqlctl_socket string socket file to use for remote mysqlctl actions (empty for local actions) diff --git a/go/flags/endtoend/vttestserver.txt b/go/flags/endtoend/vttestserver.txt index a5ebcb7cc82..071f9e69d98 100644 --- a/go/flags/endtoend/vttestserver.txt +++ b/go/flags/endtoend/vttestserver.txt @@ -72,6 +72,12 @@ Usage of vttestserver: --logtostderr log to standard error instead of files --max_table_shard_size int The maximum number of initial rows in a table shard. Ignored if--initialize_with_random_data is false. The actual number is chosen randomly (default 10000) --min_table_shard_size int The minimum number of initial rows in a table shard. Ignored if--initialize_with_random_data is false. The actual number is chosen randomly. (default 1000) + --mysql-shell-backup-location string location where the backup will be stored + --mysql-shell-dump-flags string flags to pass to mysql shell dump utility. This should be a JSON string and will be saved in the MANIFEST (default "{\"threads\": 4}") + --mysql-shell-flags string execution flags to pass to mysqlsh binary to be used during dump/load (default "--defaults-file=/dev/null --js -h localhost") + --mysql-shell-load-flags string flags to pass to mysql shell load utility. This should be a JSON string (default "{\"threads\": 4, \"loadUsers\": true, \"updateGtidSet\": \"replace\", \"skipBinlog\": true, \"progressFile\": \"\"}") + --mysql-shell-should-drain decide if we should drain while taking a backup or continue to serving traffic + --mysql-shell-speedup-restore speed up restore by disabling redo logging and double write buffer during the restore process --mysql_bind_host string which host to bind vtgate mysql listener to (default "localhost") --mysql_only If this flag is set only mysql is initialized. The rest of the vitess components are not started. Also, the output specifies the mysql unix socket instead of the vtgate port. --mysql_server_version string MySQL server version to advertise. diff --git a/go/ioutil/writer.go b/go/ioutil/writer.go new file mode 100644 index 00000000000..80ad87428bc --- /dev/null +++ b/go/ioutil/writer.go @@ -0,0 +1,39 @@ +/* +Copyright 2022 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/* +MeteredWriteCloser and MeteredWriter are respectively, time-and-byte-tracking +wrappers around WriteCloser and Writer. +*/ + +package ioutil + +import ( + "bytes" +) + +// BytesBufferWriter implements io.WriteCloser using an in-memory buffer. +type BytesBufferWriter struct { + *bytes.Buffer +} + +func (m BytesBufferWriter) Close() error { + return nil +} + +func NewBytesBufferWriter() BytesBufferWriter { + return BytesBufferWriter{bytes.NewBuffer(nil)} +} diff --git a/go/test/fuzzing/tablet_manager_fuzzer.go b/go/test/fuzzing/tablet_manager_fuzzer.go index 0e6b6aaece7..316cf75fb82 100644 --- a/go/test/fuzzing/tablet_manager_fuzzer.go +++ b/go/test/fuzzing/tablet_manager_fuzzer.go @@ -22,7 +22,7 @@ import ( "vitess.io/vitess/go/mysql/fakesqldb" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/dbconfigs" - "vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon" + "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/vttablet/tabletmanager" "vitess.io/vitess/go/vt/vttablet/tabletservermock" @@ -42,7 +42,7 @@ func FuzzTabletManagerExecuteFetchAsDba(data []byte) int { cp := mysql.ConnParams{} db := fakesqldb.New(t) db.AddQueryPattern(".*", &sqltypes.Result{}) - daemon := fakemysqldaemon.NewFakeMysqlDaemon(db) + daemon := mysqlctl.NewFakeMysqlDaemon(db) dbName := "dbname" tm := &tabletmanager.TabletManager{ diff --git a/go/vt/mysqlctl/backup.go b/go/vt/mysqlctl/backup.go index 7d62681c5dc..00d9ab89b8c 100644 --- a/go/vt/mysqlctl/backup.go +++ b/go/vt/mysqlctl/backup.go @@ -17,16 +17,18 @@ limitations under the License. package mysqlctl import ( + "bufio" "errors" "fmt" + "io" "os" "path/filepath" - "strconv" "strings" "time" "github.com/spf13/pflag" + "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/servenv" "context" @@ -338,31 +340,18 @@ func Restore(ctx context.Context, params RestoreParams) (*BackupManifest, error) return nil, err } - // mysqld needs to be running in order for mysql_upgrade to work. - // If we've just restored from a backup from previous MySQL version then mysqld - // may fail to start due to a different structure of mysql.* tables. The flag - // --skip-grant-tables ensures that these tables are not read until mysql_upgrade - // is executed. And since with --skip-grant-tables anyone can connect to MySQL - // without password, we are passing --skip-networking to greatly reduce the set - // of those who can connect. - params.Logger.Infof("Restore: starting mysqld for mysql_upgrade") - // Note Start will use dba user for waiting, this is fine, it will be allowed. - err = params.Mysqld.Start(context.Background(), params.Cnf, "--skip-grant-tables", "--skip-networking") - if err != nil { - return nil, err - } - - // We disable super_read_only, in case it is in the default MySQL startup - // parameters and will be blocking the writes we need to do in - // PopulateMetadataTables(). We do it blindly, since - // this will fail on MariaDB, which doesn't have super_read_only - // This is safe, since we're restarting MySQL after the restore anyway - params.Logger.Infof("Restore: disabling super_read_only") - if err := params.Mysqld.SetSuperReadOnly(false); err != nil { - if strings.Contains(err.Error(), strconv.Itoa(mysql.ERUnknownSystemVariable)) { - params.Logger.Warningf("Restore: server does not know about super_read_only, continuing anyway...") - } else { - params.Logger.Errorf("Restore: unexpected error while trying to set super_read_only: %v", err) + if re.ShouldStartMySQLAfterRestore() { + // mysqld needs to be running in order for mysql_upgrade to work. + // If we've just restored from a backup from previous MySQL version then mysqld + // may fail to start due to a different structure of mysql.* tables. The flag + // --skip-grant-tables ensures that these tables are not read until mysql_upgrade + // is executed. And since with --skip-grant-tables anyone can connect to MySQL + // without password, we are passing --skip-networking to greatly reduce the set + // of those who can connect. + params.Logger.Infof("Restore: starting mysqld for mysql_upgrade") + // Note Start will use dba user for waiting, this is fine, it will be allowed. + err = params.Mysqld.Start(context.Background(), params.Cnf, "--skip-grant-tables", "--skip-networking") + if err != nil { return nil, err } } @@ -403,3 +392,24 @@ func Restore(ctx context.Context, params RestoreParams) (*BackupManifest, error) restoreDuration.Set(int64(time.Since(startTs).Seconds())) return manifest, nil } + +// scanLinesToLogger scans full lines from the given Reader and sends them to +// the given Logger until EOF. +func scanLinesToLogger(prefix string, reader io.Reader, logger logutil.Logger, doneFunc func()) { + defer doneFunc() + + scanner := bufio.NewScanner(reader) + for scanner.Scan() { + line := scanner.Text() + logger.Infof("%s: %s", prefix, line) + } + if err := scanner.Err(); err != nil { + // This is usually run in a background goroutine, so there's no point + // returning an error. Just log it. + logger.Warningf("error scanning lines from %s: %v", prefix, err) + } +} + +func FormatRFC3339(t time.Time) string { + return t.Format(time.RFC3339) +} diff --git a/go/vt/mysqlctl/backup_test.go b/go/vt/mysqlctl/backup_test.go index 08d5e31a116..d68af0e044a 100644 --- a/go/vt/mysqlctl/backup_test.go +++ b/go/vt/mysqlctl/backup_test.go @@ -17,13 +17,19 @@ limitations under the License. package mysqlctl import ( + "fmt" + "io" "os" "path" "reflect" "sort" + "sync" "testing" + "github.com/stretchr/testify/require" + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/vt/logutil" ) func TestFindFilesToBackupWithoutRedoLog(t *testing.T) { @@ -214,3 +220,26 @@ type forTest []FileEntry func (f forTest) Len() int { return len(f) } func (f forTest) Swap(i, j int) { f[i], f[j] = f[j], f[i] } func (f forTest) Less(i, j int) bool { return f[i].Base+f[i].Name < f[j].Base+f[j].Name } + +func TestScanLinesToLogger(t *testing.T) { + reader, writer := io.Pipe() + logger := logutil.NewMemoryLogger() + var wg sync.WaitGroup + + wg.Add(1) + go scanLinesToLogger("test", reader, logger, wg.Done) + + for i := range [100]int{} { + _, err := writer.Write([]byte(fmt.Sprintf("foobar %d\n", i))) + require.NoError(t, err) + } + + writer.Close() + wg.Wait() + + require.Equal(t, 100, len(logger.Events)) + + for i, event := range logger.Events { + require.Equal(t, fmt.Sprintf("test: foobar %d", i), event.Value) + } +} diff --git a/go/vt/mysqlctl/backupengine.go b/go/vt/mysqlctl/backupengine.go index 2b0b08f7e14..68f5db069df 100644 --- a/go/vt/mysqlctl/backupengine.go +++ b/go/vt/mysqlctl/backupengine.go @@ -99,6 +99,7 @@ type RestoreParams struct { // Returns the manifest of a backup if successful, otherwise returns an error type RestoreEngine interface { ExecuteRestore(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle) (*BackupManifest, error) + ShouldStartMySQLAfterRestore() bool } // BackupRestoreEngine is a combination of BackupEngine and RestoreEngine. diff --git a/go/vt/mysqlctl/builtinbackupengine.go b/go/vt/mysqlctl/builtinbackupengine.go index f4c0a0b5161..cd5ee811c5d 100644 --- a/go/vt/mysqlctl/builtinbackupengine.go +++ b/go/vt/mysqlctl/builtinbackupengine.go @@ -770,6 +770,11 @@ func (be *BuiltinBackupEngine) ShouldDrainForBackup() bool { return true } +// ShouldStartMySQLAfterRestore signifies if this backup engine needs to restart MySQL once the restore is completed. +func (be *BuiltinBackupEngine) ShouldStartMySQLAfterRestore() bool { + return true +} + func getPrimaryPosition(ctx context.Context, tmc tmclient.TabletManagerClient, ts *topo.Server, keyspace, shard string) (mysql.Position, error) { si, err := ts.GetShard(ctx, keyspace, shard) if err != nil { diff --git a/go/vt/mysqlctl/builtinbackupengine_test.go b/go/vt/mysqlctl/builtinbackupengine_test.go index b6837380db7..280de5ac18f 100644 --- a/go/vt/mysqlctl/builtinbackupengine_test.go +++ b/go/vt/mysqlctl/builtinbackupengine_test.go @@ -19,7 +19,6 @@ import ( "vitess.io/vitess/go/mysql/fakesqldb" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/mysqlctl" - "vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon" "vitess.io/vitess/go/vt/mysqlctl/filebackupstorage" "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/proto/vttime" @@ -104,7 +103,7 @@ func TestExecuteBackup(t *testing.T) { // Spin up a fake daemon to be used in backups. It needs to be allowed to receive: // "STOP SLAVE", "START SLAVE", in that order. - mysqld := fakemysqldaemon.NewFakeMysqlDaemon(fakesqldb.New(t)) + mysqld := mysqlctl.NewFakeMysqlDaemon(fakesqldb.New(t)) mysqld.ExpectedExecuteSuperQueryList = []string{"STOP SLAVE", "START SLAVE"} // mysqld.ShutdownTime = time.Minute diff --git a/go/vt/mysqlctl/fakebackupengine.go b/go/vt/mysqlctl/fakebackupengine.go new file mode 100644 index 00000000000..c0fce435d35 --- /dev/null +++ b/go/vt/mysqlctl/fakebackupengine.go @@ -0,0 +1,92 @@ +/* +Copyright 2022 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mysqlctl + +import ( + "context" + "time" + + "vitess.io/vitess/go/vt/mysqlctl/backupstorage" +) + +type FakeBackupEngine struct { + ExecuteBackupCalls []FakeBackupEngineExecuteBackupCall + ExecuteBackupDuration time.Duration + ExecuteBackupReturn FakeBackupEngineExecuteBackupReturn + ExecuteRestoreCalls []FakeBackupEngineExecuteRestoreCall + ExecuteRestoreDuration time.Duration + ExecuteRestoreReturn FakeBackupEngineExecuteRestoreReturn + ShouldDrainForBackupCalls int + ShouldDrainForBackupReturn bool +} + +type FakeBackupEngineExecuteBackupCall struct { + BackupParams BackupParams + BackupHandle backupstorage.BackupHandle +} + +type FakeBackupEngineExecuteBackupReturn struct { + Ok bool + Err error +} + +type FakeBackupEngineExecuteRestoreCall struct { + BackupHandle backupstorage.BackupHandle + RestoreParams RestoreParams +} + +type FakeBackupEngineExecuteRestoreReturn struct { + Manifest *BackupManifest + Err error +} + +func (be *FakeBackupEngine) ExecuteBackup( + ctx context.Context, + params BackupParams, + bh backupstorage.BackupHandle, +) (bool, error) { + be.ExecuteBackupCalls = append(be.ExecuteBackupCalls, FakeBackupEngineExecuteBackupCall{params, bh}) + + if be.ExecuteBackupDuration > 0 { + time.Sleep(be.ExecuteBackupDuration) + } + + return be.ExecuteBackupReturn.Ok, be.ExecuteBackupReturn.Err +} + +func (be *FakeBackupEngine) ExecuteRestore( + ctx context.Context, params RestoreParams, + bh backupstorage.BackupHandle, +) (*BackupManifest, error) { + be.ExecuteRestoreCalls = append(be.ExecuteRestoreCalls, FakeBackupEngineExecuteRestoreCall{bh, params}) + + // mark restore as in progress + if err := createStateFile(params.Cnf); err != nil { + return nil, err + } + + if be.ExecuteRestoreDuration > 0 { + time.Sleep(be.ExecuteRestoreDuration) + } + + return be.ExecuteRestoreReturn.Manifest, be.ExecuteRestoreReturn.Err +} + +func (be *FakeBackupEngine) ShouldDrainForBackup() bool { + be.ShouldDrainForBackupCalls = be.ShouldDrainForBackupCalls + 1 + return be.ShouldDrainForBackupReturn +} diff --git a/go/vt/mysqlctl/fakebackupstorage.go b/go/vt/mysqlctl/fakebackupstorage.go new file mode 100644 index 00000000000..a3adff0dbd0 --- /dev/null +++ b/go/vt/mysqlctl/fakebackupstorage.go @@ -0,0 +1,160 @@ +/* +Copyright 2022 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mysqlctl + +import ( + "context" + "fmt" + "io" + + "vitess.io/vitess/go/vt/concurrency" + "vitess.io/vitess/go/vt/mysqlctl/backupstorage" +) + +type FakeBackupHandle struct { + Dir string + NameV string + ReadOnly bool + Errors concurrency.AllErrorRecorder + + AbortBackupCalls []context.Context + AbortBackupReturn error + AddFileCalls []FakeBackupHandleAddFileCall + AddFileReturn FakeBackupHandleAddFileReturn + EndBackupCalls []context.Context + EndBackupReturn error + ReadFileCalls []FakeBackupHandleReadFileCall + ReadFileReturnF func(ctx context.Context, filename string) (io.ReadCloser, error) +} + +type FakeBackupHandleAddFileCall struct { + Ctx context.Context + Filename string + Filesize int64 +} + +type FakeBackupHandleAddFileReturn struct { + WriteCloser io.WriteCloser + Err error +} + +type FakeBackupHandleReadFileCall struct { + Ctx context.Context + Filename string +} + +func (fbh *FakeBackupHandle) RecordError(err error) { + fbh.Errors.RecordError(err) +} + +func (fbh *FakeBackupHandle) HasErrors() bool { + return fbh.Errors.HasErrors() +} + +func (fbh *FakeBackupHandle) Error() error { + return fbh.Errors.Error() +} + +func (fbh *FakeBackupHandle) Directory() string { + return fbh.Dir +} + +func (fbh *FakeBackupHandle) Name() string { + return fbh.NameV +} + +func (fbh *FakeBackupHandle) AddFile(ctx context.Context, filename string, filesize int64) (io.WriteCloser, error) { + fbh.AddFileCalls = append(fbh.AddFileCalls, FakeBackupHandleAddFileCall{ctx, filename, filesize}) + return fbh.AddFileReturn.WriteCloser, fbh.AddFileReturn.Err +} + +func (fbh *FakeBackupHandle) EndBackup(ctx context.Context) error { + fbh.EndBackupCalls = append(fbh.EndBackupCalls, ctx) + return fbh.EndBackupReturn +} + +func (fbh *FakeBackupHandle) AbortBackup(ctx context.Context) error { + fbh.AbortBackupCalls = append(fbh.AbortBackupCalls, ctx) + return fbh.AbortBackupReturn +} + +func (fbh *FakeBackupHandle) ReadFile(ctx context.Context, filename string) (io.ReadCloser, error) { + fbh.ReadFileCalls = append(fbh.ReadFileCalls, FakeBackupHandleReadFileCall{ctx, filename}) + if fbh.ReadFileReturnF == nil { + return nil, fmt.Errorf("FakeBackupHandle has not defined a ReadFileReturnF") + } + return fbh.ReadFileReturnF(ctx, filename) +} + +type FakeBackupStorage struct { + CloseCalls int + CloseReturn error + ListBackupsCalls []FakeBackupStorageListBackupsCall + ListBackupsReturn FakeBackupStorageListBackupsReturn + RemoveBackupCalls []FakeBackupStorageRemoveBackupCall + RemoveBackupReturn error + RemoveBackupReturne error + StartBackupCalls []FakeBackupStorageStartBackupCall + StartBackupReturn FakeBackupStorageStartBackupReturn +} + +type FakeBackupStorageListBackupsCall struct { + Ctx context.Context + Dir string +} + +type FakeBackupStorageListBackupsReturn struct { + BackupHandles []backupstorage.BackupHandle + Err error +} + +type FakeBackupStorageRemoveBackupCall struct { + Ctx context.Context + Dir string + Name string +} + +type FakeBackupStorageStartBackupCall struct { + Ctx context.Context + Dir string + Name string +} + +type FakeBackupStorageStartBackupReturn struct { + BackupHandle backupstorage.BackupHandle + Err error +} + +func (fbs *FakeBackupStorage) ListBackups(ctx context.Context, dir string) ([]backupstorage.BackupHandle, error) { + fbs.ListBackupsCalls = append(fbs.ListBackupsCalls, FakeBackupStorageListBackupsCall{ctx, dir}) + return fbs.ListBackupsReturn.BackupHandles, fbs.ListBackupsReturn.Err +} + +func (fbs *FakeBackupStorage) StartBackup(ctx context.Context, dir, name string) (backupstorage.BackupHandle, error) { + fbs.StartBackupCalls = append(fbs.StartBackupCalls, FakeBackupStorageStartBackupCall{ctx, dir, name}) + return fbs.StartBackupReturn.BackupHandle, fbs.StartBackupReturn.Err +} + +func (fbs *FakeBackupStorage) RemoveBackup(ctx context.Context, dir, name string) error { + fbs.RemoveBackupCalls = append(fbs.RemoveBackupCalls, FakeBackupStorageRemoveBackupCall{ctx, dir, name}) + return fbs.RemoveBackupReturn +} + +func (fbs *FakeBackupStorage) Close() error { + fbs.CloseCalls = fbs.CloseCalls + 1 + return fbs.CloseReturn +} diff --git a/go/vt/mysqlctl/fakemysqldaemon/fakemysqldaemon.go b/go/vt/mysqlctl/fakemysqldaemon.go similarity index 92% rename from go/vt/mysqlctl/fakemysqldaemon/fakemysqldaemon.go rename to go/vt/mysqlctl/fakemysqldaemon.go index 3effa9309c2..5d087bc36ec 100644 --- a/go/vt/mysqlctl/fakemysqldaemon/fakemysqldaemon.go +++ b/go/vt/mysqlctl/fakemysqldaemon.go @@ -14,9 +14,10 @@ See the License for the specific language governing permissions and limitations under the License. */ -package fakemysqldaemon +package mysqlctl import ( + "errors" "fmt" "reflect" "strings" @@ -30,7 +31,6 @@ import ( "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/vt/dbconnpool" - "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/mysqlctl/tmutils" querypb "vitess.io/vitess/go/vt/proto/query" @@ -168,9 +168,16 @@ type FakeMysqlDaemon struct { // SemiSyncReplicaEnabled represents the state of rpl_semi_sync_slave_enabled. SemiSyncReplicaEnabled bool - // TimeoutHook is a func that can be called at the beginning of any method to fake a timeout. - // all a test needs to do is make it { return context.DeadlineExceeded } + // GlobalReadLock is used to test if a lock has been acquired already or not + GlobalReadLock bool + + // TimeoutHook is a func that can be called at the beginning of + // any method to fake a timeout. + // All a test needs to do is make it { return context.DeadlineExceeded }. TimeoutHook func() error + + // Version is the version that will be returned by GetVersionString. + Version string } // NewFakeMysqlDaemon returns a FakeMysqlDaemon where mysqld appears @@ -181,6 +188,7 @@ func NewFakeMysqlDaemon(db *fakesqldb.DB) *FakeMysqlDaemon { db: db, Running: true, IOThreadRunning: true, + Version: "8.0.32", } if db != nil { result.appPool = dbconnpool.NewConnectionPool("AppConnPool", 5, time.Minute, 0) @@ -190,7 +198,7 @@ func NewFakeMysqlDaemon(db *fakesqldb.DB) *FakeMysqlDaemon { } // Start is part of the MysqlDaemon interface -func (fmd *FakeMysqlDaemon) Start(ctx context.Context, cnf *mysqlctl.Mycnf, mysqldArgs ...string) error { +func (fmd *FakeMysqlDaemon) Start(ctx context.Context, cnf *Mycnf, mysqldArgs ...string) error { if fmd.Running { return fmt.Errorf("fake mysql daemon already running") } @@ -208,7 +216,7 @@ func (fmd *FakeMysqlDaemon) Start(ctx context.Context, cnf *mysqlctl.Mycnf, mysq } // Shutdown is part of the MysqlDaemon interface -func (fmd *FakeMysqlDaemon) Shutdown(ctx context.Context, cnf *mysqlctl.Mycnf, waitForMysqld bool) error { +func (fmd *FakeMysqlDaemon) Shutdown(ctx context.Context, cnf *Mycnf, waitForMysqld bool) error { if !fmd.Running { return fmt.Errorf("fake mysql daemon not running") } @@ -231,17 +239,17 @@ func (fmd *FakeMysqlDaemon) RunMysqlUpgrade() error { } // ReinitConfig is part of the MysqlDaemon interface -func (fmd *FakeMysqlDaemon) ReinitConfig(ctx context.Context, cnf *mysqlctl.Mycnf) error { +func (fmd *FakeMysqlDaemon) ReinitConfig(ctx context.Context, cnf *Mycnf) error { return nil } // RefreshConfig is part of the MysqlDaemon interface -func (fmd *FakeMysqlDaemon) RefreshConfig(ctx context.Context, cnf *mysqlctl.Mycnf) error { +func (fmd *FakeMysqlDaemon) RefreshConfig(ctx context.Context, cnf *Mycnf) error { return nil } // Wait is part of the MysqlDaemon interface. -func (fmd *FakeMysqlDaemon) Wait(ctx context.Context, cnf *mysqlctl.Mycnf) error { +func (fmd *FakeMysqlDaemon) Wait(ctx context.Context, cnf *Mycnf) error { return nil } @@ -345,17 +353,22 @@ func (fmd *FakeMysqlDaemon) IsReadOnly() (bool, error) { return fmd.ReadOnly, nil } +// IsSuperReadOnly is part of the MysqlDaemon interface. +func (fmd *FakeMysqlDaemon) IsSuperReadOnly(ctx context.Context) (bool, error) { + return fmd.SuperReadOnly, nil +} + // SetReadOnly is part of the MysqlDaemon interface func (fmd *FakeMysqlDaemon) SetReadOnly(on bool) error { fmd.ReadOnly = on return nil } -// SetSuperReadOnly is part of the MysqlDaemon interface -func (fmd *FakeMysqlDaemon) SetSuperReadOnly(on bool) error { +// SetSuperReadOnly is part of the MysqlDaemon interface. +func (fmd *FakeMysqlDaemon) SetSuperReadOnly(ctx context.Context, on bool) (ResetSuperReadOnlyFunc, error) { fmd.SuperReadOnly = on fmd.ReadOnly = on - return nil + return nil, nil } // StartReplication is part of the MysqlDaemon interface. @@ -467,6 +480,11 @@ func (fmd *FakeMysqlDaemon) Promote(hookExtraEnv map[string]string) (mysql.Posit return fmd.PromoteResult, nil } +// ExecuteSuperQuery is part of the MysqlDaemon interface +func (fmd *FakeMysqlDaemon) ExecuteSuperQuery(ctx context.Context, query string) error { + return fmd.ExecuteSuperQueryList(ctx, []string{query}) +} + // ExecuteSuperQueryList is part of the MysqlDaemon interface func (fmd *FakeMysqlDaemon) ExecuteSuperQueryList(ctx context.Context, queryList []string) error { for _, query := range queryList { @@ -667,3 +685,23 @@ func (fmd *FakeMysqlDaemon) GetVersionString() string { func (fmd *FakeMysqlDaemon) GetVersionComment(ctx context.Context) string { return "" } + +// AcquireGlobalReadLock is part of the MysqlDaemon interface. +func (fmd *FakeMysqlDaemon) AcquireGlobalReadLock(ctx context.Context) error { + if fmd.GlobalReadLock { + return errors.New("lock already acquired") + } + + fmd.GlobalReadLock = true + return nil +} + +// ReleaseGlobalReadLock is part of the MysqlDaemon interface. +func (fmd *FakeMysqlDaemon) ReleaseGlobalReadLock(ctx context.Context) error { + if fmd.GlobalReadLock { + fmd.GlobalReadLock = false + return nil + } + + return errors.New("no read locks acquired yet") +} diff --git a/go/vt/mysqlctl/mysql_daemon.go b/go/vt/mysqlctl/mysql_daemon.go index ec96eee7b2e..185aac327ac 100644 --- a/go/vt/mysqlctl/mysql_daemon.go +++ b/go/vt/mysqlctl/mysql_daemon.go @@ -70,8 +70,9 @@ type MysqlDaemon interface { ResetReplication(ctx context.Context) error PrimaryPosition() (mysql.Position, error) IsReadOnly() (bool, error) + IsSuperReadOnly(ctx context.Context) (bool, error) SetReadOnly(on bool) error - SetSuperReadOnly(on bool) error + SetSuperReadOnly(ctx context.Context, on bool) (ResetSuperReadOnlyFunc, error) SetReplicationPosition(ctx context.Context, pos mysql.Position) error SetReplicationSource(ctx context.Context, host string, port int, stopReplicationBefore bool, startReplicationAfter bool) error WaitForReparentJournal(ctx context.Context, timeCreatedNS int64) error @@ -103,6 +104,9 @@ type MysqlDaemon interface { // GetVersionComment returns the version comment GetVersionComment(ctx context.Context) string + // ExecuteSuperQuery executes a single query, no result + ExecuteSuperQuery(ctx context.Context, query string) error + // ExecuteSuperQueryList executes a list of queries, no result ExecuteSuperQueryList(ctx context.Context, queryList []string) error @@ -115,6 +119,13 @@ type MysqlDaemon interface { // DisableBinlogPlayback disable playback of binlog events DisableBinlogPlayback() error + // AcquireGlobalReadLock acquires a global read lock and keeps the connection so + // as to release it with the function below. + AcquireGlobalReadLock(ctx context.Context) error + + // ReleaseGlobalReadLock release a lock acquired with the connection from the above function. + ReleaseGlobalReadLock(ctx context.Context) error + // Close will close this instance of Mysqld. It will wait for all dba // queries to be finished. Close() diff --git a/go/vt/mysqlctl/mysqld.go b/go/vt/mysqlctl/mysqld.go index 45ec3b7bd73..a3df3172aa5 100644 --- a/go/vt/mysqlctl/mysqld.go +++ b/go/vt/mysqlctl/mysqld.go @@ -87,9 +87,10 @@ const maxLogFileSampleSize = 4096 // Mysqld is the object that represents a mysqld daemon running on this server. type Mysqld struct { - dbcfgs *dbconfigs.DBConfigs - dbaPool *dbconnpool.ConnectionPool - appPool *dbconnpool.ConnectionPool + dbcfgs *dbconfigs.DBConfigs + dbaPool *dbconnpool.ConnectionPool + appPool *dbconnpool.ConnectionPool + lockConn *dbconnpool.PooledDBConnection capabilities capabilitySet diff --git a/go/vt/mysqlctl/mysqlshellbackupengine.go b/go/vt/mysqlctl/mysqlshellbackupengine.go new file mode 100644 index 00000000000..b07caa6da54 --- /dev/null +++ b/go/vt/mysqlctl/mysqlshellbackupengine.go @@ -0,0 +1,573 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mysqlctl + +import ( + "bufio" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "os" + "os/exec" + "path" + "strings" + "sync" + "time" + + "github.com/spf13/pflag" + + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/mysqlctl/backupstorage" + "vitess.io/vitess/go/vt/servenv" + "vitess.io/vitess/go/vt/vterrors" +) + +var ( + // location to store the mysql shell backup + mysqlShellBackupLocation = "" + // flags passed to the mysql shell utility, used both on dump/restore + mysqlShellFlags = "--defaults-file=/dev/null --js -h localhost" + // flags passed to the Dump command, as a JSON string + mysqlShellDumpFlags = `{"threads": 4}` + // flags passed to the Load command, as a JSON string + mysqlShellLoadFlags = `{"threads": 4, "loadUsers": true, "updateGtidSet": "replace", "skipBinlog": true, "progressFile": ""}` + // drain a tablet when taking a backup + mysqlShellBackupShouldDrain = false + // disable redo logging and double write buffer + mysqlShellSpeedUpRestore = false + + mysqlShellBackupBinaryName = "mysqlsh" + + // use when checking if we need to create the directory on the local filesystem or not. + knownObjectStoreParams = []string{"s3BucketName", "osBucketName", "azureContainerName"} + + MySQLShellPreCheckError = errors.New("MySQLShellPreCheckError") + + // internal databases not backed up by MySQL Shell + internalDBs = []string{ + "information_schema", "mysql", "ndbinfo", "performance_schema", "sys", + } + // reserved MySQL users https://dev.mysql.com/doc/refman/8.0/en/reserved-accounts.html + reservedUsers = []string{ + "mysql.sys@localhost", "mysql.session@localhost", "mysql.infoschema@localhost", + } +) + +// MySQLShellBackupManifest represents a backup. +type MySQLShellBackupManifest struct { + // BackupManifest is an anonymous embedding of the base manifest struct. + // Note that the manifest itself doesn't fill the Position field, as we have + // no way of fetching that information from mysqlsh at the moment. + BackupManifest + + // Location of the backup directory + BackupLocation string + // Params are the parameters that backup was created with + Params string +} + +func init() { + BackupRestoreEngineMap[mysqlShellBackupEngineName] = &MySQLShellBackupEngine{} + + for _, cmd := range []string{"vtcombo", "vttablet", "vtbackup", "vttestserver", "vtctldclient"} { + servenv.OnParseFor(cmd, registerMysqlShellBackupEngineFlags) + } +} + +func registerMysqlShellBackupEngineFlags(fs *pflag.FlagSet) { + fs.StringVar(&mysqlShellBackupLocation, "mysql-shell-backup-location", mysqlShellBackupLocation, "location where the backup will be stored") + fs.StringVar(&mysqlShellFlags, "mysql-shell-flags", mysqlShellFlags, "execution flags to pass to mysqlsh binary to be used during dump/load") + fs.StringVar(&mysqlShellDumpFlags, "mysql-shell-dump-flags", mysqlShellDumpFlags, "flags to pass to mysql shell dump utility. This should be a JSON string and will be saved in the MANIFEST") + fs.StringVar(&mysqlShellLoadFlags, "mysql-shell-load-flags", mysqlShellLoadFlags, "flags to pass to mysql shell load utility. This should be a JSON string") + fs.BoolVar(&mysqlShellBackupShouldDrain, "mysql-shell-should-drain", mysqlShellBackupShouldDrain, "decide if we should drain while taking a backup or continue to serving traffic") + fs.BoolVar(&mysqlShellSpeedUpRestore, "mysql-shell-speedup-restore", mysqlShellSpeedUpRestore, "speed up restore by disabling redo logging and double write buffer during the restore process") +} + +// MySQLShellBackupEngine encapsulates the logic to implement the restoration +// of a mysql-shell based backup. +type MySQLShellBackupEngine struct { +} + +const ( + mysqlShellBackupEngineName = "mysqlshell" + mysqlShellLockMessage = "Global read lock has been released" +) + +func (be *MySQLShellBackupEngine) ExecuteBackup(ctx context.Context, params BackupParams, bh backupstorage.BackupHandle) (result bool, finalErr error) { + params.Logger.Infof("Starting ExecuteBackup in %s", params.TabletAlias) + + location := path.Join(mysqlShellBackupLocation, bh.Directory(), bh.Name()) + + err := be.backupPreCheck(location) + if err != nil { + return false, vterrors.Wrap(err, "failed backup precheck") + } + + args := []string{} + if mysqlShellFlags != "" { + args = append(args, strings.Fields(mysqlShellFlags)...) + } + + args = append(args, "-e", fmt.Sprintf("util.dumpInstance(%q, %s)", + location, + mysqlShellDumpFlags, + )) + + // to be able to get the consistent GTID sets, we will acquire a global read lock before starting mysql shell. + // oncce we have the lock, we start it and wait unti it has acquired and release its global read lock, which + // should guarantee that both use and mysql shell are seeing the same executed GTID sets. + // after this we release the lock so that replication can continue. this usually should take just a few seconds. + params.Logger.Infof("acquiring a global read lock before fetching the executed GTID sets") + err = params.Mysqld.AcquireGlobalReadLock(ctx) + if err != nil { + return false, vterrors.Wrap(err, "failed to acquire read lock to start backup") + } + lockAcquired := time.Now() // we will report how long we hold the lock for + + // we need to release the global read lock in case the backup fails to start and + // the lock wasn't released by releaseReadLock() yet. context might be expired, + // so we pass a new one. + defer func() { _ = params.Mysqld.ReleaseGlobalReadLock(context.Background()) }() + + posBeforeBackup, err := params.Mysqld.PrimaryPosition() + if err != nil { + return false, vterrors.Wrap(err, "failed to fetch position") + } + + cmd := exec.CommandContext(ctx, mysqlShellBackupBinaryName, args...) + + params.Logger.Infof("running %s", cmd.String()) + + cmdOut, err := cmd.StdoutPipe() + if err != nil { + return false, vterrors.Wrap(err, "cannot create stdout pipe") + } + cmdOriginalErr, err := cmd.StderrPipe() + if err != nil { + return false, vterrors.Wrap(err, "cannot create stderr pipe") + } + if err := cmd.Start(); err != nil { + return false, vterrors.Wrap(err, "can't start mysqlshell") + } + + pipeReader, pipeWriter := io.Pipe() + cmdErr := io.TeeReader(cmdOriginalErr, pipeWriter) + + cmdWg := &sync.WaitGroup{} + cmdWg.Add(3) + go releaseReadLock(ctx, pipeReader, params, cmdWg, lockAcquired) + go scanLinesToLogger(mysqlShellBackupEngineName+" stdout", cmdOut, params.Logger, cmdWg.Done) + go scanLinesToLogger(mysqlShellBackupEngineName+" stderr", cmdErr, params.Logger, cmdWg.Done) + + // Get exit status. + if err := cmd.Wait(); err != nil { + pipeWriter.Close() // make sure we close the writer so the goroutines above will complete. + return false, vterrors.Wrap(err, mysqlShellBackupEngineName+" failed") + } + + // close the pipeWriter and wait for the goroutines to have read all the logs + pipeWriter.Close() + cmdWg.Wait() + + // open the MANIFEST + params.Logger.Infof("Writing backup MANIFEST") + mwc, err := bh.AddFile(ctx, backupManifestFileName, backupstorage.FileSizeUnknown) + if err != nil { + return false, vterrors.Wrapf(err, "cannot add %v to backup", backupManifestFileName) + } + defer closeFile(mwc, backupManifestFileName, params.Logger, &finalErr) + + // JSON-encode and write the MANIFEST + bm := &MySQLShellBackupManifest{ + // Common base fields + BackupManifest: BackupManifest{ + BackupMethod: mysqlShellBackupEngineName, + // the position is empty here because we have no way of capturing it from mysqlsh + // we will capture it when doing the restore as mysqlsh can replace the GTIDs with + // what it has stored in the backup. + Position: posBeforeBackup, + // PurgedPosition: posBeforeBackup, + BackupTime: FormatRFC3339(params.BackupTime.UTC()), + FinishedTime: FormatRFC3339(time.Now().UTC()), + // ServerUUID: serverUUID, + // TabletAlias: params.TabletAlias, + // Keyspace: params.Keyspace, + // Shard: params.Shard, + // MySQLVersion: mysqlVersion, + // UpgradeSafe: true, + }, + + // mysql shell backup specific fields + BackupLocation: location, + Params: mysqlShellLoadFlags, + } + + data, err := json.MarshalIndent(bm, "", " ") + if err != nil { + return false, vterrors.Wrapf(err, "cannot JSON encode %v", backupManifestFileName) + } + if _, err := mwc.Write([]byte(data)); err != nil { + return false, vterrors.Wrapf(err, "cannot write %v", backupManifestFileName) + } + + params.Logger.Infof("Backup completed") + return true, nil +} + +func (be *MySQLShellBackupEngine) ExecuteRestore(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle) (*BackupManifest, error) { + params.Logger.Infof("Calling ExecuteRestore for %s (DeleteBeforeRestore: %v)", params.DbName, params.DeleteBeforeRestore) + + shouldDeleteUsers, err := be.restorePreCheck(ctx, params) + if err != nil { + return nil, vterrors.Wrap(err, "failed restore precheck") + } + + var bm MySQLShellBackupManifest + if err := getBackupManifestInto(ctx, bh, &bm); err != nil { + return nil, err + } + + // mark restore as in progress + if err := createStateFile(params.Cnf); err != nil { + return nil, err + } + + // make sure semi-sync is disabled, otherwise we will wait forever for acknowledgements + err = params.Mysqld.SetSemiSyncEnabled(false, false) + if err != nil { + return nil, vterrors.Wrap(err, "disable semi-sync failed") + } + + params.Logger.Infof("restoring on an existing tablet, so dropping database %q", params.DbName) + + readonly, err := params.Mysqld.IsSuperReadOnly(ctx) + if err != nil { + return nil, vterrors.Wrap(err, fmt.Sprintf("checking if mysqld has super_read_only=enable: %v", err)) + } + + if readonly { + resetFunc, err := params.Mysqld.SetSuperReadOnly(ctx, false) + if err != nil { + return nil, vterrors.Wrap(err, fmt.Sprintf("unable to disable super-read-only: %v", err)) + } + + defer func() { + err := resetFunc() + if err != nil { + params.Logger.Errorf("Not able to set super_read_only to its original value after restore") + } + }() + } + + err = cleanupMySQL(ctx, params, shouldDeleteUsers) + if err != nil { + log.Errorf(err.Error()) + // time.Sleep(time.Minute * 2) + return nil, vterrors.Wrap(err, "error cleaning MySQL") + } + + // we need to get rid of all the current replication information on the host. + err = params.Mysqld.ResetReplication(ctx) + if err != nil { + return nil, vterrors.Wrap(err, "unable to reset replication") + } + + // this is required so we can load the backup generated by MySQL Shell. we will disable it afterwards. + err = params.Mysqld.ExecuteSuperQuery(ctx, "SET GLOBAL LOCAL_INFILE=1") + if err != nil { + return nil, vterrors.Wrap(err, "unable to set local_infile=1") + } + + if mysqlShellSpeedUpRestore { + // disable redo logging and double write buffer if we are configured to do so. + err = params.Mysqld.ExecuteSuperQuery(ctx, "ALTER INSTANCE DISABLE INNODB REDO_LOG") + if err != nil { + return nil, vterrors.Wrap(err, "unable to disable REDO_LOG") + } + params.Logger.Infof("Disabled REDO_LOG") + + defer func() { // re-enable once we are done with the restore. + err := params.Mysqld.ExecuteSuperQuery(ctx, "ALTER INSTANCE ENABLE INNODB REDO_LOG") + if err != nil { + params.Logger.Errorf("unable to re-enable REDO_LOG: %v", err) + } else { + params.Logger.Infof("Disabled REDO_LOG") + } + }() + } + + // we need to disable SuperReadOnly otherwise we won't be able to restore the backup properly. + // once the backups is complete, we will restore it to its previous state. + resetFunc, err := be.handleSuperReadOnly(ctx, params) + if err != nil { + return nil, vterrors.Wrap(err, "unable to disable super-read-only") + } + defer resetFunc() + + args := []string{} + + if mysqlShellFlags != "" { + args = append(args, strings.Fields(mysqlShellFlags)...) + } + + args = append(args, "-e", fmt.Sprintf("util.loadDump(%q, %s)", + bm.BackupLocation, + mysqlShellLoadFlags, + )) + + cmd := exec.CommandContext(ctx, "mysqlsh", args...) + + params.Logger.Infof("running %s", cmd.String()) + + cmdOut, err := cmd.StdoutPipe() + if err != nil { + return nil, vterrors.Wrap(err, "cannot create stdout pipe") + } + cmdErr, err := cmd.StderrPipe() + if err != nil { + return nil, vterrors.Wrap(err, "cannot create stderr pipe") + } + if err := cmd.Start(); err != nil { + return nil, vterrors.Wrap(err, "can't start xbstream") + } + + cmdWg := &sync.WaitGroup{} + cmdWg.Add(2) + go scanLinesToLogger(mysqlShellBackupEngineName+" stdout", cmdOut, params.Logger, cmdWg.Done) + go scanLinesToLogger(mysqlShellBackupEngineName+" stderr", cmdErr, params.Logger, cmdWg.Done) + cmdWg.Wait() + + // Get the exit status. + if err := cmd.Wait(); err != nil { + return nil, vterrors.Wrap(err, mysqlShellBackupEngineName+" failed") + } + params.Logger.Infof("%s completed successfully", mysqlShellBackupBinaryName) + + // disable local_infile now that the restore is done. + err = params.Mysqld.ExecuteSuperQuery(ctx, "SET GLOBAL LOCAL_INFILE=0") + if err != nil { + return nil, vterrors.Wrap(err, "unable to set local_infile=0") + } + params.Logger.Infof("set local_infile=0") + + params.Logger.Infof("Restore completed") + + return &bm.BackupManifest, nil +} + +// ShouldDrainForBackup satisfies the BackupEngine interface +// MySQL Shell backups can be taken while MySQL is running so we can control this via a flag. +func (be *MySQLShellBackupEngine) ShouldDrainForBackup() bool { + return mysqlShellBackupShouldDrain +} + +// ShouldStartMySQLAfterRestore signifies if this backup engine needs to restart MySQL once the restore is completed. +// Since MySQL Shell operates on a live MySQL instance, there is no need to start it once the restore is completed +func (be *MySQLShellBackupEngine) ShouldStartMySQLAfterRestore() bool { + return false +} + +func (be *MySQLShellBackupEngine) backupPreCheck(location string) error { + if mysqlShellBackupLocation == "" { + return fmt.Errorf("%w: no backup location set via --mysql-shell-backup-location", MySQLShellPreCheckError) + } + + if mysqlShellFlags == "" || !strings.Contains(mysqlShellFlags, "--js") { + return fmt.Errorf("%w: at least the --js flag is required in the value of the flag --mysql-shell-flags", MySQLShellPreCheckError) + } + + // make sure the targe directory exists if the target location for the backup is not an object store + // (e.g. is the local filesystem) as MySQL Shell doesn't create the entire path beforehand: + isObjectStorage := false + for _, objStore := range knownObjectStoreParams { + if strings.Contains(mysqlShellDumpFlags, objStore) { + isObjectStorage = true + break + } + } + + if !isObjectStorage { + err := os.MkdirAll(location, 0o750) + if err != nil { + return fmt.Errorf("failure creating directory %s: %w", location, err) + } + } + + return nil +} + +func (be *MySQLShellBackupEngine) restorePreCheck(ctx context.Context, params RestoreParams) (shouldDeleteUsers bool, err error) { + if mysqlShellFlags == "" { + return shouldDeleteUsers, fmt.Errorf("%w: at least the --js flag is required in the value of the flag --mysql-shell-flags", MySQLShellPreCheckError) + } + + loadFlags := map[string]interface{}{} + err = json.Unmarshal([]byte(mysqlShellLoadFlags), &loadFlags) + if err != nil { + return false, fmt.Errorf("%w: unable to parse JSON of load flags", MySQLShellPreCheckError) + } + + if val, ok := loadFlags["updateGtidSet"]; !ok || val != "replace" { + return false, fmt.Errorf("%w: mysql-shell needs to restore with updateGtidSet set to \"replace\" to work with Vitess", MySQLShellPreCheckError) + } + + if val, ok := loadFlags["progressFile"]; !ok || val != "" { + return false, fmt.Errorf("%w: \"progressFile\" needs to be empty as vitess always starts a restore from scratch", MySQLShellPreCheckError) + } + + if val, ok := loadFlags["skipBinlog"]; !ok || val != true { + return false, fmt.Errorf("%w: \"skipBinlog\" needs to set to true", MySQLShellPreCheckError) + } + + if val, ok := loadFlags["loadUsers"]; ok && val == true { + shouldDeleteUsers = true + } + + return shouldDeleteUsers, nil +} + +func (be *MySQLShellBackupEngine) handleSuperReadOnly(ctx context.Context, params RestoreParams) (func(), error) { + readonly, err := params.Mysqld.IsSuperReadOnly(ctx) + if err != nil { + return nil, vterrors.Wrap(err, fmt.Sprintf("checking if mysqld has super_read_only=enable: %v", err)) + } + + params.Logger.Infof("Is Super Read Only: %v", readonly) + + if readonly { + resetFunc, err := params.Mysqld.SetSuperReadOnly(ctx, false) + if err != nil { + return nil, vterrors.Wrap(err, fmt.Sprintf("unable to disable super-read-only: %v", err)) + } + + return func() { + err := resetFunc() + if err != nil { + params.Logger.Errorf("Not able to set super_read_only to its original value after restore") + } + }, nil + } + + return func() {}, nil +} + +// releaseReadLock will keep reading the MySQL Shell STDERR waiting until the point it has acquired its lock +func releaseReadLock(ctx context.Context, reader io.Reader, params BackupParams, wg *sync.WaitGroup, lockAcquired time.Time) { + defer wg.Done() + + scanner := bufio.NewScanner(reader) + released := false + for scanner.Scan() { + line := scanner.Text() + + if !released { + + if !strings.Contains(line, mysqlShellLockMessage) { + continue + } + released = true + + params.Logger.Infof("mysql shell released its global read lock, doing the same") + + err := params.Mysqld.ReleaseGlobalReadLock(ctx) + if err != nil { + params.Logger.Errorf("unable to release global read lock: %v", err) + } + + params.Logger.Infof("global read lock released after %v", time.Since(lockAcquired)) + } + } + if err := scanner.Err(); err != nil { + params.Logger.Errorf("error reading from reader: %v", err) + } + + if !released { + params.Logger.Errorf("could not release global lock earlier") + } +} + +func cleanupMySQL(ctx context.Context, params RestoreParams, shouldDeleteUsers bool) error { + params.Logger.Infof("Cleaning up MySQL ahead of a restore") + result, err := params.Mysqld.FetchSuperQuery(ctx, "SHOW DATABASES") + if err != nil { + return err + } + + // drop all databases + for _, row := range result.Rows { + dbName := row[0].ToString() + if sliceContains(internalDBs, dbName) { + continue // not dropping internal DBs + } + + params.Logger.Infof("Dropping DB %q", dbName) + err = params.Mysqld.ExecuteSuperQuery(ctx, fmt.Sprintf("DROP DATABASE IF EXISTS `%s`", row[0].ToString())) + if err != nil { + return fmt.Errorf("error droppping database %q: %w", row[0].ToString(), err) + } + } + + if shouldDeleteUsers { + // get current user + var currentUser string + result, err = params.Mysqld.FetchSuperQuery(ctx, "SELECT user()") + if err != nil { + return fmt.Errorf("error fetching current user: %w", err) + } + + for _, row := range result.Rows { + currentUser = row[0].ToString() + } + + // drop all users except reserved ones + result, err = params.Mysqld.FetchSuperQuery(ctx, "SELECT user, host FROM mysql.user") + if err != nil { + return err + } + + for _, row := range result.Rows { + user := fmt.Sprintf("%s@%s", row[0].ToString(), row[1].ToString()) + + if user == currentUser { + continue // we don't drop the current user + } + if sliceContains(reservedUsers, user) { + continue // we skip reserved MySQL users + } + + params.Logger.Infof("Dropping User %q", user) + err = params.Mysqld.ExecuteSuperQuery(ctx, fmt.Sprintf("DROP USER '%s'@'%s'", row[0].ToString(), row[1].ToString())) + if err != nil { + return fmt.Errorf("error droppping user %q: %w", user, err) + } + } + } + + return err +} + +func sliceContains[S ~[]E, E comparable](s S, v E) bool { + for _, item := range s { + if item == v { + return true + } + } + + return false +} diff --git a/go/vt/mysqlctl/mysqlshellbackupengine_test.go b/go/vt/mysqlctl/mysqlshellbackupengine_test.go new file mode 100644 index 00000000000..ceacab49d60 --- /dev/null +++ b/go/vt/mysqlctl/mysqlshellbackupengine_test.go @@ -0,0 +1,433 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mysqlctl + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/ioutil" + "vitess.io/vitess/go/mysql/fakesqldb" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/logutil" +) + +func TestMySQLShellBackupBackupPreCheck(t *testing.T) { + originalLocation := mysqlShellBackupLocation + originalFlags := mysqlShellFlags + defer func() { + mysqlShellBackupLocation = originalLocation + mysqlShellFlags = originalFlags + }() + + engine := MySQLShellBackupEngine{} + tests := []struct { + name string + location string + flags string + err error + }{ + { + "empty flags", + "", + `{}`, + MySQLShellPreCheckError, + }, + { + "only location", + "/dev/null", + "", + MySQLShellPreCheckError, + }, + { + "only flags", + "", + "--js", + MySQLShellPreCheckError, + }, + { + "both values present but without --js", + "", + "-h localhost", + MySQLShellPreCheckError, + }, + { + "supported values", + t.TempDir(), + "--js -h localhost", + nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + + mysqlShellBackupLocation = tt.location + mysqlShellFlags = tt.flags + assert.ErrorIs(t, engine.backupPreCheck(path.Join(mysqlShellBackupLocation, "test")), tt.err) + }) + } + +} + +func TestMySQLShellBackupRestorePreCheck(t *testing.T) { + original := mysqlShellLoadFlags + defer func() { mysqlShellLoadFlags = original }() + + engine := MySQLShellBackupEngine{} + tests := []struct { + name string + flags string + err error + shouldDeleteUsers bool + }{ + { + "empty load flags", + `{}`, + MySQLShellPreCheckError, + false, + }, + { + "only updateGtidSet", + `{"updateGtidSet": "replace"}`, + MySQLShellPreCheckError, + false, + }, + { + "only progressFile", + `{"progressFile": ""}`, + MySQLShellPreCheckError, + false, + }, + { + "both values but unsupported values", + `{"updateGtidSet": "append", "progressFile": "/tmp/test1"}`, + MySQLShellPreCheckError, + false, + }, + { + "supported values", + `{"updateGtidSet": "replace", "progressFile": "", "skipBinlog": true, "loadUsers": false}`, + nil, + false, + }, + { + "should delete users", + `{"updateGtidSet": "replace", "progressFile": "", "skipBinlog": true, "loadUsers": true}`, + nil, + true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mysqlShellLoadFlags = tt.flags + shouldDeleteUsers, err := engine.restorePreCheck(context.Background(), RestoreParams{}) + assert.ErrorIs(t, err, tt.err) + assert.Equal(t, tt.shouldDeleteUsers, shouldDeleteUsers) + }) + } + +} + +func TestShouldDrainForBackupMySQLShell(t *testing.T) { + original := mysqlShellBackupShouldDrain + defer func() { mysqlShellBackupShouldDrain = original }() + + engine := MySQLShellBackupEngine{} + + mysqlShellBackupShouldDrain = false + + assert.False(t, engine.ShouldDrainForBackup()) + + mysqlShellBackupShouldDrain = true + + assert.True(t, engine.ShouldDrainForBackup()) +} + +func TestCleanupMySQL(t *testing.T) { + type userRecord struct { + user, host string + } + + tests := []struct { + name string + existingDBs []string + expectedDropDBs []string + currentUser string + existingUsers []userRecord + expectedDropUsers []string + shouldDeleteUsers bool + }{ + { + name: "testing only specific DBs", + existingDBs: []string{"_vt", "vt_test"}, + expectedDropDBs: []string{"_vt", "vt_test"}, + }, + { + name: "testing with internal dbs", + existingDBs: []string{"_vt", "mysql", "vt_test", "performance_schema"}, + expectedDropDBs: []string{"_vt", "vt_test"}, + }, + { + name: "with users but without delete", + existingDBs: []string{"_vt", "mysql", "vt_test", "performance_schema"}, + expectedDropDBs: []string{"_vt", "vt_test"}, + existingUsers: []userRecord{ + {"test", "localhost"}, + {"app", "10.0.0.1"}, + }, + expectedDropUsers: []string{}, + shouldDeleteUsers: false, + }, + { + name: "with users and delete", + existingDBs: []string{"_vt", "mysql", "vt_test", "performance_schema"}, + expectedDropDBs: []string{"_vt", "vt_test"}, + existingUsers: []userRecord{ + {"test", "localhost"}, + {"app", "10.0.0.1"}, + }, + expectedDropUsers: []string{"'test'@'localhost'", "'app'@'10.0.0.1'"}, + shouldDeleteUsers: true, + }, + { + name: "with reserved users", + existingDBs: []string{"_vt", "mysql", "vt_test", "performance_schema"}, + expectedDropDBs: []string{"_vt", "vt_test"}, + existingUsers: []userRecord{ + {"mysql.sys", "localhost"}, + {"mysql.infoschema", "localhost"}, + {"mysql.session", "localhost"}, + {"test", "localhost"}, + {"app", "10.0.0.1"}, + }, + expectedDropUsers: []string{"'test'@'localhost'", "'app'@'10.0.0.1'"}, + shouldDeleteUsers: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fakedb := fakesqldb.New(t) + defer fakedb.Close() + mysql := NewFakeMysqlDaemon(fakedb) + defer mysql.Close() + + databases := [][]sqltypes.Value{} + for _, db := range tt.existingDBs { + databases = append(databases, []sqltypes.Value{sqltypes.NewVarChar(db)}) + } + + users := [][]sqltypes.Value{} + for _, record := range tt.existingUsers { + users = append(users, []sqltypes.Value{sqltypes.NewVarChar(record.user), sqltypes.NewVarChar(record.host)}) + } + + mysql.FetchSuperQueryMap = map[string]*sqltypes.Result{ + "SHOW DATABASES": {Rows: databases}, + "SELECT user()": {Rows: [][]sqltypes.Value{{sqltypes.NewVarChar(tt.currentUser)}}}, + "SELECT user, host FROM mysql.user": {Rows: users}, + } + + for _, drop := range tt.expectedDropDBs { + mysql.ExpectedExecuteSuperQueryList = append(mysql.ExpectedExecuteSuperQueryList, + fmt.Sprintf("DROP DATABASE IF EXISTS `%s`", drop), + ) + } + + if tt.shouldDeleteUsers { + for _, drop := range tt.expectedDropUsers { + mysql.ExpectedExecuteSuperQueryList = append(mysql.ExpectedExecuteSuperQueryList, + fmt.Sprintf("DROP USER %s", drop), + ) + } + } + + params := RestoreParams{ + Mysqld: mysql, + Logger: logutil.NewMemoryLogger(), + } + + err := cleanupMySQL(context.Background(), params, tt.shouldDeleteUsers) + require.NoError(t, err) + + require.Equal(t, len(tt.expectedDropDBs)+len(tt.expectedDropUsers), mysql.ExpectedExecuteSuperQueryCurrent, + "unexpected number of queries executed") + }) + } + +} + +// this is a helper to write files in a temporary directory +func generateTestFile(t *testing.T, name, contents string) { + f, err := os.OpenFile(name, os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0700) + require.NoError(t, err) + defer f.Close() + _, err = f.WriteString(contents) + require.NoError(t, err) + require.NoError(t, f.Close()) +} + +// This tests if we are properly releasing the global read lock we acquire +// during ExecuteBackup(), even if the backup didn't succeed. +func TestMySQLShellBackupEngine_ExecuteBackup_ReleaseLock(t *testing.T) { + originalLocation := mysqlShellBackupLocation + originalBinary := mysqlShellBackupBinaryName + mysqlShellBackupLocation = "logical" + mysqlShellBackupBinaryName = path.Join(t.TempDir(), "test.sh") + + defer func() { // restore the original values. + mysqlShellBackupLocation = originalLocation + mysqlShellBackupBinaryName = originalBinary + }() + + logger := logutil.NewMemoryLogger() + fakedb := fakesqldb.New(t) + defer fakedb.Close() + mysql := NewFakeMysqlDaemon(fakedb) + defer mysql.Close() + + be := &MySQLShellBackupEngine{} + params := BackupParams{ + TabletAlias: "test", + Logger: logger, + Mysqld: mysql, + } + bs := FakeBackupStorage{ + StartBackupReturn: FakeBackupStorageStartBackupReturn{}, + } + + t.Run("lock released if we see the mysqlsh lock being acquired", func(t *testing.T) { + logger.Clear() + manifestBuffer := ioutil.NewBytesBufferWriter() + bs.StartBackupReturn.BackupHandle = &FakeBackupHandle{ + Dir: t.TempDir(), + AddFileReturn: FakeBackupHandleAddFileReturn{WriteCloser: manifestBuffer}, + } + + // this simulates mysql shell completing without any issues. + generateTestFile(t, mysqlShellBackupBinaryName, fmt.Sprintf("#!/bin/bash\n>&2 echo %s", mysqlShellLockMessage)) + + bh, err := bs.StartBackup(context.Background(), t.TempDir(), t.Name()) + require.NoError(t, err) + + _, err = be.ExecuteBackup(context.Background(), params, bh) + require.NoError(t, err) + require.False(t, mysql.GlobalReadLock) // lock must be released. + + // check the manifest is valid. + var manifest MySQLShellBackupManifest + err = json.Unmarshal(manifestBuffer.Bytes(), &manifest) + require.NoError(t, err) + + require.Equal(t, mysqlShellBackupEngineName, manifest.BackupMethod) + + // did we notice the lock was release and did we release it ours as well? + require.Contains(t, logger.String(), "global read lock released after", + "failed to release the global lock after mysqlsh") + }) + + t.Run("lock released if when we don't see mysqlsh released it", func(t *testing.T) { + mysql.GlobalReadLock = false // clear lock status. + logger.Clear() + manifestBuffer := ioutil.NewBytesBufferWriter() + bs.StartBackupReturn.BackupHandle = &FakeBackupHandle{ + Dir: t.TempDir(), + AddFileReturn: FakeBackupHandleAddFileReturn{WriteCloser: manifestBuffer}, + } + + // this simulates mysqlshell completing, but we don't see the message that is released its lock. + generateTestFile(t, mysqlShellBackupBinaryName, "#!/bin/bash\nexit 0") + + bh, err := bs.StartBackup(context.Background(), t.TempDir(), t.Name()) + require.NoError(t, err) + + // in this case the backup was successful, but even if we didn't see mysqlsh release its lock + // we make sure it is released at the end. + _, err = be.ExecuteBackup(context.Background(), params, bh) + require.NoError(t, err) + require.False(t, mysql.GlobalReadLock) // lock must be released. + + // make sure we are at least logging the lock wasn't able to be released earlier. + require.Contains(t, logger.String(), "could not release global lock earlier", + "failed to log error message when unable to release lock during backup") + }) + + t.Run("lock released when backup fails", func(t *testing.T) { + mysql.GlobalReadLock = false // clear lock status. + logger.Clear() + manifestBuffer := ioutil.NewBytesBufferWriter() + bs.StartBackupReturn.BackupHandle = &FakeBackupHandle{ + Dir: t.TempDir(), + AddFileReturn: FakeBackupHandleAddFileReturn{WriteCloser: manifestBuffer}, + } + + // this simulates the backup process failing. + generateTestFile(t, mysqlShellBackupBinaryName, "#!/bin/bash\nexit 1") + + bh, err := bs.StartBackup(context.Background(), t.TempDir(), t.Name()) + require.NoError(t, err) + + _, err = be.ExecuteBackup(context.Background(), params, bh) + require.ErrorContains(t, err, "mysqlshell failed") + require.False(t, mysql.GlobalReadLock) // lock must be released. + }) + +} + +func Test_sliceContains(t *testing.T) { + tests := []struct { + slice []any + value any + want bool + }{ + { + []any{"apple", "banana", "cherry"}, + "apple", + true, + }, + { + []any{"apple", "banana", "cherry"}, + "banana", + true, + }, + { + []any{"apple", "banana", "cherry"}, + "cherry", + true, + }, + { + []any{"apple", "banana", "cherry"}, + "dragonfruit", + false, + }, + } + + for i, tt := range tests { + t.Run(fmt.Sprintf("#%d", i), func(t *testing.T) { + assert.Equal(t, tt.want, sliceContains(tt.slice, tt.value)) + }) + } +} diff --git a/go/vt/mysqlctl/query.go b/go/vt/mysqlctl/query.go index 311828b4535..69799e7c717 100644 --- a/go/vt/mysqlctl/query.go +++ b/go/vt/mysqlctl/query.go @@ -17,6 +17,7 @@ limitations under the License. package mysqlctl import ( + "errors" "fmt" "strings" "time" @@ -228,6 +229,42 @@ func (mysqld *Mysqld) fetchStatuses(ctx context.Context, pattern string) (map[st return varMap, nil } +// ExecuteSuperQuery allows the user to execute a query as a super user. +func (mysqld *Mysqld) AcquireGlobalReadLock(ctx context.Context) error { + if mysqld.lockConn != nil { + return errors.New("lock already acquired") + } + + conn, err := getPoolReconnect(ctx, mysqld.dbaPool) + if err != nil { + return err + } + + err = mysqld.executeSuperQueryListConn(ctx, conn, []string{"FLUSH TABLES WITH READ LOCK"}) + if err != nil { + conn.Recycle() + return err + } + + mysqld.lockConn = conn + return nil +} + +func (mysqld *Mysqld) ReleaseGlobalReadLock(ctx context.Context) error { + if mysqld.lockConn == nil { + return errors.New("no read locks acquired yet") + } + + err := mysqld.executeSuperQueryListConn(ctx, mysqld.lockConn, []string{"UNLOCK TABLES"}) + if err != nil { + return err + } + + mysqld.lockConn.Recycle() + mysqld.lockConn = nil + return nil +} + const ( masterPasswordStart = " MASTER_PASSWORD = '" masterPasswordEnd = "',\n" diff --git a/go/vt/mysqlctl/replication.go b/go/vt/mysqlctl/replication.go index 3c866019c63..28970e1362d 100644 --- a/go/vt/mysqlctl/replication.go +++ b/go/vt/mysqlctl/replication.go @@ -38,6 +38,8 @@ import ( "vitess.io/vitess/go/vt/log" ) +type ResetSuperReadOnlyFunc func() error + // WaitForReplicationStart waits until the deadline for replication to start. // This validates the current primary is correct and can be connected to. func WaitForReplicationStart(mysqld MysqlDaemon, replicaStartDeadline int) error { @@ -231,6 +233,23 @@ func (mysqld *Mysqld) IsReadOnly() (bool, error) { return false, nil } +// IsSuperReadOnly return true if the instance is super read only +func (mysqld *Mysqld) IsSuperReadOnly(ctx context.Context) (bool, error) { + qr, err := mysqld.FetchSuperQuery(ctx, "SELECT @@global.super_read_only") + if err != nil { + return false, err + } + + if len(qr.Rows) == 1 { + sro := qr.Rows[0][0].ToString() + if sro == "1" || sro == "ON" { + return true, nil + } + } + + return false, nil +} + // SetReadOnly set/unset the read_only flag func (mysqld *Mysqld) SetReadOnly(on bool) error { query := "SET GLOBAL read_only = " @@ -242,15 +261,52 @@ func (mysqld *Mysqld) SetReadOnly(on bool) error { return mysqld.ExecuteSuperQuery(context.TODO(), query) } -// SetSuperReadOnly set/unset the super_read_only flag -func (mysqld *Mysqld) SetSuperReadOnly(on bool) error { +// SetSuperReadOnly set/unset the super_read_only flag. +// Returns a function which is called to set super_read_only back to its original value. +func (mysqld *Mysqld) SetSuperReadOnly(ctx context.Context, on bool) (ResetSuperReadOnlyFunc, error) { + // return function for switching `OFF` super_read_only + var resetFunc ResetSuperReadOnlyFunc + var disableFunc = func() error { + query := "SET GLOBAL super_read_only = 'OFF'" + err := mysqld.ExecuteSuperQuery(context.Background(), query) + return err + } + + // return function for switching `ON` super_read_only. + var enableFunc = func() error { + query := "SET GLOBAL super_read_only = 'ON'" + err := mysqld.ExecuteSuperQuery(context.Background(), query) + return err + } + + superReadOnlyEnabled, err := mysqld.IsSuperReadOnly(ctx) + if err != nil { + return nil, err + } + + // If non-idempotent then set the right call-back. + // We are asked to turn on super_read_only but original value is false, + // therefore return disableFunc, that can be used as defer by caller. + if on && !superReadOnlyEnabled { + resetFunc = disableFunc + } + // We are asked to turn off super_read_only but original value is true, + // therefore return enableFunc, that can be used as defer by caller. + if !on && superReadOnlyEnabled { + resetFunc = enableFunc + } + query := "SET GLOBAL super_read_only = " if on { - query += "ON" + query += "'ON'" } else { - query += "OFF" + query += "'OFF'" } - return mysqld.ExecuteSuperQuery(context.TODO(), query) + if err := mysqld.ExecuteSuperQuery(context.Background(), query); err != nil { + return nil, err + } + + return resetFunc, nil } // WaitSourcePos lets replicas wait to given replication position diff --git a/go/vt/mysqlctl/xtrabackupengine.go b/go/vt/mysqlctl/xtrabackupengine.go index e895ca4d96b..6fc5cf05c2a 100644 --- a/go/vt/mysqlctl/xtrabackupengine.go +++ b/go/vt/mysqlctl/xtrabackupengine.go @@ -729,23 +729,6 @@ func findReplicationPosition(input, flavor string, logger logutil.Logger) (mysql return replicationPosition, nil } -// scanLinesToLogger scans full lines from the given Reader and sends them to -// the given Logger until EOF. -func scanLinesToLogger(prefix string, reader io.Reader, logger logutil.Logger, doneFunc func()) { - defer doneFunc() - - scanner := bufio.NewScanner(reader) - for scanner.Scan() { - line := scanner.Text() - logger.Infof("%s: %s", prefix, line) - } - if err := scanner.Err(); err != nil { - // This is usually run in a background goroutine, so there's no point - // returning an error. Just log it. - logger.Warningf("error scanning lines from %s: %v", prefix, err) - } -} - func stripeFileName(baseFileName string, index int) string { return fmt.Sprintf("%s-%03d", baseFileName, index) } @@ -909,6 +892,11 @@ func (be *XtrabackupEngine) ShouldDrainForBackup() bool { return false } +// ShouldStartMySQLAfterRestore signifies if this backup engine needs to restart MySQL once the restore is completed. +func (be *XtrabackupEngine) ShouldStartMySQLAfterRestore() bool { + return true +} + func init() { BackupRestoreEngineMap[xtrabackupEngineName] = &XtrabackupEngine{} } diff --git a/go/vt/vttablet/tabletmanager/rpc_query_test.go b/go/vt/vttablet/tabletmanager/rpc_query_test.go index f6167e24917..87a64b2d8b7 100644 --- a/go/vt/vttablet/tabletmanager/rpc_query_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_query_test.go @@ -27,7 +27,7 @@ import ( "vitess.io/vitess/go/mysql/fakesqldb" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/dbconfigs" - "vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon" + "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/vttablet/tabletservermock" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" @@ -38,7 +38,7 @@ func TestTabletManager_ExecuteFetchAsDba(t *testing.T) { cp := mysql.ConnParams{} db := fakesqldb.New(t) db.AddQueryPattern(".*", &sqltypes.Result{}) - daemon := fakemysqldaemon.NewFakeMysqlDaemon(db) + daemon := mysqlctl.NewFakeMysqlDaemon(db) dbName := " escap`e me " tm := &TabletManager{ diff --git a/go/vt/vttablet/tabletmanager/rpc_replication.go b/go/vt/vttablet/tabletmanager/rpc_replication.go index 2adde1a1bed..b6c54d6a526 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication.go @@ -357,7 +357,7 @@ func (tm *TabletManager) InitPrimary(ctx context.Context, semiSync bool) (string if setSuperReadOnly { // Setting super_read_only off so that we can run the DDL commands - if err := tm.MysqlDaemon.SetSuperReadOnly(false); err != nil { + if _, err := tm.MysqlDaemon.SetSuperReadOnly(ctx, false); err != nil { if strings.Contains(err.Error(), strconv.Itoa(mysql.ERUnknownSystemVariable)) { log.Warningf("server does not know about super_read_only, continuing anyway...") } else { @@ -561,7 +561,7 @@ func (tm *TabletManager) demotePrimary(ctx context.Context, revertPartialFailure // idempotent. if setSuperReadOnly { // Setting super_read_only also sets read_only - if err := tm.MysqlDaemon.SetSuperReadOnly(true); err != nil { + if _, err := tm.MysqlDaemon.SetSuperReadOnly(ctx, true); err != nil { if strings.Contains(err.Error(), strconv.Itoa(mysql.ERUnknownSystemVariable)) { log.Warningf("server does not know about super_read_only, continuing anyway...") } else { diff --git a/go/vt/vttablet/tabletmanager/rpc_replication_test.go b/go/vt/vttablet/tabletmanager/rpc_replication_test.go index 0aac8c971ec..d69a8414927 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication_test.go @@ -24,7 +24,7 @@ import ( "github.com/stretchr/testify/require" - "vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon" + "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/topo/memorytopo" ) @@ -46,7 +46,7 @@ func TestPromoteReplicaReplicationManagerSuccess(t *testing.T) { numTicksRan++ }) // Set the promotion lag to a second and then run PromoteReplica - tm.MysqlDaemon.(*fakemysqldaemon.FakeMysqlDaemon).PromoteLag = time.Second + tm.MysqlDaemon.(*mysqlctl.FakeMysqlDaemon).PromoteLag = time.Second _, err := tm.PromoteReplica(ctx, false) require.NoError(t, err) // At the end we expect the replication manager to be stopped. @@ -68,7 +68,7 @@ func TestPromoteReplicaReplicationManagerFailure(t *testing.T) { require.True(t, tm.replManager.ticks.Running()) // Set the promotion lag to a second and then run PromoteReplica - tm.MysqlDaemon.(*fakemysqldaemon.FakeMysqlDaemon).PromoteError = fmt.Errorf("promote error") + tm.MysqlDaemon.(*mysqlctl.FakeMysqlDaemon).PromoteError = fmt.Errorf("promote error") _, err := tm.PromoteReplica(ctx, false) require.Error(t, err) // At the end we expect the replication manager to be stopped. diff --git a/go/vt/vttablet/tabletmanager/tm_init_test.go b/go/vt/vttablet/tabletmanager/tm_init_test.go index 56ebbbd32e1..af93b6b73fd 100644 --- a/go/vt/vttablet/tabletmanager/tm_init_test.go +++ b/go/vt/vttablet/tabletmanager/tm_init_test.go @@ -33,7 +33,7 @@ import ( "vitess.io/vitess/go/test/utils" "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/logutil" - "vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon" + "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" @@ -379,7 +379,7 @@ func TestCheckPrimaryShip(t *testing.T) { tablet.Type = topodatapb.TabletType_REPLICA tablet.PrimaryTermStartTime = nil // Get the fakeMySQL and set it up to expect a set replication source command - fakeMysql := tm.MysqlDaemon.(*fakemysqldaemon.FakeMysqlDaemon) + fakeMysql := tm.MysqlDaemon.(*mysqlctl.FakeMysqlDaemon) fakeMysql.SetReplicationSourceInputs = append(fakeMysql.SetReplicationSourceInputs, fmt.Sprintf("%v:%v", otherTablet.MysqlHostname, otherTablet.MysqlPort)) fakeMysql.ExpectedExecuteSuperQueryList = []string{ "STOP SLAVE", @@ -638,7 +638,7 @@ func TestGetBuildTags(t *testing.T) { } } -func newTestMysqlDaemon(t *testing.T, port int32) *fakemysqldaemon.FakeMysqlDaemon { +func newTestMysqlDaemon(t *testing.T, port int32) *mysqlctl.FakeMysqlDaemon { t.Helper() db := fakesqldb.New(t) @@ -659,7 +659,7 @@ func newTestMysqlDaemon(t *testing.T, port int32) *fakemysqldaemon.FakeMysqlDaem db.AddQueryPattern("UPDATE _vt\\.(local|shard)_metadata SET db_name='.+' WHERE db_name=''", &sqltypes.Result{}) db.AddQueryPattern("INSERT INTO _vt\\.local_metadata \\(.+\\) VALUES \\(.+\\) ON DUPLICATE KEY UPDATE value ?= ?'.+'.*", &sqltypes.Result{}) - mysqld := fakemysqldaemon.NewFakeMysqlDaemon(db) + mysqld := mysqlctl.NewFakeMysqlDaemon(db) mysqld.MysqlPort = sync2.NewAtomicInt32(port) return mysqld diff --git a/go/vt/vttablet/tabletmanager/tm_state_test.go b/go/vt/vttablet/tabletmanager/tm_state_test.go index 48e2123554f..537580d4853 100644 --- a/go/vt/vttablet/tabletmanager/tm_state_test.go +++ b/go/vt/vttablet/tabletmanager/tm_state_test.go @@ -28,7 +28,7 @@ import ( "vitess.io/vitess/go/test/utils" "vitess.io/vitess/go/vt/key" - "vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon" + "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/faketopo" @@ -105,7 +105,7 @@ func TestStateDenyList(t *testing.T) { tm := newTestTM(t, ts, 1, "ks", "0") defer tm.Stop() - fmd := tm.MysqlDaemon.(*fakemysqldaemon.FakeMysqlDaemon) + fmd := tm.MysqlDaemon.(*mysqlctl.FakeMysqlDaemon) fmd.Schema = &tabletmanagerdatapb.SchemaDefinition{ TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ Name: "t1", diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller_test.go b/go/vt/vttablet/tabletmanager/vreplication/controller_test.go index ebea9b8225f..4886d595d10 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller_test.go @@ -23,12 +23,12 @@ import ( "testing" "time" + "vitess.io/vitess/go/vt/mysqlctl" querypb "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/vt/binlog/binlogplayer" - "vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon" "vitess.io/vitess/go/vt/mysqlctl/tmutils" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" @@ -87,7 +87,7 @@ func TestControllerKeyRange(t *testing.T) { dbClient.ExpectRequest("commit", nil, nil) dbClientFactory := func() binlogplayer.DBClient { return dbClient } - mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} + mysqld := &mysqlctl.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} vre := NewTestEngine(nil, wantTablet.GetAlias().Cell, mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil) ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "replica", nil, vre) @@ -124,7 +124,7 @@ func TestControllerTables(t *testing.T) { dbClient.ExpectRequest("commit", nil, nil) dbClientFactory := func() binlogplayer.DBClient { return dbClient } - mysqld := &fakemysqldaemon.FakeMysqlDaemon{ + mysqld := &mysqlctl.FakeMysqlDaemon{ MysqlPort: sync2.NewAtomicInt32(3306), Schema: &tabletmanagerdatapb.SchemaDefinition{ DatabaseSchema: "", @@ -219,7 +219,7 @@ func TestControllerOverrides(t *testing.T) { dbClient.ExpectRequest("commit", nil, nil) dbClientFactory := func() binlogplayer.DBClient { return dbClient } - mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} + mysqld := &mysqlctl.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} vre := NewTestEngine(nil, wantTablet.GetAlias().Cell, mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil) ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "rdonly", nil, vre) @@ -291,7 +291,7 @@ func TestControllerRetry(t *testing.T) { dbClient.ExpectRequestRE("update _vt.vreplication set pos='MariaDB/0-1-1235', time_updated=.*", testDMLResponse, nil) dbClient.ExpectRequest("commit", nil, nil) dbClientFactory := func() binlogplayer.DBClient { return dbClient } - mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} + mysqld := &mysqlctl.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} vre := NewTestEngine(nil, env.Cells[0], mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil) ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "rdonly", nil, vre) @@ -350,7 +350,7 @@ func TestControllerStopPosition(t *testing.T) { dbClient.ExpectRequest("update _vt.vreplication set state='Stopped', message='Reached stopping position, done playing logs' where id=1", testDMLResponse, nil) dbClientFactory := func() binlogplayer.DBClient { return dbClient } - mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} + mysqld := &mysqlctl.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} vre := NewTestEngine(nil, wantTablet.GetAlias().Cell, mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil) ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "replica", nil, vre) diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine_test.go b/go/vt/vttablet/tabletmanager/vreplication/engine_test.go index 4c57f21dca1..0b61d732ce9 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/engine_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/engine_test.go @@ -32,7 +32,7 @@ import ( "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/vt/binlog/binlogplayer" - "vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon" + "vitess.io/vitess/go/vt/mysqlctl" ) func TestEngineOpen(t *testing.T) { @@ -42,7 +42,7 @@ func TestEngineOpen(t *testing.T) { resetBinlogClient() dbClient := binlogplayer.NewMockDBClient(t) dbClientFactory := func() binlogplayer.DBClient { return dbClient } - mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} + mysqld := &mysqlctl.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} vre := NewTestEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil) require.False(t, vre.IsOpen()) @@ -82,7 +82,7 @@ func TestEngineOpenRetry(t *testing.T) { resetBinlogClient() dbClient := binlogplayer.NewMockDBClient(t) dbClientFactory := func() binlogplayer.DBClient { return dbClient } - mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} + mysqld := &mysqlctl.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} vre := NewTestEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil) @@ -143,7 +143,7 @@ func TestEngineExec(t *testing.T) { resetBinlogClient() dbClient := binlogplayer.NewMockDBClient(t) dbClientFactory := func() binlogplayer.DBClient { return dbClient } - mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} + mysqld := &mysqlctl.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} // Test Insert @@ -305,7 +305,7 @@ func TestEngineBadInsert(t *testing.T) { dbClient := binlogplayer.NewMockDBClient(t) dbClientFactory := func() binlogplayer.DBClient { return dbClient } - mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} + mysqld := &mysqlctl.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} vre := NewTestEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil) @@ -333,7 +333,7 @@ func TestEngineSelect(t *testing.T) { dbClient := binlogplayer.NewMockDBClient(t) dbClientFactory := func() binlogplayer.DBClient { return dbClient } - mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} + mysqld := &mysqlctl.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} vre := NewTestEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil) @@ -366,7 +366,7 @@ func TestWaitForPos(t *testing.T) { waitRetryTime = 10 * time.Millisecond dbClient := binlogplayer.NewMockDBClient(t) - mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} + mysqld := &mysqlctl.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} dbClientFactory := func() binlogplayer.DBClient { return dbClient } vre := NewTestEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil) @@ -394,7 +394,7 @@ func TestWaitForPos(t *testing.T) { func TestWaitForPosError(t *testing.T) { dbClient := binlogplayer.NewMockDBClient(t) - mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} + mysqld := &mysqlctl.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} dbClientFactory := func() binlogplayer.DBClient { return dbClient } vre := NewTestEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil) @@ -430,7 +430,7 @@ func TestWaitForPosError(t *testing.T) { func TestWaitForPosCancel(t *testing.T) { dbClient := binlogplayer.NewMockDBClient(t) - mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} + mysqld := &mysqlctl.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} dbClientFactory := func() binlogplayer.DBClient { return dbClient } vre := NewTestEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil) @@ -474,7 +474,7 @@ func TestCreateDBAndTable(t *testing.T) { resetBinlogClient() dbClient := binlogplayer.NewMockDBClient(t) dbClientFactory := func() binlogplayer.DBClient { return dbClient } - mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} + mysqld := &mysqlctl.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} // Test Insert @@ -570,7 +570,7 @@ func TestGetDBClient(t *testing.T) { dbClientFactoryDba := func() binlogplayer.DBClient { return dbClientDba } dbClientFactoryFiltered := func() binlogplayer.DBClient { return dbClientFiltered } - mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} + mysqld := &mysqlctl.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} vre := NewTestEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactoryFiltered, dbClientFactoryDba, dbClientDba.DBName(), nil) shouldBeDbaClient := vre.getDBClient(true /*runAsAdmin*/) diff --git a/go/vt/vttablet/tabletmanager/vreplication/fuzz.go b/go/vt/vttablet/tabletmanager/vreplication/fuzz.go index 55c1e743dad..0fcfcce9660 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/fuzz.go +++ b/go/vt/vttablet/tabletmanager/vreplication/fuzz.go @@ -25,7 +25,7 @@ import ( "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/vt/binlog/binlogplayer" - "vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon" + "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/topo/memorytopo" fuzz "github.com/AdaLogics/go-fuzz-headers" @@ -94,7 +94,7 @@ func FuzzEngine(data []byte) int { resetBinlogClient() dbClient := binlogplayer.NewMockDBClient(t) dbClientFactory := func() binlogplayer.DBClient { return dbClient } - mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} + mysqld := &mysqlctl.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} vre := NewTestEngine(topoServer, "cell1", mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil) diff --git a/go/vt/vttablet/tabletserver/repltracker/poller_test.go b/go/vt/vttablet/tabletserver/repltracker/poller_test.go index 3dc27c771ca..e0734118160 100644 --- a/go/vt/vttablet/tabletserver/repltracker/poller_test.go +++ b/go/vt/vttablet/tabletserver/repltracker/poller_test.go @@ -23,12 +23,12 @@ import ( "github.com/stretchr/testify/assert" - "vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon" + "vitess.io/vitess/go/vt/mysqlctl" ) func TestPoller(t *testing.T) { poller := &poller{} - mysqld := fakemysqldaemon.NewFakeMysqlDaemon(nil) + mysqld := mysqlctl.NewFakeMysqlDaemon(nil) poller.InitDBConfig(mysqld) mysqld.ReplicationStatusError = errors.New("err") diff --git a/go/vt/vttablet/tabletserver/repltracker/repltracker_test.go b/go/vt/vttablet/tabletserver/repltracker/repltracker_test.go index 0695a079b82..33fe1a39146 100644 --- a/go/vt/vttablet/tabletserver/repltracker/repltracker_test.go +++ b/go/vt/vttablet/tabletserver/repltracker/repltracker_test.go @@ -25,7 +25,7 @@ import ( "vitess.io/vitess/go/mysql/fakesqldb" "vitess.io/vitess/go/vt/dbconfigs" - "vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon" + "vitess.io/vitess/go/vt/mysqlctl" querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" @@ -47,7 +47,7 @@ func TestReplTracker(t *testing.T) { Uid: 1, } target := &querypb.Target{} - mysqld := fakemysqldaemon.NewFakeMysqlDaemon(nil) + mysqld := mysqlctl.NewFakeMysqlDaemon(nil) rt := NewReplTracker(env, alias) rt.InitDBConfig(target, mysqld) @@ -143,7 +143,7 @@ func TestStatusHeartbeatFallBack(t *testing.T) { Cell: "cell", Uid: 1, } - mysqld := fakemysqldaemon.NewFakeMysqlDaemon(nil) + mysqld := mysqlctl.NewFakeMysqlDaemon(nil) mysqld.ReplicationLagSeconds = theCase.mysqldLag mysqld.Replicating = true mysqld.ReplicationStatusError = theCase.mysqldErr diff --git a/go/vt/wrangler/doc_test.md b/go/vt/wrangler/doc_test.md index 4fd445581da..c84a3720225 100644 --- a/go/vt/wrangler/doc_test.md +++ b/go/vt/wrangler/doc_test.md @@ -43,7 +43,7 @@ test the workflow state machine. There is no actual data being vreplicated. #### The fake MySQLDaemon -`go/vt/mysqlctl/fakemysqldaemon/fakemysqldaemon.go` +`go/vt/mysqlctl/fakemysqldaemon.go` Used to set primary positions to provide/validate gtids. diff --git a/go/vt/wrangler/fake_tablet_test.go b/go/vt/wrangler/fake_tablet_test.go index 254e1813d8d..37aeeed6fa3 100644 --- a/go/vt/wrangler/fake_tablet_test.go +++ b/go/vt/wrangler/fake_tablet_test.go @@ -29,7 +29,7 @@ import ( "vitess.io/vitess/go/mysql/fakesqldb" "vitess.io/vitess/go/netutil" "vitess.io/vitess/go/vt/dbconfigs" - "vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon" + "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vttablet/grpctmserver" "vitess.io/vitess/go/vt/vttablet/queryservice" @@ -75,7 +75,7 @@ type fakeTablet struct { // We also create the RPCServer, so users can register more services // before calling StartActionLoop(). Tablet *topodatapb.Tablet - FakeMysqlDaemon *fakemysqldaemon.FakeMysqlDaemon + FakeMysqlDaemon *mysqlctl.FakeMysqlDaemon RPCServer *grpc.Server // The following fields are created when we start the event loop for @@ -144,7 +144,7 @@ func newFakeTablet(t *testing.T, wr *Wrangler, cell string, uid uint32, tabletTy } // create a FakeMysqlDaemon with the right information by default - fakeMysqlDaemon := fakemysqldaemon.NewFakeMysqlDaemon(db) + fakeMysqlDaemon := mysqlctl.NewFakeMysqlDaemon(db) fakeMysqlDaemon.MysqlPort.Set(mysqlPort) return &fakeTablet{ diff --git a/go/vt/wrangler/testlib/fake_tablet.go b/go/vt/wrangler/testlib/fake_tablet.go index 99c43015d3e..5c85e37d43a 100644 --- a/go/vt/wrangler/testlib/fake_tablet.go +++ b/go/vt/wrangler/testlib/fake_tablet.go @@ -34,7 +34,7 @@ import ( "vitess.io/vitess/go/netutil" "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/dbconfigs" - "vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon" + "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vttablet/grpctmserver" @@ -69,7 +69,7 @@ type FakeTablet struct { // We also create the RPCServer, so users can register more services // before calling StartActionLoop(). Tablet *topodatapb.Tablet - FakeMysqlDaemon *fakemysqldaemon.FakeMysqlDaemon + FakeMysqlDaemon *mysqlctl.FakeMysqlDaemon RPCServer *grpc.Server // The following fields are created when we start the event loop for @@ -159,7 +159,7 @@ func NewFakeTablet(t *testing.T, wr *wrangler.Wrangler, cell string, uid uint32, } // create a FakeMysqlDaemon with the right information by default - fakeMysqlDaemon := fakemysqldaemon.NewFakeMysqlDaemon(db) + fakeMysqlDaemon := mysqlctl.NewFakeMysqlDaemon(db) fakeMysqlDaemon.MysqlPort.Set(mysqlPort) return &FakeTablet{