Skip to content

Commit

Permalink
lighting: hotfix lightning for DBaaS (#30927)
Browse files Browse the repository at this point in the history
* lightning: fix gcs max key limit (#30393)


Co-authored-by: Jianjun Liao <[email protected]>
  • Loading branch information
glorv and Leavrth authored Dec 22, 2021
1 parent 4a1b2e9 commit 8b61e5d
Show file tree
Hide file tree
Showing 17 changed files with 473 additions and 50 deletions.
7 changes: 6 additions & 1 deletion br/pkg/lightning/common/storage_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,18 @@ import (
"syscall"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"golang.org/x/sys/unix"
)

// GetStorageSize gets storage's capacity and available size
func GetStorageSize(dir string) (size StorageSize, err error) {
var stat unix.Statfs_t
failpoint.Inject("GetStorageSize", func(val failpoint.Value) {
injectedSize := val.(int)
failpoint.Return(StorageSize{Capacity: uint64(injectedSize), Available: uint64(injectedSize)}, nil)
})

var stat unix.Statfs_t
err = unix.Statfs(dir, &stat)
if err != nil {
return size, errors.Annotatef(err, "cannot get disk capacity at %s", dir)
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/lightning/common/storage_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"unsafe"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
)

var (
Expand All @@ -32,6 +33,10 @@ var (

// GetStorageSize gets storage's capacity and available size
func GetStorageSize(dir string) (size StorageSize, err error) {
failpoint.Inject("GetStorageSize", func(val failpoint.Value) {
injectedSize := val.(int)
failpoint.Return(StorageSize{Capacity: uint64(injectedSize), Available: uint64(injectedSize)}, nil)
})
r, _, e := getDiskFreeSpaceExW.Call(
uintptr(unsafe.Pointer(syscall.StringToUTF16Ptr(dir))),
uintptr(unsafe.Pointer(&size.Available)),
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@ type TikvImporter struct {
DiskQuota ByteSize `toml:"disk-quota" json:"disk-quota"`
RangeConcurrency int `toml:"range-concurrency" json:"range-concurrency"`
DuplicateResolution DuplicateResolutionAlgorithm `toml:"duplicate-resolution" json:"duplicate-resolution"`
IncrementalImport bool `toml:"incremental-import" json:"incremental-import"`

EngineMemCacheSize ByteSize `toml:"engine-mem-cache-size" json:"engine-mem-cache-size"`
LocalWriterMemCacheSize ByteSize `toml:"local-writer-mem-cache-size" json:"local-writer-mem-cache-size"`
Expand Down
127 changes: 105 additions & 22 deletions br/pkg/lightning/restore/check_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,23 @@ package restore
import (
"bytes"
"context"
"database/sql"
"fmt"
"io"
"path/filepath"
"reflect"
"sort"
"strconv"
"strings"
"sync"

"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"modernc.org/mathutil"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
Expand All @@ -38,15 +44,13 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
"github.com/pingcap/tidb/br/pkg/lightning/verification"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/table/tables"
"github.com/tikv/pd/server/api"
pdconfig "github.com/tikv/pd/server/config"

"go.uber.org/zap"
"modernc.org/mathutil"
)

const (
Expand Down Expand Up @@ -454,33 +458,31 @@ func (rc *Controller) localResource(sourceSize int64) error {
if err != nil {
return errors.Trace(err)
}
localAvailable := storageSize.Available
localAvailable := int64(storageSize.Available)

var message string
var passed bool
switch {
case localAvailable > uint64(sourceSize):
case localAvailable > sourceSize:
message = fmt.Sprintf("local disk resources are rich, estimate sorted data size %s, local available is %s",
units.BytesSize(float64(sourceSize)), units.BytesSize(float64(localAvailable)))
passed = true
case int64(rc.cfg.TikvImporter.DiskQuota) > localAvailable:
message = fmt.Sprintf("local disk space may not enough to finish import, estimate sorted data size is %s,"+
" but local available is %s, please set `tikv-importer.disk-quota` to a smaller value than %s"+
" or change `mydumper.sorted-kv-dir` to another disk with enough space to finish imports",
units.BytesSize(float64(sourceSize)),
units.BytesSize(float64(localAvailable)), units.BytesSize(float64(localAvailable)))
passed = false
log.L().Error(message)
default:
if int64(rc.cfg.TikvImporter.DiskQuota) > int64(localAvailable) {
message = fmt.Sprintf("local disk space may not enough to finish import"+
"estimate sorted data size is %s, but local available is %s,"+
"you need a smaller number for tikv-importer.disk-quota (%s) to finish imports",
units.BytesSize(float64(sourceSize)),
units.BytesSize(float64(localAvailable)), units.BytesSize(float64(rc.cfg.TikvImporter.DiskQuota)))
passed = false
log.L().Error(message)
} else {
message = fmt.Sprintf("local disk space may not enough to finish import, "+
"estimate sorted data size is %s, but local available is %s,"+
"we will use disk-quota (size: %s) to finish imports, which may slow down import",
units.BytesSize(float64(sourceSize)),
units.BytesSize(float64(localAvailable)), units.BytesSize(float64(rc.cfg.TikvImporter.DiskQuota)))
passed = true
log.L().Warn(message)
}
message = fmt.Sprintf("local disk space may not enough to finish import, "+
"estimate sorted data size is %s, but local available is %s,"+
"we will use disk-quota (size: %s) to finish imports, which may slow down import",
units.BytesSize(float64(sourceSize)),
units.BytesSize(float64(localAvailable)), units.BytesSize(float64(rc.cfg.TikvImporter.DiskQuota)))
passed = true
log.L().Warn(message)
}
rc.checkTemplate.Collect(Critical, passed, message)
return nil
Expand Down Expand Up @@ -870,3 +872,84 @@ outloop:
log.L().Info("Sample source data", zap.String("table", tableMeta.Name), zap.Float64("IndexRatio", tableMeta.IndexRatio), zap.Bool("IsSourceOrder", tableMeta.IsRowOrdered))
return nil
}

func (rc *Controller) checkTableEmpty(ctx context.Context) error {
if rc.cfg.TikvImporter.Backend == config.BackendTiDB || rc.cfg.TikvImporter.IncrementalImport {
return nil
}
db, _ := rc.tidbGlue.GetDB()

tableCount := 0
for _, db := range rc.dbMetas {
tableCount += len(db.Tables)
}

var lock sync.Mutex
tableNames := make([]string, 0)
concurrency := utils.MinInt(tableCount, rc.cfg.App.RegionConcurrency)
ch := make(chan string, concurrency)
eg, gCtx := errgroup.WithContext(ctx)
for i := 0; i < concurrency; i++ {
eg.Go(func() error {
for tblName := range ch {
// skip tables that have checkpoint
if rc.cfg.Checkpoint.Enable {
_, err := rc.checkpointsDB.Get(gCtx, tblName)
switch {
case err == nil:
continue
case errors.IsNotFound(err):
default:
return errors.Trace(err)
}
}

hasData, err1 := tableContainsData(gCtx, db, tblName)
if err1 != nil {
return err1
}
if hasData {
lock.Lock()
tableNames = append(tableNames, tblName)
lock.Unlock()
}
}
return nil
})
}
for _, db := range rc.dbMetas {
for _, tbl := range db.Tables {
ch <- common.UniqueTable(tbl.DB, tbl.Name)
}
}
close(ch)
if err := eg.Wait(); err != nil {
if common.IsContextCanceledError(err) {
return nil
}
return errors.Trace(err)
}

if len(tableNames) > 0 {
// sort the failed names
sort.Strings(tableNames)
msg := fmt.Sprintf("table(s) [%s] are not empty", strings.Join(tableNames, ", "))
rc.checkTemplate.Collect(Critical, false, msg)
}
return nil
}

func tableContainsData(ctx context.Context, db utils.QueryExecutor, tableName string) (bool, error) {
query := "select 1 from " + tableName + " limit 1"
var dump int
err := db.QueryRowContext(ctx, query).Scan(&dump)

switch {
case err == sql.ErrNoRows:
return false, nil
case err != nil:
return false, errors.Trace(err)
default:
return true, nil
}
}
Loading

0 comments on commit 8b61e5d

Please sign in to comment.