Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adaptive Parallelism #1726

Merged
merged 37 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
59b767a
base CPU based logic for adapting parallelism
makalaaneesh Sep 25, 2024
c12aeb6
implement GetClusterMetrics
makalaaneesh Sep 25, 2024
6566cc0
max connections in pool, change requests
makalaaneesh Sep 26, 2024
d54e249
proper setting of max parallelism in the batch import pool as well
makalaaneesh Sep 26, 2024
d70f58f
new implementation wip
makalaaneesh Sep 27, 2024
103fd00
fix
makalaaneesh Sep 27, 2024
ee77137
error handling
makalaaneesh Sep 30, 2024
1605fc3
add flags
makalaaneesh Sep 30, 2024
6776cc4
minor fixes
makalaaneesh Sep 30, 2024
71f00e2
logs
makalaaneesh Sep 30, 2024
9774b95
small fix
makalaaneesh Sep 30, 2024
5325e2b
hide flags
makalaaneesh Sep 30, 2024
ff30b21
IsAdaptiveParallelismSupported
makalaaneesh Sep 30, 2024
8f02d75
minor fixes
makalaaneesh Sep 30, 2024
1d474cc
Merge branch 'main' into aneesh/adaptive-parallelism-impl
makalaaneesh Sep 30, 2024
a503a5e
fix static check
makalaaneesh Sep 30, 2024
5c3f317
fix fallf case
makalaaneesh Sep 30, 2024
2cdcdb7
docs
makalaaneesh Sep 30, 2024
183c880
tests
makalaaneesh Sep 30, 2024
e2ff70b
basic conn pool test
makalaaneesh Oct 1, 2024
f3a98ca
more tests
makalaaneesh Oct 1, 2024
c54af7d
mintest
makalaaneesh Oct 1, 2024
ac693db
bug fix; random test
makalaaneesh Oct 1, 2024
886103d
test changes
makalaaneesh Oct 1, 2024
50fc8c4
review comments
makalaaneesh Oct 8, 2024
b532e90
review comments 2
makalaaneesh Oct 8, 2024
33dfc27
conn_pool comment
makalaaneesh Oct 8, 2024
60668e3
var name change
makalaaneesh Oct 8, 2024
e0713ef
Merge branch 'main' into aneesh/adaptive-parallelism-impl
makalaaneesh Oct 8, 2024
5677cb9
empty
makalaaneesh Oct 8, 2024
d4176d8
gather assessment tar
makalaaneesh Oct 8, 2024
5230261
Merge branch 'main' into aneesh/adaptive-parallelism-impl
makalaaneesh Oct 8, 2024
15efe8d
retar gather-assessments
makalaaneesh Oct 8, 2024
a1cc3f4
conn pool review comment
makalaaneesh Oct 9, 2024
446a94b
nullstring
makalaaneesh Oct 9, 2024
a202ec2
Merge branch 'main' into aneesh/adaptive-parallelism-impl
makalaaneesh Oct 9, 2024
5cfce37
update tar
makalaaneesh Oct 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions yb-voyager/cmd/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func validateImportFlags(cmd *cobra.Command, importerRole string) error {
case SOURCE_DB_IMPORTER_ROLE:
getSourceDBPassword(cmd)
}
validateParallelismFlags()
return nil
}

Expand Down Expand Up @@ -345,6 +346,13 @@ func registerFlagsForTarget(cmd *cobra.Command) {
"number of cores N and use N/4 as parallel jobs. "+
"Otherwise, it fall back to using twice the number of nodes in the cluster. "+
"Any value less than 1 reverts to the default calculation.")
BoolVar(cmd.Flags(), &tconf.EnableAdaptiveParallelism, "enable-adaptive-parallelism", false,
"Adapt parallelism based on the resource usage (CPU, memory) of the target YugabyteDB cluster")
cmd.Flags().MarkHidden("enable-adaptive-parallelism") // not officially released
cmd.Flags().IntVar(&tconf.MaxParallelism, "adaptive-parallelism-max", 0,
"number of max parallel jobs to use while importing data when adaptive parallelism is enabled."+
"By default, voyager will try if it can determine the total number of cores N and use N/2 as the max parallel jobs. ")
cmd.Flags().MarkHidden("adaptive-parallelism-max") // not officially released
}

func registerFlagsForSourceReplica(cmd *cobra.Command) {
Expand Down Expand Up @@ -387,3 +395,16 @@ func validateFFDBSchemaFlag() {
utils.ErrExit("Error: --source-replica-db-schema flag is mandatory for import data to source-replica")
}
}

