Skip to content

Commit

Permalink
Merge pull request #5982 from planetscale/vcursor-refactor
Browse files Browse the repository at this point in the history
Vcursor refactor
  • Loading branch information
sougou authored Mar 30, 2020
2 parents fceed44 + c6e02cb commit a294ce8
Show file tree
Hide file tree
Showing 26 changed files with 857 additions and 349 deletions.
83 changes: 83 additions & 0 deletions go/test/utils/diff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
Copyright 2020 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package utils

import (
"testing"

"github.com/google/go-cmp/cmp"
)

// MustMatchFn is used to create a common diff function for a test file
// Usage in *_test.go file:
//
// Top declaration:
//
// var mustMatch = testutils.MustMatchFn(
// []interface{}{ // types with unexported fields
// type1{},
// type2{},
// ...
// typeN{},
// },
// []string{ // ignored fields
// ".id", // id numbers are unstable
// ".createAt", // created dates might not be interesting to compare
// },
// )
//
// In Test*() function:
//
// mustMatch(t, want, got, "something doesn't match")
func MustMatchFn(allowUnexportedTypes []interface{}, ignoredFields []string, extraOpts ...cmp.Option) func(t *testing.T, want, got interface{}, errMsg string) {
diffOpts := append([]cmp.Option{
cmp.AllowUnexported(allowUnexportedTypes...),
cmpIgnoreFields(ignoredFields...),
}, extraOpts...)
// Diffs want/got and fails with errMsg on any failure.
return func(t *testing.T, want, got interface{}, errMsg string) {
t.Helper()
diff := cmp.Diff(want, got, diffOpts...)
if diff != "" {
t.Fatalf("%s: (-want +got)\n%v", errMsg, diff)
}
}
}

// MustMatch is a convenience version of MustMatchFn with no overrides.
// Usage in Test*() function:
//
// testutils.MustMatch(t, want, got, "something doesn't match")
var MustMatch = MustMatchFn(nil, nil)

// Skips fields of pathNames for cmp.Diff.
// Similar to standard cmpopts.IgnoreFields, but allows unexported fields.
func cmpIgnoreFields(pathNames ...string) cmp.Option {
skipFields := make(map[string]bool, len(pathNames))
for _, name := range pathNames {
skipFields[name] = true
}

return cmp.FilterPath(func(path cmp.Path) bool {
for _, ps := range path {
if skipFields[ps.String()] {
return true
}
}
return false
}, cmp.Ignore())
}
10 changes: 10 additions & 0 deletions go/vt/sqlparser/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,16 @@ func IsDML(sql string) bool {
return false
}

//IsDMLStatement returns true if the query is an INSERT, UPDATE or DELETE statement.
func IsDMLStatement(stmt Statement) bool {
switch stmt.(type) {
case *Insert, *Update, *Delete:
return true
}

return false
}

// SplitAndExpression breaks up the Expr into AND-separated conditions
// and appends them to filters. Outer parenthesis are removed. Precedence
// should be taken into account if expressions are recombined.
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/autocommit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func TestAutocommitDirectRangeTarget(t *testing.T) {
Autocommit: true,
TransactionMode: vtgatepb.TransactionMode_MULTI,
}
sql := "DELETE FROM sharded_user_msgs LIMIT 1000"
sql := "delete from sharded_user_msgs limit 1000"

_, err := executor.Execute(context.Background(), "TestExecute", NewSafeSession(session), sql, map[string]*querypb.BindVariable{})
require.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/engine/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (del *Delete) execDeleteEqual(vcursor VCursor, bindVars map[string]*querypb
return nil, vterrors.Wrap(err, "execDeleteEqual")
}
}
return execShard(vcursor, del.Query, bindVars, rs, true /* isDML */, true /* canAutocommit */)
return execShard(vcursor, del.Query, bindVars, rs, true /* rollbackOnError */, true /* canAutocommit */)
}

// deleteVindexEntries performs an delete if table owns vindex.
Expand Down Expand Up @@ -231,6 +231,6 @@ func (del *Delete) execDeleteByDestination(vcursor VCursor, bindVars map[string]
}
}
autocommit := (len(rss) == 1 || del.MultiShardAutocommit) && vcursor.AutocommitApproval()
res, errs := vcursor.ExecuteMultiShard(rss, queries, true /* isDML */, autocommit)
res, errs := vcursor.ExecuteMultiShard(rss, queries, true /* rollbackOnError */, autocommit)
return res, vterrors.Aggregate(errs)
}
16 changes: 9 additions & 7 deletions go/vt/vtgate/engine/fake_vcursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ func (t noopVCursor) SetContextTimeout(timeout time.Duration) context.CancelFunc
func (t noopVCursor) RecordWarning(warning *querypb.QueryWarning) {
}

