Skip to content

Commit

Permalink
Merge branch 'master' into paging-unistore
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao committed Jun 14, 2022
2 parents 68f94d1 + e1f9e0a commit eae342b
Show file tree
Hide file tree
Showing 96 changed files with 6,404 additions and 5,000 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ For more details and latest updates, see [TiDB docs](https://docs.pingcap.com/ti

TiDB Cloud is the fully-managed service of TiDB, currently available on AWS and GCP.

Quickly check out TiDB Cloud with [a free trial](https://tidbcloud.com/signup).
Quickly check out TiDB Cloud with [a free trial](https://tidbcloud.com/free-trial).

See [TiDB Cloud Quick Start Guide](https://docs.pingcap.com/tidbcloud/tidb-cloud-quickstart).

Expand Down
4 changes: 4 additions & 0 deletions br/pkg/lightning/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_library(
"//br/pkg/lightning/config",
"//br/pkg/lightning/glue",
"//br/pkg/lightning/log",
"//br/pkg/lightning/metric",
"//br/pkg/lightning/mydump",
"//br/pkg/lightning/restore",
"//br/pkg/lightning/tikv",
Expand All @@ -25,9 +26,12 @@ go_library(
"//br/pkg/storage",
"//br/pkg/utils",
"//br/pkg/version/build",
"//util/promutil",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/import_sstpb",
"@com_github_prometheus_client_golang//prometheus",
"@com_github_prometheus_client_golang//prometheus/collectors",
"@com_github_prometheus_client_golang//prometheus/promhttp",
"@com_github_shurcool_httpgzip//:httpgzip",
"@org_uber_go_zap//:zap",
Expand Down
24 changes: 16 additions & 8 deletions br/pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,17 +340,23 @@ func (be Backend) OpenEngine(ctx context.Context, config *EngineConfig, tableNam
return nil, err
}

openCounter := metric.ImporterEngineCounter.WithLabelValues("open")
openCounter.Inc()
if m, ok := metric.FromContext(ctx); ok {
openCounter := m.ImporterEngineCounter.WithLabelValues("open")
openCounter.Inc()
}

logger.Info("open engine")

failpoint.Inject("FailIfEngineCountExceeds", func(val failpoint.Value) {
closedCounter := metric.ImporterEngineCounter.WithLabelValues("closed")
openCount := metric.ReadCounter(openCounter)
closedCount := metric.ReadCounter(closedCounter)
if injectValue := val.(int); openCount-closedCount > float64(injectValue) {
panic(fmt.Sprintf("forcing failure due to FailIfEngineCountExceeds: %v - %v >= %d", openCount, closedCount, injectValue))
if m, ok := metric.FromContext(ctx); ok {
closedCounter := m.ImporterEngineCounter.WithLabelValues("closed")
openCounter := m.ImporterEngineCounter.WithLabelValues("open")
openCount := metric.ReadCounter(openCounter)

closedCount := metric.ReadCounter(closedCounter)
if injectValue := val.(int); openCount-closedCount > float64(injectValue) {
panic(fmt.Sprintf("forcing failure due to FailIfEngineCountExceeds: %v - %v >= %d", openCount, closedCount, injectValue))
}
}
})

Expand Down Expand Up @@ -380,7 +386,9 @@ func (be Backend) ResolveDuplicateRows(ctx context.Context, tbl table.Table, tab
func (engine *OpenedEngine) Close(ctx context.Context, cfg *EngineConfig) (*ClosedEngine, error) {
closedEngine, err := engine.unsafeClose(ctx, cfg)
if err == nil {
metric.ImporterEngineCounter.WithLabelValues("closed").Inc()
if m, ok := metric.FromContext(ctx); ok {
m.ImporterEngineCounter.WithLabelValues("closed").Inc()
}
}
return closedEngine, err
}
Expand Down
12 changes: 9 additions & 3 deletions br/pkg/lightning/backend/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,13 @@ type tableKVEncoder struct {
genCols []genCol
// convert auto id for shard rowid or auto random id base on row id generated by lightning
autoIDFn autoIDConverter
metrics *metric.Metrics
}

func NewTableKVEncoder(tbl table.Table, options *SessionOptions) (Encoder, error) {
metric.KvEncoderCounter.WithLabelValues("open").Inc()
func NewTableKVEncoder(tbl table.Table, options *SessionOptions, metrics *metric.Metrics) (Encoder, error) {
if metrics != nil {
metrics.KvEncoderCounter.WithLabelValues("open").Inc()
}
meta := tbl.Meta()
cols := tbl.Cols()
se := newSession(options)
Expand Down Expand Up @@ -108,6 +111,7 @@ func NewTableKVEncoder(tbl table.Table, options *SessionOptions) (Encoder, error
se: se,
genCols: genCols,
autoIDFn: autoIDFn,
metrics: metrics,
}, nil
}

Expand Down Expand Up @@ -188,7 +192,9 @@ func collectGeneratedColumns(se *session, meta *model.TableInfo, cols []*table.C

func (kvcodec *tableKVEncoder) Close() {
kvcodec.se.Close()
metric.KvEncoderCounter.WithLabelValues("closed").Inc()
if kvcodec.metrics != nil {
kvcodec.metrics.KvEncoderCounter.WithLabelValues("close").Inc()
}
}

// RowArrayMarshaler wraps a slice of types.Datum for logging the content into zap.
Expand Down
22 changes: 11 additions & 11 deletions br/pkg/lightning/backend/kv/sql2kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestEncode(t *testing.T) {
strictMode, err := NewTableKVEncoder(tbl, &SessionOptions{
SQLMode: mysql.ModeStrictAllTables,
Timestamp: 1234567890,
})
}, nil)
require.NoError(t, err)
pairs, err := strictMode.Encode(logger, rows, 1, []int{0, 1}, "1.csv", 1234)
require.Regexp(t, "failed to cast value as tinyint\\(4\\) for column `c1` \\(#1\\):.*overflows tinyint", err)
Expand Down Expand Up @@ -121,7 +121,7 @@ func TestEncode(t *testing.T) {
mockMode, err := NewTableKVEncoder(mockTbl, &SessionOptions{
SQLMode: mysql.ModeStrictAllTables,
Timestamp: 1234567891,
})
}, nil)
require.NoError(t, err)
_, err = mockMode.Encode(logger, rowsWithPk2, 2, []int{0, 1}, "1.csv", 1234)
require.EqualError(t, err, "mock error")
Expand All @@ -131,7 +131,7 @@ func TestEncode(t *testing.T) {
SQLMode: mysql.ModeNone,
Timestamp: 1234567892,
SysVars: map[string]string{"tidb_row_format_version": "1"},
})
}, nil)
require.NoError(t, err)
pairs, err = noneMode.Encode(logger, rows, 1, []int{0, 1}, "1.csv", 1234)
require.NoError(t, err)
Expand Down Expand Up @@ -208,7 +208,7 @@ func TestDecodeIndex(t *testing.T) {
strictMode, err := NewTableKVEncoder(tbl, &SessionOptions{
SQLMode: mysql.ModeStrictAllTables,
Timestamp: 1234567890,
})
}, nil)
require.NoError(t, err)
pairs, err := strictMode.Encode(logger, rows, 1, []int{0, 1, -1}, "1.csv", 123)
data := pairs.(*KvPairs)
Expand Down Expand Up @@ -247,7 +247,7 @@ func TestEncodeRowFormatV2(t *testing.T) {
SQLMode: mysql.ModeNone,
Timestamp: 1234567892,
SysVars: map[string]string{"tidb_row_format_version": "2"},
})
}, nil)
require.NoError(t, err)
pairs, err := noneMode.Encode(logger, rows, 1, []int{0, 1}, "1.csv", 1234)
require.NoError(t, err)
Expand Down Expand Up @@ -295,7 +295,7 @@ func TestEncodeTimestamp(t *testing.T) {
"tidb_row_format_version": "1",
"time_zone": "+08:00",
},
})
}, nil)
require.NoError(t, err)
pairs, err := encoder.Encode(logger, nil, 70, []int{-1, 1}, "1.csv", 1234)
require.NoError(t, err)
Expand All @@ -320,7 +320,7 @@ func TestEncodeDoubleAutoIncrement(t *testing.T) {
SysVars: map[string]string{
"tidb_row_format_version": "2",
},
})
}, nil)
require.NoError(t, err)

