Skip to content

Commit

Permalink
topsql: add label for the table and schema (#260)
Browse files Browse the repository at this point in the history
  • Loading branch information
nolouch authored Sep 13, 2024
1 parent 71c1639 commit f915f04
Show file tree
Hide file tree
Showing 24 changed files with 770 additions and 1,326 deletions.
154 changes: 151 additions & 3 deletions component/subscriber/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,22 @@ package subscriber

import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"reflect"
"strconv"
"sync"
"time"

"github.com/pingcap/ng-monitoring/component/domain"
"github.com/pingcap/ng-monitoring/component/subscriber/model"
"github.com/pingcap/ng-monitoring/component/topology"
"github.com/pingcap/ng-monitoring/config"
"github.com/pingcap/ng-monitoring/config/pdvariable"
"github.com/pingcap/ng-monitoring/utils"
"go.uber.org/zap"

"github.com/pingcap/log"
)
Expand All @@ -26,13 +34,19 @@ type Manager struct {
topoSubscriber topology.Subscriber
cfgSubscriber config.Subscriber
varSubscriber pdvariable.Subscriber
httpCli *http.Client

do *domain.Domain
schemaCache *sync.Map
schemaVersion int64

subscribeController SubscribeController
}

func NewManager(
ctx context.Context,
wg *sync.WaitGroup,
do *domain.Domain,
varSubscriber pdvariable.Subscriber,
topoSubscriber topology.Subscriber,
cfgSubscriber config.Subscriber,
Expand All @@ -42,29 +56,35 @@ func NewManager(
ctx: ctx,
wg: wg,

scrapers: make(map[topology.Component]Scraper),
scrapers: make(map[topology.Component]Scraper),
schemaCache: &sync.Map{},

varSubscriber: varSubscriber,
topoSubscriber: topoSubscriber,
cfgSubscriber: cfgSubscriber,

do: do,
prevEnabled: subscribeController.IsEnabled(),
subscribeController: subscribeController,
}
}

