From 24dd32c52aeaf9ffc6a149d49d03f8571263ea14 Mon Sep 17 00:00:00 2001 From: Shirly Date: Tue, 21 Mar 2017 16:51:11 +0800 Subject: [PATCH 1/9] check_tso:make sure commit_ts > start_ts during commit in transaction (#2901) --- store/tikv/2pc.go | 9 +++++++++ store/tikv/2pc_test.go | 17 +++++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 47bd0784b3527..a9de2c7e0740a 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -506,6 +506,15 @@ func (c *twoPhaseCommitter) execute() error { log.Warnf("2PC get commitTS failed: %v, tid: %d", err, c.startTS) return errors.Trace(err) } + + // check commitTS + if commitTS <= c.startTS { + err = errors.Errorf("Invalid transaction tso with start_ts=%v while commit_ts=%v", + c.startTS, + commitTS) + log.Error(err) + return errors.Trace(err) + } c.commitTS = commitTS if err := c.checkSchemaValid(); err != nil { return errors.Trace(err) diff --git a/store/tikv/2pc_test.go b/store/tikv/2pc_test.go index ab7d6fb278fc0..4b1ab66214d09 100644 --- a/store/tikv/2pc_test.go +++ b/store/tikv/2pc_test.go @@ -14,6 +14,7 @@ package tikv import ( + "math" "math/rand" "strings" "time" @@ -286,3 +287,19 @@ func (c *slowClient) SendCopReq(ctx goctx.Context, addr string, req *coprocessor } return c.Client.SendCopReq(ctx, addr, req, timeout) } + +func (s *testCommitterSuite) TestIllegalTso(c *C) { + txn := s.begin(c) + data := map[string]string{ + "name": "aa", + "age": "12", + } + for k, v := range data { + err := txn.Set([]byte(k), []byte(v)) + c.Assert(err, IsNil) + } + // make start ts bigger. + txn.startTS = uint64(math.MaxUint64) + err := txn.Commit() + c.Assert(err, NotNil) +} From a3777f4cac27b04007b84f001acf5a26f8896ce9 Mon Sep 17 00:00:00 2001 From: Du Chuan Date: Tue, 21 Mar 2017 18:06:11 +0800 Subject: [PATCH 2/9] builtin: add uuid built-in function (#2875) --- expression/builtin_miscellaneous.go | 11 ++++++----- expression/builtin_miscellaneous_test.go | 25 ++++++++++++++++++++++++ plan/typeinferer.go | 2 +- plan/typeinferer_test.go | 1 + 4 files changed, 33 insertions(+), 6 deletions(-) diff --git a/expression/builtin_miscellaneous.go b/expression/builtin_miscellaneous.go index e124510d02316..e900dd796cf02 100644 --- a/expression/builtin_miscellaneous.go +++ b/expression/builtin_miscellaneous.go @@ -13,12 +13,12 @@ package expression import ( - "net" - "time" - "github.com/juju/errors" "github.com/pingcap/tidb/context" "github.com/pingcap/tidb/util/types" + "github.com/twinj/uuid" + "net" + "time" ) var ( @@ -450,8 +450,9 @@ type builtinUUIDSig struct { } // See https://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_uuid -func (b *builtinUUIDSig) eval(row []types.Datum) (d types.Datum, err error) { - return d, errFunctionNotExists.GenByArgs("UUID") +func (b *builtinUUIDSig) eval(_ []types.Datum) (d types.Datum, err error) { + d.SetString(uuid.NewV1().String()) + return } type uuidShortFunctionClass struct { diff --git a/expression/builtin_miscellaneous_test.go b/expression/builtin_miscellaneous_test.go index 7a72ceff2ee4b..c40465c5794b1 100644 --- a/expression/builtin_miscellaneous_test.go +++ b/expression/builtin_miscellaneous_test.go @@ -18,8 +18,33 @@ import ( "github.com/pingcap/tidb/util/testleak" "github.com/pingcap/tidb/util/testutil" "github.com/pingcap/tidb/util/types" + "strings" ) +func (s *testEvaluatorSuite) TestUUID(c *C) { + defer testleak.AfterTest(c)() + fc := funcs[ast.UUID] + f, err := fc.getFunction(datumsToConstants(types.MakeDatums()), s.ctx) + r, err := f.eval(nil) + c.Assert(err, IsNil) + parts := strings.Split(r.GetString(), "-") + c.Assert(len(parts), Equals, 5) + for i, p := range parts { + switch i { + case 0: + c.Assert(len(p), Equals, 8) + case 1: + fallthrough + case 2: + fallthrough + case 3: + c.Assert(len(p), Equals, 4) + case 4: + c.Assert(len(p), Equals, 12) + } + } +} + func (s *testEvaluatorSuite) TestAnyValue(c *C) { defer testleak.AfterTest(c)() diff --git a/plan/typeinferer.go b/plan/typeinferer.go index 8e50cc0d62084..8592754140913 100644 --- a/plan/typeinferer.go +++ b/plan/typeinferer.go @@ -377,7 +377,7 @@ func (v *typeInferrer) handleFuncCallExpr(x *ast.FuncCallExpr) { "concat", "concat_ws", "left", "lcase", "lower", "repeat", "replace", "ucase", "upper", "convert", "substring", "elt", "substring_index", "trim", "ltrim", "rtrim", "reverse", "hex", "unhex", - "date_format", "rpad", "lpad", "char_func", "conv", "make_set", "oct": + "date_format", "rpad", "lpad", "char_func", "conv", "make_set", "oct", "uuid": tp = types.NewFieldType(mysql.TypeVarString) chs = v.defaultCharset case "strcmp", "isnull", "bit_length", "char_length", "character_length", "crc32", "timestampdiff", "sign", diff --git a/plan/typeinferer_test.go b/plan/typeinferer_test.go index 2b70a2a52a1c4..aa7c3e677620d 100644 --- a/plan/typeinferer_test.go +++ b/plan/typeinferer_test.go @@ -245,6 +245,7 @@ func (ts *testTypeInferrerSuite) TestInferType(c *C) { {`md5(123)`, mysql.TypeVarString, "utf8"}, {`sha1(123)`, mysql.TypeVarString, "utf8"}, {`sha(123)`, mysql.TypeVarString, "utf8"}, + {`uuid()`, mysql.TypeVarString, "utf8"}, {`coalesce(null, 0)`, mysql.TypeLonglong, charset.CharsetBin}, {`coalesce(null, 0.1)`, mysql.TypeNewDecimal, charset.CharsetBin}, {`coalesce(1, "1" + 1)`, mysql.TypeDouble, charset.CharsetBin}, From 13fa294a73504102a60a7e354a2109437cf47af4 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 21 Mar 2017 18:16:00 +0800 Subject: [PATCH 3/9] *: provide a command line flag --skip-grant-table (#2897) --- privilege/privileges/privileges.go | 10 +++++++++- session_test.go | 24 ++++++++++++++++++++++++ tidb-server/main.go | 12 +++++++++++- 3 files changed, 44 insertions(+), 2 deletions(-) diff --git a/privilege/privileges/privileges.go b/privilege/privileges/privileges.go index 6a2107c317103..0d060924e0f62 100644 --- a/privilege/privileges/privileges.go +++ b/privilege/privileges/privileges.go @@ -34,6 +34,9 @@ import ( // Enable enables the new privilege check feature. var Enable = false +// SkipWithGrant causes the server to start without using the privilege system at all. +var SkipWithGrant = false + // privilege error codes. const ( codeInvalidPrivilegeType terror.ErrCode = 1 @@ -182,7 +185,7 @@ type UserPrivileges struct { // RequestVerification implements the Checker interface. func (p *UserPrivileges) RequestVerification(db, table, column string, priv mysql.PrivilegeType) bool { - if !Enable { + if !Enable || SkipWithGrant { return true } @@ -212,6 +215,11 @@ const PWDHashLen = 40 // ConnectionVerification implements the Checker interface. func (p *UserPrivileges) ConnectionVerification(user, host string, auth, salt []byte) bool { + if SkipWithGrant { + p.User = user + "@" + host + return true + } + mysqlPriv := p.Handle.Get() record := mysqlPriv.connectionVerification(user, host) if record == nil { diff --git a/session_test.go b/session_test.go index 7b48b5339b33b..23507ba267cfc 100644 --- a/session_test.go +++ b/session_test.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tidb/mysql" "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/plan" + "github.com/pingcap/tidb/privilege/privileges" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/store/localstore" "github.com/pingcap/tidb/table/tables" @@ -2079,6 +2080,29 @@ func (s *testSessionSuite) TestSessionAuth(c *C) { mustExecSQL(c, se, dropDBSQL) } +func (s *testSessionSuite) TestSkipWithGrant(c *C) { + defer testleak.AfterTest(c)() + dbName := "test_skip_with_grant" + se := newSession(c, s.store, dbName) + dropDBSQL := fmt.Sprintf("drop database %s;", dbName) + + save1 := privileges.Enable + save2 := privileges.SkipWithGrant + + privileges.Enable = true + privileges.SkipWithGrant = false + c.Assert(se.Auth("user_not_exist", []byte("yyy"), []byte("zzz")), IsFalse) + + privileges.SkipWithGrant = true + c.Assert(se.Auth(`xxx@%`, []byte("yyy"), []byte("zzz")), IsTrue) + c.Assert(se.Auth(`root@%`, []byte(""), []byte("")), IsTrue) + mustExecSQL(c, se, "create table t (id int)") + + privileges.Enable = save1 + privileges.SkipWithGrant = save2 + mustExecSQL(c, se, dropDBSQL) +} + func (s *testSessionSuite) TestErrorRollback(c *C) { defer testleak.AfterTest(c)() dbName := "test_error_rollback" diff --git a/tidb-server/main.go b/tidb-server/main.go index 8c24916d8b40c..0e216a0c40f93 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -54,7 +54,7 @@ var ( lease = flag.String("lease", "1s", "schema lease duration, very dangerous to change only if you know what you do") socket = flag.String("socket", "", "The socket file to use for connection.") enablePS = flag.Bool("perfschema", false, "If enable performance schema.") - enablePrivilege = flag.Bool("privilege", false, "If enable privilege check feature.") + enablePrivilege = flag.Bool("privilege", false, "If enable privilege check feature. This flag will be removed in the future.") reportStatus = flag.Bool("report-status", true, "If enable status report HTTP service.") logFile = flag.String("log-file", "", "log file path") joinCon = flag.Int("join-concurrency", 5, "the number of goroutines that participate joining.") @@ -64,6 +64,7 @@ var ( binlogSocket = flag.String("binlog-socket", "", "socket file to write binlog") runDDL = flag.Bool("run-ddl", true, "run ddl worker on this tidb-server") retryLimit = flag.Int("retry-limit", 10, "the maximum number of retries when commit a transaction") + skipGrantTable = flag.Bool("skip-grant-table", false, "This option causes the server to start without using the privilege system at all.") timeJumpBackCounter = prometheus.NewCounter( prometheus.CounterOpts{ @@ -85,6 +86,10 @@ func main() { printer.PrintRawTiDBInfo() os.Exit(0) } + if *skipGrantTable && !hasRootPrivilege() { + log.Error("TiDB run with skip-grant-table need root privilege.") + os.Exit(-1) + } leaseDuration := parseLease() tidb.SetSchemaLease(leaseDuration) @@ -125,6 +130,7 @@ func main() { perfschema.EnablePerfSchema() } privileges.Enable = *enablePrivilege + privileges.SkipWithGrant = *skipGrantTable if *binlogSocket != "" { createBinlogClient() } @@ -229,3 +235,7 @@ func parseLease() time.Duration { } return dur } + +func hasRootPrivilege() bool { + return os.Geteuid() == 0 +} From d215257ea9b682c6522498fd4e0a3e9ac6a54970 Mon Sep 17 00:00:00 2001 From: Ewan Chou Date: Tue, 21 Mar 2017 19:52:03 +0800 Subject: [PATCH 4/9] ddl: set collate if not set. Should set column collate to default one if charset is set but collation is not set. --- ddl/column_test.go | 3 ++- ddl/ddl_api.go | 14 ++++++++++++++ util/charset/charset.go | 4 ++-- 3 files changed, 18 insertions(+), 3 deletions(-) diff --git a/ddl/column_test.go b/ddl/column_test.go index 34d5191d7ca66..7eefe7371ebd9 100644 --- a/ddl/column_test.go +++ b/ddl/column_test.go @@ -912,6 +912,7 @@ func (s *testColumnSuite) TestModifyColumn(c *C) { {"text", "blob", false}, {"varchar(10)", "varchar(8)", false}, {"varchar(10)", "varchar(11)", true}, + {"varchar(10) character set utf8 collate utf8_bin", "varchar(10) character set utf8", true}, } for _, ca := range cases { ftA := s.colDefStrToFieldType(c, ca.origin) @@ -925,7 +926,7 @@ func (s *testColumnSuite) colDefStrToFieldType(c *C, str string) *types.FieldTyp stmt, err := parser.New().ParseOneStmt(sqlA, "", "") c.Assert(err, IsNil) colDef := stmt.(*ast.AlterTableStmt).Specs[0].NewColumn - col, _, err := columnDefToCol(nil, 0, colDef) + col, _, err := buildColumnAndConstraint(nil, 0, colDef) c.Assert(err, IsNil) return &col.FieldType } diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index f48b5dc6b5235..6137797a0016a 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -112,6 +112,18 @@ func getDefaultCharsetAndCollate() (string, string) { return "utf8", "utf8_bin" } +func getDefaultCollateForCharset(str string) string { + switch str { + case charset.CharsetBin: + return charset.CollationBin + case charset.CharsetUTF8MB4: + return charset.CollationUTF8MB4 + case charset.CharsetUTF8: + return charset.CollationUTF8 + } + return "" +} + func setColumnFlagWithConstraint(colMap map[string]*table.Column, v *ast.Constraint) { switch v.Tp { case ast.ConstraintPrimaryKey: @@ -186,6 +198,8 @@ func setCharsetCollationFlenDecimal(tp *types.FieldType) { tp.Charset = charset.CharsetBin tp.Collate = charset.CharsetBin } + } else if len(tp.Collate) == 0 { + tp.Collate = getDefaultCollateForCharset(tp.Charset) } // If flen is not assigned, assigned it by type. if tp.Flen == types.UnspecifiedLength { diff --git a/util/charset/charset.go b/util/charset/charset.go index ffc3815ff9adc..486bff98206b2 100644 --- a/util/charset/charset.go +++ b/util/charset/charset.go @@ -152,11 +152,11 @@ const ( // CharsetUTF8 is the default charset for string types. CharsetUTF8 = "utf8" // CollationUTF8 is the default collation for CharsetUTF8. - CollationUTF8 = "utf8_general_ci" + CollationUTF8 = "utf8_bin" // CharsetUTF8MB4 represents 4 bytes utf8, which works the same way as utf8 in Go. CharsetUTF8MB4 = "utf8mb4" // CollationUTF8MB4 is the default collation for CharsetUTF8MB4. - CollationUTF8MB4 = "utf8mb4_general_ci" + CollationUTF8MB4 = "utf8mb4_bin" ) var collations = []*Collation{ From 0783456c782ec16fe54ff814a4b8323e6cffc4c7 Mon Sep 17 00:00:00 2001 From: Ewan Chou Date: Wed, 22 Mar 2017 00:00:00 +0800 Subject: [PATCH 5/9] *: do not use bytespool to allocate coprocessor response. (#2891) --- distsql/distsql.go | 54 ++++++-------------------------- distsql/distsql_test.go | 11 +++---- kv/kv.go | 4 +-- store/localstore/local_client.go | 28 ++--------------- store/localstore/xapi_test.go | 8 ++--- store/tikv/coprocessor.go | 8 ++--- util/bytespool/bytespool.go | 43 ------------------------- util/bytespool/bytespool_test.go | 16 ---------- 8 files changed, 21 insertions(+), 151 deletions(-) diff --git a/distsql/distsql.go b/distsql/distsql.go index ff4dceae1cdc3..5aacf04f2e69c 100644 --- a/distsql/distsql.go +++ b/distsql/distsql.go @@ -14,8 +14,6 @@ package distsql import ( - "io" - "io/ioutil" "time" "github.com/juju/errors" @@ -24,7 +22,6 @@ import ( "github.com/pingcap/tidb/mysql" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/terror" - "github.com/pingcap/tidb/util/bytespool" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/types" "github.com/pingcap/tipb/go-tipb" @@ -101,23 +98,21 @@ func (r *selectResult) fetch(ctx goctx.Context) { queryHistgram.WithLabelValues(label).Observe(duration.Seconds()) }() for { - reader, err := r.resp.Next() + resultSubset, err := r.resp.Next() if err != nil { r.results <- resultWithErr{err: errors.Trace(err)} return } - if reader == nil { + if resultSubset == nil { return } pr := &partialResult{ index: r.index, fields: r.fields, - reader: reader, aggregate: r.aggregate, ignoreData: r.ignoreData, - done: make(chan error, 1), } - go pr.fetch() + pr.unmarshal(resultSubset) select { case r.results <- resultWithErr{result: pr}: @@ -157,44 +152,25 @@ type partialResult struct { index bool aggregate bool fields []*types.FieldType - reader io.ReadCloser resp *tipb.SelectResponse chunkIdx int cursor int dataOffset int64 ignoreData bool - - done chan error - fetched bool } -func (pr *partialResult) fetch() { - defer close(pr.done) +func (pr *partialResult) unmarshal(resultSubset []byte) error { pr.resp = new(tipb.SelectResponse) - var b []byte - var err error - if rc, ok := pr.reader.(*bytespool.ReadCloser); ok { - b = rc.SharedBytes() - } else { - b, err = ioutil.ReadAll(pr.reader) - if err != nil { - pr.done <- errors.Trace(err) - return - } - } - - err = pr.resp.Unmarshal(b) + err := pr.resp.Unmarshal(resultSubset) if err != nil { - pr.done <- errors.Trace(err) - return + return errors.Trace(err) } if pr.resp.Error != nil { - pr.done <- errInvalidResp.Gen("[%d %s]", pr.resp.Error.GetCode(), pr.resp.Error.GetMsg()) - return + return errInvalidResp.Gen("[%d %s]", pr.resp.Error.GetCode(), pr.resp.Error.GetMsg()) } - pr.done <- nil + return nil } var dummyData = make([]types.Datum, 0) @@ -202,13 +178,6 @@ var dummyData = make([]types.Datum, 0) // Next returns the next row of the sub result. // If no more row to return, data would be nil. func (pr *partialResult) Next() (handle int64, data []types.Datum, err error) { - if !pr.fetched { - err = <-pr.done - pr.fetched = true - if err != nil { - return 0, nil, err - } - } if len(pr.resp.Chunks) > 0 { // For new resp rows structure. chunk := pr.getChunk() @@ -279,7 +248,7 @@ func (pr *partialResult) getChunk() *tipb.Chunk { // Close closes the sub result. func (pr *partialResult) Close() error { - return pr.reader.Close() + return nil } // Select do a select request, returns SelectResult. @@ -356,11 +325,6 @@ func composeRequest(req *tipb.SelectRequest, keyRanges []kv.KeyRange, concurrenc return kvReq, nil } -// SupportExpression checks if the expression is supported by the client. -func SupportExpression(client kv.Client, expr *tipb.Expr) bool { - return false -} - // XAPI error codes. const ( codeInvalidResp = 1 diff --git a/distsql/distsql_test.go b/distsql/distsql_test.go index 84d5317546b10..459bea1849e67 100644 --- a/distsql/distsql_test.go +++ b/distsql/distsql_test.go @@ -14,10 +14,7 @@ package distsql import ( - "bytes" "errors" - "io" - "io/ioutil" "runtime" "testing" "time" @@ -93,23 +90,23 @@ type mockResponse struct { count int } -func (resp *mockResponse) Next() (io.ReadCloser, error) { +func (resp *mockResponse) Next() ([]byte, error) { resp.count++ if resp.count == 100 { return nil, errors.New("error happend") } - return mockReaderCloser(), nil + return mockSubresult(), nil } func (resp *mockResponse) Close() error { return nil } -func mockReaderCloser() io.ReadCloser { +func mockSubresult() []byte { resp := new(tipb.SelectResponse) b, err := resp.Marshal() if err != nil { panic(err) } - return ioutil.NopCloser(bytes.NewBuffer(b)) + return b } diff --git a/kv/kv.go b/kv/kv.go index 7ec9ed0f7523d..471a76c3586f9 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -14,8 +14,6 @@ package kv import ( - "io" - goctx "golang.org/x/net/context" ) @@ -155,7 +153,7 @@ type Response interface { // Next returns a resultSubset from a single storage unit. // When full result set is returned, nil is returned. // TODO: Find a better interface for resultSubset that can avoid allocation and reuse bytes. - Next() (resultSubset io.ReadCloser, err error) + Next() (resultSubset []byte, err error) // Close response. Close() error } diff --git a/store/localstore/local_client.go b/store/localstore/local_client.go index f25c7d2c06947..c8e30977ba83f 100644 --- a/store/localstore/local_client.go +++ b/store/localstore/local_client.go @@ -1,8 +1,6 @@ package localstore import ( - "io" - "github.com/juju/errors" "github.com/pingcap/tidb/kv" "github.com/pingcap/tipb/go-tipb" @@ -94,28 +92,6 @@ func (c *dbClient) updateRegionInfo() { c.regionInfo = c.store.pd.GetRegionInfo() } -type localResponseReader struct { - s []byte - i int64 -} - -func (r *localResponseReader) Read(b []byte) (n int, err error) { - if len(b) == 0 { - return 0, nil - } - if r.i >= int64(len(r.s)) { - return 0, io.EOF - } - n = copy(b, r.s[r.i:]) - r.i += int64(n) - return -} - -func (r *localResponseReader) Close() error { - r.i = int64(len(r.s)) - return nil -} - type response struct { client *dbClient reqSent int @@ -134,7 +110,7 @@ type task struct { region *localRegion } -func (it *response) Next() (resp io.ReadCloser, err error) { +func (it *response) Next() (resp []byte, err error) { if it.finished { return nil, nil } @@ -160,7 +136,7 @@ func (it *response) Next() (resp io.ReadCloser, err error) { if it.reqSent == len(it.tasks) && it.respGot == it.reqSent { it.Close() } - return &localResponseReader{s: regionResp.data}, nil + return regionResp.data, nil } func (it *response) createRetryTasks(resp *regionResponse) []*task { diff --git a/store/localstore/xapi_test.go b/store/localstore/xapi_test.go index cf3fbfdf0d3ec..2a18a4c9034df 100644 --- a/store/localstore/xapi_test.go +++ b/store/localstore/xapi_test.go @@ -15,7 +15,6 @@ package localstore import ( "fmt" - "io/ioutil" "math" "sort" "testing" @@ -68,9 +67,8 @@ func (s *testXAPISuite) TestSelect(c *C) { req, err := prepareSelectRequest(tbInfo, txn.StartTS()) c.Check(err, IsNil) resp := client.Send(mockCtx, req) - subResp, err := resp.Next() + data, err := resp.Next() c.Check(err, IsNil) - data, err := ioutil.ReadAll(subResp) c.Check(err, IsNil) selResp := new(tipb.SelectResponse) proto.Unmarshal(data, selResp) @@ -97,9 +95,7 @@ func (s *testXAPISuite) TestSelect(c *C) { req, err = prepareIndexRequest(tbInfo, txn.StartTS()) c.Check(err, IsNil) resp = client.Send(mockCtx, req) - subResp, err = resp.Next() - c.Check(err, IsNil) - data, err = ioutil.ReadAll(subResp) + data, err = resp.Next() c.Check(err, IsNil) idxResp := new(tipb.SelectResponse) proto.Unmarshal(data, idxResp) diff --git a/store/tikv/coprocessor.go b/store/tikv/coprocessor.go index 2cd854958c10c..a02d5cee7b274 100644 --- a/store/tikv/coprocessor.go +++ b/store/tikv/coprocessor.go @@ -16,7 +16,6 @@ package tikv import ( "bytes" "fmt" - "io" "sync" "time" @@ -24,7 +23,6 @@ import ( "github.com/ngaut/log" "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/util/bytespool" "github.com/pingcap/tipb/go-tipb" goctx "golang.org/x/net/context" ) @@ -382,7 +380,7 @@ func (it *copIterator) sendToTaskCh(ctx goctx.Context, t *copTask) (finished boo } // Return next coprocessor result. -func (it *copIterator) Next() (io.ReadCloser, error) { +func (it *copIterator) Next() ([]byte, error) { coprocessorCounter.WithLabelValues("next").Inc() var ( @@ -416,7 +414,7 @@ func (it *copIterator) Next() (io.ReadCloser, error) { if resp.err != nil { return nil, errors.Trace(resp.err) } - return bytespool.NewReadCloser(bytespool.DefaultPool, nil, resp.Data), nil + return resp.Data, nil } // Handle single copTask. @@ -499,7 +497,7 @@ func (it *copIterator) Close() error { // copErrorResponse returns error when calling Next() type copErrorResponse struct{ error } -func (it copErrorResponse) Next() (io.ReadCloser, error) { +func (it copErrorResponse) Next() ([]byte, error) { return nil, it.error } diff --git a/util/bytespool/bytespool.go b/util/bytespool/bytespool.go index aa95121fcf0da..6ea46c2832810 100644 --- a/util/bytespool/bytespool.go +++ b/util/bytespool/bytespool.go @@ -14,7 +14,6 @@ package bytespool import ( - "bytes" "sync" ) @@ -89,45 +88,3 @@ func bucketIdx(size int) (i int) { } return } - -// ReadCloser frees the origin bytes when Close is called. -type ReadCloser struct { - pool *BytesPool - origin []byte - buffer *bytes.Buffer - closed bool -} - -// Close implements the io.ReadCloser interface. -// It frees the origin bytes allocated from the pool -func (r *ReadCloser) Close() error { - if r.closed { - return nil - } - r.pool.Free(r.origin) - r.closed = true - return nil -} - -// Read implements the io.ReadCloser interface. -func (r *ReadCloser) Read(b []byte) (n int, err error) { - return r.buffer.Read(b) -} - -// SharedBytes returns shared bytes of the underlying buffer. -// The returned bytes may not be used after Close is called. -// And it may not be used at the same time with Read. -func (r *ReadCloser) SharedBytes() []byte { - return r.buffer.Bytes() -} - -// NewReadCloser creates a ReadCloser. -// origin should be allocated from the pool, data can be any slice of the origin. -func NewReadCloser(pool *BytesPool, origin, data []byte) *ReadCloser { - r := &ReadCloser{ - pool: pool, - origin: origin, - buffer: bytes.NewBuffer(data), - } - return r -} diff --git a/util/bytespool/bytespool_test.go b/util/bytespool/bytespool_test.go index cef73bb07734d..4b638843d4ea4 100644 --- a/util/bytespool/bytespool_test.go +++ b/util/bytespool/bytespool_test.go @@ -14,8 +14,6 @@ package bytespool import ( - "io/ioutil" - "math/rand" "testing" . "github.com/pingcap/check" @@ -52,17 +50,3 @@ func (s *testBytesPoolSuite) TestBytesPool(c *C) { c.Assert(bp.Free(make([]byte, 100)), Equals, -1) c.Assert(bp.Free(make([]byte, kilo+1)), Equals, -1) } - -func (s *testBytesPoolSuite) TestReadCloser(c *C) { - bp := NewBytesPool() - origin, data := bp.Alloc(1000) - rand.Read(data) - r := NewReadCloser(bp, origin, data) - data2, _ := ioutil.ReadAll(r) - c.Assert(data2, BytesEquals, data) - origin, data = bp.Alloc(1000) - rand.Read(data) - r = NewReadCloser(bp, origin, data) - data2 = r.SharedBytes() - c.Assert(data2, BytesEquals, data) -} From ee322f74474f203b2a33d0b2d83ccae110e3625c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=A8=E6=96=87=5F=E6=83=98=E8=BF=B7?= Date: Wed, 22 Mar 2017 11:52:50 +0800 Subject: [PATCH 6/9] fix DATE_FORMAT func error (#2908) --- expression/builtin_time.go | 3 +++ expression/builtin_time_test.go | 10 ++++++++++ 2 files changed, 13 insertions(+) diff --git a/expression/builtin_time.go b/expression/builtin_time.go index 2e79a6a6ec685..ab850f36888bb 100644 --- a/expression/builtin_time.go +++ b/expression/builtin_time.go @@ -324,6 +324,9 @@ func builtinDateFormat(args []types.Datum, ctx context.Context) (types.Datum, er return d, errors.Trace(err) } + if date.IsNull() { + return d, nil + } t := date.GetMysqlTime() str, err := t.DateFormat(args[1].GetString()) if err != nil { diff --git a/expression/builtin_time_test.go b/expression/builtin_time_test.go index 5dd270516270e..79b563afcf29c 100644 --- a/expression/builtin_time_test.go +++ b/expression/builtin_time_test.go @@ -249,6 +249,16 @@ func (s *testEvaluatorSuite) TestDate(c *C) { func (s *testEvaluatorSuite) TestDateFormat(c *C) { defer testleak.AfterTest(c)() + // Test case for https://github.com/pingcap/tidb/issues/2908 + // SELECT DATE_FORMAT(null,'%Y-%M-%D') + args := []types.Datum{types.NewDatum(nil), types.NewStringDatum("%Y-%M-%D")} + fc := funcs[ast.DateFormat] + f, err := fc.getFunction(datumsToConstants(args), s.ctx) + c.Assert(err, IsNil) + v, err := f.eval(nil) + c.Assert(err, IsNil) + c.Assert(v.IsNull(), Equals, true) + tblDate := []struct { Input []string Expect interface{} From d00f14aaa902b96227db89b7998079fc403d8aed Mon Sep 17 00:00:00 2001 From: Lynn Date: Wed, 22 Mar 2017 12:43:05 +0800 Subject: [PATCH 7/9] store: Update pools's close function (#2906) --- store/tikv/conn_pool.go | 19 +++++++++++++++--- store/tikv/conn_pool_test.go | 39 ++++++++++++++++++++++++++++++++++++ store/tikv/kv.go | 7 ++++--- 3 files changed, 59 insertions(+), 6 deletions(-) diff --git a/store/tikv/conn_pool.go b/store/tikv/conn_pool.go index 09c3f2abc53ec..e61266af552e1 100644 --- a/store/tikv/conn_pool.go +++ b/store/tikv/conn_pool.go @@ -78,6 +78,7 @@ func (p *Pool) Close() { type Pools struct { m struct { sync.Mutex + isClosed bool capability int mpools map[string]*Pool } @@ -97,6 +98,10 @@ func NewPools(capability int, f createConnFunc) *Pools { // GetConn takes a connection out of the pool by addr. func (p *Pools) GetConn(addr string) (*Conn, error) { p.m.Lock() + if p.m.isClosed { + p.m.Unlock() + return nil, errors.Errorf("pools is closed") + } pool, ok := p.m.mpools[addr] if !ok { pool = NewPool(addr, p.m.capability, p.f) @@ -125,12 +130,20 @@ func (p *Pools) PutConn(c *Conn) { // Close closes the pool. func (p *Pools) Close() { - p.m.Lock() - defer p.m.Unlock() + var pools []*Pool + p.m.Lock() for _, pool := range p.m.mpools { - pool.Close() + pools = append(pools, pool) + } + p.m.isClosed = true + p.m.Unlock() + + for _, p := range pools { + p.Close() } + p.m.Lock() p.m.mpools = map[string]*Pool{} + p.m.Unlock() } diff --git a/store/tikv/conn_pool_test.go b/store/tikv/conn_pool_test.go index 439372831449e..086a4049776e3 100644 --- a/store/tikv/conn_pool_test.go +++ b/store/tikv/conn_pool_test.go @@ -93,3 +93,42 @@ func (s *testPoolSuite) TestPool(c *C) { _, err = p.GetConn() c.Assert(err, NotNil) } + +func (s *testPoolSuite) TestPoolsClose(c *C) { + count := 0 + addr := "127.0.0.1:6379" + f := func(addr string) (*Conn, error) { + count++ + return &Conn{ + addr: addr, + closed: false, + nc: &testDummyConn{}}, nil + } + + capability := 4 + pools := NewPools(capability, f) + + conns := make([]*Conn, 0, capability) + for i := 0; i < capability; i++ { + conn, err := pools.GetConn(addr) + c.Assert(err, IsNil) + conns = append(conns, conn) + } + + c.Assert(count, Equals, capability) + + for i := 0; i < len(conns)-1; i++ { + pools.PutConn(conns[i]) + } + + ch := make(chan struct{}) + var checkErr error + go func() { + <-ch + _, checkErr = pools.GetConn(addr) + pools.PutConn(conns[capability-1]) + }() + close(ch) + pools.Close() + c.Assert(checkErr, NotNil) +} diff --git a/store/tikv/kv.go b/store/tikv/kv.go index fe750a0473b0a..99eb4fcb72969 100644 --- a/store/tikv/kv.go +++ b/store/tikv/kv.go @@ -170,13 +170,14 @@ func (s *tikvStore) Close() error { defer mc.Unlock() delete(mc.cache, s.uuid) - if err := s.client.Close(); err != nil { - return errors.Trace(err) - } s.oracle.Close() if s.gcWorker != nil { s.gcWorker.Close() } + // Make sure all connections are put back into the pools. + if err := s.client.Close(); err != nil { + return errors.Trace(err) + } return nil } From ef7923c2de0470af7be8eab2f63f9ce96be86dde Mon Sep 17 00:00:00 2001 From: Han Fei Date: Wed, 22 Mar 2017 13:56:16 +0800 Subject: [PATCH 8/9] stats: if the stats in kv store is invalid, we will replace it with peusdo one. (#2910) --- domain/domain.go | 5 +++++ plan/statscache/statscache.go | 13 ++++++++++--- plan/statscache/statscache_test.go | 16 ++++++++++++++++ 3 files changed, 31 insertions(+), 3 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index a63a7d2a7feec..15bbdd34b3543 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -407,6 +407,11 @@ func (do *Domain) PrivilegeHandle() *privileges.Handle { return do.privHandle } +// StatsHandle returns the statistic handle. +func (do *Domain) StatsHandle() *statscache.Handle { + return do.statsHandle +} + func (do *Domain) loadTableStats() error { ver, err := do.store.CurrentVersion() if err != nil { diff --git a/plan/statscache/statscache.go b/plan/statscache/statscache.go index af903d3978d1e..6b452afda759f 100644 --- a/plan/statscache/statscache.go +++ b/plan/statscache/statscache.go @@ -38,6 +38,12 @@ type Handle struct { lastVersion uint64 } +// Clear the statsTblCache, only for test. +func (h *Handle) Clear() { + statsTblCache = statsCache{cache: map[int64]*statsInfo{}} + h.lastVersion = 0 +} + // NewHandle creates a Handle for update stats. func NewHandle(ctx context.Context) *Handle { return &Handle{ctx: ctx} @@ -66,9 +72,10 @@ func (h *Handle) Update(m *meta.Meta, is infoschema.InfoSchema) error { if tpb != nil { tbl, err = statistics.TableFromPB(tableInfo, tpb) // Error is not nil may mean that there are some ddl changes on this table, so the origin - // statistics can not be used any more, we give it a nil one. + // statistics can not be used any more, we give it a pseudo one. if err != nil { - log.Errorf("Error occured when convert pb table for table id %d", tableID) + log.Errorf("Error occured when convert pb table for table id %d, may be the table struct has changed", tableID) + tbl = statistics.PseudoTable(tableInfo) } SetStatisticsTableCache(tableID, tbl, version) } @@ -89,7 +96,7 @@ func GetStatisticsTableCache(tblInfo *model.TableInfo) *statistics.Table { statsTblCache.m.RLock() defer statsTblCache.m.RUnlock() stats, ok := statsTblCache.cache[tblInfo.ID] - if !ok || stats == nil { + if !ok || stats == nil || stats.tbl == nil { return statistics.PseudoTable(tblInfo) } tbl := stats.tbl diff --git a/plan/statscache/statscache_test.go b/plan/statscache/statscache_test.go index ff9872bd0929f..1d3c32a6a6805 100644 --- a/plan/statscache/statscache_test.go +++ b/plan/statscache/statscache_test.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/tidb" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/model" "github.com/pingcap/tidb/plan/statscache" "github.com/pingcap/tidb/util/testkit" @@ -61,6 +62,21 @@ func (s *testStatsCacheSuite) TestStatsCache(c *C) { testKit.MustExec("analyze table t") statsTbl = statscache.GetStatisticsTableCache(tableInfo) c.Assert(statsTbl.Pseudo, IsFalse) + testKit.MustExec("alter table t drop column c2") + is = do.InfoSchema() + tbl, err = is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + c.Assert(err, IsNil) + tableInfo = tbl.Meta() + + ver, err := store.CurrentVersion() + c.Assert(err, IsNil) + snapshot, err := store.GetSnapshot(ver) + c.Assert(err, IsNil) + m := meta.NewSnapshotMeta(snapshot) + do.StatsHandle().Clear() + do.StatsHandle().Update(m, is) + statsTbl = statscache.GetStatisticsTableCache(tableInfo) + c.Assert(statsTbl.Pseudo, IsTrue) } func newStoreWithBootstrap() (kv.Storage, *domain.Domain, error) { From 2c3e731f81d4d8c06c78c998e73d47f6efe5048a Mon Sep 17 00:00:00 2001 From: Rick Yu Date: Wed, 22 Mar 2017 14:21:25 +0800 Subject: [PATCH 9/9] builtin: add instr built-in function (#2857) --- expression/builtin_string.go | 42 ++++++++++++++++++++++++++++- expression/builtin_string_test.go | 44 +++++++++++++++++++++++++++++++ plan/typeinferer.go | 4 +-- plan/typeinferer_test.go | 1 + 4 files changed, 88 insertions(+), 3 deletions(-) diff --git a/expression/builtin_string.go b/expression/builtin_string.go index 1daf34280ea0d..7482bf13a573d 100644 --- a/expression/builtin_string.go +++ b/expression/builtin_string.go @@ -1695,7 +1695,47 @@ type builtinInstrSig struct { // See https://dev.mysql.com/doc/refman/5.6/en/string-functions.html#function_instr func (b *builtinInstrSig) eval(row []types.Datum) (d types.Datum, err error) { - return d, errFunctionNotExists.GenByArgs("instr") + args, err := b.evalArgs(row) + if err != nil { + return d, errors.Trace(err) + } + // INSTR(str, substr) + if args[0].IsNull() || args[1].IsNull() { + return d, nil + } + + var str, substr string + if str, err = args[0].ToString(); err != nil { + return d, errors.Trace(err) + } + if substr, err = args[1].ToString(); err != nil { + return d, errors.Trace(err) + } + + // INSTR performs case **insensitive** search by default, while at least one argument is binary string + // we do case sensitive search. + var caseSensitive bool + if args[0].Kind() == types.KindBytes || args[1].Kind() == types.KindBytes { + caseSensitive = true + } + + var pos, idx int + if caseSensitive { + idx = strings.Index(str, substr) + } else { + idx = strings.Index(strings.ToLower(str), strings.ToLower(substr)) + } + if idx == -1 { + pos = 0 + } else { + if caseSensitive { + pos = idx + 1 + } else { + pos = utf8.RuneCountInString(str[:idx]) + 1 + } + } + d.SetInt64(int64(pos)) + return d, nil } type loadFileFunctionClass struct { diff --git a/expression/builtin_string_test.go b/expression/builtin_string_test.go index 030c3d82ce0ee..98006f143dd6c 100644 --- a/expression/builtin_string_test.go +++ b/expression/builtin_string_test.go @@ -984,6 +984,50 @@ func (s *testEvaluatorSuite) TestLpad(c *C) { } } +func (s *testEvaluatorSuite) TestInstr(c *C) { + defer testleak.AfterTest(c)() + tbl := []struct { + Args []interface{} + Want interface{} + }{ + {[]interface{}{"foobarbar", "bar"}, 4}, + {[]interface{}{"xbar", "foobar"}, 0}, + + {[]interface{}{123456234, 234}, 2}, + {[]interface{}{123456, 567}, 0}, + {[]interface{}{1e10, 1e2}, 1}, + {[]interface{}{1.234, ".234"}, 2}, + {[]interface{}{1.234, ""}, 1}, + {[]interface{}{"", 123}, 0}, + {[]interface{}{"", ""}, 1}, + + {[]interface{}{"中文美好", "美好"}, 3}, + {[]interface{}{"中文美好", "世界"}, 0}, + {[]interface{}{"中文abc", "a"}, 3}, + + {[]interface{}{"live LONG and prosper", "long"}, 6}, + + {[]interface{}{"not BINARY string", "binary"}, 5}, + {[]interface{}{[]byte("BINARY string"), []byte("binary")}, 0}, + {[]interface{}{[]byte("BINARY string"), []byte("BINARY")}, 1}, + {[]interface{}{[]byte("中文abc"), []byte("abc")}, 7}, + + {[]interface{}{"foobar", nil}, nil}, + {[]interface{}{nil, "foobar"}, nil}, + {[]interface{}{nil, nil}, nil}, + } + + Dtbl := tblToDtbl(tbl) + instr := funcs[ast.Instr] + for i, t := range Dtbl { + f, err := instr.getFunction(datumsToConstants(t["Args"]), s.ctx) + c.Assert(err, IsNil) + got, err := f.eval(nil) + c.Assert(err, IsNil) + c.Assert(got, DeepEquals, t["Want"][0], Commentf("[%d]: args: %v", i, t["Args"])) + } +} + func (s *testEvaluatorSuite) TestMakeSet(c *C) { defer testleak.AfterTest(c)() diff --git a/plan/typeinferer.go b/plan/typeinferer.go index 8592754140913..968bd4a07ab3d 100644 --- a/plan/typeinferer.go +++ b/plan/typeinferer.go @@ -380,8 +380,8 @@ func (v *typeInferrer) handleFuncCallExpr(x *ast.FuncCallExpr) { "date_format", "rpad", "lpad", "char_func", "conv", "make_set", "oct", "uuid": tp = types.NewFieldType(mysql.TypeVarString) chs = v.defaultCharset - case "strcmp", "isnull", "bit_length", "char_length", "character_length", "crc32", "timestampdiff", "sign", - "is_ipv6", "ord": + case "strcmp", "isnull", "bit_length", "char_length", "character_length", "crc32", "timestampdiff", + "sign", "is_ipv6", "ord", "instr": tp = types.NewFieldType(mysql.TypeLonglong) case "connection_id": tp = types.NewFieldType(mysql.TypeLonglong) diff --git a/plan/typeinferer_test.go b/plan/typeinferer_test.go index aa7c3e677620d..1960694f95d4f 100644 --- a/plan/typeinferer_test.go +++ b/plan/typeinferer_test.go @@ -227,6 +227,7 @@ func (ts *testTypeInferrerSuite) TestInferType(c *C) { {"char_length('TiDB')", mysql.TypeLonglong, charset.CharsetBin}, {"character_length('TiDB')", mysql.TypeLonglong, charset.CharsetBin}, {"crc32('TiDB')", mysql.TypeLonglong, charset.CharsetBin}, + {"instr('foobarbar', 'bar')", mysql.TypeLonglong, charset.CharsetBin}, {"timestampdiff(MINUTE,'2003-02-01','2003-05-01 12:05:55')", mysql.TypeLonglong, charset.CharsetBin}, {"sign(0)", mysql.TypeLonglong, charset.CharsetBin}, {"sign(null)", mysql.TypeLonglong, charset.CharsetBin},