Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: support partitioned table for the /mvcc/* HTTP API (#14197) #16191

Merged
merged 6 commits into from
Apr 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/tidb_http_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,12 @@ timezone.*
}
```

*Hint: On a partitioned table, use the `table(partition)` pattern as the table name, `test(p1)` for example:*

```shell
$curl http://127.0.0.1:10080/mvcc/index/test(p1)/t1/idx/1\?a\=A
```

1. Scatter regions of the specified table, add a `scatter-range` scheduler for the PD and the range is same as the table range.

```shell
Expand Down
55 changes: 41 additions & 14 deletions server/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
Expand Down Expand Up @@ -289,23 +290,38 @@ func (t *tikvHandlerTool) formValue2DatumRow(sc *stmtctx.StatementContext, value
}

func (t *tikvHandlerTool) getTableID(dbName, tableName string) (int64, error) {
tbInfo, err := t.getTable(dbName, tableName)
tbl, err := t.getTable(dbName, tableName)
if err != nil {
return 0, errors.Trace(err)
}
return tbInfo.ID, nil
return tbl.GetPhysicalID(), nil
}

func (t *tikvHandlerTool) getTable(dbName, tableName string) (*model.TableInfo, error) {
func (t *tikvHandlerTool) getTable(dbName, tableName string) (table.PhysicalTable, error) {
schema, err := t.schema()
if err != nil {
return nil, errors.Trace(err)
}
tableName, partitionName := extractTableAndPartitionName(tableName)
tableVal, err := schema.TableByName(model.NewCIStr(dbName), model.NewCIStr(tableName))
if err != nil {
return nil, errors.Trace(err)
}
return tableVal.Meta(), nil
if pt, ok := tableVal.(table.PartitionedTable); ok {
if partitionName == "" {
return nil, errors.New("work on partitioned table, please specify the table name like this: table(partition)")
}
tblInfo := pt.Meta()
pid, err := tables.FindPartitionByName(tblInfo, partitionName)
if err != nil {
return nil, errors.Trace(err)
}
return pt.GetPartition(pid), nil
}
if partitionName != "" {
return nil, fmt.Errorf("%s is not a partitionted table", tableName)
}
return tableVal.(table.PhysicalTable), nil
}

func (t *tikvHandlerTool) schema() (infoschema.InfoSchema, error) {
Expand Down Expand Up @@ -1340,20 +1356,31 @@ func (h mvccTxnHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
}

func extractTableAndPartitionName(str string) (string, string) {
// extract table name and partition name from this "table(partition)":
// A sane person would not let the the table name or partition name contain '('.
start := strings.IndexByte(str, '(')
if start == -1 {
return str, ""
}
end := strings.IndexByte(str, ')')
if end == -1 {
return str, ""
}
return str[:start], str[start+1 : end]
}

// handleMvccGetByIdx gets MVCC info by an index key.
func (h mvccTxnHandler) handleMvccGetByIdx(params map[string]string, values url.Values) (interface{}, error) {
dbName := params[pDBName]
tableName := params[pTableName]
handleStr := params[pHandle]
schema, err := h.schema()
if err != nil {
return nil, errors.Trace(err)
}
// get table's schema.
t, err := schema.TableByName(model.NewCIStr(dbName), model.NewCIStr(tableName))

t, err := h.getTable(dbName, tableName)
if err != nil {
return nil, errors.Trace(err)
}

var idxCols []*model.ColumnInfo
var idx table.Index
for _, v := range t.Indices() {
Expand Down Expand Up @@ -1381,15 +1408,15 @@ func (h mvccTxnHandler) handleMvccGetByKey(params map[string]string, decodeData
if err != nil {
return nil, errors.Trace(err)
}
resp, err := h.getMvccByHandle(tb.ID, handle)
resp, err := h.getMvccByHandle(tb.GetPhysicalID(), handle)
if err != nil {
return nil, err
}
if !decodeData {
return resp, nil
}
colMap := make(map[int64]*types.FieldType, 3)
for _, col := range tb.Columns {
for _, col := range tb.Meta().Columns {
colMap[col.ID] = &col.FieldType
}

Expand All @@ -1399,13 +1426,13 @@ func (h mvccTxnHandler) handleMvccGetByKey(params map[string]string, decodeData
datas := make(map[string][]map[string]string)
for _, w := range respValue.Info.Writes {
if len(w.ShortValue) > 0 {
datas[strconv.FormatUint(w.StartTs, 10)], err = h.decodeMvccData(w.ShortValue, colMap, tb)
datas[strconv.FormatUint(w.StartTs, 10)], err = h.decodeMvccData(w.ShortValue, colMap, tb.Meta())
}
}

for _, v := range respValue.Info.Values {
if len(v.Value) > 0 {
datas[strconv.FormatUint(v.StartTs, 10)], err = h.decodeMvccData(v.Value, colMap, tb)
datas[strconv.FormatUint(v.StartTs, 10)], err = h.decodeMvccData(v.Value, colMap, tb.Meta())
}
}

Expand Down
39 changes: 38 additions & 1 deletion server/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,10 +392,19 @@ func (ts *HTTPHandlerTestSuite) prepareData(c *C) {
dbt.mustExec("alter table tidb.test add index idx1 (a, b);")
dbt.mustExec("alter table tidb.test add unique index idx2 (a, b);")

dbt.mustExec(`create table tidb.pt (a int) partition by range (a)
dbt.mustExec(`create table tidb.pt (a int primary key, b varchar(20), key idx(a, b))
partition by range (a)
(partition p0 values less than (256),
partition p1 values less than (512),
partition p2 values less than (1024))`)

txn2, err := dbt.db.Begin()
c.Assert(err, IsNil)
txn2.Exec("insert into tidb.pt values (42, '123')")
txn2.Exec("insert into tidb.pt values (256, 'b')")
txn2.Exec("insert into tidb.pt values (666, 'def')")
err = txn2.Commit()
c.Assert(err, IsNil)
}

func decodeKeyMvcc(closer io.ReadCloser, c *C, valid bool) {
Expand Down Expand Up @@ -458,6 +467,29 @@ func (ts *HTTPHandlerTestSuite) TestGetTableMVCC(c *C) {
err = decoder.Decode(&data2)
c.Assert(err, IsNil)
c.Assert(data2, DeepEquals, data)

resp, err = http.Get(fmt.Sprintf("http://127.0.0.1:10090/mvcc/key/tidb/test/1?decode=true"))
c.Assert(err, IsNil)
decoder = json.NewDecoder(resp.Body)
var data3 map[string]interface{}
err = decoder.Decode(&data3)
c.Assert(err, IsNil)
c.Assert(data3["key"], NotNil)
c.Assert(data3["info"], NotNil)
c.Assert(data3["data"], NotNil)
c.Assert(data3["decode_error"], IsNil)

resp, err = http.Get(fmt.Sprintf("http://127.0.0.1:10090/mvcc/key/tidb/pt(p0)/42?decode=true"))
c.Assert(err, IsNil)
defer resp.Body.Close()
decoder = json.NewDecoder(resp.Body)
var data4 map[string]interface{}
err = decoder.Decode(&data4)
c.Assert(err, IsNil)
c.Assert(data4["key"], NotNil)
c.Assert(data4["info"], NotNil)
c.Assert(data4["data"], NotNil)
c.Assert(data4["decode_error"], IsNil)
}

func (ts *HTTPHandlerTestSuite) TestGetMVCCNotFound(c *C) {
Expand Down Expand Up @@ -601,6 +633,11 @@ func (ts *HTTPHandlerTestSuite) TestGetIndexMVCC(c *C) {
var data2 kvrpcpb.MvccGetByKeyResponse
err = decoder.Decode(&data2)
c.Assert(err, NotNil)

resp, err = http.Get("http://127.0.0.1:10090/mvcc/index/tidb/pt(p2)/idx/666?a=666&b=def")
c.Assert(err, IsNil)
defer resp.Body.Close()
decodeKeyMvcc(resp.Body, c, true)
}

func (ts *HTTPHandlerTestSuite) TestGetSettings(c *C) {
Expand Down