Skip to content

Commit

Permalink
Adaptive Parallelism in import-data(#1726)
Browse files Browse the repository at this point in the history
import-data will periodically fetch resource usage metrics from the target yugabyteDB, and adapt the parallelism used, (i.e. parallel-jobs value) accordingly. The aim is to ensure that the cluster remains stable throughout, while ensuring that the data ingestion is as efficient as possible.
  • Loading branch information
makalaaneesh authored Oct 9, 2024
1 parent 42064d8 commit b465788
Show file tree
Hide file tree
Showing 14 changed files with 666 additions and 13 deletions.
21 changes: 21 additions & 0 deletions yb-voyager/cmd/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func validateImportFlags(cmd *cobra.Command, importerRole string) error {
case SOURCE_DB_IMPORTER_ROLE:
getSourceDBPassword(cmd)
}
validateParallelismFlags()
return nil
}

Expand Down Expand Up @@ -345,6 +346,13 @@ func registerFlagsForTarget(cmd *cobra.Command) {
"number of cores N and use N/4 as parallel jobs. "+
"Otherwise, it fall back to using twice the number of nodes in the cluster. "+
"Any value less than 1 reverts to the default calculation.")
BoolVar(cmd.Flags(), &tconf.EnableYBAdaptiveParallelism, "enable-adaptive-parallelism", false,
"Adapt parallelism based on the resource usage (CPU, memory) of the target YugabyteDB cluster")
cmd.Flags().MarkHidden("enable-adaptive-parallelism") // not officially released
cmd.Flags().IntVar(&tconf.MaxParallelism, "adaptive-parallelism-max", 0,
"number of max parallel jobs to use while importing data when adaptive parallelism is enabled."+
"By default, voyager will try if it can determine the total number of cores N and use N/2 as the max parallel jobs. ")
cmd.Flags().MarkHidden("adaptive-parallelism-max") // not officially released
}

func registerFlagsForSourceReplica(cmd *cobra.Command) {
Expand Down Expand Up @@ -387,3 +395,16 @@ func validateFFDBSchemaFlag() {
utils.ErrExit("Error: --source-replica-db-schema flag is mandatory for import data to source-replica")
}
}

func validateParallelismFlags() {
if tconf.EnableYBAdaptiveParallelism {
if tconf.Parallelism > 0 {
utils.ErrExit("Error: --parallel-jobs flag cannot be used with --enable-adaptive-parallelism flag")
}
}
if tconf.MaxParallelism > 0 {
if !tconf.EnableYBAdaptiveParallelism {
utils.ErrExit("Error: --adaptive-parallelism-max flag can only be used with --enable-adaptive-parallelism true")
}
}
}
17 changes: 17 additions & 0 deletions yb-voyager/cmd/importData.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/spf13/cobra"
"golang.org/x/exp/slices"

"github.com/yugabyte/yb-voyager/yb-voyager/src/adaptiveparallelism"
"github.com/yugabyte/yb-voyager/yb-voyager/src/callhome"
"github.com/yugabyte/yb-voyager/yb-voyager/src/cp"
"github.com/yugabyte/yb-voyager/yb-voyager/src/datafile"
Expand Down Expand Up @@ -436,6 +437,13 @@ func importData(importFileTasks []*ImportFileTask) {
utils.ErrExit("Failed to initialize the target DB connection pool: %s", err)
}
utils.PrintAndLog("Using %d parallel jobs.", tconf.Parallelism)
if tconf.EnableYBAdaptiveParallelism {
yb, ok := tdb.(*tgtdb.TargetYugabyteDB)
if !ok {
utils.ErrExit("adaptive parallelism is only supported if target DB is YugabyteDB")
}
go adaptiveparallelism.AdaptParallelism(yb)
}

