diff --git a/replication/backup_test.go b/replication/backup_test.go index abefd3f8..a8ee4465 100644 --- a/replication/backup_test.go +++ b/replication/backup_test.go @@ -3,6 +3,8 @@ package replication import ( "context" "os" + "path" + "sync" "time" "github.com/stretchr/testify/require" @@ -32,18 +34,67 @@ func (t *testSyncerSuite) TestStartBackupEndInGivenTime() { done := make(chan bool) + // Start the backup in a goroutine go func() { err := t.b.StartBackup(binlogDir, mysql.Position{Name: "", Pos: uint32(0)}, timeout) require.NoError(t.T(), err) done <- true }() + + // Wait for the backup to complete or timeout failTimeout := 5 * timeout ctx, cancel := context.WithTimeout(context.Background(), failTimeout) defer cancel() select { case <-done: + // Backup completed; now verify the backup files + files, err := os.ReadDir(binlogDir) + require.NoError(t.T(), err) + require.NotEmpty(t.T(), files, "No binlog files were backed up") + + for _, file := range files { + fileInfo, err := os.Stat(path.Join(binlogDir, file.Name())) + require.NoError(t.T(), err) + require.NotZero(t.T(), fileInfo.Size(), "Binlog file %s is empty", file.Name()) + } + return case <-ctx.Done(): t.T().Fatal("time out error") } } + +type CountingEventHandler struct { + count int + mutex sync.Mutex +} + +func (h *CountingEventHandler) HandleEvent(e *BinlogEvent) error { + h.mutex.Lock() + h.count++ + h.mutex.Unlock() + return nil +} + +func (t *testSyncerSuite) TestBackupEventHandlerInvocation() { + t.setupTest(mysql.MySQLFlavor) + + // Define binlogDir and timeout + binlogDir := "./var" + os.RemoveAll(binlogDir) + timeout := 2 * time.Second + + // Set up the CountingEventHandler + handler := &CountingEventHandler{} + t.b.SetEventHandler(handler) + + // Start the backup + err := t.b.StartBackup(binlogDir, mysql.Position{Name: "", Pos: uint32(0)}, timeout) + require.NoError(t.T(), err) + + // Verify that events were handled + handler.mutex.Lock() + eventCount := handler.count + handler.mutex.Unlock() + require.Greater(t.T(), eventCount, 0, "No events were handled by the EventHandler") +} diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index 1796b63c..f1e70813 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -925,10 +925,10 @@ func (b *BinlogSyncer) handleEventAndACK(s *BinlogStreamer, e *BinlogEvent, rawD // getCurrentGtidSet returns a clone of the current GTID set. func (b *BinlogSyncer) getCurrentGtidSet() GTIDSet { - if b.currGset != nil { - return b.currGset.Clone() - } - return nil + if b.currGset != nil { + return b.currGset.Clone() + } + return nil } // LastConnectionID returns last connectionID. diff --git a/replication/replication_test.go b/replication/replication_test.go index 3a64df74..7413543e 100644 --- a/replication/replication_test.go +++ b/replication/replication_test.go @@ -79,6 +79,9 @@ func (t *testSyncerSuite) testSync(s *BinlogStreamer) { } }() + handler := &CountingEventHandler{} + t.b.SetEventHandler(handler) + // use mixed format t.testExecute("SET SESSION binlog_format = 'MIXED'") @@ -258,6 +261,11 @@ func (t *testSyncerSuite) testSync(s *BinlogStreamer) { "2014-09-08 17:51:04.000456","2014-09-08 17:51:04.000456","2014-09-08 17:51:04.000456")`) t.wg.Wait() + + handler.mutex.Lock() + eventCount := handler.count + handler.mutex.Unlock() + require.Greater(t.T(), eventCount, 0, "No events were handled by the EventHandler") } func (t *testSyncerSuite) setupTest(flavor string) { @@ -463,3 +471,50 @@ func (t *testSyncerSuite) TestMysqlBinlogCodec() { require.NoError(t.T(), err) } } + +func (t *testSyncerSuite) TestGTIDSetHandling() { + t.setupTest(mysql.MySQLFlavor) + + // Ensure GTID mode is enabled + r, err := t.c.Execute("SELECT @@gtid_mode") + require.NoError(t.T(), err) + modeOn, _ := r.GetString(0, 0) + if modeOn != "ON" { + t.T().Skip("GTID mode is not ON") + } + + // Start syncing with an empty GTID set + set, _ := mysql.ParseMysqlGTIDSet("") + s, err := t.b.StartSyncGTID(set) + require.NoError(t.T(), err) + + // Perform some transactions + t.testExecute("CREATE TABLE test_gtid (id INT PRIMARY KEY)") + t.testExecute("INSERT INTO test_gtid VALUES (1)") + t.testExecute("COMMIT") + + // Read events and verify GTID sets + t.wg.Add(1) + go func() { + defer t.wg.Done() + for { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + e, err := s.GetEvent(ctx) + cancel() + if err == context.DeadlineExceeded { + return + } + require.NoError(t.T(), err) + + // Check GTID set in XIDEvent or QueryEvent + switch e.Event.(type) { + case *XIDEvent, *QueryEvent: + if !t.b.cfg.DiscardGTIDSet { + gtidSet := t.b.getCurrentGtidSet() + require.NotNil(t.T(), gtidSet, "GTID set should not be nil") + } + } + } + }() + t.wg.Wait() +}