Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: add diagnosis rule to detect cluster critical errors #14743

Merged
merged 5 commits into from
Feb 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 40 additions & 20 deletions executor/cluster_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/util/pdapi"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testutil"
pmodel "github.com/prometheus/common/model"
"google.golang.org/grpc"
)
Expand All @@ -60,11 +59,11 @@ func (s *testClusterReaderSuite) TearDownSuite(c *C) {
}

func (s *testClusterReaderSuite) TestMetricTableData(c *C) {
failPoint := "github.com/pingcap/tidb/executor/mockMetricRetrieverQueryPromQL"
c.Assert(failpoint.Enable(failPoint, "return"), IsNil)
defer func() {
c.Assert(failpoint.Disable(failPoint), IsNil)
}()
fpName := "github.com/pingcap/tidb/executor/mockMetricsPromData"
c.Assert(failpoint.Enable(fpName, "return"), IsNil)
defer func() { c.Assert(failpoint.Disable(fpName), IsNil) }()

// mock prometheus data
matrix := pmodel.Matrix{}
metric := map[pmodel.LabelName]pmodel.LabelValue{
"instance": "127.0.0.1:10080",
Expand All @@ -76,25 +75,46 @@ func (s *testClusterReaderSuite) TestMetricTableData(c *C) {
Value: pmodel.SampleValue(0.1),
}
matrix = append(matrix, &pmodel.SampleStream{Metric: metric, Values: []pmodel.SamplePair{v1}})
ctx := context.WithValue(context.Background(), "__mockMetricsData", matrix)

ctx := context.WithValue(context.Background(), "__mockMetricsPromData", matrix)
ctx = failpoint.WithHook(ctx, func(ctx context.Context, fpname string) bool {
return fpname == failPoint
return fpname == fpName
})

tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use metric_schema")
rs, err := tk.Se.Execute(ctx, "select * from tidb_query_duration;")
c.Assert(err, IsNil)
result := tk.ResultSetToResultWithCtx(ctx, rs[0], Commentf("execute sql fail"))
result.Check(testutil.RowsWithSep("|",
"2019-12-23 20:11:35.000000|127.0.0.1:10080| 0.9|0.1|The quantile of TiDB query durations(second)"))

rs, err = tk.Se.Execute(ctx, "select time,instance,quantile,value from tidb_query_duration where quantile in (0.85, 0.95);")
c.Assert(err, IsNil)
result = tk.ResultSetToResultWithCtx(ctx, rs[0], Commentf("execute sql fail"))
result.Check(testkit.Rows(
"2019-12-23 20:11:35.000000 127.0.0.1:10080 0.85 0.1",
"2019-12-23 20:11:35.000000 127.0.0.1:10080 0.95 0.1"))
cases := []struct {
sql string
exp []string
}{
{
sql: "select time,instance,quantile,value from tidb_query_duration;",
exp: []string{
"2019-12-23 20:11:35.000000 127.0.0.1:10080 0.9 0.1",
},
},
{
sql: "select time,instance,quantile,value from tidb_query_duration where quantile in (0.85, 0.95);",
exp: []string{
"2019-12-23 20:11:35.000000 127.0.0.1:10080 0.85 0.1",
"2019-12-23 20:11:35.000000 127.0.0.1:10080 0.95 0.1",
},
},
{
sql: "select time,instance,quantile,value from tidb_query_duration where quantile=0.5",
exp: []string{
"2019-12-23 20:11:35.000000 127.0.0.1:10080 0.5 0.1",
},
},
}

for _, cas := range cases {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for _, cas := range cases {
for _, case := range cases {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case is a keyword.

rs, err := tk.Se.Execute(ctx, cas.sql)
c.Assert(err, IsNil)
result := tk.ResultSetToResultWithCtx(ctx, rs[0], Commentf("sql: %s", cas.sql))
result.Check(testkit.Rows(cas.exp...))
}
}

func (s *testClusterReaderSuite) TestTiDBClusterConfig(c *C) {
Expand Down Expand Up @@ -406,7 +426,7 @@ func (s *testClusterReaderSuite) TestTiDBClusterLog(c *C) {
tmpDir string
logFile string
}
// typ => testServer
// tp => testServer
testServers := map[string]*testServer{}

// create gRPC servers
Expand Down
139 changes: 110 additions & 29 deletions executor/diagnostics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import (
"strings"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/infoschema"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/set"
"github.com/pingcap/tidb/util/sqlexec"
Expand All @@ -32,7 +34,7 @@ import (
type (
// inspectionResult represents a abnormal diagnosis result
inspectionResult struct {
typ string
tp string
instance string
// represents the diagnostics item, e.g: `ddl.lease` `raftstore.cpuusage`
item string
Expand All @@ -43,6 +45,8 @@ type (
detail string
}

inspectionName string

inspectionFilter struct{ set.StringSet }

inspectionRule interface {
Expand All @@ -51,14 +55,37 @@ type (
}
)

func (n inspectionName) name() string {
return string(n)
}

func (f inspectionFilter) enable(name string) bool {
return len(f.StringSet) == 0 || f.Exist(name)
}

type (
// configInspection is used to check whether a same configuration item has a
// different value between different instance in the cluster
configInspection struct{ inspectionName }

// versionInspection is used to check whether the same component has different
// version in the cluster
versionInspection struct{ inspectionName }

// currentLoadInspection is used to check the current load of memory/disk/cpu
// have reached a high-level threshold
currentLoadInspection struct{ inspectionName }

// criticalErrorInspection is used to check are there some critical errors
// occurred in the past
criticalErrorInspection struct{ inspectionName }
)

var inspectionRules = []inspectionRule{
&configInspection{},
&versionInspection{},
&currentLoadInspection{},
&configInspection{inspectionName: "config"},
&versionInspection{inspectionName: "version"},
&currentLoadInspection{inspectionName: "current-load"},
&criticalErrorInspection{inspectionName: "critical-error"},
}

type inspectionRetriever struct {
Expand Down Expand Up @@ -118,7 +145,7 @@ func (e *inspectionRetriever) retrieve(ctx context.Context, sctx sessionctx.Cont
finalRows = append(finalRows, types.MakeDatums(
name,
result.item,
result.typ,
result.tp,
result.instance,
result.actual,
result.expected,
Expand All @@ -130,12 +157,6 @@ func (e *inspectionRetriever) retrieve(ctx context.Context, sctx sessionctx.Cont
return finalRows, nil
}

type configInspection struct{}

func (configInspection) name() string {
return "config"
}

func (configInspection) inspect(_ context.Context, sctx sessionctx.Context, filter inspectionFilter) []inspectionResult {
// check the configuration consistent
sql := "select type, `key`, count(distinct value) as c from inspection_schema.cluster_config group by type, `key` having c > 1"
Expand All @@ -148,7 +169,7 @@ func (configInspection) inspect(_ context.Context, sctx sessionctx.Context, filt
for _, row := range rows {
if filter.enable(row.GetString(1)) {
results = append(results, inspectionResult{
typ: row.GetString(0),
tp: row.GetString(0),
instance: "",
item: row.GetString(1), // key
actual: "inconsistent",
Expand All @@ -162,12 +183,6 @@ func (configInspection) inspect(_ context.Context, sctx sessionctx.Context, filt
return results
}

type versionInspection struct{}

func (versionInspection) name() string {
return "version"
}

func (versionInspection) inspect(_ context.Context, sctx sessionctx.Context, filter inspectionFilter) []inspectionResult {
// check the configuration consistent
sql := "select type, count(distinct git_hash) as c from inspection_schema.cluster_info group by type having c > 1;"
Expand All @@ -181,7 +196,7 @@ func (versionInspection) inspect(_ context.Context, sctx sessionctx.Context, fil
for _, row := range rows {
if filter.enable(name) {
results = append(results, inspectionResult{
typ: row.GetString(0),
tp: row.GetString(0),
instance: "",
item: name,
actual: "inconsistent",
Expand All @@ -194,26 +209,20 @@ func (versionInspection) inspect(_ context.Context, sctx sessionctx.Context, fil
return results
}

type currentLoadInspection struct{}

func (currentLoadInspection) name() string {
return "current-load"
}

func (currentLoadInspection) inspect(_ context.Context, sctx sessionctx.Context, filter inspectionFilter) []inspectionResult {
var commonResult = func(item string, expected string, row chunk.Row) inspectionResult {
var commonResult = func(item, expected string, row chunk.Row) inspectionResult {
return inspectionResult{
typ: row.GetString(0),
tp: row.GetString(0),
instance: row.GetString(1),
item: item,
actual: row.GetString(2),
expected: expected,
severity: "warning",
}
}
var diskResult = func(item string, expected string, row chunk.Row) inspectionResult {
var diskResult = func(item, expected string, row chunk.Row) inspectionResult {
return inspectionResult{
typ: row.GetString(0),
tp: row.GetString(0),
instance: row.GetString(1),
item: item,
actual: row.GetString(3),
Expand Down Expand Up @@ -282,3 +291,75 @@ func (currentLoadInspection) inspect(_ context.Context, sctx sessionctx.Context,
}
return results
}

func (criticalErrorInspection) inspect(ctx context.Context, sctx sessionctx.Context, filter inspectionFilter) []inspectionResult {
// TODO: specify the `begin` and `end` time of metric query
var rules = []struct {
tp string
item string
tbl string
}{
{tp: "tidb", item: "failed-query-opm", tbl: "tidb_failed_query_opm"},
{tp: "tikv", item: "critical-error", tbl: "tikv_critical_error"},
{tp: "tidb", item: "panic-count", tbl: "tidb_panic_count"},
{tp: "tidb", item: "binlog-error", tbl: "tidb_binlog_error_count"},
{tp: "tidb", item: "pd-cmd-failed", tbl: "pd_cmd_fail_ops"},
{tp: "tidb", item: "ticlient-region-error", tbl: "tidb_kv_region_error_ops"},
{tp: "tidb", item: "lock-resolve", tbl: "tidb_lock_resolver_ops"},
{tp: "tikv", item: "scheduler-is-busy", tbl: "tikv_scheduler_is_busy"},
{tp: "tikv", item: "coprocessor-is-busy", tbl: "tikv_coprocessor_is_busy"},
{tp: "tikv", item: "channel-is-full", tbl: "tikv_channel_full_total"},
{tp: "tikv", item: "coprocessor-error", tbl: "tikv_coprocessor_request_error"},
{tp: "tidb", item: "schema-lease-error", tbl: "tidb_schema_lease_error_opm"},
{tp: "tidb", item: "txn-retry-error", tbl: "tidb_transaction_retry_error_ops"},
{tp: "tikv", item: "grpc-errors", tbl: "tikv_grpc_errors"},
}

var results []inspectionResult
for _, rule := range rules {
if filter.enable(rule.item) {
def, ok := infoschema.MetricTableMap[rule.tbl]
if !ok {
sctx.GetSessionVars().StmtCtx.AppendWarning(fmt.Errorf("metrics table: %s not fouund", rule.tbl))
continue
}
sql := fmt.Sprintf("select `%[1]s`, max(value) as max_value from `%[2]s`.`%[3]s` group by `%[1]s` having max_value > 0.0",
strings.Join(def.Labels, "`,`"), util.MetricSchemaName.L, rule.tbl)
rows, _, err := sctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQLWithContext(ctx, sql)
if err != nil {
sctx.GetSessionVars().StmtCtx.AppendWarning(fmt.Errorf("execute '%s' failed: %v", sql, err))
continue
}
for _, row := range rows {
var actual, detail string
if rest := def.Labels[1:]; len(rest) > 0 {
pairs := make([]string, 0, len(rest))
// `i+1` and `1+len(rest)` means skip the first field `instance`
for i, label := range rest {
pairs = append(pairs, fmt.Sprintf("`%s`='%s'", label, row.GetString(i+1)))
}
// TODO: find a better way to construct the `actual` field
actual = fmt.Sprintf("{%s}=%.2f", strings.Join(pairs, ","), row.GetFloat64(1+len(rest)))
detail = fmt.Sprintf("select * from `%s`.`%s` where `instance`='%s' and %s",
util.MetricSchemaName.L, rule.tbl, row.GetString(0), strings.Join(pairs, " and "))
} else {
actual = fmt.Sprintf("%.2f", row.GetFloat64(1))
detail = fmt.Sprintf("select * from `%s`.`%s` where `instance`='%s'",
util.MetricSchemaName.L, rule.tbl, row.GetString(0))
}
result := inspectionResult{
tp: rule.tp,
// NOTE: all tables which can be inspected here whose first label must be `instance`
instance: row.GetString(0),
item: rule.item,
actual: actual,
expected: "0",
severity: "warning",
detail: detail,
}
results = append(results, result)
}
}
}
return results
}
Loading