Skip to content

Commit

Permalink
add progress to data migration generate and apply
Browse files Browse the repository at this point in the history
Show percentage progress for data migration script generation. And show
count of scripts applied for data migration script apply.

It's odd to have to call bar.Abort() on error. See
vbauerster/mpb#130
  • Loading branch information
kalensk committed Jun 10, 2023
1 parent 1deb931 commit b4581b1
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 66 deletions.
44 changes: 34 additions & 10 deletions cli/commanders/data_migration_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"

"github.com/fatih/color"
"github.com/vbauerster/mpb/v8"
"github.com/vbauerster/mpb/v8/decor"
"golang.org/x/xerrors"

"github.com/greenplum-db/gpupgrade/idl"
Expand Down Expand Up @@ -58,28 +59,36 @@ func ApplyDataMigrationScripts(nonInteractive bool, gphome string, port int, log
}
}()

var wg sync.WaitGroup
progressBar := mpb.New()
errChan := make(chan error, len(scriptDirsToRun))
outputChan := make(chan []byte, len(scriptDirsToRun))

fmt.Printf("\nApplying data migration scripts...\n")
for _, scriptDir := range scriptDirsToRun {
wg.Add(1)
scriptDirEntries, err := utils.System.ReadDirFS(utils.System.DirFS(scriptDir), ".")
if err != nil {
return err
}

go func(gphome string, port int, scriptDir string) {
defer wg.Done()
bar := progressBar.New(int64(countScripts(scriptDirEntries)),
mpb.NopStyle(),
mpb.PrependDecorators(
decor.Name(" "+filepath.Base(scriptDir), decor.WCSyncSpaceR),
decor.CountersNoUnit(" %d/%d scripts applied")))

output, err := ApplyDataMigrationScriptSubDir(gphome, port, utils.System.DirFS(scriptDir), scriptDir)
go func(gphome string, port int, scriptDir string, bar *mpb.Bar) {
output, err := ApplyDataMigrationScriptSubDir(gphome, port, utils.System.DirFS(scriptDir), scriptDir, bar)
if err != nil {
errChan <- err
bar.Abort(false)
return
}

outputChan <- output
}(gphome, port, scriptDir)
}(gphome, port, scriptDir, bar)
}

wg.Wait()
progressBar.Wait()
close(errChan)
close(outputChan)

Expand Down Expand Up @@ -114,7 +123,20 @@ Logs:
return nil
}