func (t noopVCursor) Execute(method string, query string, bindvars map[string]*querypb.BindVariable, isDML bool, co vtgatepb.CommitOrder) (*sqltypes.Result, error) {
func (t noopVCursor) Execute(method string, query string, bindvars map[string]*querypb.BindVariable, rollbackOnError bool, co vtgatepb.CommitOrder) (*sqltypes.Result, error) {
panic("unimplemented")
}

func (t noopVCursor) ExecuteMultiShard(rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, isDML, autocommit bool) (*sqltypes.Result, []error) {
func (t noopVCursor) ExecuteMultiShard(rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, rollbackOnError, canAutocommit bool) (*sqltypes.Result, []error) {
panic("unimplemented")
}

Expand All @@ -78,7 +78,7 @@ func (t noopVCursor) StreamExecuteMulti(query string, rss []*srvtopo.ResolvedSha
panic("unimplemented")
}

func (t noopVCursor) ExecuteKeyspaceID(keyspace string, ksid []byte, query string, bindVars map[string]*querypb.BindVariable, isDML, autocommit bool) (*sqltypes.Result, error) {
func (t noopVCursor) ExecuteKeyspaceID(keyspace string, ksid []byte, query string, bindVars map[string]*querypb.BindVariable, rollbackOnError, autocommit bool) (*sqltypes.Result, error) {
panic("unimplemented")
}

Expand Down Expand Up @@ -121,7 +121,7 @@ func (f *loggingVCursor) RecordWarning(warning *querypb.QueryWarning) {
f.warnings = append(f.warnings, warning)
}

func (f *loggingVCursor) Execute(method string, query string, bindvars map[string]*querypb.BindVariable, isDML bool, co vtgatepb.CommitOrder) (*sqltypes.Result, error) {
func (f *loggingVCursor) Execute(method string, query string, bindvars map[string]*querypb.BindVariable, rollbackOnError bool, co vtgatepb.CommitOrder) (*sqltypes.Result, error) {
name := "Unknown"
switch co {
case vtgatepb.CommitOrder_NORMAL:
Expand All @@ -133,12 +133,12 @@ func (f *loggingVCursor) Execute(method string, query string, bindvars map[strin
case vtgatepb.CommitOrder_AUTOCOMMIT:
name = "ExecuteAutocommit"
}
f.log = append(f.log, fmt.Sprintf("%s %s %v %v", name, query, printBindVars(bindvars), isDML))
f.log = append(f.log, fmt.Sprintf("%s %s %v %v", name, query, printBindVars(bindvars), rollbackOnError))
return f.nextResult()
}

func (f *loggingVCursor) ExecuteMultiShard(rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, isDML, canAutocommit bool) (*sqltypes.Result, []error) {
f.log = append(f.log, fmt.Sprintf("ExecuteMultiShard %v%v %v", printResolvedShardQueries(rss, queries), isDML, canAutocommit))
func (f *loggingVCursor) ExecuteMultiShard(rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, rollbackOnError, canAutocommit bool) (*sqltypes.Result, []error) {
f.log = append(f.log, fmt.Sprintf("ExecuteMultiShard %v%v %v", printResolvedShardQueries(rss, queries), rollbackOnError, canAutocommit))
res, err := f.nextResult()
if err != nil {
return nil, []error{err}
Expand Down Expand Up @@ -202,6 +202,8 @@ func (f *loggingVCursor) ResolveDestinations(keyspace string, ids []*querypb.Val
shards = f.shards[:1]
case key.DestinationNone:
// Nothing to do here.
case key.DestinationShard:
shards = []string{destination.String()}
default:
return nil, nil, fmt.Errorf("unsupported destination: %v", destination)
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func (ins *Insert) execInsertSharded(vcursor VCursor, bindVars map[string]*query
}

autocommit := (len(rss) == 1 || ins.MultiShardAutocommit) && vcursor.AutocommitApproval()
result, errs := vcursor.ExecuteMultiShard(rss, queries, true /* isDML */, autocommit)
result, errs := vcursor.ExecuteMultiShard(rss, queries, true /* rollbackOnError */, autocommit)
if errs != nil {
return nil, vterrors.Wrap(vterrors.Aggregate(errs), "execInsertSharded")
}
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/engine/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,16 @@ type VCursor interface {
RecordWarning(warning *querypb.QueryWarning)

// V3 functions.
Execute(method string, query string, bindvars map[string]*querypb.BindVariable, isDML bool, co vtgatepb.CommitOrder) (*sqltypes.Result, error)
Execute(method string, query string, bindvars map[string]*querypb.BindVariable, rollbackOnError bool, co vtgatepb.CommitOrder) (*sqltypes.Result, error)
AutocommitApproval() bool

// Shard-level functions.
ExecuteMultiShard(rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, isDML, canAutocommit bool) (*sqltypes.Result, []error)
ExecuteMultiShard(rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, rollbackOnError, canAutocommit bool) (*sqltypes.Result, []error)
ExecuteStandalone(query string, bindvars map[string]*querypb.BindVariable, rs *srvtopo.ResolvedShard) (*sqltypes.Result, error)
StreamExecuteMulti(query string, rss []*srvtopo.ResolvedShard, bindVars []map[string]*querypb.BindVariable, callback func(reply *sqltypes.Result) error) error

// Keyspace ID level functions.
ExecuteKeyspaceID(keyspace string, ksid []byte, query string, bindVars map[string]*querypb.BindVariable, isDML, autocommit bool) (*sqltypes.Result, error)
ExecuteKeyspaceID(keyspace string, ksid []byte, query string, bindVars map[string]*querypb.BindVariable, rollbackOnError, autocommit bool) (*sqltypes.Result, error)

// Resolver methods, from key.Destination to srvtopo.ResolvedShard.
// Will replace all of the Topo functions.
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtgate/engine/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (route *Route) execute(vcursor VCursor, bindVars map[string]*querypb.BindVa
}

queries := getQueries(route.Query, bvs)
result, errs := vcursor.ExecuteMultiShard(rss, queries, false /* isDML */, false /* autocommit */)
result, errs := vcursor.ExecuteMultiShard(rss, queries, false /* rollbackOnError */, false /* autocommit */)

if errs != nil {
if route.ScatterErrorsAsWarnings {
Expand Down Expand Up @@ -362,7 +362,7 @@ func (route *Route) GetFields(vcursor VCursor, bindVars map[string]*querypb.Bind
// This code is unreachable. It's just a sanity check.
return nil, fmt.Errorf("no shards for keyspace: %s", route.Keyspace.Name)
}
qr, err := execShard(vcursor, route.FieldQuery, bindVars, rss[0], false /* isDML */, false /* canAutocommit */)
qr, err := execShard(vcursor, route.FieldQuery, bindVars, rss[0], false /* rollbackOnError */, false /* canAutocommit */)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -518,14 +518,14 @@ func resolveKeyspaceID(vcursor VCursor, vindex vindexes.SingleColumn, vindexKey
}
}

func execShard(vcursor VCursor, query string, bindVars map[string]*querypb.BindVariable, rs *srvtopo.ResolvedShard, isDML, canAutocommit bool) (*sqltypes.Result, error) {
func execShard(vcursor VCursor, query string, bindVars map[string]*querypb.BindVariable, rs *srvtopo.ResolvedShard, rollbackOnError, canAutocommit bool) (*sqltypes.Result, error) {
autocommit := canAutocommit && vcursor.AutocommitApproval()
result, errs := vcursor.ExecuteMultiShard([]*srvtopo.ResolvedShard{rs}, []*querypb.BoundQuery{
{
Sql: query,
BindVariables: bindVars,
},
}, isDML, autocommit)
}, rollbackOnError, autocommit)
return result, vterrors.Aggregate(errs)
}

Expand Down
94 changes: 94 additions & 0 deletions go/vt/vtgate/engine/send.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package engine

import (
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/vindexes"

querypb "vitess.io/vitess/go/vt/proto/query"
)

var _ Primitive = (*Send)(nil)

// Send is an operator to send query to the specific keyspace, tabletType and destination
type Send struct {
// Keyspace specifies the keyspace to send the query to.
Keyspace *vindexes.Keyspace

// TargetDestination specifies an explicit target destination to send the query to.
// This bypases the core of the v3 engine.
TargetDestination key.Destination

// Query specifies the query to be executed.
Query string

// NoAutoCommit specifies if we need to check autocommit behaviour
NoAutoCommit bool

noInputs
}

// RouteType implements Primitive interface
func (s *Send) RouteType() string {
if s.NoAutoCommit {
return "SendNoAutoCommit"
}

return "Send"
}

// GetKeyspaceName implements Primitive interface
func (s *Send) GetKeyspaceName() string {
return s.Keyspace.Name
}

// GetTableName implements Primitive interface
func (s *Send) GetTableName() string {
return ""
}

// Execute implements Primitive interface
func (s *Send) Execute(vcursor VCursor, bindVars map[string]*query.BindVariable, _ bool) (*sqltypes.Result, error) {
rss, _, err := vcursor.ResolveDestinations(s.Keyspace.Name, nil, []key.Destination{s.TargetDestination})
if err != nil {
return nil, vterrors.Wrap(err, "sendExecute")
}

if !s.Keyspace.Sharded && len(rss) != 1 {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "Keyspace does not have exactly one shard: %v", rss)
}

queries := make([]*querypb.BoundQuery, len(rss))
for i := range rss {
queries[i] = &querypb.BoundQuery{
Sql: s.Query,
BindVariables: bindVars,
}
}

canAutocommit := false
if !s.NoAutoCommit {
canAutocommit = len(rss) == 1 && vcursor.AutocommitApproval()
}

rollbackOnError := !s.NoAutoCommit // for non-dml queries, there's no need to do a rollback
result, errs := vcursor.ExecuteMultiShard(rss, queries, rollbackOnError, canAutocommit)
err = vterrors.Aggregate(errs)
if err != nil {
return nil, err
}
return result, nil
}

// StreamExecute implements Primitive interface
func (s *Send) StreamExecute(vcursor VCursor, bindVars map[string]*query.BindVariable, wantields bool, callback func(*sqltypes.Result) error) error {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "not reachable") // TODO: systay - this should work
}

// GetFields implements Primitive interface
func (s *Send) GetFields(vcursor VCursor, bindVars map[string]*query.BindVariable) (*sqltypes.Result, error) {
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "not reachable") // TODO: systay - @sugu, is this correct?
}
Loading

0 comments on commit a294ce8

Please sign in to comment.