Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vcursor refactor #5982

Merged
merged 13 commits into from
Mar 30, 2020
83 changes: 83 additions & 0 deletions go/test/utils/diff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
Copyright 2020 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package utils

import (
"testing"

"github.com/google/go-cmp/cmp"
)

// MustMatchFn is used to create a common diff function for a test file
// Usage in *_test.go file:
//
// Top declaration:
//
// var mustMatch = testutils.MustMatchFn(
// []interface{}{ // types with unexported fields
// type1{},
// type2{},
// ...
// typeN{},
// },
// []string{ // ignored fields
// ".id", // id numbers are unstable
// ".createAt", // created dates might not be interesting to compare
// },
// )
//
// In Test*() function:
//
// mustMatch(t, want, got, "something doesn't match")
func MustMatchFn(allowUnexportedTypes []interface{}, ignoredFields []string, extraOpts ...cmp.Option) func(t *testing.T, want, got interface{}, errMsg string) {
diffOpts := append([]cmp.Option{
cmp.AllowUnexported(allowUnexportedTypes...),
cmpIgnoreFields(ignoredFields...),
}, extraOpts...)
// Diffs want/got and fails with errMsg on any failure.
return func(t *testing.T, want, got interface{}, errMsg string) {
t.Helper()
diff := cmp.Diff(want, got, diffOpts...)
if diff != "" {
t.Fatalf("%s: (-want +got)\n%v", errMsg, diff)
}
}
}

// MustMatch is a convenience version of MustMatchFn with no overrides.
// Usage in Test*() function:
//
// testutils.MustMatch(t, want, got, "something doesn't match")
var MustMatch = MustMatchFn(nil, nil)

// Skips fields of pathNames for cmp.Diff.
// Similar to standard cmpopts.IgnoreFields, but allows unexported fields.
func cmpIgnoreFields(pathNames ...string) cmp.Option {
skipFields := make(map[string]bool, len(pathNames))
for _, name := range pathNames {
skipFields[name] = true
}

return cmp.FilterPath(func(path cmp.Path) bool {
for _, ps := range path {
if skipFields[ps.String()] {
return true
}
}
return false
}, cmp.Ignore())
}
10 changes: 10 additions & 0 deletions go/vt/sqlparser/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,16 @@ func IsDML(sql string) bool {
return false
}

//IsDMLStatement returns true if the query is an INSERT, UPDATE or DELETE statement.
func IsDMLStatement(stmt Statement) bool {
switch stmt.(type) {
case *Insert, *Update, *Delete:
return true
}

return false
}

// SplitAndExpression breaks up the Expr into AND-separated conditions
// and appends them to filters. Outer parenthesis are removed. Precedence
// should be taken into account if expressions are recombined.
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/autocommit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func TestAutocommitDirectRangeTarget(t *testing.T) {
Autocommit: true,
TransactionMode: vtgatepb.TransactionMode_MULTI,
}
sql := "DELETE FROM sharded_user_msgs LIMIT 1000"
sql := "delete from sharded_user_msgs limit 1000"

_, err := executor.Execute(context.Background(), "TestExecute", NewSafeSession(session), sql, map[string]*querypb.BindVariable{})
require.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/engine/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (del *Delete) execDeleteEqual(vcursor VCursor, bindVars map[string]*querypb
return nil, vterrors.Wrap(err, "execDeleteEqual")
}
}
return execShard(vcursor, del.Query, bindVars, rs, true /* isDML */, true /* canAutocommit */)
return execShard(vcursor, del.Query, bindVars, rs, true /* rollbackOnError */, true /* canAutocommit */)
}

// deleteVindexEntries performs an delete if table owns vindex.
Expand Down Expand Up @@ -231,6 +231,6 @@ func (del *Delete) execDeleteByDestination(vcursor VCursor, bindVars map[string]
}
}
autocommit := (len(rss) == 1 || del.MultiShardAutocommit) && vcursor.AutocommitApproval()
res, errs := vcursor.ExecuteMultiShard(rss, queries, true /* isDML */, autocommit)
res, errs := vcursor.ExecuteMultiShard(rss, queries, true /* rollbackOnError */, autocommit)
return res, vterrors.Aggregate(errs)
}
16 changes: 9 additions & 7 deletions go/vt/vtgate/engine/fake_vcursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ func (t noopVCursor) SetContextTimeout(timeout time.Duration) context.CancelFunc
func (t noopVCursor) RecordWarning(warning *querypb.QueryWarning) {
}