strDatumForID := types.NewStringDatum("1")
Expand Down Expand Up @@ -386,7 +386,7 @@ func TestEncodeMissingAutoValue(t *testing.T) {
SysVars: map[string]string{
"tidb_row_format_version": "2",
},
})
}, nil)
require.NoError(t, err)

realRowID := encoder.(*tableKVEncoder).autoIDFn(rowID)
Expand Down Expand Up @@ -447,7 +447,7 @@ func TestDefaultAutoRandoms(t *testing.T) {
Timestamp: 1234567893,
SysVars: map[string]string{"tidb_row_format_version": "2"},
AutoRandomSeed: 456,
})
}, nil)
require.NoError(t, err)
logger := log.Logger{Logger: zap.NewNop()}
pairs, err := encoder.Encode(logger, []types.Datum{types.NewStringDatum("")}, 70, []int{-1, 0}, "1.csv", 1234)
Expand Down Expand Up @@ -482,7 +482,7 @@ func TestShardRowId(t *testing.T) {
Timestamp: 1234567893,
SysVars: map[string]string{"tidb_row_format_version": "2"},
AutoRandomSeed: 456,
})
}, nil)
require.NoError(t, err)
logger := log.Logger{Logger: zap.NewNop()}
keyMap := make(map[int64]struct{}, 16)
Expand Down Expand Up @@ -636,7 +636,7 @@ func SetUpTest(b *testing.B) *benchSQL2KVSuite {
// Construct the corresponding KV encoder.
tbl, err := tables.TableFromMeta(NewPanickingAllocators(0), tableInfo)
require.NoError(b, err)
encoder, err := NewTableKVEncoder(tbl, &SessionOptions{SysVars: map[string]string{"tidb_row_format_version": "2"}})
encoder, err := NewTableKVEncoder(tbl, &SessionOptions{SysVars: map[string]string{"tidb_row_format_version": "2"}}, nil)
require.NoError(b, err)
logger := log.Logger{Logger: zap.NewNop()}

Expand Down
10 changes: 8 additions & 2 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ type local struct {
importClientFactory ImportClientFactory

bufferPool *membuf.Pool
metrics *metric.Metrics
}

func openDuplicateDB(storeDir string) (*pebble.DB, error) {
Expand Down Expand Up @@ -334,6 +335,9 @@ func NewLocalBackend(
importClientFactory: importClientFactory,
bufferPool: membuf.NewPool(membuf.WithAllocator(manual.Allocator{})),
}
if m, ok := metric.FromContext(ctx); ok {
local.metrics = m
}
if err = local.checkMultiIngestSupport(ctx); err != nil {
return backend.MakeBackend(nil), common.ErrCheckMultiIngest.Wrap(err).GenWithStackByArgs()
}
Expand Down Expand Up @@ -1247,7 +1251,9 @@ loopWrite:
engine.importedKVSize.Add(rangeStats.totalBytes)
engine.importedKVCount.Add(rangeStats.count)
engine.finishedRanges.add(finishedRange)
metric.BytesCounter.WithLabelValues(metric.BytesStateImported).Add(float64(rangeStats.totalBytes))
if local.metrics != nil {
local.metrics.BytesCounter.WithLabelValues(metric.BytesStateImported).Add(float64(rangeStats.totalBytes))
}
}
return errors.Trace(err)
}
Expand Down Expand Up @@ -1685,7 +1691,7 @@ func (local *local) MakeEmptyRows() kv.Rows {
}

