Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: get tiflash http(s) port from tiflash status port instead of etcd #30901

Merged
merged 8 commits into from
Dec 28, 2021
Merged
107 changes: 62 additions & 45 deletions executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/deadlock"
"github.com/pingcap/tidb/ddl/label"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/expression"
Expand Down Expand Up @@ -69,7 +68,6 @@ import (
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/stmtsummary"
"github.com/pingcap/tidb/util/stringutil"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -2616,54 +2614,73 @@ type tiflashInstanceInfo struct {
}

func (e *TiFlashSystemTableRetriever) initialize(sctx sessionctx.Context, tiflashInstances set.StringSet) error {
store := sctx.GetStore()
if etcd, ok := store.(kv.EtcdBackend); ok {
var addrs []string
var err error
if addrs, err = etcd.EtcdAddrs(); err != nil {
storeInfo, err := infoschema.GetStoreServerInfo(sctx)
if err != nil {
return err
}

for _, info := range storeInfo {
if info.ServerType != kv.TiFlash.Name() {
continue
}
info.ResolveLoopBackAddr()
if len(tiflashInstances) > 0 && !tiflashInstances.Exist(info.Address) {
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

append a warning?

Copy link
Contributor Author

@lidezhu lidezhu Dec 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tiflashInstances is just the where clause in the select list.
For example, if the sql is select * from tiflash_tables, then tiflashInstances is empty and it means we need to select the info for all tiflash nodes in the cluster.
If the sql is select * from tiflash_tables where tiflash_instance='127.0.0.1:9000', then tiflashInstances contains the element 127.0.0.1:9000 and we need filter out other tiflash nodes.

}
hostAndStatusPort := strings.Split(info.StatusAddr, ":")
if len(hostAndStatusPort) != 2 {
return errors.Errorf("node status addr: %s format illegal", info.StatusAddr)
}
// fetch tiflash config
configURL := fmt.Sprintf("%s://%s/config", util.InternalHTTPSchema(), info.StatusAddr)
resp, err := util.InternalHTTPClient().Get(configURL)
if err != nil {
lidezhu marked this conversation as resolved.
Show resolved Hide resolved
sctx.GetSessionVars().StmtCtx.AppendWarning(err)
continue
}
if resp.StatusCode != http.StatusOK {
return errors.Errorf("request %s failed: %s", configURL, resp.Status)
}
Comment on lines +2637 to +2643
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should append warning or return error first?

Copy link
Contributor Author

@lidezhu lidezhu Dec 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If tiflash doesn't return response, it means the tiflash node is down or is network partitioned, we can safely ignore the tiflash node and just append warning.
But if tiflash can return response, but the status code is not StatusOK, then something bad unknown must happen and an error is perfered to remind this case.

// parse http_port or https_port from the fetched config
var nestedConfig map[string]interface{}
if err = json.NewDecoder(resp.Body).Decode(&nestedConfig); err != nil {
return err
}
if addrs != nil {
domainFromCtx := domain.GetDomain(sctx)
if domainFromCtx != nil {
cli := domainFromCtx.GetEtcdClient()
prefix := "/tiflash/cluster/http_port/"
kv := clientv3.NewKV(cli)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
resp, err := kv.Get(ctx, prefix, clientv3.WithPrefix())
cancel()
if err != nil {
return errors.Trace(err)
}
for _, ev := range resp.Kvs {
id := string(ev.Key)[len(prefix):]
if len(tiflashInstances) > 0 && !tiflashInstances.Exist(id) {
continue
}
url := fmt.Sprintf("%s://%s", util.InternalHTTPSchema(), ev.Value)
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return errors.Trace(err)
}
resp, err := util.InternalHTTPClient().Do(req)
if err != nil {
sctx.GetSessionVars().StmtCtx.AppendWarning(err)
continue
}
resp.Body.Close()
e.instanceInfos = append(e.instanceInfos, tiflashInstanceInfo{
id: id,
url: url,
})
e.instanceCount += 1
}
e.initialized = true
return nil
if engineStoreConfig, ok := nestedConfig["engine-store"]; ok {
foundPort := false
var port interface{}
portProtocol := ""
if httpPort, ok := engineStoreConfig.(map[string]interface{})["http_port"]; ok {
foundPort = true
port = httpPort
portProtocol = "http"
} else if httpsPort, ok := engineStoreConfig.(map[string]interface{})["https_port"]; ok {
foundPort = true
port = httpsPort
portProtocol = "https"
}
if !foundPort {
return errors.Errorf("engine-store.http_port/https_port not found in server %s", info.Address)
}
switch portValue := port.(type) {
case float64:
e.instanceInfos = append(e.instanceInfos, tiflashInstanceInfo{
id: info.Address,
url: fmt.Sprintf("%s://%s:%d", portProtocol, hostAndStatusPort[0], int(portValue)),
})
e.instanceCount += 1
default:
return errors.Errorf("engine-store.http_port value(%p) unexpected in server %s", port, info.Address)
}
} else {
return errors.Errorf("engine-store config not found in server %s", info.Address)
}
if err = resp.Body.Close(); err != nil {
lidezhu marked this conversation as resolved.
Show resolved Hide resolved
return err
}
return errors.Errorf("Etcd addrs not found")
}
return errors.Errorf("%T not an etcd backend", store)
e.initialized = true
return nil
}

func (e *TiFlashSystemTableRetriever) dataForTiFlashSystemTables(ctx sessionctx.Context, tidbDatabases string, tidbTables string) ([][]types.Datum, error) {
Expand Down
4 changes: 2 additions & 2 deletions executor/infoschema_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -969,9 +969,9 @@ func (s *testInfoschemaTableSuite) TestSequences(c *C) {
func (s *testInfoschemaTableSuite) TestTiFlashSystemTables(c *C) {
tk := testkit.NewTestKit(c, s.store)
err := tk.QueryToErr("select * from information_schema.TIFLASH_TABLES;")
c.Assert(err.Error(), Equals, "Etcd addrs not found")
c.Assert(err, Equals, nil)
err = tk.QueryToErr("select * from information_schema.TIFLASH_SEGMENTS;")
c.Assert(err.Error(), Equals, "Etcd addrs not found")
c.Assert(err, Equals, nil)
}

func (s *testInfoschemaTableSuite) TestTablesPKType(c *C) {
Expand Down