func (m *Manager) Run() {
defer m.clearScrapers()

ticker := time.NewTicker(30 * time.Second)
for {
select {
case getCfg := <-m.cfgSubscriber:
m.subscribeController.UpdateConfig(getCfg())
m.httpCli = m.subscribeController.NewHTTPClient()
case getVars := <-m.varSubscriber:
m.subscribeController.UpdatePDVariable(getVars())
case getTopology := <-m.topoSubscriber:
m.components = getTopology()
m.subscribeController.UpdateTopology(getTopology())
case <-ticker.C:
m.updateSchemaCache()
continue
case <-m.ctx.Done():
return
}
Expand All @@ -87,6 +107,134 @@ func (m *Manager) Run() {
}
}

func (m *Manager) updateSchemaCache() {
if !m.subscribeController.IsEnabled() {
// clear cache
m.schemaCache.Range(func(k, v interface{}) bool {
m.schemaCache.Delete(k)
return true
})
m.schemaVersion = 0
return
}
if m.do == nil {
return
}

ectx, cancel := context.WithTimeout(context.TODO(), 3*time.Second)
etcdCli, err := m.do.GetEtcdClient()
defer cancel()
if err != nil {
log.Error("failed to get etcd client", zap.Error(err))
return
}
resp, err := etcdCli.Get(ectx, model.SchemaVersionPath)
if err != nil || len(resp.Kvs) != 1 {
if resp != nil && len(resp.Kvs) == 0 {
return
}
log.Warn("failed to get tidb schema version", zap.Error(err))
return
}
schemaVersion, err := strconv.ParseInt(string(resp.Kvs[0].Value), 10, 64)
if err != nil {
log.Warn("failed to get tidb schema version", zap.Error(err))
return
}
if schemaVersion == m.schemaVersion {
return
}
log.Info("schema version changed", zap.Int64("old", m.schemaVersion), zap.Int64("new", schemaVersion))
m.tryUpdateSchemaCache(schemaVersion)
}

type getConfig interface {
GetConfig() *config.Config
}

func (m *Manager) requestDB(path string, v interface{}) error {
schema := "http"
if sc, ok := m.subscribeController.(getConfig); ok && sc.GetConfig().Security.GetTLSConfig() != nil {
schema = "https"
}
for _, compt := range m.components {
if compt.Name != topology.ComponentTiDB {
continue
}

url := fmt.Sprintf("%s://%s:%d%s", schema, compt.IP, compt.StatusPort, path)
resp, err := m.httpCli.Get(url)
if err != nil {
log.Error("request failed", zap.Error(err))
continue
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
log.Error("request failed", zap.String("status", resp.Status))
continue
}
if err := json.NewDecoder(resp.Body).Decode(v); err != nil {
log.Error("decode response failed", zap.Error(err))
continue
}
return nil
}
return fmt.Errorf("all request failed")
}

func (m *Manager) tryUpdateSchemaCache(schemaVersion int64) {
// get all database info
var dbInfos []*model.DBInfo
if err := m.requestDB("/schema", &dbInfos); err != nil {
return
}

// get all table info
updateSuccess := true
for _, db := range dbInfos {
if db.State == model.StateNone {
continue
}
var tableInfos []*model.TableInfo
encodeName := url.PathEscape(db.Name.O)
if err := m.requestDB(fmt.Sprintf("/schema/%s?id_name_only=true", encodeName), &tableInfos); err != nil {
updateSuccess = false
continue
}
log.Info("update table info", zap.String("db", db.Name.O), zap.Reflect("table-info", tableInfos))
if len(tableInfos) == 0 {
continue
}
for _, table := range tableInfos {
indices := make(map[int64]string, len(table.Indices))
for _, index := range table.Indices {
indices[index.ID] = index.Name.O
}
detail := &model.TableDetail{
Name: table.Name.O,
DB: db.Name.O,
ID: table.ID,
Indices: indices,
}
m.schemaCache.Store(table.ID, detail)
if partition := table.GetPartitionInfo(); partition != nil {
for _, partitionDef := range partition.Definitions {
detail := &model.TableDetail{
Name: fmt.Sprintf("%s/%s", table.Name.O, partitionDef.Name.O),
DB: db.Name.O,
ID: partitionDef.ID,
Indices: indices,
}
m.schemaCache.Store(partitionDef.ID, detail)
}
}
}
}
if updateSuccess {
m.schemaVersion = schemaVersion
}
}

func (m *Manager) updateScrapers() {
// clean up closed scrapers
for component, scraper := range m.scrapers {
Expand All @@ -109,7 +257,7 @@ func (m *Manager) updateScrapers() {

// set up incoming scrapers
for i := range in {
scraper := m.subscribeController.NewScraper(m.ctx, in[i])
scraper := m.subscribeController.NewScraper(m.ctx, in[i], m.schemaCache)
m.scrapers[in[i]] = scraper

if !isNil(scraper) {
Expand Down
9 changes: 8 additions & 1 deletion component/subscriber/mock_sub_controller_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

101 changes: 101 additions & 0 deletions component/subscriber/model/model.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2024 PingCAP, Inc. Licensed under Apache-2.0.

package model

// SchemaState is the state for schema elements.
type SchemaState byte

const (
// StateNone means this schema element is absent and can't be used.
StateNone SchemaState = iota
// StateDeleteOnly means we can only delete items for this schema element.
StateDeleteOnly
// StateWriteOnly means we can use any write operation on this schema element,
// but outer can't read the changed data.
StateWriteOnly
// StateWriteReorganization means we are re-organizing whole data after write only state.
StateWriteReorganization
// StateDeleteReorganization means we are re-organizing whole data after delete only state.
StateDeleteReorganization
// StatePublic means this schema element is ok for all write and read operations.
StatePublic
)

const (
SchemaVersionPath = "/tidb/ddl/global_schema_version"
)

// CIStr is case insensitive string.
type CIStr struct {
O string `json:"O"` // Original string.
L string `json:"L"` // Lower case string.
}

// DBInfo provides meta data describing a DB.
type DBInfo struct {
ID int64 `json:"id"`
Name CIStr `json:"db_name"`
State SchemaState `json:"state"`
}

// IndexInfo provides meta data describing a DB index.
// It corresponds to the statement `CREATE INDEX Name ON Table (Column);`
// See https://dev.mysql.com/doc/refman/5.7/en/create-index.html
type IndexInfo struct {
ID int64 `json:"id"`
Name CIStr `json:"idx_name"`
}

// PartitionDefinition defines a single partition.
type PartitionDefinition struct {
ID int64 `json:"id"`
Name CIStr `json:"name"`
}

// PartitionInfo provides table partition info.
type PartitionInfo struct {
// User may already creates table with partition but table partition is not
// yet supported back then. When Enable is true, write/read need use tid
// rather than pid.
Enable bool `json:"enable"`
Definitions []*PartitionDefinition `json:"definitions"`
}

// TableInfo provides meta data describing a DB table.
type TableInfo struct {
ID int64 `json:"id"`
Name CIStr `json:"name"`
Indices []*IndexInfo `json:"index_info"`
Partition *PartitionInfo `json:"partition"`
}

// GetPartitionInfo returns the partition information.
func (t *TableInfo) GetPartitionInfo() *PartitionInfo {
if t.Partition != nil && t.Partition.Enable {
return t.Partition
}
return nil
}

type DBTablesInfo struct {
DB DBInfo `json:"db"`
Tables []TableInfo `json:"tables"`
}

type DBTableInfo struct {
DB DBInfo
Table IndexedTableInfo
}

type IndexedTableInfo struct {
ID int64
Name CIStr
Indices map[int64]string
}

type TableDetail struct {
Name string
DB string
ID int64
Indices map[int64]string
}
5 changes: 4 additions & 1 deletion component/subscriber/sub_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package subscriber

import (
"context"
"net/http"
"sync"

"github.com/pingcap/ng-monitoring/component/topology"
"github.com/pingcap/ng-monitoring/config"
Expand All @@ -16,10 +18,11 @@ type SubscribeController interface {
UpdatePDVariable(pdvariable.PDVariable)
UpdateConfig(config.Config)
UpdateTopology([]topology.Component)
NewHTTPClient() *http.Client
}

type ScraperFactory interface {
NewScraper(ctx context.Context, component topology.Component) Scraper
NewScraper(ctx context.Context, component topology.Component, schemaInfo *sync.Map) Scraper
}

type Scraper interface {
Expand Down
Loading

0 comments on commit f915f04

Please sign in to comment.