func (local *local) NewEncoder(tbl table.Table, options *kv.SessionOptions) (kv.Encoder, error) {
return kv.NewTableKVEncoder(tbl, options)
return kv.NewTableKVEncoder(tbl, options, local.metrics)
}

func engineSSTDir(storeDir string, engineUUID uuid.UUID) string {
Expand Down
57 changes: 47 additions & 10 deletions br/pkg/lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/glue"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/lightning/metric"
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
"github.com/pingcap/tidb/br/pkg/lightning/restore"
"github.com/pingcap/tidb/br/pkg/lightning/tikv"
Expand All @@ -51,6 +52,9 @@ import (
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/version/build"
"github.com/pingcap/tidb/util/promutil"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/shurcooL/httpgzip"
"go.uber.org/zap"
Expand All @@ -69,6 +73,9 @@ type Lightning struct {
serverLock sync.Mutex
status restore.LightningStatus

promFactory promutil.Factory
promRegistry promutil.Registry

cancelLock sync.Mutex
curTask *config.Config
cancel context.CancelFunc // for per task context, which maybe different from lightning context
Expand All @@ -94,12 +101,16 @@ func New(globalCfg *config.GlobalConfig) *Lightning {

redact.InitRedact(globalCfg.Security.RedactInfoLog)

promFactory := promutil.NewDefaultFactory()
promRegistry := promutil.NewDefaultRegistry()
ctx, shutdown := context.WithCancel(context.Background())
return &Lightning{
globalCfg: globalCfg,
globalTLS: tls,
ctx: ctx,
shutdown: shutdown,
globalCfg: globalCfg,
globalTLS: tls,
ctx: ctx,
shutdown: shutdown,
promFactory: promFactory,
promRegistry: promRegistry,
}
}

Expand Down Expand Up @@ -181,7 +192,16 @@ func httpHandleWrapper(h http.HandlerFunc) http.HandlerFunc {
func (l *Lightning) goServe(statusAddr string, realAddrWriter io.Writer) error {
mux := http.NewServeMux()
mux.Handle("/", http.RedirectHandler("/web/", http.StatusFound))
mux.Handle("/metrics", promhttp.Handler())

registry := l.promRegistry
registry.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
registry.MustRegister(collectors.NewGoCollector())
if gatherer, ok := registry.(prometheus.Gatherer); ok {
handler := promhttp.InstrumentMetricHandler(
registry, promhttp.HandlerFor(gatherer, promhttp.HandlerOpts{}),
)
mux.Handle("/metrics", handler)
}

mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
Expand Down Expand Up @@ -242,8 +262,12 @@ func (l *Lightning) RunOnce(taskCtx context.Context, taskCfg *config.Config, glu
failpoint.Inject("SetTaskID", func(val failpoint.Value) {
taskCfg.TaskID = int64(val.(int))
})

return l.run(taskCtx, taskCfg, &options{glue: glue})
o := &options{
glue: glue,
promFactory: l.promFactory,
promRegistry: l.promRegistry,
}
return l.run(taskCtx, taskCfg, o)
}

func (l *Lightning) RunServer() error {
Expand All @@ -260,7 +284,10 @@ func (l *Lightning) RunServer() error {
if err != nil {
return err
}
o := &options{}
o := &options{
promFactory: l.promFactory,
promRegistry: l.promRegistry,
}
err = l.run(context.Background(), task, o)
if err != nil && !common.IsContextCanceledError(err) {
restore.DeliverPauser.Pause() // force pause the progress on error
Expand All @@ -280,7 +307,10 @@ func (l *Lightning) RunServer() error {
// - WithCheckpointStorage: caller has opened an external storage for lightning and want to save checkpoint
// in it. Otherwise, lightning will save checkpoint by the Checkpoint.DSN in config
func (l *Lightning) RunOnceWithOptions(taskCtx context.Context, taskCfg *config.Config, opts ...Option) error {
o := &options{}
o := &options{
promFactory: l.promFactory,
promRegistry: l.promRegistry,
}
for _, opt := range opts {
opt(o)
}
Expand Down Expand Up @@ -331,7 +361,14 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *opti

utils.LogEnvVariables()

ctx, cancel := context.WithCancel(taskCtx)
metrics := metric.NewMetrics(o.promFactory)
metrics.RegisterTo(o.promRegistry)
defer func() {
metrics.UnregisterFrom(o.promRegistry)
}()

ctx := metric.NewContext(taskCtx, metrics)
ctx, cancel := context.WithCancel(ctx)
l.cancelLock.Lock()
l.cancel = cancel
l.curTask = taskCfg
Expand Down
6 changes: 5 additions & 1 deletion br/pkg/lightning/lightning_serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ func TestRun(t *testing.T) {
path, _ := filepath.Abs(".")
ctx := context.Background()
invalidGlue := glue.NewExternalTiDBGlue(nil, 0)
o := &options{glue: invalidGlue}
o := &options{
glue: invalidGlue,
promRegistry: lightning.promRegistry,
promFactory: lightning.promFactory,
}
err = lightning.run(ctx, &config.Config{
Mydumper: config.MydumperRuntime{
SourceDir: "file://" + filepath.ToSlash(path),
Expand Down
3 changes: 3 additions & 0 deletions br/pkg/lightning/metric/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
importpath = "github.com/pingcap/tidb/br/pkg/lightning/metric",
visibility = ["//visibility:public"],
deps = [
"//util/promutil",
"@com_github_prometheus_client_golang//prometheus",
"@com_github_prometheus_client_model//go",
],
Expand All @@ -16,7 +17,9 @@ go_test(
srcs = ["metric_test.go"],
deps = [
":metric",
"//util/promutil",
"@com_github_prometheus_client_golang//prometheus",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
],
)
Loading

0 comments on commit eae342b

Please sign in to comment.