From 31797fff22c92bbcdb02f9c70d6950988614732a Mon Sep 17 00:00:00 2001 From: Renan Rangel Date: Fri, 18 Oct 2024 05:42:45 -0700 Subject: [PATCH 1/3] fix releasing the global read lock when mysqlshell backup fails Signed-off-by: Renan Rangel --- go/ioutil/writer.go | 14 ++ go/vt/mysqlctl/fakemysqldaemon.go | 17 ++- go/vt/mysqlctl/mysqlshellbackupengine.go | 14 +- go/vt/mysqlctl/mysqlshellbackupengine_test.go | 134 ++++++++++++++++++ 4 files changed, 175 insertions(+), 4 deletions(-) diff --git a/go/ioutil/writer.go b/go/ioutil/writer.go index 4aac07ba501..6cfe38f7ea8 100644 --- a/go/ioutil/writer.go +++ b/go/ioutil/writer.go @@ -22,6 +22,7 @@ wrappers around WriteCloser and Writer. package ioutil import ( + "bytes" "io" "time" ) @@ -87,3 +88,16 @@ func NewMeteredWriter(tw io.Writer, fns ...func(int, time.Duration)) MeteredWrit func (tw *meteredWriter) Write(p []byte) (int, error) { return tw.meter.measure(tw.Writer.Write, p) } + +// MemoryBuffer implements io.WriteCloser using an in-memory buffer. +type MemoryBuffer struct { + *bytes.Buffer +} + +func (m MemoryBuffer) Close() error { + return nil +} + +func NewMemoryBuffer() MemoryBuffer { + return MemoryBuffer{bytes.NewBuffer(nil)} +} diff --git a/go/vt/mysqlctl/fakemysqldaemon.go b/go/vt/mysqlctl/fakemysqldaemon.go index bc14823ddcd..ef62f0ccb85 100644 --- a/go/vt/mysqlctl/fakemysqldaemon.go +++ b/go/vt/mysqlctl/fakemysqldaemon.go @@ -185,6 +185,9 @@ type FakeMysqlDaemon struct { // SemiSyncReplicaEnabled represents the state of rpl_semi_sync_replica_enabled. SemiSyncReplicaEnabled bool + // GlobalReadLock is used to test if a lock has been acquired already or not + GlobalReadLock bool + // TimeoutHook is a func that can be called at the beginning of // any method to fake a timeout. // All a test needs to do is make it { return context.DeadlineExceeded }. @@ -772,10 +775,20 @@ func (fmd *FakeMysqlDaemon) HostMetrics(ctx context.Context, cnf *Mycnf) (*mysql // AcquireGlobalReadLock is part of the MysqlDaemon interface. func (fmd *FakeMysqlDaemon) AcquireGlobalReadLock(ctx context.Context) error { - return errors.New("not implemented") + if fmd.GlobalReadLock { + return errors.New("lock already acquired") + } + + fmd.GlobalReadLock = true + return nil } // ReleaseGlobalReadLock is part of the MysqlDaemon interface. func (fmd *FakeMysqlDaemon) ReleaseGlobalReadLock(ctx context.Context) error { - return errors.New("not implemented") + if fmd.GlobalReadLock { + fmd.GlobalReadLock = false + return nil + } + + return errors.New("no read locks acquired yet") } diff --git a/go/vt/mysqlctl/mysqlshellbackupengine.go b/go/vt/mysqlctl/mysqlshellbackupengine.go index 0c41056b621..65950aced5a 100644 --- a/go/vt/mysqlctl/mysqlshellbackupengine.go +++ b/go/vt/mysqlctl/mysqlshellbackupengine.go @@ -56,6 +56,8 @@ var ( // disable redo logging and double write buffer mysqlShellSpeedUpRestore = false + mysqlShellBackupBinaryName = "mysqlsh" + // use when checking if we need to create the directory on the local filesystem or not. knownObjectStoreParams = []string{"s3BucketName", "osBucketName", "azureContainerName"} @@ -107,8 +109,8 @@ type MySQLShellBackupEngine struct { } const ( - mysqlShellBackupBinaryName = "mysqlsh" mysqlShellBackupEngineName = "mysqlshell" + mysqlShellLockMessage = "Global read lock has been released" ) func (be *MySQLShellBackupEngine) ExecuteBackup(ctx context.Context, params BackupParams, bh backupstorage.BackupHandle) (result BackupResult, finalErr error) { @@ -152,6 +154,10 @@ func (be *MySQLShellBackupEngine) ExecuteBackup(ctx context.Context, params Back } lockAcquired := time.Now() // we will report how long we hold the lock for + // we need to release the global read lock in case the backup fails to start and + // the lock wasn't released by releaseReadLock() yet + defer func() { _ = params.Mysqld.ReleaseGlobalReadLock(ctx) }() + posBeforeBackup, err := params.Mysqld.PrimaryPosition(ctx) if err != nil { return BackupUnusable, vterrors.Wrap(err, "failed to fetch position") @@ -503,7 +509,7 @@ func releaseReadLock(ctx context.Context, reader io.Reader, params BackupParams, if !released { - if !strings.Contains(line, "Global read lock has been released") { + if !strings.Contains(line, mysqlShellLockMessage) { continue } released = true @@ -521,6 +527,10 @@ func releaseReadLock(ctx context.Context, reader io.Reader, params BackupParams, if err := scanner.Err(); err != nil { params.Logger.Errorf("error reading from reader: %v", err) } + + if !released { + params.Logger.Errorf("could not release global lock earlier") + } } func cleanupMySQL(ctx context.Context, params RestoreParams, shouldDeleteUsers bool) error { diff --git a/go/vt/mysqlctl/mysqlshellbackupengine_test.go b/go/vt/mysqlctl/mysqlshellbackupengine_test.go index a0d0c8d7a1d..6960f44ad94 100644 --- a/go/vt/mysqlctl/mysqlshellbackupengine_test.go +++ b/go/vt/mysqlctl/mysqlshellbackupengine_test.go @@ -18,13 +18,17 @@ package mysqlctl import ( "context" + "encoding/json" "fmt" + "os" "path" + "strings" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/ioutil" "vitess.io/vitess/go/mysql/fakesqldb" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/logutil" @@ -307,3 +311,133 @@ func TestCleanupMySQL(t *testing.T) { } } + +// this is a helper to write files in a temporary directory +func generateTestFile(t *testing.T, name, contents string) { + f, err := os.OpenFile(name, os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0700) + require.NoError(t, err) + defer f.Close() + _, err = f.WriteString(contents) + require.NoError(t, err) + require.NoError(t, f.Close()) +} + +// This tests if we are properly releasing the global read lock we acquire +// during ExecuteBackup(), even if the backup didn't succeed. +func TestMySQLShellBackupEngine_ExecuteBackup_ReleaseLock(t *testing.T) { + originalLocation := mysqlShellBackupLocation + originalBinary := mysqlShellBackupBinaryName + mysqlShellBackupLocation = "logical" + mysqlShellBackupBinaryName = path.Join(t.TempDir(), "test.sh") + + defer func() { // restore the original values. + mysqlShellBackupLocation = originalLocation + mysqlShellBackupBinaryName = originalBinary + }() + + fakedb := fakesqldb.New(t) + defer fakedb.Close() + mysql := NewFakeMysqlDaemon(fakedb) + defer mysql.Close() + + be := &MySQLShellBackupEngine{} + params := BackupParams{ + TabletAlias: "test", + Mysqld: mysql, + } + bs := FakeBackupStorage{ + StartBackupReturn: FakeBackupStorageStartBackupReturn{}, + } + + t.Run("lock released if we see the mysqlsh lock being acquired", func(t *testing.T) { + logger := logutil.NewMemoryLogger() + params.Logger = logger + manifestBuffer := ioutil.NewMemoryBuffer() + bs.StartBackupReturn.BackupHandle = &FakeBackupHandle{ + Dir: t.TempDir(), + AddFileReturn: FakeBackupHandleAddFileReturn{WriteCloser: manifestBuffer}, + } + + // this simulates mysql shell completing without any issues. + generateTestFile(t, mysqlShellBackupBinaryName, fmt.Sprintf("#!/bin/bash\n>&2 echo %s", mysqlShellLockMessage)) + + bh, err := bs.StartBackup(context.Background(), t.TempDir(), t.Name()) + require.NoError(t, err) + + _, err = be.ExecuteBackup(context.Background(), params, bh) + require.NoError(t, err) + require.False(t, mysql.GlobalReadLock) // lock must be released. + + // check the manifest is valid. + var manifest MySQLShellBackupManifest + err = json.Unmarshal(manifestBuffer.Bytes(), &manifest) + require.NoError(t, err) + + require.Equal(t, mysqlShellBackupEngineName, manifest.BackupMethod) + + // did we notice the lock was release and did we release it ours as well? + errorLogged := false + for _, event := range logger.Events { + if strings.Contains(event.Value, "global read lock released after") { + errorLogged = true + } + } + + assert.True(t, errorLogged, "failed to release the global lock after mysqlsh") + }) + + t.Run("lock released if when we don't see mysqlsh released it", func(t *testing.T) { + mysql.GlobalReadLock = false // clear lock status. + logger := logutil.NewMemoryLogger() + params.Logger = logger + manifestBuffer := ioutil.NewMemoryBuffer() + bs.StartBackupReturn.BackupHandle = &FakeBackupHandle{ + Dir: t.TempDir(), + AddFileReturn: FakeBackupHandleAddFileReturn{WriteCloser: manifestBuffer}, + } + + // this simulates mysqlshell completing, but we don't see the message that is released its lock. + generateTestFile(t, mysqlShellBackupBinaryName, "#!/bin/bash\nexit 0") + + bh, err := bs.StartBackup(context.Background(), t.TempDir(), t.Name()) + require.NoError(t, err) + + // in this case the backup was successful, but even if we didn't see mysqlsh release its lock + // we make sure it is released at the end. + _, err = be.ExecuteBackup(context.Background(), params, bh) + require.NoError(t, err) + require.False(t, mysql.GlobalReadLock) // lock must be released. + + // make sure we are at least logging the lock wasn't able to be released earlier. + errorLogged := false + for _, event := range logger.Events { + if strings.Contains(event.Value, "could not release global lock earlier") { + errorLogged = true + } + } + + assert.True(t, errorLogged, "failed to log error message when unable to release lock during backup") + }) + + t.Run("lock released when backup fails", func(t *testing.T) { + mysql.GlobalReadLock = false // clear lock status. + logger := logutil.NewMemoryLogger() + params.Logger = logger + manifestBuffer := ioutil.NewMemoryBuffer() + bs.StartBackupReturn.BackupHandle = &FakeBackupHandle{ + Dir: t.TempDir(), + AddFileReturn: FakeBackupHandleAddFileReturn{WriteCloser: manifestBuffer}, + } + + // this simulates the backup process failing. + generateTestFile(t, mysqlShellBackupBinaryName, "#!/bin/bash\nexit 1") + + bh, err := bs.StartBackup(context.Background(), t.TempDir(), t.Name()) + require.NoError(t, err) + + _, err = be.ExecuteBackup(context.Background(), params, bh) + require.ErrorContains(t, err, "mysqlshell failed") + require.False(t, mysql.GlobalReadLock) // lock must be released. + }) + +} From a221b2e37a9105f35bb1382481a192b2fd88a765 Mon Sep 17 00:00:00 2001 From: Renan Rangel Date: Fri, 18 Oct 2024 08:23:58 -0700 Subject: [PATCH 2/3] fix goroutine leak Signed-off-by: Renan Rangel --- go/vt/mysqlctl/mysqlshellbackupengine.go | 1 + 1 file changed, 1 insertion(+) diff --git a/go/vt/mysqlctl/mysqlshellbackupengine.go b/go/vt/mysqlctl/mysqlshellbackupengine.go index 65950aced5a..8c347a008f6 100644 --- a/go/vt/mysqlctl/mysqlshellbackupengine.go +++ b/go/vt/mysqlctl/mysqlshellbackupengine.go @@ -190,6 +190,7 @@ func (be *MySQLShellBackupEngine) ExecuteBackup(ctx context.Context, params Back // Get exit status. if err := cmd.Wait(); err != nil { + pipeWriter.Close() // make sure we close the writer so the goroutines above will complete. return BackupUnusable, vterrors.Wrap(err, mysqlShellBackupEngineName+" failed") } From cfcd5114c1ba235da48c587d538bd2488b2f9c75 Mon Sep 17 00:00:00 2001 From: Renan Rangel Date: Fri, 18 Oct 2024 10:36:44 -0700 Subject: [PATCH 3/3] PR feedback Signed-off-by: Renan Rangel --- go/ioutil/writer.go | 10 ++--- go/vt/mysqlctl/mysqlshellbackupengine.go | 5 ++- go/vt/mysqlctl/mysqlshellbackupengine_test.go | 38 ++++++------------- 3 files changed, 20 insertions(+), 33 deletions(-) diff --git a/go/ioutil/writer.go b/go/ioutil/writer.go index 6cfe38f7ea8..759ae1b7ab6 100644 --- a/go/ioutil/writer.go +++ b/go/ioutil/writer.go @@ -89,15 +89,15 @@ func (tw *meteredWriter) Write(p []byte) (int, error) { return tw.meter.measure(tw.Writer.Write, p) } -// MemoryBuffer implements io.WriteCloser using an in-memory buffer. -type MemoryBuffer struct { +// BytesBufferWriter implements io.WriteCloser using an in-memory buffer. +type BytesBufferWriter struct { *bytes.Buffer } -func (m MemoryBuffer) Close() error { +func (m BytesBufferWriter) Close() error { return nil } -func NewMemoryBuffer() MemoryBuffer { - return MemoryBuffer{bytes.NewBuffer(nil)} +func NewBytesBufferWriter() BytesBufferWriter { + return BytesBufferWriter{bytes.NewBuffer(nil)} } diff --git a/go/vt/mysqlctl/mysqlshellbackupengine.go b/go/vt/mysqlctl/mysqlshellbackupengine.go index 8c347a008f6..b7405ce7eaa 100644 --- a/go/vt/mysqlctl/mysqlshellbackupengine.go +++ b/go/vt/mysqlctl/mysqlshellbackupengine.go @@ -155,8 +155,9 @@ func (be *MySQLShellBackupEngine) ExecuteBackup(ctx context.Context, params Back lockAcquired := time.Now() // we will report how long we hold the lock for // we need to release the global read lock in case the backup fails to start and - // the lock wasn't released by releaseReadLock() yet - defer func() { _ = params.Mysqld.ReleaseGlobalReadLock(ctx) }() + // the lock wasn't released by releaseReadLock() yet. context might be expired, + // so we pass a new one. + defer func() { _ = params.Mysqld.ReleaseGlobalReadLock(context.Background()) }() posBeforeBackup, err := params.Mysqld.PrimaryPosition(ctx) if err != nil { diff --git a/go/vt/mysqlctl/mysqlshellbackupengine_test.go b/go/vt/mysqlctl/mysqlshellbackupengine_test.go index 6960f44ad94..67f27b5382e 100644 --- a/go/vt/mysqlctl/mysqlshellbackupengine_test.go +++ b/go/vt/mysqlctl/mysqlshellbackupengine_test.go @@ -22,7 +22,6 @@ import ( "fmt" "os" "path" - "strings" "testing" "github.com/stretchr/testify/assert" @@ -335,6 +334,7 @@ func TestMySQLShellBackupEngine_ExecuteBackup_ReleaseLock(t *testing.T) { mysqlShellBackupBinaryName = originalBinary }() + logger := logutil.NewMemoryLogger() fakedb := fakesqldb.New(t) defer fakedb.Close() mysql := NewFakeMysqlDaemon(fakedb) @@ -343,6 +343,7 @@ func TestMySQLShellBackupEngine_ExecuteBackup_ReleaseLock(t *testing.T) { be := &MySQLShellBackupEngine{} params := BackupParams{ TabletAlias: "test", + Logger: logger, Mysqld: mysql, } bs := FakeBackupStorage{ @@ -350,9 +351,8 @@ func TestMySQLShellBackupEngine_ExecuteBackup_ReleaseLock(t *testing.T) { } t.Run("lock released if we see the mysqlsh lock being acquired", func(t *testing.T) { - logger := logutil.NewMemoryLogger() - params.Logger = logger - manifestBuffer := ioutil.NewMemoryBuffer() + logger.Clear() + manifestBuffer := ioutil.NewBytesBufferWriter() bs.StartBackupReturn.BackupHandle = &FakeBackupHandle{ Dir: t.TempDir(), AddFileReturn: FakeBackupHandleAddFileReturn{WriteCloser: manifestBuffer}, @@ -376,21 +376,14 @@ func TestMySQLShellBackupEngine_ExecuteBackup_ReleaseLock(t *testing.T) { require.Equal(t, mysqlShellBackupEngineName, manifest.BackupMethod) // did we notice the lock was release and did we release it ours as well? - errorLogged := false - for _, event := range logger.Events { - if strings.Contains(event.Value, "global read lock released after") { - errorLogged = true - } - } - - assert.True(t, errorLogged, "failed to release the global lock after mysqlsh") + require.Contains(t, logger.String(), "global read lock released after", + "failed to release the global lock after mysqlsh") }) t.Run("lock released if when we don't see mysqlsh released it", func(t *testing.T) { mysql.GlobalReadLock = false // clear lock status. - logger := logutil.NewMemoryLogger() - params.Logger = logger - manifestBuffer := ioutil.NewMemoryBuffer() + logger.Clear() + manifestBuffer := ioutil.NewBytesBufferWriter() bs.StartBackupReturn.BackupHandle = &FakeBackupHandle{ Dir: t.TempDir(), AddFileReturn: FakeBackupHandleAddFileReturn{WriteCloser: manifestBuffer}, @@ -409,21 +402,14 @@ func TestMySQLShellBackupEngine_ExecuteBackup_ReleaseLock(t *testing.T) { require.False(t, mysql.GlobalReadLock) // lock must be released. // make sure we are at least logging the lock wasn't able to be released earlier. - errorLogged := false - for _, event := range logger.Events { - if strings.Contains(event.Value, "could not release global lock earlier") { - errorLogged = true - } - } - - assert.True(t, errorLogged, "failed to log error message when unable to release lock during backup") + require.Contains(t, logger.String(), "could not release global lock earlier", + "failed to log error message when unable to release lock during backup") }) t.Run("lock released when backup fails", func(t *testing.T) { mysql.GlobalReadLock = false // clear lock status. - logger := logutil.NewMemoryLogger() - params.Logger = logger - manifestBuffer := ioutil.NewMemoryBuffer() + logger.Clear() + manifestBuffer := ioutil.NewBytesBufferWriter() bs.StartBackupReturn.BackupHandle = &FakeBackupHandle{ Dir: t.TempDir(), AddFileReturn: FakeBackupHandleAddFileReturn{WriteCloser: manifestBuffer},