Skip to content

Commit

Permalink
Add tests to verify ACKs are sent *after* fsync and correct event han…
Browse files Browse the repository at this point in the history
…dling

- Updated `backup_test.go` to ensure binlog files are written and fsynced before ACKs are sent by checking that backup files exist and have non-zero sizes after the backup process completes
- Implemented `CountingEventHandler` in `backup_test.go` to confirm that the `EventHandler` is invoked during backups, ensuring events are processed as expected
- Added `TestGTIDSetHandling` in `replication_test.go` to verify that GTIDSets are correctly handled after refactoring
- Modified `testSync` method in `replication_test.go` to use a custom `EventHandler` and verify that events are processed correctly before ACKs are sent
  • Loading branch information
Dylan Terry committed Sep 30, 2024
1 parent 9d327ec commit b0273aa
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 4 deletions.
51 changes: 51 additions & 0 deletions replication/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package replication
import (
"context"
"os"
"path"
"sync"
"time"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -32,18 +34,67 @@ func (t *testSyncerSuite) TestStartBackupEndInGivenTime() {

done := make(chan bool)

// Start the backup in a goroutine
go func() {
err := t.b.StartBackup(binlogDir, mysql.Position{Name: "", Pos: uint32(0)}, timeout)
require.NoError(t.T(), err)
done <- true
}()

// Wait for the backup to complete or timeout
failTimeout := 5 * timeout
ctx, cancel := context.WithTimeout(context.Background(), failTimeout)
defer cancel()
select {
case <-done:
// Backup completed; now verify the backup files
files, err := os.ReadDir(binlogDir)
require.NoError(t.T(), err)
require.NotEmpty(t.T(), files, "No binlog files were backed up")

for _, file := range files {
fileInfo, err := os.Stat(path.Join(binlogDir, file.Name()))
require.NoError(t.T(), err)
require.NotZero(t.T(), fileInfo.Size(), "Binlog file %s is empty", file.Name())
}

return
case <-ctx.Done():
t.T().Fatal("time out error")
}
}

type CountingEventHandler struct {
count int
mutex sync.Mutex
}

func (h *CountingEventHandler) HandleEvent(e *BinlogEvent) error {
h.mutex.Lock()
h.count++
h.mutex.Unlock()
return nil
}

func (t *testSyncerSuite) TestBackupEventHandlerInvocation() {
t.setupTest(mysql.MySQLFlavor)

// Define binlogDir and timeout
binlogDir := "./var"
os.RemoveAll(binlogDir)
timeout := 2 * time.Second

// Set up the CountingEventHandler
handler := &CountingEventHandler{}
t.b.SetEventHandler(handler)

// Start the backup
err := t.b.StartBackup(binlogDir, mysql.Position{Name: "", Pos: uint32(0)}, timeout)
require.NoError(t.T(), err)

// Verify that events were handled
handler.mutex.Lock()
eventCount := handler.count
handler.mutex.Unlock()
require.Greater(t.T(), eventCount, 0, "No events were handled by the EventHandler")
}
8 changes: 4 additions & 4 deletions replication/binlogsyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -925,10 +925,10 @@ func (b *BinlogSyncer) handleEventAndACK(s *BinlogStreamer, e *BinlogEvent, rawD

// getCurrentGtidSet returns a clone of the current GTID set.
func (b *BinlogSyncer) getCurrentGtidSet() GTIDSet {
if b.currGset != nil {
return b.currGset.Clone()
}
return nil
if b.currGset != nil {
return b.currGset.Clone()
}
return nil
}

// LastConnectionID returns last connectionID.
Expand Down
55 changes: 55 additions & 0 deletions replication/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ func (t *testSyncerSuite) testSync(s *BinlogStreamer) {
}
}()

handler := &CountingEventHandler{}
t.b.SetEventHandler(handler)

// use mixed format
t.testExecute("SET SESSION binlog_format = 'MIXED'")

Expand Down Expand Up @@ -258,6 +261,11 @@ func (t *testSyncerSuite) testSync(s *BinlogStreamer) {
"2014-09-08 17:51:04.000456","2014-09-08 17:51:04.000456","2014-09-08 17:51:04.000456")`)

t.wg.Wait()

handler.mutex.Lock()
eventCount := handler.count
handler.mutex.Unlock()
require.Greater(t.T(), eventCount, 0, "No events were handled by the EventHandler")
}

func (t *testSyncerSuite) setupTest(flavor string) {
Expand Down Expand Up @@ -463,3 +471,50 @@ func (t *testSyncerSuite) TestMysqlBinlogCodec() {
require.NoError(t.T(), err)
}
}

func (t *testSyncerSuite) TestGTIDSetHandling() {
t.setupTest(mysql.MySQLFlavor)

// Ensure GTID mode is enabled
r, err := t.c.Execute("SELECT @@gtid_mode")
require.NoError(t.T(), err)
modeOn, _ := r.GetString(0, 0)
if modeOn != "ON" {
t.T().Skip("GTID mode is not ON")
}

// Start syncing with an empty GTID set
set, _ := mysql.ParseMysqlGTIDSet("")
s, err := t.b.StartSyncGTID(set)
require.NoError(t.T(), err)

// Perform some transactions
t.testExecute("CREATE TABLE test_gtid (id INT PRIMARY KEY)")
t.testExecute("INSERT INTO test_gtid VALUES (1)")
t.testExecute("COMMIT")

// Read events and verify GTID sets
t.wg.Add(1)
go func() {
defer t.wg.Done()
for {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
e, err := s.GetEvent(ctx)
cancel()
if err == context.DeadlineExceeded {
return
}
require.NoError(t.T(), err)

// Check GTID set in XIDEvent or QueryEvent
switch e.Event.(type) {
case *XIDEvent, *QueryEvent:
if !t.b.cfg.DiscardGTIDSet {
gtidSet := t.b.getCurrentGtidSet()
require.NotNil(t.T(), gtidSet, "GTID set should not be nil")
}
}
}
}()
t.wg.Wait()
}

0 comments on commit b0273aa

Please sign in to comment.