Skip to content

Commit

Permalink
add SyncMode and support for synchronous and asynchronous backup hand…
Browse files Browse the repository at this point in the history
…ling
  • Loading branch information
Dylan Terry committed Oct 2, 2024
1 parent 4fa8f67 commit 591a1c7
Show file tree
Hide file tree
Showing 4 changed files with 300 additions and 113 deletions.
49 changes: 38 additions & 11 deletions replication/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@ func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Du
// - timeout: The maximum duration to wait for new binlog events before stopping the backup process.
// If set to 0, a default very long timeout (30 days) is used instead.
// - handler: A function that takes a binlog filename and returns an WriteCloser for writing raw events to.
func (b *BinlogSyncer) StartBackupWithHandler(p Position, timeout time.Duration,
handler func(binlogFilename string) (io.WriteCloser, error)) (retErr error) {
func (b *BinlogSyncer) StartBackupWithHandler(
p Position,
timeout time.Duration,
handler func(binlogFilename string) (io.WriteCloser, error),
) (retErr error) {
if timeout == 0 {
// a very long timeout here
timeout = 30 * 3600 * 24 * time.Second
Expand All @@ -47,8 +50,10 @@ func (b *BinlogSyncer) StartBackupWithHandler(p Position, timeout time.Duration,
handler: handler,
}

// Set the event handler in BinlogSyncer
b.SetEventHandler(backupHandler)
if b.cfg.SyncMode == SyncModeSync {
// Set the event handler in BinlogSyncer for synchronous mode
b.SetEventHandler(backupHandler)
}

// Start syncing
s, err := b.StartSync(p)
Expand All @@ -70,13 +75,33 @@ func (b *BinlogSyncer) StartBackupWithHandler(p Position, timeout time.Duration,
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

select {
case <-ctx.Done():
return nil
case <-b.ctx.Done():
return nil
case err := <-s.ech:
return errors.Trace(err)
if b.cfg.SyncMode == SyncModeSync {
// Synchronous mode: wait for completion or error
select {
case <-ctx.Done():
return nil
case <-b.ctx.Done():
return nil
case err := <-s.ech:
return errors.Trace(err)
}
} else {
// Asynchronous mode: consume events from the streamer
for {
select {
case <-ctx.Done():
return nil
case <-b.ctx.Done():
return nil
case err := <-s.ech:
return errors.Trace(err)
case e := <-s.ch:
err = backupHandler.HandleEvent(e)
if err != nil {
return errors.Trace(err)
}
}
}
}
}

Expand All @@ -87,6 +112,7 @@ type BackupEventHandler struct {
file *os.File
mutex sync.Mutex
fsyncedChan chan struct{}
eventCount int // eventCount used for testing
}

func (h *BackupEventHandler) HandleEvent(e *BinlogEvent) error {
Expand Down Expand Up @@ -148,6 +174,7 @@ func (h *BackupEventHandler) HandleEvent(e *BinlogEvent) error {
return errors.Trace(err)
}
}
h.eventCount++

return nil
}
Loading

0 comments on commit 591a1c7

Please sign in to comment.