Skip to content

Commit

Permalink
server: support partitioned table for the /mvcc/* HTTP API (#14197) (#…
Browse files Browse the repository at this point in the history
…16191)

* cherry pick #14197 to release-3.0
  • Loading branch information
sre-bot authored Apr 8, 2020
1 parent f75d6fe commit e9ecbc1
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 15 deletions.
6 changes: 6 additions & 0 deletions docs/tidb_http_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,12 @@ timezone.*
}
```

*Hint: On a partitioned table, use the `table(partition)` pattern as the table name, `test(p1)` for example:*

```shell
$curl http://127.0.0.1:10080/mvcc/index/test(p1)/t1/idx/1\?a\=A
```

1. Scatter regions of the specified table, add a `scatter-range` scheduler for the PD and the range is same as the table range.

```shell
Expand Down
55 changes: 41 additions & 14 deletions server/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
Expand Down Expand Up @@ -289,23 +290,38 @@ func (t *tikvHandlerTool) formValue2DatumRow(sc *stmtctx.StatementContext, value
}

func (t *tikvHandlerTool) getTableID(dbName, tableName string) (int64, error) {
tbInfo, err := t.getTable(dbName, tableName)
tbl, err := t.getTable(dbName, tableName)
if err != nil {
return 0, errors.Trace(err)
}
return tbInfo.ID, nil
return tbl.GetPhysicalID(), nil
}

func (t *tikvHandlerTool) getTable(dbName, tableName string) (*model.TableInfo, error) {
func (t *tikvHandlerTool) getTable(dbName, tableName string) (table.PhysicalTable, error) {
schema, err := t.schema()
if err != nil {
return nil, errors.Trace(err)
}
tableName, partitionName := extractTableAndPartitionName(tableName)
tableVal, err := schema.TableByName(model.NewCIStr(dbName), model.NewCIStr(tableName))
if err != nil {
return nil, errors.Trace(err)
}
return tableVal.Meta(), nil
if pt, ok := tableVal.(table.PartitionedTable); ok {
if partitionName == "" {
return nil, errors.New("work on partitioned table, please specify the table name like this: table(partition)")
}
tblInfo := pt.Meta()
pid, err := tables.FindPartitionByName(tblInfo, partitionName)
if err != nil {
return nil, errors.Trace(err)
}
return pt.GetPartition(pid), nil
}
if partitionName != "" {
return nil, fmt.Errorf("%s is not a partitionted table", tableName)
}
return tableVal.(table.PhysicalTable), nil
}

func (t *tikvHandlerTool) schema() (infoschema.InfoSchema, error) {
Expand Down Expand Up @@ -1340,20 +1356,31 @@ func (h mvccTxnHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
}

func extractTableAndPartitionName(str string) (string, string) {
// extract table name and partition name from this "table(partition)":
// A sane person would not let the the table name or partition name contain '('.
start := strings.IndexByte(str, '(')
if start == -1 {
return str, ""
}
end := strings.IndexByte(str, ')')
if end == -1 {
return str, ""
}
return str[:start], str[start+1 : end]
}

// handleMvccGetByIdx gets MVCC info by an index key.
func (h mvccTxnHandler) handleMvccGetByIdx(params map[string]string, values url.Values) (interface{}, error) {
dbName := params[pDBName]
tableName := params[pTableName]
handleStr := params[pHandle]
schema, err := h.schema()
if err != nil {
return nil, errors.Trace(err)
}
// get table's schema.
t, err := schema.TableByName(model.NewCIStr(dbName), model.NewCIStr(tableName))

t, err := h.getTable(dbName, tableName)
if err != nil {
return nil, errors.Trace(err)
}

var idxCols []*model.ColumnInfo
var idx table.Index
for _, v := range t.Indices() {
Expand Down Expand Up @@ -1381,15 +1408,15 @@ func (h mvccTxnHandler) handleMvccGetByKey(params map[string]string, decodeData
if err != nil {
return nil, errors.Trace(err)
}
resp, err := h.getMvccByHandle(tb.ID, handle)
resp, err := h.getMvccByHandle(tb.GetPhysicalID(), handle)
if err != nil {
return nil, err
}
if !decodeData {
return resp, nil
}
colMap := make(map[int64]*types.FieldType, 3)
for _, col := range tb.Columns {
for _, col := range tb.Meta().Columns {
colMap[col.ID] = &col.FieldType
}

Expand All @@ -1399,13 +1426,13 @@ func (h mvccTxnHandler) handleMvccGetByKey(params map[string]string, decodeData
datas := make(map[string][]map[string]string)
for _, w := range respValue.Info.Writes {
if len(w.ShortValue) > 0 {
datas[strconv.FormatUint(w.StartTs, 10)], err = h.decodeMvccData(w.ShortValue, colMap, tb)
datas[strconv.FormatUint(w.StartTs, 10)], err = h.decodeMvccData(w.ShortValue, colMap, tb.Meta())
}
}

for _, v := range respValue.Info.Values {
if len(v.Value) > 0 {
datas[strconv.FormatUint(v.StartTs, 10)], err = h.decodeMvccData(v.Value, colMap, tb)
datas[strconv.FormatUint(v.StartTs, 10)], err = h.decodeMvccData(v.Value, colMap, tb.Meta())
}
}

Expand Down
39 changes: 38 additions & 1 deletion server/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,10 +392,19 @@ func (ts *HTTPHandlerTestSuite) prepareData(c *C) {
dbt.mustExec("alter table tidb.test add index idx1 (a, b);")
dbt.mustExec("alter table tidb.test add unique index idx2 (a, b);")

dbt.mustExec(`create table tidb.pt (a int) partition by range (a)
dbt.mustExec(`create table tidb.pt (a int primary key, b varchar(20), key idx(a, b))
partition by range (a)
(partition p0 values less than (256),
partition p1 values less than (512),
partition p2 values less than (1024))`)

txn2, err := dbt.db.Begin()
c.Assert(err, IsNil)
txn2.Exec("insert into tidb.pt values (42, '123')")
txn2.Exec("insert into tidb.pt values (256, 'b')")
txn2.Exec("insert into tidb.pt values (666, 'def')")
err = txn2.Commit()
c.Assert(err, IsNil)
}

func decodeKeyMvcc(closer io.ReadCloser, c *C, valid bool) {
Expand Down Expand Up @@ -458,6 +467,29 @@ func (ts *HTTPHandlerTestSuite) TestGetTableMVCC(c *C) {
err = decoder.Decode(&data2)
c.Assert(err, IsNil)
c.Assert(data2, DeepEquals, data)

resp, err = http.Get(fmt.Sprintf("http://127.0.0.1:10090/mvcc/key/tidb/test/1?decode=true"))
c.Assert(err, IsNil)
decoder = json.NewDecoder(resp.Body)
var data3 map[string]interface{}
err = decoder.Decode(&data3)
c.Assert(err, IsNil)
c.Assert(data3["key"], NotNil)
c.Assert(data3["info"], NotNil)
c.Assert(data3["data"], NotNil)
c.Assert(data3["decode_error"], IsNil)

resp, err = http.Get(fmt.Sprintf("http://127.0.0.1:10090/mvcc/key/tidb/pt(p0)/42?decode=true"))
c.Assert(err, IsNil)
defer resp.Body.Close()
decoder = json.NewDecoder(resp.Body)
var data4 map[string]interface{}
err = decoder.Decode(&data4)
c.Assert(err, IsNil)
c.Assert(data4["key"], NotNil)
c.Assert(data4["info"], NotNil)
c.Assert(data4["data"], NotNil)
c.Assert(data4["decode_error"], IsNil)
}

func (ts *HTTPHandlerTestSuite) TestGetMVCCNotFound(c *C) {
Expand Down Expand Up @@ -601,6 +633,11 @@ func (ts *HTTPHandlerTestSuite) TestGetIndexMVCC(c *C) {
var data2 kvrpcpb.MvccGetByKeyResponse
err = decoder.Decode(&data2)
c.Assert(err, NotNil)

resp, err = http.Get("http://127.0.0.1:10090/mvcc/index/tidb/pt(p2)/idx/666?a=666&b=def")
c.Assert(err, IsNil)
defer resp.Body.Close()
decodeKeyMvcc(resp.Body, c, true)
}

func (ts *HTTPHandlerTestSuite) TestGetSettings(c *C) {
Expand Down

0 comments on commit e9ecbc1

Please sign in to comment.