Skip to content

Commit

Permalink
store TiDB server info to PD and add http api handle (#7082)
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 authored Aug 15, 2018
1 parent 4684eec commit 9fc67b9
Show file tree
Hide file tree
Showing 9 changed files with 430 additions and 16 deletions.
7 changes: 7 additions & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@ type DDL interface {
SchemaSyncer() SchemaSyncer
// OwnerManager gets the owner manager.
OwnerManager() owner.Manager
// GetID gets the ddl ID.
GetID() string
// GetTableMaxRowID gets the max row ID of a normal table or a partition.
GetTableMaxRowID(startTS uint64, tbl table.Table) (int64, bool, error)
// SetBinlogClient sets the binlog client for DDL worker. It's exported for testing.
Expand Down Expand Up @@ -433,6 +435,11 @@ func (d *ddl) OwnerManager() owner.Manager {
return d.ownerManager
}

// GetID implements DDL.GetID interface.
func (d *ddl) GetID() string {
return d.uuid
}

func checkJobMaxInterval(job *model.Job) time.Duration {
// The job of adding index takes more time to process.
// So it uses the longer time.
Expand Down
36 changes: 23 additions & 13 deletions ddl/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,11 @@ func NewSchemaSyncer(etcdCli *clientv3.Client, id string) SchemaSyncer {
}
}

func (s *schemaVersionSyncer) putKV(ctx context.Context, retryCnt int, key, val string,
// PutKVToEtcd puts key value to etcd.
// etcdCli is client of etcd.
// retryCnt is retry time when an error occurs.
// opts is configures of etcd Operations.
func PutKVToEtcd(ctx context.Context, etcdCli *clientv3.Client, retryCnt int, key, val string,
opts ...clientv3.OpOption) error {
var err error
for i := 0; i < retryCnt; i++ {
Expand All @@ -112,12 +116,12 @@ func (s *schemaVersionSyncer) putKV(ctx context.Context, retryCnt int, key, val
}

childCtx, cancel := context.WithTimeout(ctx, keyOpDefaultTimeout)
_, err = s.etcdCli.Put(childCtx, key, val, opts...)
_, err = etcdCli.Put(childCtx, key, val, opts...)
cancel()
if err == nil {
return nil
}
log.Warnf("[syncer] put schema version %s failed %v no.%d", val, err, i)
log.Warnf("[etcd-cli] put key: %s value: %s failed %v no.%d", key, val, err, i)
time.Sleep(keyOpRetryInterval)
}
return errors.Trace(err)
Expand Down Expand Up @@ -148,7 +152,7 @@ func (s *schemaVersionSyncer) Init(ctx context.Context) error {
s.mu.globalVerCh = s.etcdCli.Watch(ctx, DDLGlobalSchemaVersion)
s.mu.Unlock()

err = s.putKV(ctx, keyOpDefaultRetryCnt, s.selfSchemaVerPath, InitialVersion,
err = PutKVToEtcd(ctx, s.etcdCli, keyOpDefaultRetryCnt, s.selfSchemaVerPath, InitialVersion,
clientv3.WithLease(s.session.Lease()))
return errors.Trace(err)
}
Expand Down Expand Up @@ -176,7 +180,7 @@ func (s *schemaVersionSyncer) Restart(ctx context.Context) error {

childCtx, cancel := context.WithTimeout(ctx, keyOpDefaultTimeout)
defer cancel()
err = s.putKV(childCtx, putKeyRetryUnlimited, s.selfSchemaVerPath, InitialVersion,
err = PutKVToEtcd(childCtx, s.etcdCli, putKeyRetryUnlimited, s.selfSchemaVerPath, InitialVersion,
clientv3.WithLease(s.session.Lease()))

return errors.Trace(err)
Expand Down Expand Up @@ -214,7 +218,7 @@ func (s *schemaVersionSyncer) WatchGlobalSchemaVer(ctx context.Context) {
func (s *schemaVersionSyncer) UpdateSelfVersion(ctx context.Context, version int64) error {
startTime := time.Now()
ver := strconv.FormatInt(version, 10)
err := s.putKV(ctx, putKeyNoRetry, s.selfSchemaVerPath, ver,
err := PutKVToEtcd(ctx, s.etcdCli, putKeyNoRetry, s.selfSchemaVerPath, ver,
clientv3.WithLease(s.session.Lease()))

metrics.UpdateSelfVersionHistogram.WithLabelValues(metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
Expand All @@ -227,8 +231,7 @@ func (s *schemaVersionSyncer) OwnerUpdateGlobalVersion(ctx context.Context, vers
ver := strconv.FormatInt(version, 10)
// TODO: If the version is larger than the original global version, we need set the version.
// Otherwise, we'd better set the original global version.
err := s.putKV(ctx, putKeyRetryUnlimited, DDLGlobalSchemaVersion, ver)

err := PutKVToEtcd(ctx, s.etcdCli, putKeyRetryUnlimited, DDLGlobalSchemaVersion, ver)
metrics.OwnerHandleSyncerHistogram.WithLabelValues(metrics.OwnerUpdateGlobalVersion, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
return errors.Trace(err)
}
Expand All @@ -241,15 +244,22 @@ func (s *schemaVersionSyncer) RemoveSelfVersionPath() error {
metrics.DeploySyncerHistogram.WithLabelValues(metrics.SyncerClear, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
}()

err = DeleteKeyFromEtcd(s.selfSchemaVerPath, s.etcdCli, keyOpDefaultRetryCnt, keyOpDefaultTimeout)
return errors.Trace(err)
}

// DeleteKeyFromEtcd deletes key value from etcd.
func DeleteKeyFromEtcd(key string, etcdCli *clientv3.Client, retryCnt int, timeout time.Duration) error {
var err error
ctx := context.Background()
for i := 0; i < keyOpDefaultRetryCnt; i++ {
childCtx, cancel := context.WithTimeout(ctx, keyOpDefaultTimeout)
_, err = s.etcdCli.Delete(childCtx, s.selfSchemaVerPath)
for i := 0; i < retryCnt; i++ {
childCtx, cancel := context.WithTimeout(ctx, timeout)
_, err = etcdCli.Delete(childCtx, key)
cancel()
if err == nil {
return nil
}
log.Warnf("[syncer] remove schema version path %s failed %v no.%d", s.selfSchemaVerPath, err, i)
log.Warnf("[etcd-cli] delete key %s failed %v no.%d", key, err, i)
}
return errors.Trace(err)
}
Expand Down Expand Up @@ -283,7 +293,7 @@ func (s *schemaVersionSyncer) MustGetGlobalVersion(ctx context.Context) (int64,
if err != nil {
continue
}
if err == nil && len(resp.Kvs) > 0 {
if len(resp.Kvs) > 0 {
var ver int
ver, err = strconv.Atoi(string(resp.Kvs[0].Value))
if err == nil {
Expand Down
12 changes: 12 additions & 0 deletions docs/tidb_http_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,18 @@ timezone.*
curl http://{TiDBIP}:10080/settings
```

1. Get TiDB server information.

```shell
curl http://{TiDBIP}:10080/info
```

1. Get TiDB cluster all servers information.

```shell
curl http://{TiDBIP}:10080/info/all
```

1. Enable/Disable TiDB server general log

```shell
Expand Down
16 changes: 16 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type Domain struct {
statsHandle unsafe.Pointer
statsLease time.Duration
ddl ddl.DDL
info *InfoSyncer
m sync.Mutex
SchemaValidator SchemaValidator
sysSessionPool *pools.ResourcePool
Expand Down Expand Up @@ -251,6 +252,11 @@ func (do *Domain) DDL() ddl.DDL {
return do.ddl
}

// InfoSyncer gets infoSyncer from domain.
func (do *Domain) InfoSyncer() *InfoSyncer {
return do.info
}

// Store gets KV store from domain.
func (do *Domain) Store() kv.Storage {
return do.store
Expand Down Expand Up @@ -359,6 +365,8 @@ func (do *Domain) loadSchemaInLoop(lease time.Duration) {
break
}
do.SchemaValidator.Restart()
case <-do.info.Done():
do.info.Restart(context.Background())
case <-do.exit:
return
}
Expand Down Expand Up @@ -392,6 +400,9 @@ func (do *Domain) Close() {
if do.ddl != nil {
terror.Log(errors.Trace(do.ddl.Stop()))
}
if do.info != nil {
do.info.RemoveServerInfo()
}
close(do.exit)
if do.etcdClient != nil {
terror.Log(errors.Trace(do.etcdClient.Close()))
Expand Down Expand Up @@ -500,6 +511,11 @@ func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.R
if err != nil {
return errors.Trace(err)
}
do.info = NewInfoSyncer(do.ddl.GetID(), do.etcdClient)
err = do.info.Init(ctx)
if err != nil {
return errors.Trace(err)
}
err = do.Reload()
if err != nil {
return errors.Trace(err)
Expand Down
Loading

0 comments on commit 9fc67b9

Please sign in to comment.