targetDBVersion := tdb.GetVersion()
fmt.Printf("%s version: %s\n", tconf.TargetDBType, targetDBVersion)
Expand Down Expand Up @@ -483,6 +491,14 @@ func importData(importFileTasks []*ImportFileTask) {
utils.PrintAndLog("Tables to import: %v", importFileTasksToTableNames(pendingTasks))
prepareTableToColumns(pendingTasks) //prepare the tableToColumns map
poolSize := tconf.Parallelism * 2
if tconf.EnableYBAdaptiveParallelism {
// in case of adaptive parallelism, we need to use maxParalllelism * 2
yb, ok := tdb.(*tgtdb.TargetYugabyteDB)
if !ok {
utils.ErrExit("adaptive parallelism is only supported if target DB is YugabyteDB")
}
poolSize = yb.GetNumMaxConnectionsInPool() * 2
}
progressReporter := NewImportDataProgressReporter(bool(disablePb))

if importerRole == TARGET_DB_IMPORTER_ROLE {
Expand All @@ -494,6 +510,7 @@ func importData(importFileTasks []*ImportFileTask) {
// The code can produce `poolSize` number of batches at a time. But, it can consume only
// `parallelism` number of batches at a time.
batchImportPool = pool.New().WithMaxGoroutines(poolSize)
log.Infof("created batch import pool of size: %d", poolSize)

totalProgressAmount := getTotalProgressAmount(task)
progressReporter.ImportFileStarted(task, totalProgressAmount)
Expand Down
1 change: 1 addition & 0 deletions yb-voyager/cmd/importDataFileCommand.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ func checkImportDataFileFlags(cmd *cobra.Command) {
getTargetPassword(cmd)
validateTargetPortRange()
validateTargetSchemaFlag()
validateParallelismFlags()
}

func checkFileFormat() {
Expand Down
2 changes: 1 addition & 1 deletion yb-voyager/cmd/importDataState.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ func (s *ImportDataState) GetImportedEventsStatsForTableList(tableNameTupList []
return nil, fmt.Errorf("error in getting import stats from target db: %w", err)
}
defer rows.Close()

for rows.Next() {
var eventCounter tgtdb.EventCounter
var tableName string
Expand Down
3 changes: 3 additions & 0 deletions yb-voyager/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,11 @@ require (
)

require (
github.com/fergusstrange/embedded-postgres v1.29.0 // indirect
github.com/jackc/puddle v1.3.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
)
Expand Down
6 changes: 6 additions & 0 deletions yb-voyager/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,8 @@ github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYF
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/felixge/httpsnoop v1.0.2/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/fergusstrange/embedded-postgres v1.29.0 h1:Uv8hdhoiaNMuH0w8UuGXDHr60VoAQPFdgx7Qf3bzXJM=
github.com/fergusstrange/embedded-postgres v1.29.0/go.mod h1:t/MLs0h9ukYM6FSt99R7InCHs1nW0ordoVCcnzmpTYw=
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
github.com/form3tech-oss/jwt-go v3.2.3+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
Expand Down Expand Up @@ -1430,6 +1432,8 @@ github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.10.2/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lib/pq v1.10.7 h1:p7ZhMD+KsSRozJr34udlUrhboJwWAgCg34+/ZZNvZZw=
github.com/lib/pq v1.10.7/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/linode/linodego v1.4.0/go.mod h1:PVsRxSlOiJyvG4/scTszpmZDTdgS+to3X6eS8pRrWI8=
github.com/linode/linodego v1.12.0/go.mod h1:NJlzvlNtdMRRkXb0oN6UWzUkj6t+IBsyveHgZ5Ppjyk=
github.com/linuxkit/virtsock v0.0.0-20201010232012-f8cee7dfc7a3/go.mod h1:3r6x7q95whyfWQpmGZTu3gk3v2YkMi05HEzl7Tf7YEo=
Expand Down Expand Up @@ -1880,6 +1884,8 @@ github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgk
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
github.com/xeipuuv/gojsonschema v0.0.0-20180618132009-1d523034197f/go.mod h1:5yf86TLmAcydyeJq5YvxkGPE2fm/u4myDekKRoLuqhs=
github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 h1:nIPpBwaJSVYIxUFsDv3M8ofmx9yWTog9BfvIu0q41lo=
github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8/go.mod h1:HUYIGzjTL3rfEspMxjDjgmT5uz5wzYJKVo23qUhYTos=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xlab/treeprint v1.1.0/go.mod h1:gj5Gd3gPdKtR1ikdDK6fnFLdmIS0X30kTTuNd/WEJu0=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
Expand Down
106 changes: 106 additions & 0 deletions yb-voyager/src/adaptiveparallelism/adaptive_parallelism.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
Copyright (c) YugabyteDB, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package adaptiveparallelism

import (
"fmt"
"strconv"
"time"

"github.com/davecgh/go-spew/spew"
log "github.com/sirupsen/logrus"
)

const (
CPU_USAGE_USER = "cpu_usage_user"
CPU_USAGE_SYSTEM = "cpu_usage_system"
MAX_CPU_THRESHOLD = 70
ADAPTIVE_PARALLELISM_FREQUENCY = 10 * time.Second
)

type TargetYugabyteDBWithConnectionPool interface {
IsAdaptiveParallelismSupported() bool
GetClusterMetrics() (map[string]map[string]string, error) // node_uuid:metric_name:metric_value
GetNumConnectionsInPool() int
GetNumMaxConnectionsInPool() int
UpdateNumConnectionsInPool(int) error // (delta)
}

func AdaptParallelism(yb TargetYugabyteDBWithConnectionPool) error {
if !yb.IsAdaptiveParallelismSupported() {
return fmt.Errorf("adaptive parallelism not supported in target YB database")
}
for {
time.Sleep(ADAPTIVE_PARALLELISM_FREQUENCY)
err := fetchClusterMetricsAndUpdateParallelism(yb)
if err != nil {
log.Warnf("adaptive: error updating parallelism: %v", err)
}
}
}

func fetchClusterMetricsAndUpdateParallelism(yb TargetYugabyteDBWithConnectionPool) error {
clusterMetrics, err := yb.GetClusterMetrics()
log.Infof("adaptive: clusterMetrics: %v", spew.Sdump(clusterMetrics)) // TODO: move to debug?
if err != nil {
return fmt.Errorf("getting cluster metrics: %w", err)
}

// get max CPU
// Note that right now, voyager ingests data into the target in parallel,
// but one table at a time. Therefore, in cases where there is a single tablet for a table,
// either due to pre-split or colocated table, it is possible that the load on the cluster
// will be uneven. Nevertheless, we still want to ensure that the cluster is not overloaded,
// therefore we use the max CPU usage across all nodes in the cluster.
maxCpuUsage, err := getMaxCpuUsageInCluster(clusterMetrics)
if err != nil {
return fmt.Errorf("getting max cpu usage in cluster: %w", err)
}
log.Infof("adaptive: max cpu usage in cluster = %d", maxCpuUsage)

if maxCpuUsage > MAX_CPU_THRESHOLD {
log.Infof("adaptive: found CPU usage = %d > %d, reducing parallelism to %d", maxCpuUsage, MAX_CPU_THRESHOLD, yb.GetNumConnectionsInPool()-1)
err = yb.UpdateNumConnectionsInPool(-1)
if err != nil {
return fmt.Errorf("updating parallelism with -1: %w", err)
}
} else {
log.Infof("adaptive: found CPU usage = %d <= %d, increasing parallelism to %d", maxCpuUsage, MAX_CPU_THRESHOLD, yb.GetNumConnectionsInPool()+1)
err := yb.UpdateNumConnectionsInPool(1)
if err != nil {
return fmt.Errorf("updating parallelism with +1 : %w", err)
}
}
return nil
}

func getMaxCpuUsageInCluster(clusterMetrics map[string]map[string]string) (int, error) {
var maxCpuPct int
for _, nodeMetrics := range clusterMetrics {
cpuUsageUser, err := strconv.ParseFloat(nodeMetrics[CPU_USAGE_USER], 64)
if err != nil {
return -1, fmt.Errorf("parsing cpu usage user as float: %w", err)
}
cpuUsageSystem, err := strconv.ParseFloat(nodeMetrics[CPU_USAGE_SYSTEM], 64)
if err != nil {
return -1, fmt.Errorf("parsing cpu usage system as float: %w", err)
}

cpuUsagePct := int((cpuUsageUser + cpuUsageSystem) * 100)
maxCpuPct = max(maxCpuPct, cpuUsagePct)
}
return maxCpuPct, nil
}
106 changes: 106 additions & 0 deletions yb-voyager/src/adaptiveparallelism/adaptive_parallelism_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
Copyright (c) YugabyteDB, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package adaptiveparallelism

import (
"strconv"
"testing"

"github.com/stretchr/testify/assert"
)

type dummyTargetYugabyteDB struct {
size int
maxSize int
cpuUsageUser1 float64
cpuUsageSys1 float64
cpuUsageUser2 float64
cpuUsageSys2 float64
}

func (d *dummyTargetYugabyteDB) IsAdaptiveParallelismSupported() bool {
return true
}

func (d *dummyTargetYugabyteDB) GetClusterMetrics() (map[string]map[string]string, error) {
result := make(map[string]map[string]string)
result["node1"] = make(map[string]string)
result["node1"]["cpu_usage_user"] = strconv.FormatFloat(d.cpuUsageUser1, 'f', -1, 64)
result["node1"]["cpu_usage_system"] = strconv.FormatFloat(d.cpuUsageSys1, 'f', -1, 64)
result["node2"] = make(map[string]string)
result["node2"]["cpu_usage_user"] = strconv.FormatFloat(d.cpuUsageUser2, 'f', -1, 64)
result["node2"]["cpu_usage_system"] = strconv.FormatFloat(d.cpuUsageSys2, 'f', -1, 64)
return result, nil
}

func (d *dummyTargetYugabyteDB) GetNumConnectionsInPool() int {
return d.size
}

func (d *dummyTargetYugabyteDB) GetNumMaxConnectionsInPool() int {
return d.maxSize
}

func (d *dummyTargetYugabyteDB) UpdateNumConnectionsInPool(delta int) error {
d.size += delta
return nil
}

func TestMaxCpuUsage(t *testing.T) {
yb := &dummyTargetYugabyteDB{
size: 3,
maxSize: 6,
cpuUsageUser1: 0.5,
cpuUsageSys1: 0.1,
cpuUsageUser2: 0.1,
cpuUsageSys2: 0.1,
}

clusterMetrics, _ := yb.GetClusterMetrics()
maxCpuUsage, err := getMaxCpuUsageInCluster(clusterMetrics)
assert.NoError(t, err)
assert.Equal(t, 60, maxCpuUsage)
}

func TestIncreaseParallelism(t *testing.T) {
yb := &dummyTargetYugabyteDB{
size: 3,
maxSize: 6,
cpuUsageUser1: 0.5,
cpuUsageSys1: 0.1,
cpuUsageUser2: 0.5,
cpuUsageSys2: 0.1,
}

err := fetchClusterMetricsAndUpdateParallelism(yb)
assert.NoErrorf(t, err, "failed to fetch cluster metrics and update parallelism")
assert.Equal(t, 4, yb.GetNumConnectionsInPool())
}

func TestDecreaseParallelismBasedOnCpu(t *testing.T) {
yb := &dummyTargetYugabyteDB{
size: 3,
maxSize: 6,
cpuUsageUser1: 0.8, // above threshold
cpuUsageSys1: 0.1,
cpuUsageUser2: 0.5,
cpuUsageSys2: 0.1,
}

err := fetchClusterMetricsAndUpdateParallelism(yb)
assert.NoErrorf(t, err, "failed to fetch cluster metrics and update parallelism")
assert.Equal(t, 2, yb.GetNumConnectionsInPool())
}
Binary file modified yb-voyager/src/srcdb/data/gather-assessment-metadata.tar.gz
Binary file not shown.
Loading

0 comments on commit b465788

Please sign in to comment.