func validateParallelismFlags() {
if tconf.EnableAdaptiveParallelism {
if tconf.Parallelism > 0 {
utils.ErrExit("Error: --parallel-jobs flag cannot be used with --enable-adaptive-parallelism flag")
makalaaneesh marked this conversation as resolved.
Show resolved Hide resolved
}
}
if tconf.MaxParallelism > 0 {
if !tconf.EnableAdaptiveParallelism {
utils.ErrExit("Error: --adaptive-parallelism-max flag can only be used with --enable-adaptive-parallelism true")
}
}
}
14 changes: 14 additions & 0 deletions yb-voyager/cmd/importData.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/spf13/cobra"
"golang.org/x/exp/slices"

"github.com/yugabyte/yb-voyager/yb-voyager/src/adaptiveparallelism"
"github.com/yugabyte/yb-voyager/yb-voyager/src/callhome"
"github.com/yugabyte/yb-voyager/yb-voyager/src/cp"
"github.com/yugabyte/yb-voyager/yb-voyager/src/datafile"
Expand Down Expand Up @@ -436,6 +437,13 @@ func importData(importFileTasks []*ImportFileTask) {
utils.ErrExit("Failed to initialize the target DB connection pool: %s", err)
}
utils.PrintAndLog("Using %d parallel jobs.", tconf.Parallelism)
if tconf.EnableAdaptiveParallelism {
yb, ok := tdb.(*tgtdb.TargetYugabyteDB)
priyanshi-yb marked this conversation as resolved.
Show resolved Hide resolved
if !ok {
utils.ErrExit("adaptive parallelism is only supported if target DB is YugabyteDB")
makalaaneesh marked this conversation as resolved.
Show resolved Hide resolved
}
go adaptiveparallelism.AdaptParallelism(yb)
}

