Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: add helper function to set and query TiFlash's sync status #30473

Merged
merged 13 commits into from
Dec 17, 2021
37 changes: 37 additions & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/pingcap/tidb/table"
goutil "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/admin"
"github.com/pingcap/tidb/util/gcutil"
"github.com/pingcap/tidb/util/logutil"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
Expand Down Expand Up @@ -731,3 +732,39 @@ func init() {
RunInGoTest = true
}
}

// GetDropOrTruncateTableInfoFromJobsByStore implements GetDropOrTruncateTableInfoFromJobs
func GetDropOrTruncateTableInfoFromJobsByStore(jobs []*model.Job, gcSafePoint uint64, getTable func(uint64, int64, int64) (*model.TableInfo, error), fn func(*model.Job, *model.TableInfo) (bool, error)) (bool, error) {
for _, job := range jobs {
// Check GC safe point for getting snapshot infoSchema.
err := gcutil.ValidateSnapshotWithGCSafePoint(job.StartTS, gcSafePoint)
if err != nil {
return false, err
}
if job.Type != model.ActionDropTable && job.Type != model.ActionTruncateTable {
continue
}

tbl, err := getTable(job.StartTS, job.SchemaID, job.TableID)
if err != nil {
if meta.ErrDBNotExists.Equal(err) {
// The dropped/truncated DDL maybe execute failed that caused by the parallel DDL execution,
// then can't find the table from the snapshot info-schema. Should just ignore error here,
// see more in TestParallelDropSchemaAndDropTable.
continue
}
return false, err
}
if tbl == nil {
// The dropped/truncated DDL maybe execute failed that caused by the parallel DDL execution,
// then can't find the table from the snapshot info-schema. Should just ignore error here,
// see more in TestParallelDropSchemaAndDropTable.
continue
}
finish, err := fn(job, tbl)
if err != nil || finish {
return finish, err
}
}
return false, nil
}
39 changes: 6 additions & 33 deletions executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,42 +672,15 @@ func (e *DDLExec) getRecoverTableByJobID(s *ast.RecoverTableStmt, t *meta.Meta,
// GetDropOrTruncateTableInfoFromJobs gets the dropped/truncated table information from DDL jobs,
// it will use the `start_ts` of DDL job as snapshot to get the dropped/truncated table information.
func GetDropOrTruncateTableInfoFromJobs(jobs []*model.Job, gcSafePoint uint64, dom *domain.Domain, fn func(*model.Job, *model.TableInfo) (bool, error)) (bool, error) {
for _, job := range jobs {
// Check GC safe point for getting snapshot infoSchema.
err := gcutil.ValidateSnapshotWithGCSafePoint(job.StartTS, gcSafePoint)
getTable := func(StartTS uint64, SchemaID int64, TableID int64) (*model.TableInfo, error) {
snapMeta, err := dom.GetSnapshotMeta(StartTS)
if err != nil {
return false, err
}
if job.Type != model.ActionDropTable && job.Type != model.ActionTruncateTable {
continue
}

snapMeta, err := dom.GetSnapshotMeta(job.StartTS)
if err != nil {
return false, err
}
tbl, err := snapMeta.GetTable(job.SchemaID, job.TableID)
if err != nil {
if meta.ErrDBNotExists.Equal(err) {
// The dropped/truncated DDL maybe execute failed that caused by the parallel DDL execution,
// then can't find the table from the snapshot info-schema. Should just ignore error here,
// see more in TestParallelDropSchemaAndDropTable.
continue
}
return false, err
}
if tbl == nil {
// The dropped/truncated DDL maybe execute failed that caused by the parallel DDL execution,
// then can't find the table from the snapshot info-schema. Should just ignore error here,
// see more in TestParallelDropSchemaAndDropTable.
continue
}
finish, err := fn(job, tbl)
if err != nil || finish {
return finish, err
return nil, err
}
tbl, err := snapMeta.GetTable(SchemaID, TableID)
return tbl, err
}
return false, nil
return ddl.GetDropOrTruncateTableInfoFromJobsByStore(jobs, gcSafePoint, getTable, fn)
}

func (e *DDLExec) getRecoverTableByTableName(tableName *ast.TableName) (*model.Job, *model.TableInfo, error) {
Expand Down
237 changes: 235 additions & 2 deletions store/helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package helper

import (
"bufio"
"bytes"
"context"
"encoding/hex"
Expand All @@ -32,7 +33,7 @@ import (
"github.com/pingcap/errors"
deadlockpb "github.com/pingcap/kvproto/pkg/deadlock"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -896,11 +897,243 @@ func (h *Helper) GetPDRegionStats(tableID int64, stats *PDRegionStats) error {

defer func() {
if err = resp.Body.Close(); err != nil {
log.Error("err", zap.Error(err))
logutil.BgLogger().Error("err", zap.Error(err))
}
}()

dec := json.NewDecoder(resp.Body)

return dec.Decode(stats)
}

// DeletePlacementRule is to delete placement rule for certain group.
func (h *Helper) DeletePlacementRule(group string, ruleID string) error {
pdAddrs, err := h.GetPDAddr()
if err != nil {
return errors.Trace(err)
}

deleteURL := fmt.Sprintf("%s://%s/pd/api/v1/config/rule/%v/%v",
util.InternalHTTPSchema(),
pdAddrs[0],
group,
ruleID,
)

req, err := http.NewRequest("DELETE", deleteURL, nil)
if err != nil {
return err
}

resp, err := util.InternalHTTPClient().Do(req)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return errors.New("DeletePlacementRule returns error")
}
defer func() {
CalvinNeo marked this conversation as resolved.
Show resolved Hide resolved
if err = resp.Body.Close(); err != nil {
logutil.BgLogger().Error("err", zap.Error(err))
}
}()
return nil
}

// SetPlacementRule is a helper function to set placement rule.
func (h *Helper) SetPlacementRule(rule placement.Rule) error {
pdAddrs, err := h.GetPDAddr()
if err != nil {
return errors.Trace(err)
}
m, _ := json.Marshal(rule)

postURL := fmt.Sprintf("%s://%s/pd/api/v1/config/rule",
util.InternalHTTPSchema(),
pdAddrs[0],
)
buf := bytes.NewBuffer(m)
resp, err := util.InternalHTTPClient().Post(postURL, "application/json", buf)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return errors.New("SetPlacementRule returns error")
}
defer func() {
if err = resp.Body.Close(); err != nil {
logutil.BgLogger().Error("err", zap.Error(err))
}
}()
return nil
}

// GetGroupRules to get all placement rule in a certain group.
func (h *Helper) GetGroupRules(group string) ([]placement.Rule, error) {
pdAddrs, err := h.GetPDAddr()
if err != nil {
return nil, errors.Trace(err)
}

getURL := fmt.Sprintf("%s://%s/pd/api/v1/config/rules/group/%s",
util.InternalHTTPSchema(),
pdAddrs[0],
group,
)

resp, err := util.InternalHTTPClient().Get(getURL)
if err != nil {
return nil, err
}

if resp.StatusCode != http.StatusOK {
return nil, errors.New("GetGroupRules returns error")
}

defer func() {
if err = resp.Body.Close(); err != nil {
logutil.BgLogger().Error("err", zap.Error(err))
}
}()

buf := new(bytes.Buffer)
_, err = buf.ReadFrom(resp.Body)
if err != nil {
return nil, errors.Trace(err)
}

var rules []placement.Rule
err = json.Unmarshal(buf.Bytes(), &rules)
if err != nil {
return nil, errors.Trace(err)
}

return rules, nil
}

// PostAccelerateSchedule sends `regions/accelerate-schedule` request.
func (h *Helper) PostAccelerateSchedule(tableID int64) error {
pdAddrs, err := h.GetPDAddr()
if err != nil {
return errors.Trace(err)
}
startKey := tablecodec.GenTableRecordPrefix(tableID)
endKey := tablecodec.EncodeTablePrefix(tableID + 1)
startKey = codec.EncodeBytes([]byte{}, startKey)
endKey = codec.EncodeBytes([]byte{}, endKey)

postURL := fmt.Sprintf("%s://%s/pd/api/v1/regions/accelerate-schedule",
util.InternalHTTPSchema(),
pdAddrs[0])

if err != nil {
return err
}
CalvinNeo marked this conversation as resolved.
Show resolved Hide resolved
input := map[string]string{
"start_key": url.QueryEscape(string(startKey)),
"end_key": url.QueryEscape(string(endKey)),
}
v, err := json.Marshal(input)
if err != nil {
return err
}
resp, err := util.InternalHTTPClient().Post(postURL, "application/json", bytes.NewBuffer(v))
defer func() {
if err = resp.Body.Close(); err != nil {
logutil.BgLogger().Error("err", zap.Error(err))
}
}()
if err != nil {
CalvinNeo marked this conversation as resolved.
Show resolved Hide resolved
return err
}
return nil
}

// GetPDRegionRecordStats is a helper function calling `/stats/region`.
func (h *Helper) GetPDRegionRecordStats(tableID int64, stats *PDRegionStats) error {
pdAddrs, err := h.GetPDAddr()
if err != nil {
return errors.Trace(err)
}

startKey := tablecodec.GenTableRecordPrefix(tableID)
endKey := tablecodec.EncodeTablePrefix(tableID + 1)
startKey = codec.EncodeBytes([]byte{}, startKey)
endKey = codec.EncodeBytes([]byte{}, endKey)

statURL := fmt.Sprintf("%s://%s/pd/api/v1/stats/region?start_key=%s&end_key=%s",
util.InternalHTTPSchema(),
pdAddrs[0],
url.QueryEscape(string(startKey)),
url.QueryEscape(string(endKey)))

resp, err := util.InternalHTTPClient().Get(statURL)
if err != nil {
return err
}

defer func() {
if err = resp.Body.Close(); err != nil {
logutil.BgLogger().Error("err", zap.Error(err))
}
}()

dec := json.NewDecoder(resp.Body)

return dec.Decode(stats)
}

// GetTiFlashTableIDFromEndKey computes tableID from pd rule's endKey.
func GetTiFlashTableIDFromEndKey(endKey string) int64 {
endKey, _ = url.QueryUnescape(endKey)
_, decodedEndKey, _ := codec.DecodeBytes([]byte(endKey), []byte{})
tableID := tablecodec.DecodeTableID(decodedEndKey)
tableID -= 1
CalvinNeo marked this conversation as resolved.
Show resolved Hide resolved
return tableID
}

// ComputeTiFlashStatus is helper function for CollectTiFlashStatus.
func ComputeTiFlashStatus(reader *bufio.Reader, regionReplica *map[int64]int) error {
ns, _, _ := reader.ReadLine()
n, err := strconv.ParseInt(string(ns), 10, 64)
if err != nil {
return errors.Trace(err)
}
for i := int64(0); i < n; i++ {
rs, _, _ := reader.ReadLine()
// For (`table`, `store`), has region `r`
r, err := strconv.ParseInt(strings.Trim(string(rs), "\r\n \t"), 10, 32)
if err != nil {
return errors.Trace(err)
}
if i, ok := (*regionReplica)[r]; ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i is double declared, better change name

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will replace i in later PR

(*regionReplica)[r] = i + 1
} else {
(*regionReplica)[r] = 1
}
}
return nil
}

// CollectTiFlashStatus query sync status of one table from TiFlash store.
func CollectTiFlashStatus(statusAddress string, tableID int64, regionReplica *map[int64]int) error {
statURL := fmt.Sprintf("%s://%s/tiflash/sync-status/%d",
util.InternalHTTPSchema(),
statusAddress,
tableID,
)
resp, err := util.InternalHTTPClient().Get(statURL)
if err != nil {
return nil
}

defer func() {
resp.Body.Close()
CalvinNeo marked this conversation as resolved.
Show resolved Hide resolved
}()

reader := bufio.NewReader(resp.Body)
if err = ComputeTiFlashStatus(reader, regionReplica); err != nil {
return errors.Trace(err)
}
return nil
}
15 changes: 15 additions & 0 deletions store/helper/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
package helper_test

import (
"bufio"
"crypto/tls"
"encoding/json"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -438,3 +440,16 @@ func mockStoreStatResponse(w http.ResponseWriter, _ *http.Request) {
log.Panic("write http response failed", zap.Error(err))
}
}

func TestComputeTiFlashStatus(t *testing.T) {
regionReplica := make(map[int64]int)
resp1 := "0\n\n"
resp2 := "1\n1009\n"
br1 := bufio.NewReader(strings.NewReader(resp1))
br2 := bufio.NewReader(strings.NewReader(resp2))
err := helper.ComputeTiFlashStatus(br1, &regionReplica)
require.NoError(t, err)
CalvinNeo marked this conversation as resolved.
Show resolved Hide resolved
err = helper.ComputeTiFlashStatus(br2, &regionReplica)
require.NoError(t, err)
require.Equal(t, len(regionReplica), 1)
CalvinNeo marked this conversation as resolved.
Show resolved Hide resolved
}