func (t noopVCursor) Execute(method string, query string, bindvars map[string]*querypb.BindVariable, isDML bool, co vtgatepb.CommitOrder) (*sqltypes.Result, error) {
func (t noopVCursor) Execute(method string, query string, bindvars map[string]*querypb.BindVariable, rollbackOnError bool, co vtgatepb.CommitOrder) (*sqltypes.Result, error) {
panic("unimplemented")
}

func (t noopVCursor) ExecuteMultiShard(rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, isDML, autocommit bool) (*sqltypes.Result, []error) {
func (t noopVCursor) ExecuteMultiShard(rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, rollbackOnError, canAutocommit bool) (*sqltypes.Result, []error) {
panic("unimplemented")
}

Expand All @@ -78,7 +78,7 @@ func (t noopVCursor) StreamExecuteMulti(query string, rss []*srvtopo.ResolvedSha
panic("unimplemented")
}

func (t noopVCursor) ExecuteKeyspaceID(keyspace string, ksid []byte, query string, bindVars map[string]*querypb.BindVariable, isDML, autocommit bool) (*sqltypes.Result, error) {
func (t noopVCursor) ExecuteKeyspaceID(keyspace string, ksid []byte, query string, bindVars map[string]*querypb.BindVariable, rollbackOnError, autocommit bool) (*sqltypes.Result, error) {
panic("unimplemented")
}

Expand Down Expand Up @@ -121,7 +121,7 @@ func (f *loggingVCursor) RecordWarning(warning *querypb.QueryWarning) {
f.warnings = append(f.warnings, warning)
}

func (f *loggingVCursor) Execute(method string, query string, bindvars map[string]*querypb.BindVariable, isDML bool, co vtgatepb.CommitOrder) (*sqltypes.Result, error) {
func (f *loggingVCursor) Execute(method string, query string, bindvars map[string]*querypb.BindVariable, rollbackOnError bool, co vtgatepb.CommitOrder) (*sqltypes.Result, error) {
name := "Unknown"
switch co {
case vtgatepb.CommitOrder_NORMAL:
Expand All @@ -133,12 +133,12 @@ func (f *loggingVCursor) Execute(method string, query string, bindvars map[strin
case vtgatepb.CommitOrder_AUTOCOMMIT:
name = "ExecuteAutocommit"
}
f.log = append(f.log, fmt.Sprintf("%s %s %v %v", name, query, printBindVars(bindvars), isDML))
f.log = append(f.log, fmt.Sprintf("%s %s %v %v", name, query, printBindVars(bindvars), rollbackOnError))
return f.nextResult()
}

func (f *loggingVCursor) ExecuteMultiShard(rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, isDML, canAutocommit bool) (*sqltypes.Result, []error) {
f.log = append(f.log, fmt.Sprintf("ExecuteMultiShard %v%v %v", printResolvedShardQueries(rss, queries), isDML, canAutocommit))
func (f *loggingVCursor) ExecuteMultiShard(rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, rollbackOnError, canAutocommit bool) (*sqltypes.Result, []error) {
f.log = append(f.log, fmt.Sprintf("ExecuteMultiShard %v%v %v", printResolvedShardQueries(rss, queries), rollbackOnError, canAutocommit))
res, err := f.nextResult()
if err != nil {
return nil, []error{err}
Expand Down Expand Up @@ -202,6 +202,8 @@ func (f *loggingVCursor) ResolveDestinations(keyspace string, ids []*querypb.Val
shards = f.shards[:1]
case key.DestinationNone:
// Nothing to do here.
case key.DestinationShard:
shards = []string{destination.String()}
default:
return nil, nil, fmt.Errorf("unsupported destination: %v", destination)
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func (ins *Insert) execInsertSharded(vcursor VCursor, bindVars map[string]*query
}

autocommit := (len(rss) == 1 || ins.MultiShardAutocommit) && vcursor.AutocommitApproval()
result, errs := vcursor.ExecuteMultiShard(rss, queries, true /* isDML */, autocommit)
result, errs := vcursor.ExecuteMultiShard(rss, queries, true /* rollbackOnError */, autocommit)
if errs != nil {
return nil, vterrors.Wrap(vterrors.Aggregate(errs), "execInsertSharded")
}
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/engine/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,16 @@ type VCursor interface {
RecordWarning(warning *querypb.QueryWarning)

// V3 functions.
Execute(method string, query string, bindvars map[string]*querypb.BindVariable, isDML bool, co vtgatepb.CommitOrder) (*sqltypes.Result, error)
Execute(method string, query string, bindvars map[string]*querypb.BindVariable, rollbackOnError bool, co vtgatepb.CommitOrder) (*sqltypes.Result, error)
AutocommitApproval() bool

// Shard-level functions.
ExecuteMultiShard(rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, isDML, canAutocommit bool) (*sqltypes.Result, []error)
ExecuteMultiShard(rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, rollbackOnError, canAutocommit bool) (*sqltypes.Result, []error)
ExecuteStandalone(query string, bindvars map[string]*querypb.BindVariable, rs *srvtopo.ResolvedShard) (*sqltypes.Result, error)
StreamExecuteMulti(query string, rss []*srvtopo.ResolvedShard, bindVars []map[string]*querypb.BindVariable, callback func(reply *sqltypes.Result) error) error

// Keyspace ID level functions.
ExecuteKeyspaceID(keyspace string, ksid []byte, query string, bindVars map[string]*querypb.BindVariable, isDML, autocommit bool) (*sqltypes.Result, error)
ExecuteKeyspaceID(keyspace string, ksid []byte, query string, bindVars map[string]*querypb.BindVariable, rollbackOnError, autocommit bool) (*sqltypes.Result, error)

// Resolver methods, from key.Destination to srvtopo.ResolvedShard.
// Will replace all of the Topo functions.
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtgate/engine/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (route *Route) execute(vcursor VCursor, bindVars map[string]*querypb.BindVa
}

queries := getQueries(route.Query, bvs)
result, errs := vcursor.ExecuteMultiShard(rss, queries, false /* isDML */, false /* autocommit */)
result, errs := vcursor.ExecuteMultiShard(rss, queries, false /* rollbackOnError */, false /* autocommit */)

if errs != nil {
if route.ScatterErrorsAsWarnings {
Expand Down Expand Up @@ -362,7 +362,7 @@ func (route *Route) GetFields(vcursor VCursor, bindVars map[string]*querypb.Bind
// This code is unreachable. It's just a sanity check.
return nil, fmt.Errorf("no shards for keyspace: %s", route.Keyspace.Name)
}
qr, err := execShard(vcursor, route.FieldQuery, bindVars, rss[0], false /* isDML */, false /* canAutocommit */)
qr, err := execShard(vcursor, route.FieldQuery, bindVars, rss[0], false /* rollbackOnError */, false /* canAutocommit */)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -518,14 +518,14 @@ func resolveKeyspaceID(vcursor VCursor, vindex vindexes.SingleColumn, vindexKey
}
}

func execShard(vcursor VCursor, query string, bindVars map[string]*querypb.BindVariable, rs *srvtopo.ResolvedShard, isDML, canAutocommit bool) (*sqltypes.Result, error) {
func execShard(vcursor VCursor, query string, bindVars map[string]*querypb.BindVariable, rs *srvtopo.ResolvedShard, rollbackOnError, canAutocommit bool) (*sqltypes.Result, error) {
autocommit := canAutocommit && vcursor.AutocommitApproval()
result, errs := vcursor.ExecuteMultiShard([]*srvtopo.ResolvedShard{rs}, []*querypb.BoundQuery{
{
Sql: query,
BindVariables: bindVars,
},
}, isDML, autocommit)
}, rollbackOnError, autocommit)
return result, vterrors.Aggregate(errs)
}

Expand Down
94 changes: 94 additions & 0 deletions go/vt/vtgate/engine/send.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package engine

import (
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/vindexes"

querypb "vitess.io/vitess/go/vt/proto/query"
)

var _ Primitive = (*Send)(nil)

// Send is an operator to send query to the specific keyspace, tabletType and destination
type Send struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd recommend a custom JSON marshaller that emits an additional fake "Opcode": "Send", which will make the output plan more readable (look at engine/limit.go).

// Keyspace specifies the keyspace to send the query to.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also see that Send is mostly a subset of Route, but it's good to keep it separate for now.

Keyspace *vindexes.Keyspace

// TargetDestination specifies an explicit target destination to send the query to.
// This bypases the core of the v3 engine.
TargetDestination key.Destination

// Query specifies the query to be executed.
Query string

// NoAutoCommit specifies if we need to check autocommit behaviour
NoAutoCommit bool

noInputs
}

// RouteType implements Primitive interface
func (s *Send) RouteType() string {
if s.NoAutoCommit {
return "SendNoAutoCommit"
}

return "Send"
}

// GetKeyspaceName implements Primitive interface
func (s *Send) GetKeyspaceName() string {
return s.Keyspace.Name
}

// GetTableName implements Primitive interface
func (s *Send) GetTableName() string {
return ""
}

// Execute implements Primitive interface
func (s *Send) Execute(vcursor VCursor, bindVars map[string]*query.BindVariable, _ bool) (*sqltypes.Result, error) {
rss, _, err := vcursor.ResolveDestinations(s.Keyspace.Name, nil, []key.Destination{s.TargetDestination})
if err != nil {
return nil, vterrors.Wrap(err, "sendExecute")
}

if !s.Keyspace.Sharded && len(rss) != 1 {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "Keyspace does not have exactly one shard: %v", rss)
}

queries := make([]*querypb.BoundQuery, len(rss))
for i := range rss {
queries[i] = &querypb.BoundQuery{
Sql: s.Query,
BindVariables: bindVars,
}
}

canAutocommit := false
if !s.NoAutoCommit {
canAutocommit = len(rss) == 1 && vcursor.AutocommitApproval()
}

rollbackOnError := !s.NoAutoCommit // for non-dml queries, there's no need to do a rollback
result, errs := vcursor.ExecuteMultiShard(rss, queries, rollbackOnError, canAutocommit)
err = vterrors.Aggregate(errs)
if err != nil {
return nil, err
}
return result, nil
}

// StreamExecute implements Primitive interface
func (s *Send) StreamExecute(vcursor VCursor, bindVars map[string]*query.BindVariable, wantields bool, callback func(*sqltypes.Result) error) error {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "not reachable") // TODO: systay - this should work
}

// GetFields implements Primitive interface
func (s *Send) GetFields(vcursor VCursor, bindVars map[string]*query.BindVariable) (*sqltypes.Result, error) {
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "not reachable") // TODO: systay - @sugu, is this correct?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct for now. We have to watch out if/when we allow these primitives to become composable.

}
Loading