Skip to content

Commit

Permalink
Add basic tests to migrator (#1168)
Browse files Browse the repository at this point in the history
  • Loading branch information
timvaillancourt authored Sep 6, 2022
1 parent 05c7ed5 commit 1df37c2
Show file tree
Hide file tree
Showing 2 changed files with 374 additions and 91 deletions.
209 changes: 118 additions & 91 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package logic

import (
"context"
"errors"
"fmt"
"io"
"math"
Expand All @@ -22,6 +23,10 @@ import (
"github.com/github/gh-ost/go/sql"
)

var (
ErrMigratorUnsupportedRenameAlter = errors.New("ALTER statement seems to RENAME the table. This is not supported, and you should run your RENAME outside gh-ost.")
)

type ChangelogState string

const (
Expand Down Expand Up @@ -223,28 +228,22 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
case Migrated, ReadMigrationRangeValues:
// no-op event
case GhostTableMigrated:
{
this.ghostTableMigrated <- true
}
this.ghostTableMigrated <- true
case AllEventsUpToLockProcessed:
{
var applyEventFunc tableWriteFunc = func() error {
this.allEventsUpToLockProcessed <- changelogStateString
return nil
}
// at this point we know all events up to lock have been read from the streamer,
// because the streamer works sequentially. So those events are either already handled,
// or have event functions in applyEventsQueue.
// So as not to create a potential deadlock, we write this func to applyEventsQueue
// asynchronously, understanding it doesn't really matter.
go func() {
this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc)
}()
var applyEventFunc tableWriteFunc = func() error {
this.allEventsUpToLockProcessed <- changelogStateString
return nil
}
// at this point we know all events up to lock have been read from the streamer,
// because the streamer works sequentially. So those events are either already handled,
// or have event functions in applyEventsQueue.
// So as not to create a potential deadlock, we write this func to applyEventsQueue
// asynchronously, understanding it doesn't really matter.
go func() {
this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc)
}()
default:
{
return fmt.Errorf("Unknown changelog state: %+v", changelogState)
}
return fmt.Errorf("Unknown changelog state: %+v", changelogState)
}
this.migrationContext.Log.Infof("Handled changelog state %s", changelogState)
return nil
Expand All @@ -268,13 +267,13 @@ func (this *Migrator) listenOnPanicAbort() {
this.migrationContext.Log.Fatale(err)
}

// validateStatement validates the `alter` statement meets criteria.
// validateAlterStatement validates the `alter` statement meets criteria.
// At this time this means:
// - column renames are approved
// - no table rename allowed
func (this *Migrator) validateStatement() (err error) {
func (this *Migrator) validateAlterStatement() (err error) {
if this.parser.IsRenameTable() {
return fmt.Errorf("ALTER statement seems to RENAME the table. This is not supported, and you should run your RENAME outside gh-ost.")
return ErrMigratorUnsupportedRenameAlter
}
if this.parser.HasNonTrivialRenames() && !this.migrationContext.SkipRenamedColumns {
this.migrationContext.ColumnRenameMap = this.parser.GetNonTrivialRenames()
Expand Down Expand Up @@ -352,7 +351,7 @@ func (this *Migrator) Migrate() (err error) {
if err := this.parser.ParseAlterStatement(this.migrationContext.AlterStatement); err != nil {
return err
}
if err := this.validateStatement(); err != nil {
if err := this.validateAlterStatement(); err != nil {
return err
}

Expand Down Expand Up @@ -903,72 +902,49 @@ func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) {
}
}

// printStatus prints the progress status, and optionally additionally detailed
// dump of configuration.
// `rule` indicates the type of output expected.
// By default the status is written to standard output, but other writers can
// be used as well.
func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
if rule == NoPrintStatusRule {
return
}
writers = append(writers, os.Stdout)

elapsedTime := this.migrationContext.ElapsedTime()
elapsedSeconds := int64(elapsedTime.Seconds())
totalRowsCopied := this.migrationContext.GetTotalRowsCopied()
rowsEstimate := atomic.LoadInt64(&this.migrationContext.RowsEstimate) + atomic.LoadInt64(&this.migrationContext.RowsDeltaEstimate)
if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 {
// Done copying rows. The totalRowsCopied value is the de-facto number of rows,
// and there is no further need to keep updating the value.
rowsEstimate = totalRowsCopied
}
var progressPct float64
if rowsEstimate == 0 {
progressPct = 100.0
} else {
progressPct = 100.0 * float64(totalRowsCopied) / float64(rowsEstimate)
}
// we take the opportunity to update migration context with progressPct
this.migrationContext.SetProgressPct(progressPct)
// Before status, let's see if we should print a nice reminder for what exactly we're doing here.
shouldPrintMigrationStatusHint := (elapsedSeconds%600 == 0)
if rule == ForcePrintStatusAndHintRule {
shouldPrintMigrationStatusHint = true
}
if rule == ForcePrintStatusOnlyRule {
shouldPrintMigrationStatusHint = false
}
if shouldPrintMigrationStatusHint {
this.printMigrationStatusHint(writers...)
// getProgressPercent returns an estimate of migration progess as a percent.
func (this *Migrator) getProgressPercent(rowsEstimate int64) (progressPct float64) {
progressPct = 100.0
if rowsEstimate > 0 {
progressPct *= float64(this.migrationContext.GetTotalRowsCopied()) / float64(rowsEstimate)
}
return progressPct
}

var etaSeconds float64 = math.MaxFloat64
var etaDuration = time.Duration(base.ETAUnknown)
// getMigrationETA returns the estimated duration of the migration
func (this *Migrator) getMigrationETA(rowsEstimate int64) (eta string, duration time.Duration) {
duration = time.Duration(base.ETAUnknown)
progressPct := this.getProgressPercent(rowsEstimate)
if progressPct >= 100.0 {
etaDuration = 0
duration = 0
} else if progressPct >= 0.1 {
totalRowsCopied := this.migrationContext.GetTotalRowsCopied()
elapsedRowCopySeconds := this.migrationContext.ElapsedRowCopyTime().Seconds()
totalExpectedSeconds := elapsedRowCopySeconds * float64(rowsEstimate) / float64(totalRowsCopied)
etaSeconds = totalExpectedSeconds - elapsedRowCopySeconds
etaSeconds := totalExpectedSeconds - elapsedRowCopySeconds
if etaSeconds >= 0 {
etaDuration = time.Duration(etaSeconds) * time.Second
duration = time.Duration(etaSeconds) * time.Second
} else {
etaDuration = 0
duration = 0
}
}
this.migrationContext.SetETADuration(etaDuration)
var eta string
switch etaDuration {

switch duration {
case 0:
eta = "due"
case time.Duration(base.ETAUnknown):
eta = "N/A"
default:
eta = base.PrettifyDurationOutput(etaDuration)
eta = base.PrettifyDurationOutput(duration)
}

state := "migrating"
return eta, duration
}

// getMigrationStateAndETA returns the state and eta of the migration.
func (this *Migrator) getMigrationStateAndETA(rowsEstimate int64) (state, eta string, etaDuration time.Duration) {
eta, etaDuration = this.getMigrationETA(rowsEstimate)
state = "migrating"
if atomic.LoadInt64(&this.migrationContext.CountingRowsFlag) > 0 && !this.migrationContext.ConcurrentCountTableRows {
state = "counting rows"
} else if atomic.LoadInt64(&this.migrationContext.IsPostponingCutOver) > 0 {
Expand All @@ -977,27 +953,78 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
} else if isThrottled, throttleReason, _ := this.migrationContext.IsThrottled(); isThrottled {
state = fmt.Sprintf("throttled, %s", throttleReason)
}
return state, eta, etaDuration
}

var shouldPrintStatus bool
if rule == HeuristicPrintStatusRule {
if elapsedSeconds <= 60 {
shouldPrintStatus = true
} else if etaSeconds <= 60 {
shouldPrintStatus = true
} else if etaSeconds <= 180 {
shouldPrintStatus = (elapsedSeconds%5 == 0)
} else if elapsedSeconds <= 180 {
shouldPrintStatus = (elapsedSeconds%5 == 0)
} else if this.migrationContext.TimeSincePointOfInterest().Seconds() <= 60 {
shouldPrintStatus = (elapsedSeconds%5 == 0)
} else {
shouldPrintStatus = (elapsedSeconds%30 == 0)
}
// shouldPrintStatus returns true when the migrator is due to print status info.
func (this *Migrator) shouldPrintStatus(rule PrintStatusRule, elapsedSeconds int64, etaDuration time.Duration) (shouldPrint bool) {
if rule != HeuristicPrintStatusRule {
return true
}

etaSeconds := etaDuration.Seconds()
if elapsedSeconds <= 60 {
shouldPrint = true
} else if etaSeconds <= 60 {
shouldPrint = true
} else if etaSeconds <= 180 {
shouldPrint = (elapsedSeconds%5 == 0)
} else if elapsedSeconds <= 180 {
shouldPrint = (elapsedSeconds%5 == 0)
} else if this.migrationContext.TimeSincePointOfInterest().Seconds() <= 60 {
shouldPrint = (elapsedSeconds%5 == 0)
} else {
// Not heuristic
shouldPrintStatus = true
shouldPrint = (elapsedSeconds%30 == 0)
}

return shouldPrint
}

// shouldPrintMigrationStatus returns true when the migrator is due to print the migration status hint
func (this *Migrator) shouldPrintMigrationStatusHint(rule PrintStatusRule, elapsedSeconds int64) (shouldPrint bool) {
if elapsedSeconds%600 == 0 {
shouldPrint = true
} else if rule == ForcePrintStatusAndHintRule {
shouldPrint = true
}
return shouldPrint
}

// printStatus prints the progress status, and optionally additionally detailed
// dump of configuration.
// `rule` indicates the type of output expected.
// By default the status is written to standard output, but other writers can
// be used as well.
func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
if rule == NoPrintStatusRule {
return
}
writers = append(writers, os.Stdout)

elapsedTime := this.migrationContext.ElapsedTime()
elapsedSeconds := int64(elapsedTime.Seconds())
totalRowsCopied := this.migrationContext.GetTotalRowsCopied()
rowsEstimate := atomic.LoadInt64(&this.migrationContext.RowsEstimate) + atomic.LoadInt64(&this.migrationContext.RowsDeltaEstimate)
if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 {
// Done copying rows. The totalRowsCopied value is the de-facto number of rows,
// and there is no further need to keep updating the value.
rowsEstimate = totalRowsCopied
}

// we take the opportunity to update migration context with progressPct
progressPct := this.getProgressPercent(rowsEstimate)
this.migrationContext.SetProgressPct(progressPct)

// Before status, let's see if we should print a nice reminder for what exactly we're doing here.
if this.shouldPrintMigrationStatusHint(rule, elapsedSeconds) {
this.printMigrationStatusHint(writers...)
}
if !shouldPrintStatus {

// Get state + ETA
state, eta, etaDuration := this.getMigrationStateAndETA(rowsEstimate)
this.migrationContext.SetETADuration(etaDuration)

if !this.shouldPrintStatus(rule, elapsedSeconds, etaDuration) {
return
}

Expand All @@ -1016,7 +1043,7 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
)
this.applier.WriteChangelog(
fmt.Sprintf("copy iteration %d at %d", this.migrationContext.GetIteration(), time.Now().Unix()),
status,
state,
)
w := io.MultiWriter(writers...)
fmt.Fprintln(w, status)
Expand Down
Loading

0 comments on commit 1df37c2

Please sign in to comment.