func ApplyDataMigrationScriptSubDir(gphome string, port int, scriptDirFS fs.FS, scriptDir string) ([]byte, error) {
func countScripts(entries []fs.DirEntry) int {
var numScripts int
for _, entry := range entries {
if filepath.Ext(entry.Name()) != ".sql" {
continue
}

numScripts += 1
}

return numScripts
}

func ApplyDataMigrationScriptSubDir(gphome string, port int, scriptDirFS fs.FS, scriptDir string, bar *mpb.Bar) ([]byte, error) {
entries, err := utils.System.ReadDirFS(scriptDirFS, ".")
if err != nil {
return nil, err
Expand All @@ -130,13 +152,15 @@ func ApplyDataMigrationScriptSubDir(gphome string, port int, scriptDirFS fs.FS,
continue
}

fmt.Printf(" %s\n", entry.Name())
log.Printf(" %s\n", entry.Name())
output, err := applySQLFile(gphome, port, "postgres", filepath.Join(scriptDir, entry.Name()), "-v", "ON_ERROR_STOP=1", "--echo-queries")
if err != nil {
return nil, err
}

outputs = append(outputs, output...)

bar.Increment()
}

return outputs, nil
Expand Down
27 changes: 18 additions & 9 deletions cli/commanders/data_migration_apply_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"testing"
"testing/fstest"

"github.com/vbauerster/mpb/v8"

"github.com/greenplum-db/gpupgrade/cli/commanders"
"github.com/greenplum-db/gpupgrade/idl"
"github.com/greenplum-db/gpupgrade/step"
Expand Down Expand Up @@ -48,16 +50,21 @@ func TestApplyDataMigrationScripts(t *testing.T) {
t.Run("prints stats specific message for stats phase", func(t *testing.T) {
d := commanders.BufferStandardDescriptors(t)

currentScriptDir := testutils.GetTempDir(t, "")
defer testutils.MustRemoveAll(t, currentScriptDir)
resetStdin := testutils.SetStdin(t, "a\n")
defer resetStdin()

scriptDirFS := fstest.MapFS{
"migration_postgres_generate_stats.sql": {},
"migration_template1_generate_stats.sql": {},
}

utils.System.DirFS = func(dir string) fs.FS {
return currentDirFS
return scriptDirFS
}
defer utils.ResetSystemFunctions()

resetStdin := testutils.SetStdin(t, "a\n")
defer resetStdin()
commanders.SetPsqlFileCommand(exectest.NewCommand(commanders.SuccessScript))
defer commanders.ResetPsqlFileCommand()

err := commanders.ApplyDataMigrationScripts(false, "", 0, logDir, currentDirFS, currentScriptDir, idl.Step_stats)
if err != nil {
Expand Down Expand Up @@ -155,14 +162,16 @@ func TestApplyDataMigrationScripts(t *testing.T) {

func TestApplyDataMigrationScriptSubDir(t *testing.T) {
scriptSubDir := "/home/gpupgrade/data-migration/current/initialize/unique_primary_foreign_key_constraint"
progressBar := mpb.New()
bar := progressBar.AddBar(int64(100))

t.Run("errors when failing to read current script directory", func(t *testing.T) {
utils.System.ReadDirFS = func(fsys fs.FS, name string) ([]fs.DirEntry, error) {
return nil, os.ErrPermission
}
defer utils.ResetSystemFunctions()

output, err := commanders.ApplyDataMigrationScriptSubDir("", 0, fstest.MapFS{}, scriptSubDir)
output, err := commanders.ApplyDataMigrationScriptSubDir("", 0, fstest.MapFS{}, scriptSubDir, bar)
if !errors.Is(err, os.ErrPermission) {
t.Errorf("got error %#v want %#v", err, os.ErrPermission)
}
Expand All @@ -173,7 +182,7 @@ func TestApplyDataMigrationScriptSubDir(t *testing.T) {
})

t.Run("errors when no directories are in the current script directory", func(t *testing.T) {
output, err := commanders.ApplyDataMigrationScriptSubDir("", 0, fstest.MapFS{}, scriptSubDir)
output, err := commanders.ApplyDataMigrationScriptSubDir("", 0, fstest.MapFS{}, scriptSubDir, bar)
expected := fmt.Sprintf("No SQL files found in %q.", scriptSubDir)
if !strings.Contains(err.Error(), expected) {
t.Errorf("got error %#v, want %#v", err, expected)
Expand All @@ -194,7 +203,7 @@ func TestApplyDataMigrationScriptSubDir(t *testing.T) {
"drop_postgres_indexes.bash": {},
}

output, err := commanders.ApplyDataMigrationScriptSubDir("", 0, fsys, scriptSubDir)
output, err := commanders.ApplyDataMigrationScriptSubDir("", 0, fsys, scriptSubDir, bar)
if err != nil {
t.Errorf("unexpected err %#v", err)
}
Expand All @@ -212,7 +221,7 @@ func TestApplyDataMigrationScriptSubDir(t *testing.T) {
"migration_postgres_gen_drop_constraint_2_primary_unique.sql": {},
}

output, err := commanders.ApplyDataMigrationScriptSubDir("", 0, fsys, scriptSubDir)
output, err := commanders.ApplyDataMigrationScriptSubDir("", 0, fsys, scriptSubDir, bar)
var exitError *exec.ExitError
if !errors.As(err, &exitError) {
t.Errorf("got %T, want %T", err, exitError)
Expand Down
108 changes: 85 additions & 23 deletions cli/commanders/data_migration_generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"sync"
"time"

"github.com/vbauerster/mpb/v8"
"github.com/vbauerster/mpb/v8/decor"
"golang.org/x/xerrors"

"github.com/greenplum-db/gpupgrade/greenplum"
Expand Down Expand Up @@ -70,30 +72,33 @@ func GenerateDataMigrationScripts(nonInteractive bool, gphome string, port int,
return err
}

databases, err := GetDatabases(db)
databases, err := GetDatabases(db, utils.System.DirFS(seedDir))
if err != nil {
return err
}

var wg sync.WaitGroup
errChan := make(chan error, len(databases))
fmt.Printf("\nGenerating data migration scripts for %d databases...\n", len(databases))
progressBar := mpb.New()
errChan := make(chan error, len(databases))

for _, database := range databases {
wg.Add(1)

go func(database DatabaseName, gphome string, port int, seedDir string, outputDir string) {
defer wg.Done()
bar := progressBar.New(int64(database.NumSeedScripts),
mpb.NopStyle(),
mpb.PrependDecorators(decor.Name(" "+database.Datname, decor.WCSyncSpaceR)),
mpb.AppendDecorators(decor.NewPercentage("%d")))

err = GenerateScriptsPerDatabase(database, gphome, port, seedDir, outputDir)
go func(database DatabaseInfo, gphome string, port int, seedDir string, outputDir string, bar *mpb.Bar) {
err = GenerateScriptsPerDatabase(database, gphome, port, seedDir, outputDir, bar)
if err != nil {
errChan <- err
bar.Abort(false)
return
}
}(database, gphome, port, seedDir, outputDir)

}(database, gphome, port, seedDir, outputDir, bar)
}

wg.Wait()
progressBar.Wait()
close(errChan)

var errs error
Expand Down Expand Up @@ -229,7 +234,7 @@ Select: `)
}
}

func GenerateScriptsPerDatabase(database DatabaseName, gphome string, port int, seedDir string, outputDir string) error {
func GenerateScriptsPerDatabase(database DatabaseInfo, gphome string, port int, seedDir string, outputDir string, bar *mpb.Bar) error {
output, err := executeSQLCommand(gphome, port, database.Datname, `CREATE LANGUAGE plpythonu;`)
if err != nil && !strings.Contains(err.Error(), "already exists") {
return err
Expand Down Expand Up @@ -259,17 +264,17 @@ func GenerateScriptsPerDatabase(database DatabaseName, gphome string, port int,

for _, phase := range MigrationScriptPhases {
wg.Add(1)
fmt.Printf(" Generating %q scripts for %s\n", phase, database.Datname)
log.Printf(" Generating %q scripts for %s\n", phase, database.Datname)

go func(phase idl.Step, database DatabaseName, gphome string, port int, seedDir string, outputDir string) {
go func(phase idl.Step, database DatabaseInfo, gphome string, port int, seedDir string, outputDir string, bar *mpb.Bar) {
defer wg.Done()

err = GenerateScriptsPerPhase(phase, database, gphome, port, seedDir, utils.System.DirFS(seedDir), outputDir)
err = GenerateScriptsPerPhase(phase, database, gphome, port, seedDir, utils.System.DirFS(seedDir), outputDir, bar)
if err != nil {
errChan <- err
return
}
}(phase, database, gphome, port, seedDir, outputDir)
}(phase, database, gphome, port, seedDir, outputDir, bar)
}

wg.Wait()
Expand Down Expand Up @@ -298,7 +303,7 @@ func isGlobalScript(script string, database string) bool {
return database != "postgres" && (script == "gen_alter_gphdfs_roles.sql" || script == "generate_cluster_stats.sh")
}

func GenerateScriptsPerPhase(phase idl.Step, database DatabaseName, gphome string, port int, seedDir string, seedDirFS fs.FS, outputDir string) error {
func GenerateScriptsPerPhase(phase idl.Step, database DatabaseInfo, gphome string, port int, seedDir string, seedDirFS fs.FS, outputDir string, bar *mpb.Bar) error {
scriptDirs, err := fs.ReadDir(seedDirFS, phase.String())
if err != nil {
return err
Expand Down Expand Up @@ -335,6 +340,8 @@ func GenerateScriptsPerPhase(phase idl.Step, database DatabaseName, gphome strin
}
}

bar.Increment() // Increment for seed scripts run rather than actual scripts generated

if len(scriptOutput) == 0 {
continue
}
Expand Down Expand Up @@ -367,26 +374,34 @@ func GenerateScriptsPerPhase(phase idl.Step, database DatabaseName, gphome strin
return nil
}

type DatabaseName struct {
Datname string
QuotedDatname string
type DatabaseInfo struct {
Datname string
QuotedDatname string
NumSeedScripts int
}

func GetDatabases(db *sql.DB) ([]DatabaseName, error) {
func GetDatabases(db *sql.DB, seedDirFS fs.FS) ([]DatabaseInfo, error) {
rows, err := db.Query(`SELECT datname, quote_ident(datname) AS quoted_datname FROM pg_database WHERE datname != 'template0';`)
if err != nil {
return nil, err
}
defer rows.Close()

var databases []DatabaseName
var databases []DatabaseInfo
for rows.Next() {
var database DatabaseName
err := rows.Scan(&database.Datname, &database.QuotedDatname)
var database DatabaseInfo
err = rows.Scan(&database.Datname, &database.QuotedDatname)
if err != nil {
return nil, xerrors.Errorf("pg_database: %w", err)
}

numSeedScripts, err := countSeedScripts(database.Datname, seedDirFS)
if err != nil {
return nil, err
}

database.NumSeedScripts = numSeedScripts

databases = append(databases, database)
}

Expand All @@ -397,3 +412,50 @@ func GetDatabases(db *sql.DB) ([]DatabaseName, error) {

return databases, nil
}

func countSeedScripts(database string, seedDirFS fs.FS) (int, error) {
var numSeedScripts int

phasesEntries, err := utils.System.ReadDirFS(seedDirFS, ".")
if err != nil {
return 0, err
}

for _, phaseEntry := range phasesEntries {
if !phaseEntry.IsDir() || !isPhase(phaseEntry.Name()) {
continue
}

seedScriptDirs, err := fs.ReadDir(seedDirFS, phaseEntry.Name())
if err != nil {
return 0, err
}

for _, seedScriptDir := range seedScriptDirs {
seedScripts, err := utils.System.ReadDirFS(seedDirFS, filepath.Join(phaseEntry.Name(), seedScriptDir.Name()))
if err != nil {
return 0, err
}

for _, seedScript := range seedScripts {
if isGlobalScript(seedScript.Name(), database) {
continue
}

numSeedScripts += 1
}
}
}

return numSeedScripts, nil
}

func isPhase(input string) bool {
for _, phase := range MigrationScriptPhases {
if input == phase.String() {
return true
}
}

return false
}
Loading

0 comments on commit b4581b1

Please sign in to comment.