targetDBVersion := tdb.GetVersion()
fmt.Printf("%s version: %s\n", tconf.TargetDBType, targetDBVersion)
Expand Down Expand Up @@ -483,6 +491,11 @@ func importData(importFileTasks []*ImportFileTask) {
utils.PrintAndLog("Tables to import: %v", importFileTasksToTableNames(pendingTasks))
prepareTableToColumns(pendingTasks) //prepare the tableToColumns map
poolSize := tconf.Parallelism * 2
if tconf.EnableAdaptiveParallelism {
// in case of adaptive parallelism, we need to use maxParalllelism * 2
yb := tdb.(*tgtdb.TargetYugabyteDB)
makalaaneesh marked this conversation as resolved.
Show resolved Hide resolved
poolSize = yb.GetNumMaxConnectionsInPool() * 2
}
progressReporter := NewImportDataProgressReporter(bool(disablePb))

if importerRole == TARGET_DB_IMPORTER_ROLE {
Expand All @@ -494,6 +507,7 @@ func importData(importFileTasks []*ImportFileTask) {
// The code can produce `poolSize` number of batches at a time. But, it can consume only
// `parallelism` number of batches at a time.
batchImportPool = pool.New().WithMaxGoroutines(poolSize)
log.Info("created batch import pool of size: ", poolSize)
makalaaneesh marked this conversation as resolved.
Show resolved Hide resolved

totalProgressAmount := getTotalProgressAmount(task)
progressReporter.ImportFileStarted(task, totalProgressAmount)
Expand Down
1 change: 1 addition & 0 deletions yb-voyager/cmd/importDataFileCommand.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ func checkImportDataFileFlags(cmd *cobra.Command) {
getTargetPassword(cmd)
validateTargetPortRange()
validateTargetSchemaFlag()
validateParallelismFlags()
}

func checkFileFormat() {
Expand Down
2 changes: 1 addition & 1 deletion yb-voyager/cmd/importDataState.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ func (s *ImportDataState) GetImportedEventsStatsForTableList(tableNameTupList []
return nil, fmt.Errorf("error in getting import stats from target db: %w", err)
}
defer rows.Close()

for rows.Next() {
var eventCounter tgtdb.EventCounter
var tableName string
Expand Down
3 changes: 3 additions & 0 deletions yb-voyager/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,11 @@ require (
)

require (
github.com/fergusstrange/embedded-postgres v1.29.0 // indirect
github.com/jackc/puddle v1.3.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
)
Expand Down
6 changes: 6 additions & 0 deletions yb-voyager/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,8 @@ github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYF
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/felixge/httpsnoop v1.0.2/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/fergusstrange/embedded-postgres v1.29.0 h1:Uv8hdhoiaNMuH0w8UuGXDHr60VoAQPFdgx7Qf3bzXJM=
github.com/fergusstrange/embedded-postgres v1.29.0/go.mod h1:t/MLs0h9ukYM6FSt99R7InCHs1nW0ordoVCcnzmpTYw=
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
github.com/form3tech-oss/jwt-go v3.2.3+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
Expand Down Expand Up @@ -1430,6 +1432,8 @@ github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.10.2/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lib/pq v1.10.7 h1:p7ZhMD+KsSRozJr34udlUrhboJwWAgCg34+/ZZNvZZw=
github.com/lib/pq v1.10.7/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/linode/linodego v1.4.0/go.mod h1:PVsRxSlOiJyvG4/scTszpmZDTdgS+to3X6eS8pRrWI8=
github.com/linode/linodego v1.12.0/go.mod h1:NJlzvlNtdMRRkXb0oN6UWzUkj6t+IBsyveHgZ5Ppjyk=
github.com/linuxkit/virtsock v0.0.0-20201010232012-f8cee7dfc7a3/go.mod h1:3r6x7q95whyfWQpmGZTu3gk3v2YkMi05HEzl7Tf7YEo=
Expand Down Expand Up @@ -1880,6 +1884,8 @@ github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgk
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
github.com/xeipuuv/gojsonschema v0.0.0-20180618132009-1d523034197f/go.mod h1:5yf86TLmAcydyeJq5YvxkGPE2fm/u4myDekKRoLuqhs=
github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 h1:nIPpBwaJSVYIxUFsDv3M8ofmx9yWTog9BfvIu0q41lo=
github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8/go.mod h1:HUYIGzjTL3rfEspMxjDjgmT5uz5wzYJKVo23qUhYTos=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xlab/treeprint v1.1.0/go.mod h1:gj5Gd3gPdKtR1ikdDK6fnFLdmIS0X30kTTuNd/WEJu0=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
Expand Down
108 changes: 108 additions & 0 deletions yb-voyager/src/adaptiveparallelism/adaptive_parallelism.go
makalaaneesh marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
Copyright (c) YugabyteDB, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package adaptiveparallelism

import (
"fmt"
"strconv"
"time"

"github.com/davecgh/go-spew/spew"
log "github.com/sirupsen/logrus"
)

const (
CPU_USAGE_USER = "cpu_usage_user"
CPU_USAGE_SYSTEM = "cpu_usage_system"
MAX_CPU_THRESHOLD = 70
ADAPTIVE_PARALLELISM_FREQUENCY = 10 * time.Second
)

type TargetYugabyteDBWithConnectionPool interface {
IsAdaptiveParallelismSupported() bool
GetClusterMetrics() (map[string]map[string]string, error) // node_uuid:metric_name:metric_value
GetNumConnectionsInPool() int
GetNumMaxConnectionsInPool() int
UpdateNumConnectionsInPool(int) error // (delta)
}

func AdaptParallelism(yb TargetYugabyteDBWithConnectionPool) error {
priyanshi-yb marked this conversation as resolved.
Show resolved Hide resolved
if !yb.IsAdaptiveParallelismSupported() {
return fmt.Errorf("adaptive parallelism not supported in target YB database")
}
for {
time.Sleep(ADAPTIVE_PARALLELISM_FREQUENCY)
err := fetchClusterMetricsAndUpdateParallelism(yb)
if err != nil {
log.Warnf("adaptive: error updating parallelism: %v", err)
}
}
}

func fetchClusterMetricsAndUpdateParallelism(yb TargetYugabyteDBWithConnectionPool) error {
clusterMetrics, err := yb.GetClusterMetrics()
log.Infof("adaptive: clusterMetrics: %v", spew.Sdump(clusterMetrics)) // TODO: move to debug?
if err != nil {
return fmt.Errorf("getting cluster metrics: %w", err)
}

// get max CPU
// Note that right now, voyager ingests data into the target in parallel,
// but one table at a time. Therefore, in cases where there is a single tablet for a table,
// either due to pre-split or colocated table, it is possible that the load on the cluster
// will be uneven. Nevertheless, we still want to ensure that the cluster is not overloaded,
// therefore we use the max CPU usage across all nodes in the cluster.
maxCpuUsage, err := getMaxCpuUsageInCluster(clusterMetrics)
if err != nil {
return fmt.Errorf("getting max cpu usage in cluster: %w", err)
}
log.Infof("adaptive: max cpu usage in cluster = %d", maxCpuUsage)

if maxCpuUsage > MAX_CPU_THRESHOLD {
log.Infof("adaptive: found CPU usage = %d > %d, reducing parallelism to %d", maxCpuUsage, MAX_CPU_THRESHOLD, yb.GetNumConnectionsInPool()-1)
err = yb.UpdateNumConnectionsInPool(-1)
if err != nil {
return fmt.Errorf("updating parallelism with -1: %w", err)
}
} else {
log.Infof("adaptive: found CPU usage = %d <= %d, increasing parallelism to %d", maxCpuUsage, MAX_CPU_THRESHOLD, yb.GetNumConnectionsInPool()+1)
err := yb.UpdateNumConnectionsInPool(1)
if err != nil {
return fmt.Errorf("updating parallelism with +1 : %w", err)
}
}
makalaaneesh marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

func getMaxCpuUsageInCluster(clusterMetrics map[string]map[string]string) (int, error) {
var maxCpuPct int
for _, nodeMetrics := range clusterMetrics {
cpuUsageUser, err := strconv.ParseFloat(nodeMetrics[CPU_USAGE_USER], 64)
if err != nil {
return -1, fmt.Errorf("parsing cpu usage user as float: %w", err)
}
cpuUsageSystem, err := strconv.ParseFloat(nodeMetrics[CPU_USAGE_SYSTEM], 64)
if err != nil {
return -1, fmt.Errorf("parsing cpu usage system as float: %w", err)
}

cpuUsagePct := int((cpuUsageUser + cpuUsageSystem) * 100)
if cpuUsagePct > maxCpuPct {
maxCpuPct = cpuUsagePct
}
makalaaneesh marked this conversation as resolved.
Show resolved Hide resolved
}
return maxCpuPct, nil
}
106 changes: 106 additions & 0 deletions yb-voyager/src/adaptiveparallelism/adaptive_parallelism_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
Copyright (c) YugabyteDB, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package adaptiveparallelism

import (
"strconv"
"testing"

"github.com/stretchr/testify/assert"
)

type dummyTargetYugabyteDB struct {
size int
maxSize int
cpuUsageUser1 float64
cpuUsageSys1 float64
cpuUsageUser2 float64
cpuUsageSys2 float64
}

func (d *dummyTargetYugabyteDB) IsAdaptiveParallelismSupported() bool {
return true
}

func (d *dummyTargetYugabyteDB) GetClusterMetrics() (map[string]map[string]string, error) {
result := make(map[string]map[string]string)
result["node1"] = make(map[string]string)
result["node1"]["cpu_usage_user"] = strconv.FormatFloat(d.cpuUsageUser1, 'f', -1, 64)
result["node1"]["cpu_usage_system"] = strconv.FormatFloat(d.cpuUsageSys1, 'f', -1, 64)
result["node2"] = make(map[string]string)
result["node2"]["cpu_usage_user"] = strconv.FormatFloat(d.cpuUsageUser2, 'f', -1, 64)
result["node2"]["cpu_usage_system"] = strconv.FormatFloat(d.cpuUsageSys2, 'f', -1, 64)
return result, nil
}

func (d *dummyTargetYugabyteDB) GetNumConnectionsInPool() int {
return d.size
}

func (d *dummyTargetYugabyteDB) GetNumMaxConnectionsInPool() int {
return d.maxSize
}

func (d *dummyTargetYugabyteDB) UpdateNumConnectionsInPool(delta int) error {
d.size += delta
return nil
}

func TestMaxCpuUsage(t *testing.T) {
yb := &dummyTargetYugabyteDB{
size: 3,
maxSize: 6,
cpuUsageUser1: 0.5,
cpuUsageSys1: 0.1,
cpuUsageUser2: 0.1,
cpuUsageSys2: 0.1,
}

clusterMetrics, _ := yb.GetClusterMetrics()
maxCpuUsage, err := getMaxCpuUsageInCluster(clusterMetrics)
assert.NoError(t, err)
assert.Equal(t, 60, maxCpuUsage)
}

func TestIncreaseParallelism(t *testing.T) {
yb := &dummyTargetYugabyteDB{
size: 3,
maxSize: 6,
cpuUsageUser1: 0.5,
cpuUsageSys1: 0.1,
cpuUsageUser2: 0.5,
cpuUsageSys2: 0.1,
}

err := fetchClusterMetricsAndUpdateParallelism(yb)
assert.NoErrorf(t, err, "failed to fetch cluster metrics and update parallelism")
assert.Equal(t, 4, yb.GetNumConnectionsInPool())
}

func TestDecreaseParallelismBasedOnCpu(t *testing.T) {
yb := &dummyTargetYugabyteDB{
size: 3,
maxSize: 6,
cpuUsageUser1: 0.8, // above threshold
cpuUsageSys1: 0.1,
cpuUsageUser2: 0.5,
cpuUsageSys2: 0.1,
}

err := fetchClusterMetricsAndUpdateParallelism(yb)
assert.NoErrorf(t, err, "failed to fetch cluster metrics and update parallelism")
assert.Equal(t, 2, yb.GetNumConnectionsInPool())
}
Binary file modified yb-voyager/src/srcdb/data/gather-assessment-metadata.tar.gz
Binary file not shown.
Loading
Loading