Skip to content

Commit

Permalink
Merge branch 'master' into rewrite_auto_inc
Browse files Browse the repository at this point in the history
  • Loading branch information
3pointer committed Sep 4, 2023
2 parents a314592 + be5ce3d commit 34ef3f1
Show file tree
Hide file tree
Showing 95 changed files with 2,088 additions and 1,334 deletions.
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -6937,13 +6937,13 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sha256 = "608e5c393dcf7fa07a7a360333816dc479b05bad6ad489a4643c9a096e47f5d9",
strip_prefix = "github.com/tikv/client-go/[email protected].20230811033710-8a214402da13",
sha256 = "9cf5877cb0b43d73140e280ad9c80dccd9684e89659a358ee75702469368cf95",
strip_prefix = "github.com/tikv/client-go/[email protected].20230829002846-295094e5b534",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230811033710-8a214402da13.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230811033710-8a214402da13.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230811033710-8a214402da13.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230811033710-8a214402da13.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230829002846-295094e5b534.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230829002846-295094e5b534.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230829002846-295094e5b534.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230829002846-295094e5b534.zip",
],
)
go_repository(
Expand Down
44 changes: 44 additions & 0 deletions br/pkg/lightning/backend/external/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type Engine struct {
storage storage.ExternalStorage
dataFiles []string
statsFiles []string
splitKeys [][]byte
bufPool *membuf.Pool

iter *MergeKVIter
Expand All @@ -48,6 +49,9 @@ type Engine struct {
dupDetectOpt common.DupDetectOpt
ts uint64

totalKVSize int64
totalKVLength int64

importedKVSize *atomic.Int64
importedKVCount *atomic.Int64
}
Expand All @@ -62,6 +66,8 @@ func NewExternalEngine(
duplicateDB *pebble.DB,
dupDetectOpt common.DupDetectOpt,
ts uint64,
totalKVSize int64,
totakKVLength int64,
) common.Engine {
return &Engine{
storage: storage,
Expand All @@ -73,6 +79,8 @@ func NewExternalEngine(
duplicateDB: duplicateDB,
dupDetectOpt: dupDetectOpt,
ts: ts,
totalKVSize: totalKVSize,
totalKVLength: totakKVLength,
importedKVSize: atomic.NewInt64(0),
importedKVCount: atomic.NewInt64(0),
}
Expand Down Expand Up @@ -169,6 +177,42 @@ func (e *Engine) createMergeIter(ctx context.Context, start kv.Key) (*MergeKVIte
return iter, nil
}

// KVStatistics returns the total kv size and total kv length.
func (e *Engine) KVStatistics() (totalKVSize int64, totalKVLength int64) {
return e.totalKVSize, e.totalKVLength
}

// ImportedStatistics returns the imported kv size and imported kv length.
func (e *Engine) ImportedStatistics() (importedKVSize int64, importedKVLength int64) {
return e.importedKVSize.Load(), e.importedKVCount.Load()
}

// ID is the identifier of an engine.
func (e *Engine) ID() string {
return "external"
}

// SplitRanges split the ranges by split keys provided by external engine.
func (e *Engine) SplitRanges(
startKey, endKey []byte,
_, _ int64,
_ log.Logger,
) ([]common.Range, error) {
splitKeys := e.splitKeys
ranges := make([]common.Range, 0, len(splitKeys)+1)
ranges = append(ranges, common.Range{Start: startKey})
for i := 0; i < len(splitKeys); i++ {
ranges[len(ranges)-1].End = splitKeys[i]
var endK []byte
if i < len(splitKeys)-1 {
endK = splitKeys[i+1]
}
ranges = append(ranges, common.Range{Start: splitKeys[i], End: endK})
}
ranges[len(ranges)-1].End = endKey
return ranges, nil
}

// Close releases the resources of the engine.
func (e *Engine) Close() error {
if e.iter == nil {
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/lightning/backend/local/duplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,13 +673,13 @@ func (m *DupeDetector) splitLocalDupTaskByKeys(
if err != nil {
return nil, errors.Trace(err)
}
ranges := splitRangeBySizeProps(Range{start: task.StartKey, end: task.EndKey}, sizeProps, sizeLimit, keysLimit)
ranges := splitRangeBySizeProps(common.Range{Start: task.StartKey, End: task.EndKey}, sizeProps, sizeLimit, keysLimit)
newDupTasks := make([]dupTask, 0, len(ranges))
for _, r := range ranges {
newDupTasks = append(newDupTasks, dupTask{
KeyRange: tidbkv.KeyRange{
StartKey: r.start,
EndKey: r.end,
StartKey: r.Start,
EndKey: r.End,
},
tableID: task.tableID,
indexInfo: task.indexInfo,
Expand Down
39 changes: 37 additions & 2 deletions br/pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ type engineMeta struct {

type syncedRanges struct {
sync.Mutex
ranges []Range
ranges []common.Range
}

func (r *syncedRanges) add(g Range) {
func (r *syncedRanges) add(g common.Range) {
r.Lock()
r.ranges = append(r.ranges, g)
r.Unlock()
Expand Down Expand Up @@ -275,6 +275,41 @@ func (e *Engine) TotalMemorySize() int64 {
return memSize
}

// KVStatistics returns the total kv size and total kv length.
func (e *Engine) KVStatistics() (totalKVSize int64, totalKVLength int64) {
return e.TotalSize.Load(), e.Length.Load()
}

// ImportedStatistics returns the imported kv size and imported kv length.
func (e *Engine) ImportedStatistics() (importedKVSize int64, importedKVLength int64) {
return e.importedKVSize.Load(), e.importedKVCount.Load()
}

// ID is the identifier of an engine.
func (e *Engine) ID() string {
return e.UUID.String()
}

// SplitRanges gets size properties from pebble and split ranges according to size/keys limit.
func (e *Engine) SplitRanges(
startKey, endKey []byte,
sizeLimit, keysLimit int64,
logger log.Logger,
) ([]common.Range, error) {
sizeProps, err := getSizePropertiesFn(logger, e.getDB(), e.keyAdapter)
if err != nil {
return nil, errors.Trace(err)
}

ranges := splitRangeBySizeProps(
common.Range{Start: startKey, End: endKey},
sizeProps,
sizeLimit,
keysLimit,
)
return ranges, nil
}

type rangeOffsets struct {
Size uint64
Keys uint64
Expand Down
Loading

0 comments on commit 34ef3f1

Please sign in to comment.