Skip to content

Commit

Permalink
backend: add local kv storage backend to get rid of importer (pingcap…
Browse files Browse the repository at this point in the history
…#326)

* backend: add local backend, try to move importer's sort kv and split & scatter into lightning

* address comment

* use sync.Map

* write to every peer

* add retry on write and ingest

* fix split region

* udpate leveldb config

* use badger

* use pebble

* update pebble config

* restrict concurrency

* use workerPool to restrict concurrency for all engines

* make range concurrency as config

* flush db at close engine

* wip: use sstable to split range

* fast split ranges by encode key and file size

* fix split bug

* use bigEndian to split range

* fix split region bug & ingest lost write meta bug

* add send-kv-pairs config

* fix duplicate write error

* fix checksum in small table

* use go routine to write tikv and ingest

* fix

* update sort

* update sort

* fix

* fix

* fix retry

* update

* fix concurrency bug

* fix retry bug

* fix iter.Next

* try fix checksum mismatch

* fix deadlock

* update

* fix

* optimize memory usage

* add checkpoint for local mode

* fix checkpoint

* do not write chunk checkpoint in local mode

* fix remote checkpoint

* fix checkpoint

* manually destroy checkpoint

* only flush index if checkpoint is on

* remove some useless code

* format code

* fix unit test

* fix test

* fix

* fix tls

* fix review comment

* add c comment for local.Close

* add some comment

* fix local backend checkpoint

* fix unit test

* refine some test with local backend

* address comment

* fix close engine

* checkpoint integrateion_test for local backend

* try fix

* return nil if engine not exist in CloseEngine and ImportEngine

* address comment

* test localbackend checkpoint

* adjust config to save coverage

* fix review comments

* change ParseIndexKey method

* remove saveCpChan  channel buf

* add test to save coverage

* test ingest failed

* inject failpoint before dataengine importer

* fix review comments

* fix test format

Co-authored-by: luancheng <[email protected]>
  • Loading branch information
glorv and 3pointer authored Jun 10, 2020
1 parent 18b82fb commit cc190c1
Show file tree
Hide file tree
Showing 40 changed files with 2,164 additions and 441 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.DS_Store
bin/
test_*/
*.log
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ path_to_add := $(addsuffix /bin,$(subst :,/bin:,$(GOPATH)))
export PATH := $(path_to_add):$(PATH)

GO := go
GOBUILD := GO111MODULE=on CGO_ENABLED=0 $(GO) build
GOBUILD := GO111MODULE=on CGO_ENABLED=1 $(GO) build
GOTEST := GO111MODULE=on CGO_ENABLED=1 $(GO) test -p 3

ARCH := "`uname -s`"
Expand Down
51 changes: 36 additions & 15 deletions cmd/tidb-lightning-ctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,6 @@ func checkpointErrorDestroy(ctx context.Context, cfg *config.Config, tls *common
}
defer target.Close()

importer, err := kv.NewImporter(ctx, tls, cfg.TikvImporter.Addr, cfg.TiDB.PdAddr)
if err != nil {
return errors.Trace(err)
}
defer importer.Close()

targetTables, err := cpdb.DestroyErrorCheckpoint(ctx, tableName)
if err != nil {
return errors.Trace(err)
Expand All @@ -219,15 +213,42 @@ func checkpointErrorDestroy(ctx context.Context, cfg *config.Config, tls *common
}
}

for _, table := range targetTables {
for engineID := table.MinEngineID; engineID <= table.MaxEngineID; engineID++ {
fmt.Fprintln(os.Stderr, "Closing and cleaning up engine:", table.TableName, engineID)
closedEngine, err := importer.UnsafeCloseEngine(ctx, table.TableName, engineID)
if err != nil {
fmt.Fprintln(os.Stderr, "* Encountered error while closing engine:", err)
lastErr = err
} else {
closedEngine.Cleanup(ctx)
if cfg.TikvImporter.Backend == "importer" {
importer, err := kv.NewImporter(ctx, tls, cfg.TikvImporter.Addr, cfg.TiDB.PdAddr)
if err != nil {
return errors.Trace(err)
}
defer importer.Close()

for _, table := range targetTables {
for engineID := table.MinEngineID; engineID <= table.MaxEngineID; engineID++ {
fmt.Fprintln(os.Stderr, "Closing and cleaning up engine:", table.TableName, engineID)
closedEngine, err := importer.UnsafeCloseEngine(ctx, table.TableName, engineID)
if err != nil {
fmt.Fprintln(os.Stderr, "* Encountered error while closing engine:", err)
lastErr = err
} else {
closedEngine.Cleanup(ctx)
}
}
}
}
// For importer backend, engine was stored in importer's memory, we can retrieve it from alive importer process.
// But in local backend, if we want to use common API `UnsafeCloseEngine` and `Cleanup`,
// we need either lightning process alive or engine map persistent.
// both of them seems unnecessary if we only need to do is cleanup specify engine directory.
// so we didn't choose to use common API.
if cfg.TikvImporter.Backend == "local" {
for _, table := range targetTables {
for engineID := table.MinEngineID; engineID <= table.MaxEngineID; engineID++ {
fmt.Fprintln(os.Stderr, "Closing and cleaning up engine:", table.TableName, engineID)
_, eID := kv.MakeUUID(table.TableName, engineID)
file := kv.LocalFile{Uuid: eID}
err := file.Cleanup(cfg.TikvImporter.SortedKVDir)
if err != nil {
fmt.Fprintln(os.Stderr, "* Encountered error while cleanup engine:", err)
lastErr = err
}
}
}
}
Expand Down
14 changes: 9 additions & 5 deletions cmd/tidb-lightning/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,16 @@ func main() {
// The time need of `encode kv data and write` step reduce from 52m4s to 37m30s when change
// GOGC from 100 to 500, the total time needed reduce near 15m too.
// The cost of this is the memory of lightnin at runtime grow from about 200M to 700M, but it's acceptable.
//
// So we set the gc percentage as 500 default to reduce the GC frequency instead of 100.
gogc := os.Getenv("GOGC")
if gogc == "" {
old := debug.SetGCPercent(500)
log.L().Debug("set gc percentage", zap.Int("old", old), zap.Int("new", 500))
//
// Local mode need much more memory than importer/tidb mode, if the gc percentage is too high,
// lightning memory usage will also be high.
if cfg.TikvImporter.Backend != config.BackendLocal {
gogc := os.Getenv("GOGC")
if gogc == "" {
old := debug.SetGCPercent(500)
log.L().Debug("set gc percentage", zap.Int("old", old), zap.Int("new", 500))
}
}

err := app.GoServe()
Expand Down
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,30 @@ go 1.13
require (
github.com/BurntSushi/toml v0.3.1
github.com/DATA-DOG/go-sqlmock v1.4.1
github.com/cockroachdb/pebble v0.0.0-20200601233547-7956a7440a70
github.com/coreos/go-semver v0.3.0
github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3 // indirect
github.com/go-bindata/go-bindata v3.1.2+incompatible // indirect
github.com/go-sql-driver/mysql v1.5.0
github.com/gogo/protobuf v1.3.1
github.com/golang/mock v1.4.3
github.com/jeremywohl/flatten v0.0.0-20190921043622-d936035e55cf // indirect
github.com/joho/sqltocsv v0.0.0-20190824231449-5650f27fd5b6
github.com/pingcap/br v0.0.0-20200521085655-53201addd4ad
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712
github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011
github.com/pingcap/failpoint v0.0.0-20200506114213-c17f16071c53
github.com/pingcap/kvproto v0.0.0-20200518112156-d4aeb467de29
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad
github.com/pingcap/parser v0.0.0-20200522094936-3b720a0512a6
github.com/pingcap/pd/v4 v4.0.0-rc.2.0.20200520083007-2c251bd8f181
github.com/pingcap/tidb v1.1.0-beta.0.20200527030457-572bba0499e1
github.com/pingcap/tidb-tools v4.0.0-rc.1.0.20200514040632-f76b3e428e19+incompatible
github.com/prometheus/client_golang v1.5.1
github.com/prometheus/client_model v0.2.0
github.com/satori/go.uuid v1.2.0
github.com/shurcooL/httpgzip v0.0.0-20190720172056-320755c1c1b0
github.com/spaolacci/murmur3 v1.1.0 // indirect
go.uber.org/zap v1.15.0
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a
golang.org/x/text v0.3.2
Expand Down
Loading

0 comments on commit cc190c1

Please sign in to comment.