From 220a9315336988c2c10c4f8909c927be30ae711f Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Thu, 26 Mar 2020 14:03:34 +0530 Subject: [PATCH 01/13] added parsedestination method Signed-off-by: Harshit Gangal --- go/vt/vtgate/executor.go | 6 +- go/vt/vtgate/executor_test.go | 23 +++--- go/vt/vtgate/vcursor_impl.go | 39 ++++++++- go/vt/vtgate/vcursor_impl_test.go | 133 ++++++++++++++++++++++++++++++ 4 files changed, 186 insertions(+), 15 deletions(-) create mode 100644 go/vt/vtgate/vcursor_impl_test.go diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index 49f4efea8e8..a6d9f1d3faa 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -312,7 +312,7 @@ func (e *Executor) handleExec(ctx context.Context, safeSession *SafeSession, sql // V3 mode. query, comments := sqlparser.SplitMarginComments(sql) - vcursor := newVCursorImpl(ctx, safeSession, destKeyspace, destTabletType, comments, e, logStats, e.VSchema(), e.resolver.resolver) + vcursor, _ := newVCursorImpl(ctx, safeSession, comments, e, logStats, e.VSchema(), e.resolver.resolver) plan, err := e.getPlan( vcursor, query, @@ -1217,7 +1217,7 @@ func (e *Executor) StreamExecute(ctx context.Context, method string, safeSession bindVars = make(map[string]*querypb.BindVariable) } query, comments := sqlparser.SplitMarginComments(sql) - vcursor := newVCursorImpl(ctx, safeSession, target.Keyspace, target.TabletType, comments, e, logStats, e.VSchema(), e.resolver.resolver) + vcursor, _ := newVCursorImpl(ctx, safeSession, comments, e, logStats, e.VSchema(), e.resolver.resolver) // check if this is a stream statement for messaging // TODO: support keyRange syntax @@ -1667,7 +1667,7 @@ func (e *Executor) prepare(ctx context.Context, safeSession *SafeSession, sql st func (e *Executor) handlePrepare(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, destKeyspace string, destTabletType topodatapb.TabletType, logStats *LogStats) ([]*querypb.Field, error) { // V3 mode. query, comments := sqlparser.SplitMarginComments(sql) - vcursor := newVCursorImpl(ctx, safeSession, destKeyspace, destTabletType, comments, e, logStats, e.VSchema(), e.resolver.resolver) + vcursor, _ := newVCursorImpl(ctx, safeSession, comments, e, logStats, e.VSchema(), e.resolver.resolver) plan, err := e.getPlan( vcursor, query, diff --git a/go/vt/vtgate/executor_test.go b/go/vt/vtgate/executor_test.go index e9b8a67ac75..c97854e27da 100644 --- a/go/vt/vtgate/executor_test.go +++ b/go/vt/vtgate/executor_test.go @@ -1959,8 +1959,8 @@ func TestVSchemaStats(t *testing.T) { func TestGetPlanUnnormalized(t *testing.T) { r, _, _, _ := createExecutorEnv() - emptyvc := newVCursorImpl(context.Background(), nil, "", 0, makeComments(""), r, nil, r.VSchema(), r.resolver.resolver) - unshardedvc := newVCursorImpl(context.Background(), nil, KsTestUnsharded, 0, makeComments(""), r, nil, r.VSchema(), r.resolver.resolver) + emptyvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.VSchema(), r.resolver.resolver) + unshardedvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.VSchema(), r.resolver.resolver) logStats1 := NewLogStats(context.Background(), "Test", "", nil) query1 := "select * from music_user_map where id = 1" @@ -2005,9 +2005,12 @@ func TestGetPlanUnnormalized(t *testing.T) { KsTestUnsharded + "@unknown:" + query1, "@unknown:" + query1, } - if keys := r.plans.Keys(); !reflect.DeepEqual(keys, want) { - t.Errorf("Plan keys: %s, want %s", keys, want) + if diff := cmp.Diff(want, r.plans.Keys()); diff != "" { + t.Errorf("\n-want,+got:\n%s", diff) } + //if keys := r.plans.Keys(); !reflect.DeepEqual(keys, want) { + // t.Errorf("Plan keys: %s, want %s", keys, want) + //} if logStats4.SQL != wantSQL { t.Errorf("logstats sql want \"%s\" got \"%s\"", wantSQL, logStats4.SQL) } @@ -2015,7 +2018,7 @@ func TestGetPlanUnnormalized(t *testing.T) { func TestGetPlanCacheUnnormalized(t *testing.T) { r, _, _, _ := createExecutorEnv() - emptyvc := newVCursorImpl(context.Background(), nil, "", 0, makeComments(""), r, nil, r.VSchema(), r.resolver.resolver) + emptyvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.VSchema(), r.resolver.resolver) query1 := "select * from music_user_map where id = 1" logStats1 := NewLogStats(context.Background(), "Test", "", nil) _, err := r.getPlan(emptyvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, true /* skipQueryPlanCache */, logStats1) @@ -2040,7 +2043,7 @@ func TestGetPlanCacheUnnormalized(t *testing.T) { // Skip cache using directive r, _, _, _ = createExecutorEnv() - unshardedvc := newVCursorImpl(context.Background(), nil, KsTestUnsharded, 0, makeComments(""), r, nil, r.VSchema(), r.resolver.resolver) + unshardedvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.VSchema(), r.resolver.resolver) query1 = "insert /*vt+ SKIP_QUERY_PLAN_CACHE=1 */ into user(id) values (1), (2)" logStats1 = NewLogStats(context.Background(), "Test", "", nil) @@ -2061,7 +2064,7 @@ func TestGetPlanCacheUnnormalized(t *testing.T) { func TestGetPlanCacheNormalized(t *testing.T) { r, _, _, _ := createExecutorEnv() r.normalize = true - emptyvc := newVCursorImpl(context.Background(), nil, "", 0, makeComments(""), r, nil, r.VSchema(), r.resolver.resolver) + emptyvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.VSchema(), r.resolver.resolver) query1 := "select * from music_user_map where id = 1" logStats1 := NewLogStats(context.Background(), "Test", "", nil) _, err := r.getPlan(emptyvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, true /* skipQueryPlanCache */, logStats1) @@ -2086,7 +2089,7 @@ func TestGetPlanCacheNormalized(t *testing.T) { // Skip cache using directive r, _, _, _ = createExecutorEnv() r.normalize = true - unshardedvc := newVCursorImpl(context.Background(), nil, KsTestUnsharded, 0, makeComments(""), r, nil, r.VSchema(), r.resolver.resolver) + unshardedvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.VSchema(), r.resolver.resolver) query1 = "insert /*vt+ SKIP_QUERY_PLAN_CACHE=1 */ into user(id) values (1), (2)" logStats1 = NewLogStats(context.Background(), "Test", "", nil) @@ -2107,8 +2110,8 @@ func TestGetPlanCacheNormalized(t *testing.T) { func TestGetPlanNormalized(t *testing.T) { r, _, _, _ := createExecutorEnv() r.normalize = true - emptyvc := newVCursorImpl(context.Background(), nil, "", 0, makeComments(""), r, nil, r.VSchema(), r.resolver.resolver) - unshardedvc := newVCursorImpl(context.Background(), nil, KsTestUnsharded, 0, makeComments(""), r, nil, r.VSchema(), r.resolver.resolver) + emptyvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.VSchema(), r.resolver.resolver) + unshardedvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.VSchema(), r.resolver.resolver) query1 := "select * from music_user_map where id = 1" query2 := "select * from music_user_map where id = 2" diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go index bba6596c4b3..7bf6c8dbf30 100644 --- a/go/vt/vtgate/vcursor_impl.go +++ b/go/vt/vtgate/vcursor_impl.go @@ -34,6 +34,7 @@ import ( topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + topoprotopb "vitess.io/vitess/go/vt/topo/topoproto" ) var _ engine.VCursor = (*vcursorImpl)(nil) @@ -56,6 +57,7 @@ type vcursorImpl struct { safeSession *SafeSession keyspace string tabletType topodatapb.TabletType + destination key.Destination marginComments sqlparser.MarginComments executor iExecute resolver *srvtopo.Resolver @@ -71,18 +73,23 @@ type vcursorImpl struct { // the query and supply it here. Trailing comments are typically sent by the application for various reasons, // including as identifying markers. So, they have to be added back to all queries that are executed // on behalf of the original query. -func newVCursorImpl(ctx context.Context, safeSession *SafeSession, keyspace string, tabletType topodatapb.TabletType, marginComments sqlparser.MarginComments, executor *Executor, logStats *LogStats, vschema *vindexes.VSchema, resolver *srvtopo.Resolver) *vcursorImpl { +func newVCursorImpl(ctx context.Context, safeSession *SafeSession, marginComments sqlparser.MarginComments, executor *Executor, logStats *LogStats, vschema *vindexes.VSchema, resolver *srvtopo.Resolver) (*vcursorImpl, error) { + keyspace, tabletType, destination, err := parseDestinationTarget(safeSession.TargetString, vschema) + if err != nil { + return nil, err + } return &vcursorImpl{ ctx: ctx, safeSession: safeSession, keyspace: keyspace, tabletType: tabletType, + destination: destination, marginComments: marginComments, executor: executor, logStats: logStats, vschema: vschema, resolver: resolver, - } + }, nil } // Context returns the current Context. @@ -253,3 +260,31 @@ func commentedShardQueries(shardQueries []*querypb.BoundQuery, marginComments sq } return newQueries } + +// TargetDestination implements the ContextVSchema interface +func (vc *vcursorImpl) TargetDestination(qualifier string) (key.Destination, *vindexes.Keyspace, topodatapb.TabletType, error) { + keyspaceName := vc.keyspace + if vc.destination == nil && qualifier != "" { + keyspaceName = qualifier + } + if keyspaceName == "" { + return nil, nil, 0, vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT, "keyspace not specified") + } + keyspace := vc.vschema.Keyspaces[keyspaceName] + if keyspace == nil { + return nil, nil, 0, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "no keyspace with name [%s] found", keyspaceName) + } + return vc.destination, keyspace.Keyspace, vc.tabletType, nil +} + +// ParseDestinationTarget parses destination target string and sets default keyspace if possible. +func parseDestinationTarget(targetString string, vschema *vindexes.VSchema) (string, topodatapb.TabletType, key.Destination, error) { + destKeyspace, destTabletType, dest, err := topoprotopb.ParseDestination(targetString, defaultTabletType) + // Set default keyspace + if destKeyspace == "" && len(vschema.Keyspaces) == 1 { + for k := range vschema.Keyspaces { + destKeyspace = k + } + } + return destKeyspace, destTabletType, dest, err +} diff --git a/go/vt/vtgate/vcursor_impl_test.go b/go/vt/vtgate/vcursor_impl_test.go new file mode 100644 index 00000000000..61b70505f72 --- /dev/null +++ b/go/vt/vtgate/vcursor_impl_test.go @@ -0,0 +1,133 @@ +package vtgate + +import ( + "context" + "testing" + + "vitess.io/vitess/go/vt/key" + "vitess.io/vitess/go/vt/vtgate/vindexes" + + "github.com/stretchr/testify/require" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" + "vitess.io/vitess/go/vt/sqlparser" +) + +func TestDestinationKeyspace(t *testing.T) { + + ks1 := &vindexes.Keyspace{ + Name: "ks1", + Sharded: false, + } + ks1Schema := &vindexes.KeyspaceSchema{ + Keyspace: ks1, + Tables: nil, + Vindexes: nil, + Error: nil, + } + ks2 := &vindexes.Keyspace{ + Name: "ks2", + Sharded: false, + } + ks2Schema := &vindexes.KeyspaceSchema{ + Keyspace: ks2, + Tables: nil, + Vindexes: nil, + Error: nil, + } + vschemaWith2KS := &vindexes.VSchema{ + Keyspaces: map[string]*vindexes.KeyspaceSchema{ + ks1.Name: ks1Schema, + ks2.Name: ks2Schema, + }} + + vschemaWith1KS := &vindexes.VSchema{ + Keyspaces: map[string]*vindexes.KeyspaceSchema{ + ks1.Name: ks1Schema, + }} + + type testCase struct { + vschema *vindexes.VSchema + targetString, qualifier string + expectedError string + expectedKeyspace string + expectedDest key.Destination + expectedTabletType topodatapb.TabletType + } + + tests := []testCase{{ + vschema: vschemaWith1KS, + targetString: "", + qualifier: "", + expectedKeyspace: ks1.Name, + expectedDest: nil, + expectedTabletType: topodatapb.TabletType_MASTER, + }, { + vschema: vschemaWith1KS, + targetString: "ks1", + qualifier: "", + expectedKeyspace: ks1.Name, + expectedDest: nil, + expectedTabletType: topodatapb.TabletType_MASTER, + }, { + vschema: vschemaWith1KS, + targetString: "ks1:-80", + qualifier: "", + expectedKeyspace: ks1.Name, + expectedDest: key.DestinationShard("-80"), + expectedTabletType: topodatapb.TabletType_MASTER, + }, { + vschema: vschemaWith1KS, + targetString: "ks1@replica", + qualifier: "", + expectedKeyspace: ks1.Name, + expectedDest: nil, + expectedTabletType: topodatapb.TabletType_REPLICA, + }, { + vschema: vschemaWith1KS, + targetString: "ks1:-80@replica", + qualifier: "", + expectedKeyspace: ks1.Name, + expectedDest: key.DestinationShard("-80"), + expectedTabletType: topodatapb.TabletType_REPLICA, + }, { + vschema: vschemaWith1KS, + targetString: "", + qualifier: "ks1", + expectedKeyspace: ks1.Name, + expectedDest: nil, + expectedTabletType: topodatapb.TabletType_MASTER, + }, { + vschema: vschemaWith1KS, + targetString: "ks2", + qualifier: "", + expectedError: "no keyspace with name [ks2] found", + }, { + vschema: vschemaWith1KS, + targetString: "ks2:-80", + qualifier: "", + expectedError: "no keyspace with name [ks2] found", + }, { + vschema: vschemaWith1KS, + targetString: "", + qualifier: "ks2", + expectedError: "no keyspace with name [ks2] found", + }, { + vschema: vschemaWith2KS, + targetString: "", + expectedError: "keyspace not specified", + }} + + for _, tc := range tests { + impl, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: tc.targetString}), sqlparser.MarginComments{}, nil, nil, tc.vschema, nil) + dest, keyspace, tabletType, err := impl.TargetDestination(tc.qualifier) + if tc.expectedError == "" { + require.NoError(t, err) + require.Equal(t, tc.expectedDest, dest) + require.Equal(t, tc.expectedKeyspace, keyspace.Name) + require.Equal(t, tc.expectedTabletType, tabletType) + } else { + require.EqualError(t, err, tc.expectedError) + } + } +} From 8c343a07f625c3b339b844cdb1353cb7f14d4a41 Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Thu, 26 Mar 2020 11:54:52 +0100 Subject: [PATCH 02/13] Added Destination() to the ContextVSchema interface Signed-off-by: Andres Taylor --- go/vt/vtgate/planbuilder/builder.go | 1 + go/vt/vtgate/planbuilder/plan_test.go | 6 ++++++ go/vt/vtgate/vcursor_impl.go | 8 ++++++++ 3 files changed, 15 insertions(+) diff --git a/go/vt/vtgate/planbuilder/builder.go b/go/vt/vtgate/planbuilder/builder.go index bd016333523..5409838bb55 100644 --- a/go/vt/vtgate/planbuilder/builder.go +++ b/go/vt/vtgate/planbuilder/builder.go @@ -122,6 +122,7 @@ type ContextVSchema interface { FindTablesOrVindex(tablename sqlparser.TableName) ([]*vindexes.Table, vindexes.Vindex, string, topodatapb.TabletType, key.Destination, error) DefaultKeyspace() (*vindexes.Keyspace, error) TargetString() string + Destination() key.Destination } //------------------------------------------------------------------------- diff --git a/go/vt/vtgate/planbuilder/plan_test.go b/go/vt/vtgate/planbuilder/plan_test.go index c4ac1d20067..8828e609d3e 100644 --- a/go/vt/vtgate/planbuilder/plan_test.go +++ b/go/vt/vtgate/planbuilder/plan_test.go @@ -186,10 +186,16 @@ func loadSchema(t *testing.T, filename string) *vindexes.VSchema { return vschema } +var _ ContextVSchema = (*vschemaWrapper)(nil) + type vschemaWrapper struct { v *vindexes.VSchema } +func (vw *vschemaWrapper) Destination() key.Destination { + panic("implement me") +} + func (vw *vschemaWrapper) FindTable(tab sqlparser.TableName) (*vindexes.Table, string, topodatapb.TabletType, key.Destination, error) { destKeyspace, destTabletType, destTarget, err := topoproto.ParseDestination(tab.Qualifier.String(), topodatapb.TabletType_MASTER) if err != nil { diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go index 7bf6c8dbf30..f8863037b97 100644 --- a/go/vt/vtgate/vcursor_impl.go +++ b/go/vt/vtgate/vcursor_impl.go @@ -20,6 +20,8 @@ import ( "sync/atomic" "time" + "vitess.io/vitess/go/vt/vtgate/planbuilder" + "golang.org/x/net/context" "vitess.io/vitess/go/sqltypes" @@ -38,6 +40,7 @@ import ( ) var _ engine.VCursor = (*vcursorImpl)(nil) +var _ planbuilder.ContextVSchema = (*vcursorImpl)(nil) var _ iExecute = (*Executor)(nil) // vcursor_impl needs these facilities to be able to be able to execute queries for vindexes @@ -247,6 +250,11 @@ func (vc *vcursorImpl) ResolveDestinations(keyspace string, ids []*querypb.Value return vc.resolver.ResolveDestinations(vc.ctx, keyspace, vc.tabletType, ids, destinations) } +// Destination implements the ContextVSchema interface +func (vc *vcursorImpl) Destination() key.Destination { + return vc.destination +} + func commentedShardQueries(shardQueries []*querypb.BoundQuery, marginComments sqlparser.MarginComments) []*querypb.BoundQuery { if marginComments.Leading == "" && marginComments.Trailing == "" { return shardQueries From fe9975aaaad725286fecc076d7a348e6208bda4c Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Thu, 26 Mar 2020 19:31:04 +0530 Subject: [PATCH 03/13] added bypass planbuilder Signed-off-by: Harshit Gangal --- go/vt/vtgate/engine/send.go | 63 +++++++++++++++++++++++++ go/vt/vtgate/executor.go | 2 +- go/vt/vtgate/planbuilder/builder.go | 68 +++++++++++++++------------ go/vt/vtgate/planbuilder/bypass.go | 35 ++++++++++++++ go/vt/vtgate/planbuilder/plan_test.go | 39 ++++++++++++++- go/vt/vtgate/vcursor_impl.go | 8 ++++ 6 files changed, 181 insertions(+), 34 deletions(-) create mode 100644 go/vt/vtgate/engine/send.go create mode 100644 go/vt/vtgate/planbuilder/bypass.go diff --git a/go/vt/vtgate/engine/send.go b/go/vt/vtgate/engine/send.go new file mode 100644 index 00000000000..e5f3f10b000 --- /dev/null +++ b/go/vt/vtgate/engine/send.go @@ -0,0 +1,63 @@ +package engine + +import ( + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/key" + "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/vtgate/vindexes" +) + +var _ Primitive = (*Send)(nil) + +// Send is an operator to send query to the specific keyspace, tabletType and destination +type Send struct { + // Keyspace specifies the keyspace to send the query to. + Keyspace *vindexes.Keyspace + + // TargetDestination specifies an explicit target destination to send the query to. + // This bypases the core of the v3 engine. + TargetDestination key.Destination + + // TargetTabletType specifies an explicit target destination tablet type + // this is only used in conjunction with TargetDestination + TargetTabletType topodatapb.TabletType + + // Query specifies the query to be executed. + Query string +} + +// RouteType implements Primitive interface +func (s Send) RouteType() string { + panic("implement me") +} + +// GetKeyspaceName implements Primitive interface +func (s Send) GetKeyspaceName() string { + panic("implement me") +} + +// GetTableName implements Primitive interface +func (s Send) GetTableName() string { + panic("implement me") +} + +// Execute implements Primitive interface +func (s Send) Execute(vcursor VCursor, bindVars map[string]*query.BindVariable, wantfields bool) (*sqltypes.Result, error) { + panic("implement me") +} + +// StreamExecute implements Primitive interface +func (s Send) StreamExecute(vcursor VCursor, bindVars map[string]*query.BindVariable, wantields bool, callback func(*sqltypes.Result) error) error { + panic("implement me") +} + +// GetFields implements Primitive interface +func (s Send) GetFields(vcursor VCursor, bindVars map[string]*query.BindVariable) (*sqltypes.Result, error) { + panic("implement me") +} + +// Inputs implements Primitive interface +func (s Send) Inputs() []Primitive { + panic("implement me") +} diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index a6d9f1d3faa..1d7e2138fb7 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -173,7 +173,7 @@ func (e *Executor) execute(ctx context.Context, safeSession *SafeSession, sql st if err != nil { return nil, err } - + // ks@replica , :-80-> ks:-80, -80@replica, ks:-80@replica if safeSession.InTransaction() && destTabletType != topodatapb.TabletType_MASTER { return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "transactions are supported only for master tablet types, current type: %v", destTabletType) } diff --git a/go/vt/vtgate/planbuilder/builder.go b/go/vt/vtgate/planbuilder/builder.go index 5409838bb55..ed4df32b86d 100644 --- a/go/vt/vtgate/planbuilder/builder.go +++ b/go/vt/vtgate/planbuilder/builder.go @@ -123,6 +123,7 @@ type ContextVSchema interface { DefaultKeyspace() (*vindexes.Keyspace, error) TargetString() string Destination() key.Destination + TabletType() topodatapb.TabletType } //------------------------------------------------------------------------- @@ -274,37 +275,42 @@ func Build(query string, vschema ContextVSchema) (*engine.Plan, error) { func BuildFromStmt(query string, stmt sqlparser.Statement, vschema ContextVSchema, bindVarNeeds sqlparser.BindVarNeeds) (*engine.Plan, error) { var err error var instruction engine.Primitive - switch stmt := stmt.(type) { - case *sqlparser.Select: - instruction, err = buildSelectPlan(stmt, vschema) - case *sqlparser.Insert: - instruction, err = buildInsertPlan(stmt, vschema) - case *sqlparser.Update: - instruction, err = buildUpdatePlan(stmt, vschema) - case *sqlparser.Delete: - instruction, err = buildDeletePlan(stmt, vschema) - case *sqlparser.Union: - instruction, err = buildUnionPlan(stmt, vschema) - case *sqlparser.Set: - return nil, errors.New("unsupported construct: set") - case *sqlparser.Show: - return nil, errors.New("unsupported construct: show") - case *sqlparser.DDL: - return nil, errors.New("unsupported construct: ddl") - case *sqlparser.DBDDL: - return nil, errors.New("unsupported construct: ddl on database") - case *sqlparser.OtherRead: - return nil, errors.New("unsupported construct: other read") - case *sqlparser.OtherAdmin: - return nil, errors.New("unsupported construct: other admin") - case *sqlparser.Begin: - return nil, errors.New("unsupported construct: begin") - case *sqlparser.Commit: - return nil, errors.New("unsupported construct: commit") - case *sqlparser.Rollback: - return nil, errors.New("unsupported construct: rollback") - default: - return nil, fmt.Errorf("BUG: unexpected statement type: %T", stmt) + + if vschema.Destination() != nil { + instruction, err = buildPlanForBypass(stmt, vschema) + } else { + switch stmt := stmt.(type) { + case *sqlparser.Select: + instruction, err = buildSelectPlan(stmt, vschema) + case *sqlparser.Insert: + instruction, err = buildInsertPlan(stmt, vschema) + case *sqlparser.Update: + instruction, err = buildUpdatePlan(stmt, vschema) + case *sqlparser.Delete: + instruction, err = buildDeletePlan(stmt, vschema) + case *sqlparser.Union: + instruction, err = buildUnionPlan(stmt, vschema) + case *sqlparser.Set: + return nil, errors.New("unsupported construct: set") + case *sqlparser.Show: + return nil, errors.New("unsupported construct: show") + case *sqlparser.DDL: + return nil, errors.New("unsupported construct: ddl") + case *sqlparser.DBDDL: + return nil, errors.New("unsupported construct: ddl on database") + case *sqlparser.OtherRead: + return nil, errors.New("unsupported construct: other read") + case *sqlparser.OtherAdmin: + return nil, errors.New("unsupported construct: other admin") + case *sqlparser.Begin: + return nil, errors.New("unsupported construct: begin") + case *sqlparser.Commit: + return nil, errors.New("unsupported construct: commit") + case *sqlparser.Rollback: + return nil, errors.New("unsupported construct: rollback") + default: + return nil, fmt.Errorf("BUG: unexpected statement type: %T", stmt) + } } if err != nil { return nil, err diff --git a/go/vt/vtgate/planbuilder/bypass.go b/go/vt/vtgate/planbuilder/bypass.go new file mode 100644 index 00000000000..945f9b0db7c --- /dev/null +++ b/go/vt/vtgate/planbuilder/bypass.go @@ -0,0 +1,35 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package planbuilder + +import ( + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vtgate/engine" +) + +func buildPlanForBypass(stmt sqlparser.Statement, vschema ContextVSchema) (engine.Primitive, error) { + keyspace, err := vschema.DefaultKeyspace() + if err != nil { + return nil, err + } + return &engine.Send{ + Keyspace: keyspace, + TargetDestination: vschema.Destination(), + TargetTabletType: vschema.TabletType(), + Query: sqlparser.String(stmt), + }, nil +} diff --git a/go/vt/vtgate/planbuilder/plan_test.go b/go/vt/vtgate/planbuilder/plan_test.go index 8828e609d3e..90c96bbf4ca 100644 --- a/go/vt/vtgate/planbuilder/plan_test.go +++ b/go/vt/vtgate/planbuilder/plan_test.go @@ -169,6 +169,34 @@ func TestOne(t *testing.T) { testFile(t, "onecase.txt", "", vschema) } +func TestBypassPlanning(t *testing.T) { + query := "select * from ks.t1" + stmt, err := sqlparser.Parse(query) + require.NoError(t, err) + vschema := &vschemaWrapper{ + v: loadSchema(t, "schema_test.json"), + keyspace: &vindexes.Keyspace{ + Name: "main", + Sharded: false, + }, + tabletType: topodatapb.TabletType_MASTER, + dest: key.DestinationShard("-80"), + } + plan, err := BuildFromStmt(query, stmt, vschema, sqlparser.BindVarNeeds{}) + require.NoError(t, err) + expected := &engine.Send{Query: query, + Keyspace: &vindexes.Keyspace{ + Name: "main", + Sharded: false, + }, + TargetTabletType: topodatapb.TabletType_MASTER, + TargetDestination: key.DestinationShard("-80"), + } + if diff := cmp.Diff(expected, plan.Instructions); diff != "" { + t.Errorf("-want,+actual\n%s", diff) + } +} + func loadSchema(t *testing.T, filename string) *vindexes.VSchema { formal, err := vindexes.LoadFormal(locateFile(filename)) if err != nil { @@ -189,11 +217,18 @@ func loadSchema(t *testing.T, filename string) *vindexes.VSchema { var _ ContextVSchema = (*vschemaWrapper)(nil) type vschemaWrapper struct { - v *vindexes.VSchema + v *vindexes.VSchema + keyspace *vindexes.Keyspace + tabletType topodatapb.TabletType + dest key.Destination +} + +func (vw *vschemaWrapper) TabletType() topodatapb.TabletType { + return vw.tabletType } func (vw *vschemaWrapper) Destination() key.Destination { - panic("implement me") + return vw.dest } func (vw *vschemaWrapper) FindTable(tab sqlparser.TableName) (*vindexes.Table, string, topodatapb.TabletType, key.Destination, error) { diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go index f8863037b97..745630e60ee 100644 --- a/go/vt/vtgate/vcursor_impl.go +++ b/go/vt/vtgate/vcursor_impl.go @@ -78,6 +78,9 @@ type vcursorImpl struct { // on behalf of the original query. func newVCursorImpl(ctx context.Context, safeSession *SafeSession, marginComments sqlparser.MarginComments, executor *Executor, logStats *LogStats, vschema *vindexes.VSchema, resolver *srvtopo.Resolver) (*vcursorImpl, error) { keyspace, tabletType, destination, err := parseDestinationTarget(safeSession.TargetString, vschema) + // ks + // select b from ts.a + // select b from ps.a if err != nil { return nil, err } @@ -255,6 +258,11 @@ func (vc *vcursorImpl) Destination() key.Destination { return vc.destination } +// TabletType implements the ContextVSchema interface +func (vc *vcursorImpl) TabletType() topodatapb.TabletType { + return vc.tabletType +} + func commentedShardQueries(shardQueries []*querypb.BoundQuery, marginComments sqlparser.MarginComments) []*querypb.BoundQuery { if marginComments.Leading == "" && marginComments.Trailing == "" { return shardQueries From 3add0bdc8c943529a9626393510ad8b9d96d4221 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Fri, 27 Mar 2020 12:09:54 +0530 Subject: [PATCH 04/13] added test for send in engine Signed-off-by: Harshit Gangal --- go/vt/vtgate/engine/fake_vcursor_test.go | 2 + go/vt/vtgate/engine/send.go | 33 +++++++++--- go/vt/vtgate/engine/send_test.go | 66 ++++++++++++++++++++++++ go/vt/vtgate/planbuilder/bypass.go | 11 +++- go/vt/vtgate/planbuilder/plan_test.go | 1 - 5 files changed, 105 insertions(+), 8 deletions(-) create mode 100644 go/vt/vtgate/engine/send_test.go diff --git a/go/vt/vtgate/engine/fake_vcursor_test.go b/go/vt/vtgate/engine/fake_vcursor_test.go index 7ff09281eb8..5e392c0d534 100644 --- a/go/vt/vtgate/engine/fake_vcursor_test.go +++ b/go/vt/vtgate/engine/fake_vcursor_test.go @@ -202,6 +202,8 @@ func (f *loggingVCursor) ResolveDestinations(keyspace string, ids []*querypb.Val shards = f.shards[:1] case key.DestinationNone: // Nothing to do here. + case key.DestinationShard: + shards = []string{destination.String()} default: return nil, nil, fmt.Errorf("unsupported destination: %v", destination) } diff --git a/go/vt/vtgate/engine/send.go b/go/vt/vtgate/engine/send.go index e5f3f10b000..134f937b2ef 100644 --- a/go/vt/vtgate/engine/send.go +++ b/go/vt/vtgate/engine/send.go @@ -4,8 +4,11 @@ import ( "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/proto/query" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/vindexes" + + querypb "vitess.io/vitess/go/vt/proto/query" ) var _ Primitive = (*Send)(nil) @@ -19,10 +22,6 @@ type Send struct { // This bypases the core of the v3 engine. TargetDestination key.Destination - // TargetTabletType specifies an explicit target destination tablet type - // this is only used in conjunction with TargetDestination - TargetTabletType topodatapb.TabletType - // Query specifies the query to be executed. Query string } @@ -44,7 +43,29 @@ func (s Send) GetTableName() string { // Execute implements Primitive interface func (s Send) Execute(vcursor VCursor, bindVars map[string]*query.BindVariable, wantfields bool) (*sqltypes.Result, error) { - panic("implement me") + rss, _, err := vcursor.ResolveDestinations(s.Keyspace.Name, nil, []key.Destination{s.TargetDestination}) + if err != nil { + return nil, vterrors.Wrap(err, "sendExecute") + } + + if !s.Keyspace.Sharded && len(rss) != 1 { + return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "Keyspace does not have exactly one shard: %v", rss) + } + + queries := make([]*querypb.BoundQuery, len(rss)) + for i := range rss { + queries[i] = &querypb.BoundQuery{ + Sql: s.Query, + BindVariables: bindVars, + } + } + + result, errs := vcursor.ExecuteMultiShard(rss, queries, false, true) + err = vterrors.Aggregate(errs) + if err != nil { + return nil, err + } + return result, nil } // StreamExecute implements Primitive interface diff --git a/go/vt/vtgate/engine/send_test.go b/go/vt/vtgate/engine/send_test.go new file mode 100644 index 00000000000..1e9f824e6aa --- /dev/null +++ b/go/vt/vtgate/engine/send_test.go @@ -0,0 +1,66 @@ +package engine + +import ( + "errors" + "testing" + + "vitess.io/vitess/go/vt/key" + querypb "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/vtgate/vindexes" +) + +func TestSendUnsharded(t *testing.T) { + send := &Send{ + Keyspace: &vindexes.Keyspace{ + Name: "ks", + Sharded: false, + }, + Query: "dummy_query", + TargetDestination: key.DestinationAllShards{}, + } + + vc := &loggingVCursor{shards: []string{"0"}} + _, err := send.Execute(vc, map[string]*querypb.BindVariable{}, false) + if err != nil { + t.Fatal(err) + } + vc.ExpectLog(t, []string{ + `ResolveDestinations ks [] Destinations:DestinationAllShards()`, + `ExecuteMultiShard ks.0: dummy_query {} false true`, + }) + + // Failure cases + vc = &loggingVCursor{shardErr: errors.New("shard_error")} + _, err = send.Execute(vc, map[string]*querypb.BindVariable{}, false) + expectError(t, "Execute", err, "sendExecute: shard_error") + + vc = &loggingVCursor{} + _, err = send.Execute(vc, map[string]*querypb.BindVariable{}, false) + expectError(t, "Execute", err, "Keyspace does not have exactly one shard: []") +} + +func TestSendSharded(t *testing.T) { + send := &Send{ + Keyspace: &vindexes.Keyspace{ + Name: "ks", + Sharded: true, + }, + Query: "dummy_query", + TargetDestination: key.DestinationShard("20-"), + } + + vc := &loggingVCursor{shards: []string{"-20", "20-"}} + _, err := send.Execute(vc, map[string]*querypb.BindVariable{}, false) + if err != nil { + t.Fatal(err) + } + vc.ExpectLog(t, []string{ + `ResolveDestinations ks [] Destinations:DestinationShard(20-)`, + `ExecuteMultiShard ks.DestinationShard(20-): dummy_query {} false true`, + }) + + // Failure cases + vc = &loggingVCursor{shardErr: errors.New("shard_error")} + _, err = send.Execute(vc, map[string]*querypb.BindVariable{}, false) + expectError(t, "Execute", err, "sendExecute: shard_error") +} diff --git a/go/vt/vtgate/planbuilder/bypass.go b/go/vt/vtgate/planbuilder/bypass.go index 945f9b0db7c..142c04ffc34 100644 --- a/go/vt/vtgate/planbuilder/bypass.go +++ b/go/vt/vtgate/planbuilder/bypass.go @@ -17,11 +17,21 @@ limitations under the License. package planbuilder import ( + "vitess.io/vitess/go/vt/key" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/engine" ) func buildPlanForBypass(stmt sqlparser.Statement, vschema ContextVSchema) (engine.Primitive, error) { + switch vschema.Destination().(type) { + case key.DestinationExactKeyRange: + if _, ok := stmt.(*sqlparser.Insert); ok { + return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "range queries not supported for inserts: %s", vschema.TargetString()) + } + } + keyspace, err := vschema.DefaultKeyspace() if err != nil { return nil, err @@ -29,7 +39,6 @@ func buildPlanForBypass(stmt sqlparser.Statement, vschema ContextVSchema) (engin return &engine.Send{ Keyspace: keyspace, TargetDestination: vschema.Destination(), - TargetTabletType: vschema.TabletType(), Query: sqlparser.String(stmt), }, nil } diff --git a/go/vt/vtgate/planbuilder/plan_test.go b/go/vt/vtgate/planbuilder/plan_test.go index 90c96bbf4ca..aab3853de72 100644 --- a/go/vt/vtgate/planbuilder/plan_test.go +++ b/go/vt/vtgate/planbuilder/plan_test.go @@ -189,7 +189,6 @@ func TestBypassPlanning(t *testing.T) { Name: "main", Sharded: false, }, - TargetTabletType: topodatapb.TabletType_MASTER, TargetDestination: key.DestinationShard("-80"), } if diff := cmp.Diff(expected, plan.Instructions); diff != "" { From 59173b4236e80a7601614a38db365880cd16fac2 Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Fri, 27 Mar 2020 08:43:08 +0100 Subject: [PATCH 05/13] Added route type and testing Signed-off-by: Andres Taylor --- go/test/utils/diff.go | 83 +++++++++++++++++++++++++++ go/vt/vtgate/engine/send.go | 32 +++++------ go/vt/vtgate/engine/send_test.go | 10 ++-- go/vt/vtgate/planbuilder/bypass.go | 4 ++ go/vt/vtgate/planbuilder/plan_test.go | 15 ++++- 5 files changed, 119 insertions(+), 25 deletions(-) create mode 100644 go/test/utils/diff.go diff --git a/go/test/utils/diff.go b/go/test/utils/diff.go new file mode 100644 index 00000000000..05a1f9eb66d --- /dev/null +++ b/go/test/utils/diff.go @@ -0,0 +1,83 @@ +/* +Copyright 2020 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "testing" + + "github.com/google/go-cmp/cmp" +) + +// MustMatchFn is used to create a common diff function for a test file +// Usage in *_test.go file: +// +// Top declaration: +// +// var mustMatch = testutils.MustMatchFn( +// []interface{}{ // types with unexported fields +// type1{}, +// type2{}, +// ... +// typeN{}, +// }, +// []string{ // ignored fields +// ".id", // id numbers are unstable +// ".createAt", // created dates might not be interesting to compare +// }, +// ) +// +// In Test*() function: +// +// mustMatch(t, want, got, "something doesn't match") +func MustMatchFn(allowUnexportedTypes []interface{}, ignoredFields []string, extraOpts ...cmp.Option) func(t *testing.T, want, got interface{}, errMsg string) { + diffOpts := append([]cmp.Option{ + cmp.AllowUnexported(allowUnexportedTypes...), + cmpIgnoreFields(ignoredFields...), + }, extraOpts...) + // Diffs want/got and fails with errMsg on any failure. + return func(t *testing.T, want, got interface{}, errMsg string) { + t.Helper() + diff := cmp.Diff(want, got, diffOpts...) + if diff != "" { + t.Fatalf("%s: (-want +got)\n%v", errMsg, diff) + } + } +} + +// MustMatch is a convenience version of MustMatchFn with no overrides. +// Usage in Test*() function: +// +// testutils.MustMatch(t, want, got, "something doesn't match") +var MustMatch = MustMatchFn(nil, nil) + +// Skips fields of pathNames for cmp.Diff. +// Similar to standard cmpopts.IgnoreFields, but allows unexported fields. +func cmpIgnoreFields(pathNames ...string) cmp.Option { + skipFields := make(map[string]bool, len(pathNames)) + for _, name := range pathNames { + skipFields[name] = true + } + + return cmp.FilterPath(func(path cmp.Path) bool { + for _, ps := range path { + if skipFields[ps.String()] { + return true + } + } + return false + }, cmp.Ignore()) +} diff --git a/go/vt/vtgate/engine/send.go b/go/vt/vtgate/engine/send.go index 134f937b2ef..76fefdf7beb 100644 --- a/go/vt/vtgate/engine/send.go +++ b/go/vt/vtgate/engine/send.go @@ -24,25 +24,30 @@ type Send struct { // Query specifies the query to be executed. Query string + + // OpCode specifies the route type + OpCode string + + noInputs } // RouteType implements Primitive interface -func (s Send) RouteType() string { - panic("implement me") +func (s *Send) RouteType() string { + return s.OpCode } // GetKeyspaceName implements Primitive interface -func (s Send) GetKeyspaceName() string { - panic("implement me") +func (s *Send) GetKeyspaceName() string { + return s.Keyspace.Name } // GetTableName implements Primitive interface -func (s Send) GetTableName() string { - panic("implement me") +func (s *Send) GetTableName() string { + return "" } // Execute implements Primitive interface -func (s Send) Execute(vcursor VCursor, bindVars map[string]*query.BindVariable, wantfields bool) (*sqltypes.Result, error) { +func (s *Send) Execute(vcursor VCursor, bindVars map[string]*query.BindVariable, _ bool) (*sqltypes.Result, error) { rss, _, err := vcursor.ResolveDestinations(s.Keyspace.Name, nil, []key.Destination{s.TargetDestination}) if err != nil { return nil, vterrors.Wrap(err, "sendExecute") @@ -69,16 +74,11 @@ func (s Send) Execute(vcursor VCursor, bindVars map[string]*query.BindVariable, } // StreamExecute implements Primitive interface -func (s Send) StreamExecute(vcursor VCursor, bindVars map[string]*query.BindVariable, wantields bool, callback func(*sqltypes.Result) error) error { - panic("implement me") +func (s *Send) StreamExecute(vcursor VCursor, bindVars map[string]*query.BindVariable, wantields bool, callback func(*sqltypes.Result) error) error { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "not reachable") // TODO: systay - this should work } // GetFields implements Primitive interface -func (s Send) GetFields(vcursor VCursor, bindVars map[string]*query.BindVariable) (*sqltypes.Result, error) { - panic("implement me") -} - -// Inputs implements Primitive interface -func (s Send) Inputs() []Primitive { - panic("implement me") +func (s *Send) GetFields(vcursor VCursor, bindVars map[string]*query.BindVariable) (*sqltypes.Result, error) { + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "not reachable") // TODO: systay - @sugu, is this correct? } diff --git a/go/vt/vtgate/engine/send_test.go b/go/vt/vtgate/engine/send_test.go index 1e9f824e6aa..c8883661794 100644 --- a/go/vt/vtgate/engine/send_test.go +++ b/go/vt/vtgate/engine/send_test.go @@ -4,6 +4,8 @@ import ( "errors" "testing" + "github.com/stretchr/testify/require" + "vitess.io/vitess/go/vt/key" querypb "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/vtgate/vindexes" @@ -21,9 +23,7 @@ func TestSendUnsharded(t *testing.T) { vc := &loggingVCursor{shards: []string{"0"}} _, err := send.Execute(vc, map[string]*querypb.BindVariable{}, false) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) vc.ExpectLog(t, []string{ `ResolveDestinations ks [] Destinations:DestinationAllShards()`, `ExecuteMultiShard ks.0: dummy_query {} false true`, @@ -51,9 +51,7 @@ func TestSendSharded(t *testing.T) { vc := &loggingVCursor{shards: []string{"-20", "20-"}} _, err := send.Execute(vc, map[string]*querypb.BindVariable{}, false) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) vc.ExpectLog(t, []string{ `ResolveDestinations ks [] Destinations:DestinationShard(20-)`, `ExecuteMultiShard ks.DestinationShard(20-): dummy_query {} false true`, diff --git a/go/vt/vtgate/planbuilder/bypass.go b/go/vt/vtgate/planbuilder/bypass.go index 142c04ffc34..88583d2104f 100644 --- a/go/vt/vtgate/planbuilder/bypass.go +++ b/go/vt/vtgate/planbuilder/bypass.go @@ -40,5 +40,9 @@ func buildPlanForBypass(stmt sqlparser.Statement, vschema ContextVSchema) (engin Keyspace: keyspace, TargetDestination: vschema.Destination(), Query: sqlparser.String(stmt), + OpCode: ByPassOpCode, }, nil } + +//ByPassOpCode is the opcode +const ByPassOpCode = "ByPass" diff --git a/go/vt/vtgate/planbuilder/plan_test.go b/go/vt/vtgate/planbuilder/plan_test.go index aab3853de72..93a9504bb95 100644 --- a/go/vt/vtgate/planbuilder/plan_test.go +++ b/go/vt/vtgate/planbuilder/plan_test.go @@ -26,6 +26,8 @@ import ( "strings" "testing" + "vitess.io/vitess/go/test/utils" + "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require" @@ -169,6 +171,14 @@ func TestOne(t *testing.T) { testFile(t, "onecase.txt", "", vschema) } +var mustMatch = utils.MustMatchFn( + []interface{}{ // types with unexported fields + engine.Send{}, + }, + []string{ // ignored fields + }, +) + func TestBypassPlanning(t *testing.T) { query := "select * from ks.t1" stmt, err := sqlparser.Parse(query) @@ -190,10 +200,9 @@ func TestBypassPlanning(t *testing.T) { Sharded: false, }, TargetDestination: key.DestinationShard("-80"), + OpCode: ByPassOpCode, } - if diff := cmp.Diff(expected, plan.Instructions); diff != "" { - t.Errorf("-want,+actual\n%s", diff) - } + mustMatch(t, expected, plan.Instructions, "plan output not what we expected") } func loadSchema(t *testing.T, filename string) *vindexes.VSchema { From cc6c455a61b9e448afe8d8d57d38364da3fe9956 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Fri, 27 Mar 2020 17:02:40 +0530 Subject: [PATCH 06/13] Removed dest logic from handleExec and added destination in plan key Signed-off-by: Harshit Gangal --- go/vt/vtgate/autocommit_test.go | 10 +- go/vt/vtgate/executor.go | 48 +---- go/vt/vtgate/executor_dml_test.go | 255 +++++++++------------------ go/vt/vtgate/executor_select_test.go | 10 +- go/vt/vtgate/executor_test.go | 10 +- go/vt/vtgate/vcursor_impl.go | 8 + 6 files changed, 109 insertions(+), 232 deletions(-) diff --git a/go/vt/vtgate/autocommit_test.go b/go/vt/vtgate/autocommit_test.go index 1188cab4802..a3c689f1652 100644 --- a/go/vt/vtgate/autocommit_test.go +++ b/go/vt/vtgate/autocommit_test.go @@ -424,17 +424,17 @@ func TestAutocommitDirectRangeTarget(t *testing.T) { Autocommit: true, TransactionMode: vtgatepb.TransactionMode_MULTI, } - sql := "DELETE FROM sharded_user_msgs LIMIT 1000" + sql := "delete from sharded_user_msgs limit 1000" _, err := executor.Execute(context.Background(), "TestExecute", NewSafeSession(session), sql, map[string]*querypb.BindVariable{}) require.NoError(t, err) - testQueries(t, "sbc1", sbc1, []*querypb.BoundQuery{{ + testBatchQuery(t, "sbc1", sbc1, &querypb.BoundQuery{ Sql: sql, BindVariables: map[string]*querypb.BindVariable{}, - }}) - testAsTransactionCount(t, "sbc1", sbc1, 0) - testCommitCount(t, "sbc1", sbc1, 1) + }) + testAsTransactionCount(t, "sbc1", sbc1, 1) + testCommitCount(t, "sbc1", sbc1, 0) } func autocommitExec(executor *Executor, sql string) (*sqltypes.Result, error) { diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index 1d7e2138fb7..3436297cce1 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -266,49 +266,6 @@ func (e *Executor) execute(ctx context.Context, safeSession *SafeSession, sql st } func (e *Executor) handleExec(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, destKeyspace string, destTabletType topodatapb.TabletType, dest key.Destination, logStats *LogStats, stmtType sqlparser.StatementType) (*sqltypes.Result, error) { - if dest != nil { - if destKeyspace == "" { - return nil, errNoKeyspace - } - - switch dest.(type) { - case key.DestinationExactKeyRange: - stmtType := sqlparser.Preview(sql) - if stmtType == sqlparser.StmtInsert { - return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "range queries not supported for inserts: %s", safeSession.TargetString) - } - - } - - execStart := time.Now() - if e.normalize { - query, comments := sqlparser.SplitMarginComments(sql) - stmt, err := sqlparser.Parse(query) - if err != nil { - return nil, err - } - rewriteResult, err := sqlparser.PrepareAST(stmt, bindVars, "vtg") - if err != nil { - return nil, err - } - normalized := sqlparser.String(rewriteResult.AST) - sql = comments.Leading + normalized + comments.Trailing - neededBindVariables, err := e.createNeededBindVariables(rewriteResult.BindVarNeeds, safeSession) - if err != nil { - return nil, err - } - for k, v := range neededBindVariables { - bindVars[k] = v - } - } - logStats.PlanTime = execStart.Sub(logStats.StartTime) - logStats.SQL = sql - logStats.BindVariables = bindVars - result, err := e.resolver.Execute(ctx, sql, bindVars, destKeyspace, destTabletType, dest, safeSession, false /* notInTransaction */, safeSession.Options, logStats, true /* canAutocommit */) - logStats.ExecuteTime = time.Since(execStart) - e.updateQueryCounts("ShardDirect", "", "", int64(logStats.ShardQueries)) - return result, err - } // V3 mode. query, comments := sqlparser.SplitMarginComments(sql) @@ -1376,8 +1333,7 @@ func (e *Executor) getPlan(vcursor *vcursorImpl, sql string, comments sqlparser. if e.VSchema() == nil { return nil, errors.New("vschema not initialized") } - keyspace := vcursor.keyspace - planKey := keyspace + vindexes.TabletTypeSuffix[vcursor.tabletType] + ":" + sql + planKey := vcursor.planPrefixKey() + ":" + sql if plan, ok := e.plans.Get(planKey); ok { return plan.(*engine.Plan), nil } @@ -1409,7 +1365,7 @@ func (e *Executor) getPlan(vcursor *vcursorImpl, sql string, comments sqlparser. logStats.BindVariables = bindVars } - planKey = keyspace + vindexes.TabletTypeSuffix[vcursor.tabletType] + ":" + normalized + planKey = vcursor.planPrefixKey() + ":" + normalized if plan, ok := e.plans.Get(planKey); ok { return plan.(*engine.Plan), nil } diff --git a/go/vt/vtgate/executor_dml_test.go b/go/vt/vtgate/executor_dml_test.go index d8f2a3cc62b..6116d8d5d43 100644 --- a/go/vt/vtgate/executor_dml_test.go +++ b/go/vt/vtgate/executor_dml_test.go @@ -21,6 +21,9 @@ import ( "strings" "testing" + "github.com/stretchr/testify/assert" + "vitess.io/vitess/go/test/utils" + "github.com/stretchr/testify/require" "golang.org/x/net/context" @@ -314,12 +317,8 @@ func TestUpdateNormalize(t *testing.T) { "vtg2": sqltypes.TestBindVariable(int64(1)), }, }} - if sbc1.Queries != nil { - t.Errorf("sbc1.Queries: %+v, want nil\n", sbc1.Queries) - } - if !reflect.DeepEqual(sbc2.Queries, wantQueries) { - t.Errorf("sbc2.Queries: %+v, want %+v\n", sbc2.Queries, wantQueries) - } + assert.Empty(t, sbc1.BatchQueries) + utils.MustMatch(t, sbc2.BatchQueries[0], wantQueries, "didn't get expected queries") sbc2.Queries = nil masterSession.TargetString = "" } @@ -1524,186 +1523,102 @@ func TestInsertBadAutoInc(t *testing.T) { } func TestKeyDestRangeQuery(t *testing.T) { - executor, sbc1, sbc2, _ := createExecutorEnv() - // it works in a single shard key range - masterSession.TargetString = "TestExecutor[40-60]" - _, err := executorExec(executor, "DELETE FROM sharded_user_msgs LIMIT 1000", nil) - require.NoError(t, err) - sql := "DELETE FROM sharded_user_msgs LIMIT 1000" - wantQueries := []*querypb.BoundQuery{{ - Sql: sql, - BindVariables: map[string]*querypb.BindVariable{}, - }} - - if len(sbc1.Queries) != 0 { - t.Errorf("sbc1.Queries: %+v, want %+v\n", sbc1.Queries, []*querypb.BoundQuery{}) + type testCase struct { + inputQuery, targetString string + expectedSbc1Query string + expectedSbc2Query string + } + deleteInput := "DELETE FROM sharded_user_msgs LIMIT 1000" + deleteOutput := "delete from sharded_user_msgs limit 1000" + + selectInput := "SELECT * FROM sharded_user_msgs LIMIT 1" + selectOutput := "select * from sharded_user_msgs limit 1" + updateInput := "UPDATE sharded_user_msgs set message='test' LIMIT 1" + updateOutput := "update sharded_user_msgs set message = 'test' limit 1" + insertInput := "INSERT INTO sharded_user_msgs(message) VALUES('test')" + insertOutput := "insert into sharded_user_msgs(message) values ('test')" + tests := []testCase{ + { + inputQuery: deleteInput, + targetString: "TestExecutor[-60]", + expectedSbc1Query: deleteOutput, + expectedSbc2Query: deleteOutput, + }, + { + inputQuery: deleteInput, + targetString: "TestExecutor[40-60]", + expectedSbc2Query: deleteOutput, + }, + { + inputQuery: deleteInput, + targetString: "TestExecutor[-]", + expectedSbc1Query: deleteOutput, + expectedSbc2Query: deleteOutput, + }, + { + inputQuery: selectInput, + targetString: "TestExecutor[-]", + expectedSbc1Query: selectOutput, + expectedSbc2Query: selectOutput, + }, + { + inputQuery: updateInput, + targetString: "TestExecutor[-]", + expectedSbc1Query: updateOutput, + expectedSbc2Query: updateOutput, + }, + { + inputQuery: insertInput, + targetString: "TestExecutor:40-60", + expectedSbc2Query: insertOutput, + }, + { + inputQuery: insertInput, + targetString: "TestExecutor:-20", + expectedSbc1Query: insertOutput, + }, } - testQueries(t, "sbc2", sbc2, wantQueries) - - sbc1.Queries = nil - sbc2.Queries = nil - - // it works with keyrange spanning two shards - masterSession.TargetString = "TestExecutor[-60]" - - _, err = executorExec(executor, sql, nil) - require.NoError(t, err) - testQueries(t, "sbc1", sbc1, wantQueries) - testQueries(t, "sbc1", sbc2, wantQueries) - sbc1.Queries = nil - sbc2.Queries = nil - - // it works with open ended key range - masterSession.TargetString = "TestExecutor[-]" + for _, tc := range tests { + t.Run(tc.targetString+" - "+tc.inputQuery, func(t *testing.T) { + executor, sbc1, sbc2, _ := createExecutorEnv() - _, err = executorExec(executor, sql, nil) - require.NoError(t, err) - - testQueries(t, "sbc1", sbc1, wantQueries) - testQueries(t, "sbc2", sbc2, wantQueries) - - sbc1.Queries = nil - sbc2.Queries = nil - - // it works for select - sql = "SELECT * FROM sharded_user_msgs LIMIT 1" - wantQueries = []*querypb.BoundQuery{{ - Sql: sql, - BindVariables: map[string]*querypb.BindVariable{}, - }} - - _, err = executorExec(executor, sql, nil) - require.NoError(t, err) + masterSession.TargetString = tc.targetString + _, err := executorExec(executor, tc.inputQuery, nil) + require.NoError(t, err) - testQueries(t, "sbc1", sbc1, wantQueries) - testQueries(t, "sbc2", sbc2, wantQueries) - - sbc1.Queries = nil - sbc2.Queries = nil - - // it works for updates - sql = "UPDATE sharded_user_msgs set message='test' LIMIT 1" - - wantQueries = []*querypb.BoundQuery{{ - Sql: sql, - BindVariables: map[string]*querypb.BindVariable{}, - }} - - _, err = executorExec(executor, sql, nil) - require.NoError(t, err) - - testQueries(t, "sbc1", sbc1, wantQueries) - testQueries(t, "sbc2", sbc2, wantQueries) + if tc.expectedSbc1Query == "" { + require.Empty(t, sbc1.BatchQueries, "sbc1") + } else { + assertBatchQueriesContain(t, tc.expectedSbc1Query, "sbc1", sbc1) + } - sbc1.Queries = nil - sbc2.Queries = nil + if tc.expectedSbc2Query == "" { + require.Empty(t, sbc2.BatchQueries) + } else { + assertBatchQueriesContain(t, tc.expectedSbc2Query, "sbc2", sbc2) + } + }) + } // it does not work for inserts - _, err = executorExec(executor, "INSERT INTO sharded_user_msgs(message) VALUES('test')", nil) + executor, _, _, _ := createExecutorEnv() + masterSession.TargetString = "TestExecutor[-]" + _, err := executorExec(executor, insertInput, nil) - want := "range queries not supported for inserts: TestExecutor[-]" - if err == nil || err.Error() != want { - t.Errorf("got: %v, want %s", err, want) - } + require.EqualError(t, err, "range queries not supported for inserts: TestExecutor[-]") - sbc1.Queries = nil - sbc2.Queries = nil masterSession.TargetString = "" } -func TestKeyShardDestQuery(t *testing.T) { - executor, sbc1, sbc2, _ := createExecutorEnv() - // it works in a single shard key range - masterSession.TargetString = "TestExecutor:40-60" - - _, err := executorExec(executor, "DELETE FROM sharded_user_msgs LIMIT 1000", nil) - require.NoError(t, err) - sql := "DELETE FROM sharded_user_msgs LIMIT 1000" - wantQueries := []*querypb.BoundQuery{{ - Sql: sql, - BindVariables: map[string]*querypb.BindVariable{}, - }} - - if len(sbc1.Queries) != 0 { - t.Errorf("sbc1.Queries: %+v, want %+v\n", sbc1.Queries, []*querypb.BoundQuery{}) - } - testQueries(t, "sbc2", sbc2, wantQueries) - - sbc1.Queries = nil - sbc2.Queries = nil - - masterSession.TargetString = "TestExecutor:40-60" - - _, err = executorExec(executor, sql, nil) - require.NoError(t, err) - - testQueries(t, "sbc2", sbc2, wantQueries) - - sbc1.Queries = nil - sbc2.Queries = nil - - // it works for select - sql = "SELECT * FROM sharded_user_msgs LIMIT 1" - wantQueries = []*querypb.BoundQuery{{ - Sql: sql, - BindVariables: map[string]*querypb.BindVariable{}, - }} - - _, err = executorExec(executor, sql, nil) - require.NoError(t, err) - - if len(sbc1.Queries) != 0 { - t.Errorf("sbc1.Queries: %+v, want %+v\n", sbc1.Queries, []*querypb.BoundQuery{}) - } - - testQueries(t, "sbc2", sbc2, wantQueries) - - sbc1.Queries = nil - sbc2.Queries = nil - - // it works for updates - sql = "UPDATE sharded_user_msgs set message='test' LIMIT 1" - - wantQueries = []*querypb.BoundQuery{{ +func assertBatchQueriesContain(t *testing.T, sql, sbcName string, sbc *sandboxconn.SandboxConn) { + t.Helper() + expectedQuery := &querypb.BoundQuery{ Sql: sql, BindVariables: map[string]*querypb.BindVariable{}, - }} - - _, err = executorExec(executor, sql, nil) - - require.NoError(t, err) - - if len(sbc1.Queries) != 0 { - t.Errorf("sbc1.Queries: %+v, want %+v\n", sbc1.Queries, []*querypb.BoundQuery{}) } - - testQueries(t, "sbc2", sbc2, wantQueries) - - sbc1.Queries = nil - sbc2.Queries = nil - - // it works for inserts - - sql = "INSERT INTO sharded_user_msgs(message) VALUES('test')" - _, err = executorExec(executor, sql, nil) - - wantQueries = []*querypb.BoundQuery{{ - Sql: sql, - BindVariables: map[string]*querypb.BindVariable{}, - }} - require.NoError(t, err) - - if len(sbc1.Queries) != 0 { - t.Errorf("sbc1.Queries: %+v, want %+v\n", sbc1.Queries, []*querypb.BoundQuery{}) - } - - testQueries(t, "sbc2", sbc2, wantQueries) - - sbc1.Queries = nil - sbc2.Queries = nil - masterSession.TargetString = "" + testBatchQuery(t, sbcName, sbc, expectedQuery) } // Prepared statement tests diff --git a/go/vt/vtgate/executor_select_test.go b/go/vt/vtgate/executor_select_test.go index 502680c4945..f70ce6da7de 100644 --- a/go/vt/vtgate/executor_select_test.go +++ b/go/vt/vtgate/executor_select_test.go @@ -22,6 +22,8 @@ import ( "strings" "testing" + "vitess.io/vitess/go/test/utils" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -632,12 +634,8 @@ func TestSelectNormalize(t *testing.T) { "vtg1": sqltypes.TestBindVariable(int64(1)), }, }} - if sbc1.Queries != nil { - t.Errorf("sbc1.Queries: %+v, want nil\n", sbc1.Queries) - } - if !reflect.DeepEqual(sbc2.Queries, wantQueries) { - t.Errorf("sbc2.Queries: %+v, want %+v\n", sbc2.Queries, wantQueries) - } + require.Empty(t, sbc1.BatchQueries) + utils.MustMatch(t, sbc2.BatchQueries[0], wantQueries, "sbc2.Queries") sbc2.Queries = nil masterSession.TargetString = "" } diff --git a/go/vt/vtgate/executor_test.go b/go/vt/vtgate/executor_test.go index c97854e27da..543cd155806 100644 --- a/go/vt/vtgate/executor_test.go +++ b/go/vt/vtgate/executor_test.go @@ -194,6 +194,7 @@ func TestExecutorTransactionsNoAutoCommit(t *testing.T) { } } +//TODO - what about these? func TestDirectTargetRewrites(t *testing.T) { executor, _, _, sbclookup := createExecutorEnv() executor.normalize = true @@ -205,13 +206,12 @@ func TestDirectTargetRewrites(t *testing.T) { } sql := "select database()" - if _, err := executor.Execute(context.Background(), "TestExecute", NewSafeSession(session), sql, map[string]*querypb.BindVariable{}); err != nil { - t.Error(err) - } - testQueries(t, "sbclookup", sbclookup, []*querypb.BoundQuery{{ + _, err := executor.Execute(context.Background(), "TestExecute", NewSafeSession(session), sql, map[string]*querypb.BindVariable{}) + require.NoError(t, err) + testBatchQuery(t, "sbclookup", sbclookup, &querypb.BoundQuery{ Sql: "select :__vtdbname as `database()` from dual", BindVariables: map[string]*querypb.BindVariable{"__vtdbname": sqltypes.StringBindVariable("TestUnsharded")}, - }}) + }) } func TestExecutorTransactionsAutoCommit(t *testing.T) { diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go index 745630e60ee..11bee348d9d 100644 --- a/go/vt/vtgate/vcursor_impl.go +++ b/go/vt/vtgate/vcursor_impl.go @@ -17,6 +17,7 @@ limitations under the License. package vtgate import ( + "fmt" "sync/atomic" "time" @@ -304,3 +305,10 @@ func parseDestinationTarget(targetString string, vschema *vindexes.VSchema) (str } return destKeyspace, destTabletType, dest, err } + +func (vc *vcursorImpl) planPrefixKey() string { + if vc.destination != nil { + return fmt.Sprintf("%s%s%s", vc.keyspace, vindexes.TabletTypeSuffix[vc.tabletType], vc.destination.String()) + } + return fmt.Sprintf("%s%s", vc.keyspace, vindexes.TabletTypeSuffix[vc.tabletType]) +} From 289ef8108c8b8c79d58b5b1cffb3babaf86bc90f Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Fri, 27 Mar 2020 12:58:07 +0100 Subject: [PATCH 07/13] Moved to plan test using text file Signed-off-by: Andres Taylor --- go/vt/vtgate/planbuilder/plan_test.go | 68 +++++++------------ .../planbuilder/testdata/bypass_cases.txt | 59 ++++++++++++++++ 2 files changed, 85 insertions(+), 42 deletions(-) create mode 100644 go/vt/vtgate/planbuilder/testdata/bypass_cases.txt diff --git a/go/vt/vtgate/planbuilder/plan_test.go b/go/vt/vtgate/planbuilder/plan_test.go index 93a9504bb95..a73ed6c8c88 100644 --- a/go/vt/vtgate/planbuilder/plan_test.go +++ b/go/vt/vtgate/planbuilder/plan_test.go @@ -26,8 +26,6 @@ import ( "strings" "testing" - "vitess.io/vitess/go/test/utils" - "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require" @@ -144,7 +142,10 @@ func init() { } func TestPlan(t *testing.T) { - vschema := loadSchema(t, "schema_test.json") + vschemaWrapper := &vschemaWrapper{ + v: loadSchema(t, "schema_test.json"), + } + testOutputTempDir, err := ioutil.TempDir("", "plan_test") require.NoError(t, err) // You will notice that some tests expect user.Id instead of user.id. @@ -153,35 +154,29 @@ func TestPlan(t *testing.T) { // the column is named as Id. This is to make sure that // column names are case-preserved, but treated as // case-insensitive even if they come from the vschema. - testFile(t, "aggr_cases.txt", testOutputTempDir, vschema) - testFile(t, "dml_cases.txt", testOutputTempDir, vschema) - testFile(t, "from_cases.txt", testOutputTempDir, vschema) - testFile(t, "filter_cases.txt", testOutputTempDir, vschema) - testFile(t, "postprocess_cases.txt", testOutputTempDir, vschema) - testFile(t, "select_cases.txt", testOutputTempDir, vschema) - testFile(t, "symtab_cases.txt", testOutputTempDir, vschema) - testFile(t, "unsupported_cases.txt", testOutputTempDir, vschema) - testFile(t, "vindex_func_cases.txt", testOutputTempDir, vschema) - testFile(t, "wireup_cases.txt", testOutputTempDir, vschema) - testFile(t, "memory_sort_cases.txt", testOutputTempDir, vschema) + testFile(t, "aggr_cases.txt", testOutputTempDir, vschemaWrapper) + testFile(t, "dml_cases.txt", testOutputTempDir, vschemaWrapper) + testFile(t, "from_cases.txt", testOutputTempDir, vschemaWrapper) + testFile(t, "filter_cases.txt", testOutputTempDir, vschemaWrapper) + testFile(t, "postprocess_cases.txt", testOutputTempDir, vschemaWrapper) + testFile(t, "select_cases.txt", testOutputTempDir, vschemaWrapper) + testFile(t, "symtab_cases.txt", testOutputTempDir, vschemaWrapper) + testFile(t, "unsupported_cases.txt", testOutputTempDir, vschemaWrapper) + testFile(t, "vindex_func_cases.txt", testOutputTempDir, vschemaWrapper) + testFile(t, "wireup_cases.txt", testOutputTempDir, vschemaWrapper) + testFile(t, "memory_sort_cases.txt", testOutputTempDir, vschemaWrapper) } func TestOne(t *testing.T) { - vschema := loadSchema(t, "schema_test.json") + vschema := &vschemaWrapper{ + v: loadSchema(t, "schema_test.json"), + } + testFile(t, "onecase.txt", "", vschema) } -var mustMatch = utils.MustMatchFn( - []interface{}{ // types with unexported fields - engine.Send{}, - }, - []string{ // ignored fields - }, -) - -func TestBypassPlanning(t *testing.T) { - query := "select * from ks.t1" - stmt, err := sqlparser.Parse(query) +func TestBypassPlanningFromFile(t *testing.T) { + testOutputTempDir, err := ioutil.TempDir("", "plan_test") require.NoError(t, err) vschema := &vschemaWrapper{ v: loadSchema(t, "schema_test.json"), @@ -192,17 +187,8 @@ func TestBypassPlanning(t *testing.T) { tabletType: topodatapb.TabletType_MASTER, dest: key.DestinationShard("-80"), } - plan, err := BuildFromStmt(query, stmt, vschema, sqlparser.BindVarNeeds{}) - require.NoError(t, err) - expected := &engine.Send{Query: query, - Keyspace: &vindexes.Keyspace{ - Name: "main", - Sharded: false, - }, - TargetDestination: key.DestinationShard("-80"), - OpCode: ByPassOpCode, - } - mustMatch(t, expected, plan.Instructions, "plan output not what we expected") + + testFile(t, "bypass_cases.txt", testOutputTempDir, vschema) } func loadSchema(t *testing.T, filename string) *vindexes.VSchema { @@ -278,21 +264,19 @@ type testPlan struct { Instructions engine.Primitive `json:",omitempty"` } -func testFile(t *testing.T, filename, tempDir string, vschema *vindexes.VSchema) { +func testFile(t *testing.T, filename, tempDir string, vschema *vschemaWrapper) { t.Run(filename, func(t *testing.T) { expected := &strings.Builder{} fail := false for tcase := range iterateExecFile(filename) { t.Run(tcase.comments, func(t *testing.T) { - plan, err := Build(tcase.input, &vschemaWrapper{ - v: vschema, - }) + plan, err := Build(tcase.input, vschema) out := getPlanOrErrorOutput(err, plan) if out != tcase.output { fail = true - t.Errorf("File: %s, Line: %v\n %s", filename, tcase.lineno, cmp.Diff(tcase.output, out)) + t.Errorf("File: %s, Line: %v\n %s\n%s", filename, tcase.lineno, cmp.Diff(tcase.output, out), out) } if err != nil { diff --git a/go/vt/vtgate/planbuilder/testdata/bypass_cases.txt b/go/vt/vtgate/planbuilder/testdata/bypass_cases.txt new file mode 100644 index 00000000000..30022a7cb59 --- /dev/null +++ b/go/vt/vtgate/planbuilder/testdata/bypass_cases.txt @@ -0,0 +1,59 @@ +# select bypass +"select count(*), col from unsharded" +{ + "Original": "select count(*), col from unsharded", + "Instructions": { + "Keyspace": { + "Name": "main", + "Sharded": false + }, + "TargetDestination": "-80", + "Query": "select count(*), col from unsharded", + "OpCode": "ByPass" + } +} + +# update bypass +"update user set val = 1 where id = 18446744073709551616 and id = 1" +{ + "Original": "update user set val = 1 where id = 18446744073709551616 and id = 1", + "Instructions": { + "Keyspace": { + "Name": "main", + "Sharded": false + }, + "TargetDestination": "-80", + "Query": "update user set val = 1 where id = 18446744073709551616 and id = 1", + "OpCode": "ByPass" + } +} + +# delete bypass +"DELETE FROM USER WHERE ID = 42" +{ + "Original": "DELETE FROM USER WHERE ID = 42", + "Instructions": { + "Keyspace": { + "Name": "main", + "Sharded": false + }, + "TargetDestination": "-80", + "Query": "delete from USER where ID = 42", + "OpCode": "ByPass" + } +} + +# insert bypass +"INSERT INTO USER (ID, NAME) VALUES (42, 'ms X')" +{ + "Original": "INSERT INTO USER (ID, NAME) VALUES (42, 'ms X')", + "Instructions": { + "Keyspace": { + "Name": "main", + "Sharded": false + }, + "TargetDestination": "-80", + "Query": "insert into USER(ID, NAME) values (42, 'ms X')", + "OpCode": "ByPass" + } +} From f1010d55acbd20a90821979b994ca040c0cb7b1c Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Fri, 27 Mar 2020 19:19:58 +0530 Subject: [PATCH 08/13] fix vtexplain test Signed-off-by: Harshit Gangal --- go/vt/vtexplain/testdata/multi-output/target-output.txt | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/go/vt/vtexplain/testdata/multi-output/target-output.txt b/go/vt/vtexplain/testdata/multi-output/target-output.txt index abf70f5b57d..a4a11506c3e 100644 --- a/go/vt/vtexplain/testdata/multi-output/target-output.txt +++ b/go/vt/vtexplain/testdata/multi-output/target-output.txt @@ -1,12 +1,16 @@ ---------------------------------------------------------------------- select * from user where email='null@void.com' +1 ks_sharded/40-80: begin 1 ks_sharded/40-80: select * from user where email = 'null@void.com' limit 10001 +1 ks_sharded/40-80: commit ---------------------------------------------------------------------- select * from user where id in (1,2,3,4,5,6,7,8) +1 ks_sharded/40-80: begin 1 ks_sharded/40-80: select * from user where id in (1, 2, 3, 4, 5, 6, 7, 8) limit 10001 +1 ks_sharded/40-80: commit ---------------------------------------------------------------------- insert into user (id, name) values (2, 'bob') From 8219745666085e3b74320c5b5fb15de7e3816934 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 30 Mar 2020 11:32:23 +0530 Subject: [PATCH 09/13] Renamed isDML to rollbackOnError Signed-off-by: Harshit Gangal --- go/vt/vtgate/engine/delete.go | 4 +-- go/vt/vtgate/engine/fake_vcursor_test.go | 14 +++++----- go/vt/vtgate/engine/insert.go | 2 +- go/vt/vtgate/engine/primitive.go | 6 ++--- go/vt/vtgate/engine/route.go | 8 +++--- go/vt/vtgate/engine/update.go | 4 +-- go/vt/vtgate/executor.go | 2 +- go/vt/vtgate/vcursor_impl.go | 26 +++++++++---------- go/vt/vtgate/vcursor_impl_test.go | 1 - go/vt/vtgate/vindexes/consistent_lookup.go | 8 +++--- .../vtgate/vindexes/consistent_lookup_test.go | 12 ++++----- go/vt/vtgate/vindexes/lookup_internal.go | 8 +++--- go/vt/vtgate/vindexes/lookup_test.go | 10 +++---- go/vt/vtgate/vindexes/vindex.go | 4 +-- 14 files changed, 54 insertions(+), 55 deletions(-) diff --git a/go/vt/vtgate/engine/delete.go b/go/vt/vtgate/engine/delete.go index c1bf66db083..409ce0d5d97 100644 --- a/go/vt/vtgate/engine/delete.go +++ b/go/vt/vtgate/engine/delete.go @@ -166,7 +166,7 @@ func (del *Delete) execDeleteEqual(vcursor VCursor, bindVars map[string]*querypb return nil, vterrors.Wrap(err, "execDeleteEqual") } } - return execShard(vcursor, del.Query, bindVars, rs, true /* isDML */, true /* canAutocommit */) + return execShard(vcursor, del.Query, bindVars, rs, true /* rollbackOnError */, true /* canAutocommit */) } // deleteVindexEntries performs an delete if table owns vindex. @@ -231,6 +231,6 @@ func (del *Delete) execDeleteByDestination(vcursor VCursor, bindVars map[string] } } autocommit := (len(rss) == 1 || del.MultiShardAutocommit) && vcursor.AutocommitApproval() - res, errs := vcursor.ExecuteMultiShard(rss, queries, true /* isDML */, autocommit) + res, errs := vcursor.ExecuteMultiShard(rss, queries, true /* rollbackOnError */, autocommit) return res, vterrors.Aggregate(errs) } diff --git a/go/vt/vtgate/engine/fake_vcursor_test.go b/go/vt/vtgate/engine/fake_vcursor_test.go index 5e392c0d534..c0d93370f4b 100644 --- a/go/vt/vtgate/engine/fake_vcursor_test.go +++ b/go/vt/vtgate/engine/fake_vcursor_test.go @@ -58,11 +58,11 @@ func (t noopVCursor) SetContextTimeout(timeout time.Duration) context.CancelFunc func (t noopVCursor) RecordWarning(warning *querypb.QueryWarning) { } -func (t noopVCursor) Execute(method string, query string, bindvars map[string]*querypb.BindVariable, isDML bool, co vtgatepb.CommitOrder) (*sqltypes.Result, error) { +func (t noopVCursor) Execute(method string, query string, bindvars map[string]*querypb.BindVariable, rollbackOnError bool, co vtgatepb.CommitOrder) (*sqltypes.Result, error) { panic("unimplemented") } -func (t noopVCursor) ExecuteMultiShard(rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, isDML, autocommit bool) (*sqltypes.Result, []error) { +func (t noopVCursor) ExecuteMultiShard(rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, rollbackOnError, canAutocommit bool) (*sqltypes.Result, []error) { panic("unimplemented") } @@ -78,7 +78,7 @@ func (t noopVCursor) StreamExecuteMulti(query string, rss []*srvtopo.ResolvedSha panic("unimplemented") } -func (t noopVCursor) ExecuteKeyspaceID(keyspace string, ksid []byte, query string, bindVars map[string]*querypb.BindVariable, isDML, autocommit bool) (*sqltypes.Result, error) { +func (t noopVCursor) ExecuteKeyspaceID(keyspace string, ksid []byte, query string, bindVars map[string]*querypb.BindVariable, rollbackOnError, autocommit bool) (*sqltypes.Result, error) { panic("unimplemented") } @@ -121,7 +121,7 @@ func (f *loggingVCursor) RecordWarning(warning *querypb.QueryWarning) { f.warnings = append(f.warnings, warning) } -func (f *loggingVCursor) Execute(method string, query string, bindvars map[string]*querypb.BindVariable, isDML bool, co vtgatepb.CommitOrder) (*sqltypes.Result, error) { +func (f *loggingVCursor) Execute(method string, query string, bindvars map[string]*querypb.BindVariable, rollbackOnError bool, co vtgatepb.CommitOrder) (*sqltypes.Result, error) { name := "Unknown" switch co { case vtgatepb.CommitOrder_NORMAL: @@ -133,12 +133,12 @@ func (f *loggingVCursor) Execute(method string, query string, bindvars map[strin case vtgatepb.CommitOrder_AUTOCOMMIT: name = "ExecuteAutocommit" } - f.log = append(f.log, fmt.Sprintf("%s %s %v %v", name, query, printBindVars(bindvars), isDML)) + f.log = append(f.log, fmt.Sprintf("%s %s %v %v", name, query, printBindVars(bindvars), rollbackOnError)) return f.nextResult() } -func (f *loggingVCursor) ExecuteMultiShard(rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, isDML, canAutocommit bool) (*sqltypes.Result, []error) { - f.log = append(f.log, fmt.Sprintf("ExecuteMultiShard %v%v %v", printResolvedShardQueries(rss, queries), isDML, canAutocommit)) +func (f *loggingVCursor) ExecuteMultiShard(rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, rollbackOnError, canAutocommit bool) (*sqltypes.Result, []error) { + f.log = append(f.log, fmt.Sprintf("ExecuteMultiShard %v%v %v", printResolvedShardQueries(rss, queries), rollbackOnError, canAutocommit)) res, err := f.nextResult() if err != nil { return nil, []error{err} diff --git a/go/vt/vtgate/engine/insert.go b/go/vt/vtgate/engine/insert.go index 625a4656231..e08d5398ea2 100644 --- a/go/vt/vtgate/engine/insert.go +++ b/go/vt/vtgate/engine/insert.go @@ -277,7 +277,7 @@ func (ins *Insert) execInsertSharded(vcursor VCursor, bindVars map[string]*query } autocommit := (len(rss) == 1 || ins.MultiShardAutocommit) && vcursor.AutocommitApproval() - result, errs := vcursor.ExecuteMultiShard(rss, queries, true /* isDML */, autocommit) + result, errs := vcursor.ExecuteMultiShard(rss, queries, true /* rollbackOnError */, autocommit) if errs != nil { return nil, vterrors.Wrap(vterrors.Aggregate(errs), "execInsertSharded") } diff --git a/go/vt/vtgate/engine/primitive.go b/go/vt/vtgate/engine/primitive.go index 34a8d96a226..1ee2389b048 100644 --- a/go/vt/vtgate/engine/primitive.go +++ b/go/vt/vtgate/engine/primitive.go @@ -57,16 +57,16 @@ type VCursor interface { RecordWarning(warning *querypb.QueryWarning) // V3 functions. - Execute(method string, query string, bindvars map[string]*querypb.BindVariable, isDML bool, co vtgatepb.CommitOrder) (*sqltypes.Result, error) + Execute(method string, query string, bindvars map[string]*querypb.BindVariable, rollbackOnError bool, co vtgatepb.CommitOrder) (*sqltypes.Result, error) AutocommitApproval() bool // Shard-level functions. - ExecuteMultiShard(rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, isDML, canAutocommit bool) (*sqltypes.Result, []error) + ExecuteMultiShard(rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, rollbackOnError, canAutocommit bool) (*sqltypes.Result, []error) ExecuteStandalone(query string, bindvars map[string]*querypb.BindVariable, rs *srvtopo.ResolvedShard) (*sqltypes.Result, error) StreamExecuteMulti(query string, rss []*srvtopo.ResolvedShard, bindVars []map[string]*querypb.BindVariable, callback func(reply *sqltypes.Result) error) error // Keyspace ID level functions. - ExecuteKeyspaceID(keyspace string, ksid []byte, query string, bindVars map[string]*querypb.BindVariable, isDML, autocommit bool) (*sqltypes.Result, error) + ExecuteKeyspaceID(keyspace string, ksid []byte, query string, bindVars map[string]*querypb.BindVariable, rollbackOnError, autocommit bool) (*sqltypes.Result, error) // Resolver methods, from key.Destination to srvtopo.ResolvedShard. // Will replace all of the Topo functions. diff --git a/go/vt/vtgate/engine/route.go b/go/vt/vtgate/engine/route.go index 8a0f2f326a4..d1d78a9bc13 100644 --- a/go/vt/vtgate/engine/route.go +++ b/go/vt/vtgate/engine/route.go @@ -267,7 +267,7 @@ func (route *Route) execute(vcursor VCursor, bindVars map[string]*querypb.BindVa } queries := getQueries(route.Query, bvs) - result, errs := vcursor.ExecuteMultiShard(rss, queries, false /* isDML */, false /* autocommit */) + result, errs := vcursor.ExecuteMultiShard(rss, queries, false /* rollbackOnError */, false /* autocommit */) if errs != nil { if route.ScatterErrorsAsWarnings { @@ -362,7 +362,7 @@ func (route *Route) GetFields(vcursor VCursor, bindVars map[string]*querypb.Bind // This code is unreachable. It's just a sanity check. return nil, fmt.Errorf("no shards for keyspace: %s", route.Keyspace.Name) } - qr, err := execShard(vcursor, route.FieldQuery, bindVars, rss[0], false /* isDML */, false /* canAutocommit */) + qr, err := execShard(vcursor, route.FieldQuery, bindVars, rss[0], false /* rollbackOnError */, false /* canAutocommit */) if err != nil { return nil, err } @@ -518,14 +518,14 @@ func resolveKeyspaceID(vcursor VCursor, vindex vindexes.SingleColumn, vindexKey } } -func execShard(vcursor VCursor, query string, bindVars map[string]*querypb.BindVariable, rs *srvtopo.ResolvedShard, isDML, canAutocommit bool) (*sqltypes.Result, error) { +func execShard(vcursor VCursor, query string, bindVars map[string]*querypb.BindVariable, rs *srvtopo.ResolvedShard, rollbackOnError, canAutocommit bool) (*sqltypes.Result, error) { autocommit := canAutocommit && vcursor.AutocommitApproval() result, errs := vcursor.ExecuteMultiShard([]*srvtopo.ResolvedShard{rs}, []*querypb.BoundQuery{ { Sql: query, BindVariables: bindVars, }, - }, isDML, autocommit) + }, rollbackOnError, autocommit) return result, vterrors.Aggregate(errs) } diff --git a/go/vt/vtgate/engine/update.go b/go/vt/vtgate/engine/update.go index 565f86e1e14..684fe65d7d3 100644 --- a/go/vt/vtgate/engine/update.go +++ b/go/vt/vtgate/engine/update.go @@ -173,7 +173,7 @@ func (upd *Update) execUpdateEqual(vcursor VCursor, bindVars map[string]*querypb return nil, vterrors.Wrap(err, "execUpdateEqual") } } - return execShard(vcursor, upd.Query, bindVars, rs, true /* isDML */, true /* canAutocommit */) + return execShard(vcursor, upd.Query, bindVars, rs, true /* rollbackOnError */, true /* canAutocommit */) } // updateVindexEntries performs an update when a vindex is being modified @@ -260,6 +260,6 @@ func (upd *Update) execUpdateByDestination(vcursor VCursor, bindVars map[string] } autocommit := (len(rss) == 1 || upd.MultiShardAutocommit) && vcursor.AutocommitApproval() - result, errs := vcursor.ExecuteMultiShard(rss, queries, true /* isDML */, autocommit) + result, errs := vcursor.ExecuteMultiShard(rss, queries, true /* rollbackOnError */, autocommit) return result, vterrors.Aggregate(errs) } diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index 3436297cce1..0debe8a97e5 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -312,7 +312,7 @@ func (e *Executor) handleExec(ctx context.Context, safeSession *SafeSession, sql } // Check if there was partial DML execution. If so, rollback the transaction. - if err != nil && safeSession.InTransaction() && vcursor.hasPartialDML { + if err != nil && safeSession.InTransaction() && vcursor.rollbackOnPartialExec { _ = e.txConn.Rollback(ctx, safeSession) err = vterrors.Errorf(vtrpcpb.Code_ABORTED, "transaction rolled back due to partial DML execution: %v", err) } diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go index 11bee348d9d..3235086c4db 100644 --- a/go/vt/vtgate/vcursor_impl.go +++ b/go/vt/vtgate/vcursor_impl.go @@ -66,11 +66,11 @@ type vcursorImpl struct { executor iExecute resolver *srvtopo.Resolver logStats *LogStats - // hasPartialDML is set to true if any DML was successfully + // rollbackOnPartialExec is set to true if any DML was successfully // executed. If there was a subsequent failure, the transaction // must be forced to rollback. - hasPartialDML bool - vschema *vindexes.VSchema + rollbackOnPartialExec bool + vschema *vindexes.VSchema } // newVcursorImpl creates a vcursorImpl. Before creating this object, you have to separate out any marginComments that came with @@ -174,7 +174,7 @@ func (vc *vcursorImpl) TargetString() string { } // Execute is part of the engine.VCursor interface. -func (vc *vcursorImpl) Execute(method string, query string, bindVars map[string]*querypb.BindVariable, isDML bool, co vtgatepb.CommitOrder) (*sqltypes.Result, error) { +func (vc *vcursorImpl) Execute(method string, query string, bindVars map[string]*querypb.BindVariable, rollbackOnError bool, co vtgatepb.CommitOrder) (*sqltypes.Result, error) { session := vc.safeSession if co == vtgatepb.CommitOrder_AUTOCOMMIT { // For autocommit, we have to create an independent session. @@ -185,19 +185,19 @@ func (vc *vcursorImpl) Execute(method string, query string, bindVars map[string] } qr, err := vc.executor.Execute(vc.ctx, method, session, vc.marginComments.Leading+query+vc.marginComments.Trailing, bindVars) - if err == nil && isDML { - vc.hasPartialDML = true + if err == nil && rollbackOnError { + vc.rollbackOnPartialExec = true } return qr, err } // ExecuteMultiShard is part of the engine.VCursor interface. -func (vc *vcursorImpl) ExecuteMultiShard(rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, isDML, autocommit bool) (*sqltypes.Result, []error) { +func (vc *vcursorImpl) ExecuteMultiShard(rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, rollbackOnError, autocommit bool) (*sqltypes.Result, []error) { atomic.AddUint32(&vc.logStats.ShardQueries, uint32(len(queries))) qr, errs := vc.executor.ExecuteMultiShard(vc.ctx, rss, commentedShardQueries(queries, vc.marginComments), vc.tabletType, vc.safeSession, false, autocommit) - if errs == nil && isDML { - vc.hasPartialDML = true + if errs == nil && rollbackOnError { + vc.rollbackOnPartialExec = true } return qr, errs } @@ -229,7 +229,7 @@ func (vc *vcursorImpl) StreamExecuteMulti(query string, rss []*srvtopo.ResolvedS } // ExecuteKeyspaceID is part of the engine.VCursor interface. -func (vc *vcursorImpl) ExecuteKeyspaceID(keyspace string, ksid []byte, query string, bindVars map[string]*querypb.BindVariable, isDML, autocommit bool) (*sqltypes.Result, error) { +func (vc *vcursorImpl) ExecuteKeyspaceID(keyspace string, ksid []byte, query string, bindVars map[string]*querypb.BindVariable, rollbackOnError, autocommit bool) (*sqltypes.Result, error) { atomic.AddUint32(&vc.logStats.ShardQueries, 1) rss, _, err := vc.ResolveDestinations(keyspace, nil, []key.Destination{key.DestinationKeyspaceID(ksid)}) if err != nil { @@ -239,11 +239,11 @@ func (vc *vcursorImpl) ExecuteKeyspaceID(keyspace string, ksid []byte, query str Sql: query, BindVariables: bindVars, }} - qr, errs := vc.ExecuteMultiShard(rss, queries, isDML, autocommit) + qr, errs := vc.ExecuteMultiShard(rss, queries, rollbackOnError, autocommit) if len(errs) == 0 { - if isDML { - vc.hasPartialDML = true + if rollbackOnError { + vc.rollbackOnPartialExec = true } return qr, nil } diff --git a/go/vt/vtgate/vcursor_impl_test.go b/go/vt/vtgate/vcursor_impl_test.go index 61b70505f72..d6062dca443 100644 --- a/go/vt/vtgate/vcursor_impl_test.go +++ b/go/vt/vtgate/vcursor_impl_test.go @@ -14,7 +14,6 @@ import ( ) func TestDestinationKeyspace(t *testing.T) { - ks1 := &vindexes.Keyspace{ Name: "ks1", Sharded: false, diff --git a/go/vt/vtgate/vindexes/consistent_lookup.go b/go/vt/vtgate/vindexes/consistent_lookup.go index 1c64d218cc7..84072e75beb 100644 --- a/go/vt/vtgate/vindexes/consistent_lookup.go +++ b/go/vt/vtgate/vindexes/consistent_lookup.go @@ -266,19 +266,19 @@ func (lu *clCommon) handleDup(vcursor VCursor, values []sqltypes.Value, ksid []b bindVars[lu.lkp.To] = sqltypes.BytesBindVariable(ksid) // Lock the lookup row using pre priority. - qr, err := vcursor.Execute("VindexCreate", lu.lockLookupQuery, bindVars, false /* isDML */, vtgatepb.CommitOrder_PRE) + qr, err := vcursor.Execute("VindexCreate", lu.lockLookupQuery, bindVars, false /* rollbackOnError */, vtgatepb.CommitOrder_PRE) if err != nil { return err } switch len(qr.Rows) { case 0: - if _, err := vcursor.Execute("VindexCreate", lu.insertLookupQuery, bindVars, true /* isDML */, vtgatepb.CommitOrder_PRE); err != nil { + if _, err := vcursor.Execute("VindexCreate", lu.insertLookupQuery, bindVars, true /* rollbackOnError */, vtgatepb.CommitOrder_PRE); err != nil { return err } case 1: existingksid := qr.Rows[0][0].ToBytes() // Lock the target row using normal transaction priority. - qr, err = vcursor.ExecuteKeyspaceID(lu.keyspace, existingksid, lu.lockOwnerQuery, bindVars, false /* isDML */, false /* autocommit */) + qr, err = vcursor.ExecuteKeyspaceID(lu.keyspace, existingksid, lu.lockOwnerQuery, bindVars, false /* rollbackOnError */, false /* autocommit */) if err != nil { return err } @@ -288,7 +288,7 @@ func (lu *clCommon) handleDup(vcursor VCursor, values []sqltypes.Value, ksid []b if bytes.Equal(existingksid, ksid) { return nil } - if _, err := vcursor.Execute("VindexCreate", lu.updateLookupQuery, bindVars, true /* isDML */, vtgatepb.CommitOrder_PRE); err != nil { + if _, err := vcursor.Execute("VindexCreate", lu.updateLookupQuery, bindVars, true /* rollbackOnError */, vtgatepb.CommitOrder_PRE); err != nil { return err } default: diff --git a/go/vt/vtgate/vindexes/consistent_lookup_test.go b/go/vt/vtgate/vindexes/consistent_lookup_test.go index dbb95dbc4d6..346063def6d 100644 --- a/go/vt/vtgate/vindexes/consistent_lookup_test.go +++ b/go/vt/vtgate/vindexes/consistent_lookup_test.go @@ -522,7 +522,7 @@ func (vc *loggingVCursor) AddResult(qr *sqltypes.Result, err error) { vc.errors = append(vc.errors, err) } -func (vc *loggingVCursor) Execute(method string, query string, bindvars map[string]*querypb.BindVariable, isDML bool, co vtgatepb.CommitOrder) (*sqltypes.Result, error) { +func (vc *loggingVCursor) Execute(method string, query string, bindvars map[string]*querypb.BindVariable, rollbackOnError bool, co vtgatepb.CommitOrder) (*sqltypes.Result, error) { name := "Unknown" switch co { case vtgatepb.CommitOrder_NORMAL: @@ -534,14 +534,14 @@ func (vc *loggingVCursor) Execute(method string, query string, bindvars map[stri case vtgatepb.CommitOrder_AUTOCOMMIT: name = "ExecuteAutocommit" } - return vc.execute(name, query, bindvars, isDML) + return vc.execute(name, query, bindvars, rollbackOnError) } -func (vc *loggingVCursor) ExecuteKeyspaceID(keyspace string, ksid []byte, query string, bindVars map[string]*querypb.BindVariable, isDML, autocommit bool) (*sqltypes.Result, error) { - return vc.execute("ExecuteKeyspaceID", query, bindVars, isDML) +func (vc *loggingVCursor) ExecuteKeyspaceID(keyspace string, ksid []byte, query string, bindVars map[string]*querypb.BindVariable, rollbackOnError, autocommit bool) (*sqltypes.Result, error) { + return vc.execute("ExecuteKeyspaceID", query, bindVars, rollbackOnError) } -func (vc *loggingVCursor) execute(method string, query string, bindvars map[string]*querypb.BindVariable, isDML bool) (*sqltypes.Result, error) { +func (vc *loggingVCursor) execute(method string, query string, bindvars map[string]*querypb.BindVariable, rollbackOnError bool) (*sqltypes.Result, error) { if vc.index >= len(vc.results) { return nil, fmt.Errorf("ran out of results to return: %s", query) } @@ -550,7 +550,7 @@ func (vc *loggingVCursor) execute(method string, query string, bindvars map[stri bvl = append(bvl, bv{Name: k, Bv: string(v.Value)}) } sort.Slice(bvl, func(i, j int) bool { return bvl[i].Name < bvl[j].Name }) - vc.log = append(vc.log, fmt.Sprintf("%s %s %v %v", method, query, bvl, isDML)) + vc.log = append(vc.log, fmt.Sprintf("%s %s %v %v", method, query, bvl, rollbackOnError)) idx := vc.index vc.index++ if vc.errors[idx] != nil { diff --git a/go/vt/vtgate/vindexes/lookup_internal.go b/go/vt/vtgate/vindexes/lookup_internal.go index bfcc38a2c17..2ea8a77e455 100644 --- a/go/vt/vtgate/vindexes/lookup_internal.go +++ b/go/vt/vtgate/vindexes/lookup_internal.go @@ -76,7 +76,7 @@ func (lkp *lookupInternal) Lookup(vcursor VCursor, ids []sqltypes.Value) ([]*sql if lkp.Autocommit { co = vtgatepb.CommitOrder_AUTOCOMMIT } - result, err = vcursor.Execute("VindexLookup", lkp.sel, bindVars, false /* isDML */, co) + result, err = vcursor.Execute("VindexLookup", lkp.sel, bindVars, false /* rollbackOnError */, co) if err != nil { return nil, fmt.Errorf("lookup.Map: %v", err) } @@ -101,7 +101,7 @@ func (lkp *lookupInternal) VerifyCustom(vcursor VCursor, ids, values []sqltypes. lkp.FromColumns[0]: sqltypes.ValueBindVariable(id), lkp.To: sqltypes.ValueBindVariable(values[i]), } - result, err := vcursor.Execute("VindexVerify", lkp.ver, bindVars, false /* isDML */, co) + result, err := vcursor.Execute("VindexVerify", lkp.ver, bindVars, false /* rollbackOnError */, co) if err != nil { return nil, fmt.Errorf("lookup.Verify: %v", err) } @@ -206,7 +206,7 @@ func (lkp *lookupInternal) createCustom(vcursor VCursor, rowsColValues [][]sqlty fmt.Fprintf(buf, "%s=values(%s)", lkp.To, lkp.To) } - if _, err := vcursor.Execute("VindexCreate", buf.String(), bindVars, true /* isDML */, co); err != nil { + if _, err := vcursor.Execute("VindexCreate", buf.String(), bindVars, true /* rollbackOnError */, co); err != nil { return fmt.Errorf("lookup.Create: %v", err) } return nil @@ -247,7 +247,7 @@ func (lkp *lookupInternal) Delete(vcursor VCursor, rowsColValues [][]sqltypes.Va bindVars[lkp.FromColumns[colIdx]] = sqltypes.ValueBindVariable(columnValue) } bindVars[lkp.To] = sqltypes.ValueBindVariable(value) - _, err := vcursor.Execute("VindexDelete", lkp.del, bindVars, true /* isDML */, co) + _, err := vcursor.Execute("VindexDelete", lkp.del, bindVars, true /* rollbackOnError */, co) if err != nil { return fmt.Errorf("lookup.Delete: %v", err) } diff --git a/go/vt/vtgate/vindexes/lookup_test.go b/go/vt/vtgate/vindexes/lookup_test.go index 961a63725a9..827e0708dda 100644 --- a/go/vt/vtgate/vindexes/lookup_test.go +++ b/go/vt/vtgate/vindexes/lookup_test.go @@ -45,7 +45,7 @@ type vcursor struct { pre, post int } -func (vc *vcursor) Execute(method string, query string, bindvars map[string]*querypb.BindVariable, isDML bool, co vtgatepb.CommitOrder) (*sqltypes.Result, error) { +func (vc *vcursor) Execute(method string, query string, bindvars map[string]*querypb.BindVariable, rollbackOnError bool, co vtgatepb.CommitOrder) (*sqltypes.Result, error) { switch co { case vtgatepb.CommitOrder_PRE: vc.pre++ @@ -54,14 +54,14 @@ func (vc *vcursor) Execute(method string, query string, bindvars map[string]*que case vtgatepb.CommitOrder_AUTOCOMMIT: vc.autocommits++ } - return vc.execute(method, query, bindvars, isDML) + return vc.execute(method, query, bindvars, rollbackOnError) } -func (vc *vcursor) ExecuteKeyspaceID(keyspace string, ksid []byte, query string, bindVars map[string]*querypb.BindVariable, isDML, autocommit bool) (*sqltypes.Result, error) { - return vc.execute("ExecuteKeyspaceID", query, bindVars, isDML) +func (vc *vcursor) ExecuteKeyspaceID(keyspace string, ksid []byte, query string, bindVars map[string]*querypb.BindVariable, rollbackOnError, autocommit bool) (*sqltypes.Result, error) { + return vc.execute("ExecuteKeyspaceID", query, bindVars, rollbackOnError) } -func (vc *vcursor) execute(method string, query string, bindvars map[string]*querypb.BindVariable, isDML bool) (*sqltypes.Result, error) { +func (vc *vcursor) execute(method string, query string, bindvars map[string]*querypb.BindVariable, rollbackOnError bool) (*sqltypes.Result, error) { vc.queries = append(vc.queries, &querypb.BoundQuery{ Sql: query, BindVariables: bindvars, diff --git a/go/vt/vtgate/vindexes/vindex.go b/go/vt/vtgate/vindexes/vindex.go index ce7951aec65..0fea9233bb7 100644 --- a/go/vt/vtgate/vindexes/vindex.go +++ b/go/vt/vtgate/vindexes/vindex.go @@ -35,8 +35,8 @@ import ( // in the current context and session of a VTGate request. Vindexes // can use this interface to execute lookup queries. type VCursor interface { - Execute(method string, query string, bindvars map[string]*querypb.BindVariable, isDML bool, co vtgatepb.CommitOrder) (*sqltypes.Result, error) - ExecuteKeyspaceID(keyspace string, ksid []byte, query string, bindVars map[string]*querypb.BindVariable, isDML, autocommit bool) (*sqltypes.Result, error) + Execute(method string, query string, bindvars map[string]*querypb.BindVariable, rollbackOnError bool, co vtgatepb.CommitOrder) (*sqltypes.Result, error) + ExecuteKeyspaceID(keyspace string, ksid []byte, query string, bindVars map[string]*querypb.BindVariable, rollbackOnError, autocommit bool) (*sqltypes.Result, error) } // Vindex defines the interface required to register a vindex. From d93107be6874cda599e977b9399f3ee3810ac942 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 30 Mar 2020 13:59:50 +0530 Subject: [PATCH 10/13] Added autocommit logic to send primitive Signed-off-by: Harshit Gangal --- .../testdata/multi-output/target-output.txt | 4 ---- go/vt/vtgate/autocommit_test.go | 8 ++++---- go/vt/vtgate/engine/send.go | 3 ++- go/vt/vtgate/engine/send_test.go | 4 ++-- go/vt/vtgate/executor_dml_test.go | 16 ++++++++-------- go/vt/vtgate/executor_select_test.go | 4 ++-- go/vt/vtgate/executor_test.go | 4 ++-- 7 files changed, 20 insertions(+), 23 deletions(-) diff --git a/go/vt/vtexplain/testdata/multi-output/target-output.txt b/go/vt/vtexplain/testdata/multi-output/target-output.txt index a4a11506c3e..abf70f5b57d 100644 --- a/go/vt/vtexplain/testdata/multi-output/target-output.txt +++ b/go/vt/vtexplain/testdata/multi-output/target-output.txt @@ -1,16 +1,12 @@ ---------------------------------------------------------------------- select * from user where email='null@void.com' -1 ks_sharded/40-80: begin 1 ks_sharded/40-80: select * from user where email = 'null@void.com' limit 10001 -1 ks_sharded/40-80: commit ---------------------------------------------------------------------- select * from user where id in (1,2,3,4,5,6,7,8) -1 ks_sharded/40-80: begin 1 ks_sharded/40-80: select * from user where id in (1, 2, 3, 4, 5, 6, 7, 8) limit 10001 -1 ks_sharded/40-80: commit ---------------------------------------------------------------------- insert into user (id, name) values (2, 'bob') diff --git a/go/vt/vtgate/autocommit_test.go b/go/vt/vtgate/autocommit_test.go index a3c689f1652..e0ebb8c6705 100644 --- a/go/vt/vtgate/autocommit_test.go +++ b/go/vt/vtgate/autocommit_test.go @@ -429,12 +429,12 @@ func TestAutocommitDirectRangeTarget(t *testing.T) { _, err := executor.Execute(context.Background(), "TestExecute", NewSafeSession(session), sql, map[string]*querypb.BindVariable{}) require.NoError(t, err) - testBatchQuery(t, "sbc1", sbc1, &querypb.BoundQuery{ + testQueries(t, "sbc1", sbc1, []*querypb.BoundQuery{{ Sql: sql, BindVariables: map[string]*querypb.BindVariable{}, - }) - testAsTransactionCount(t, "sbc1", sbc1, 1) - testCommitCount(t, "sbc1", sbc1, 0) + }}) + testAsTransactionCount(t, "sbc1", sbc1, 0) + testCommitCount(t, "sbc1", sbc1, 1) } func autocommitExec(executor *Executor, sql string) (*sqltypes.Result, error) { diff --git a/go/vt/vtgate/engine/send.go b/go/vt/vtgate/engine/send.go index 76fefdf7beb..e9097a2ecd0 100644 --- a/go/vt/vtgate/engine/send.go +++ b/go/vt/vtgate/engine/send.go @@ -65,7 +65,8 @@ func (s *Send) Execute(vcursor VCursor, bindVars map[string]*query.BindVariable, } } - result, errs := vcursor.ExecuteMultiShard(rss, queries, false, true) + canAutocommit := len(rss) == 1 && vcursor.AutocommitApproval() + result, errs := vcursor.ExecuteMultiShard(rss, queries, true, canAutocommit) err = vterrors.Aggregate(errs) if err != nil { return nil, err diff --git a/go/vt/vtgate/engine/send_test.go b/go/vt/vtgate/engine/send_test.go index c8883661794..c3e03ec755c 100644 --- a/go/vt/vtgate/engine/send_test.go +++ b/go/vt/vtgate/engine/send_test.go @@ -26,7 +26,7 @@ func TestSendUnsharded(t *testing.T) { require.NoError(t, err) vc.ExpectLog(t, []string{ `ResolveDestinations ks [] Destinations:DestinationAllShards()`, - `ExecuteMultiShard ks.0: dummy_query {} false true`, + `ExecuteMultiShard ks.0: dummy_query {} true true`, }) // Failure cases @@ -54,7 +54,7 @@ func TestSendSharded(t *testing.T) { require.NoError(t, err) vc.ExpectLog(t, []string{ `ResolveDestinations ks [] Destinations:DestinationShard(20-)`, - `ExecuteMultiShard ks.DestinationShard(20-): dummy_query {} false true`, + `ExecuteMultiShard ks.DestinationShard(20-): dummy_query {} true true`, }) // Failure cases diff --git a/go/vt/vtgate/executor_dml_test.go b/go/vt/vtgate/executor_dml_test.go index 6116d8d5d43..b0a47608df5 100644 --- a/go/vt/vtgate/executor_dml_test.go +++ b/go/vt/vtgate/executor_dml_test.go @@ -317,8 +317,8 @@ func TestUpdateNormalize(t *testing.T) { "vtg2": sqltypes.TestBindVariable(int64(1)), }, }} - assert.Empty(t, sbc1.BatchQueries) - utils.MustMatch(t, sbc2.BatchQueries[0], wantQueries, "didn't get expected queries") + assert.Empty(t, sbc1.Queries) + utils.MustMatch(t, sbc2.Queries, wantQueries, "didn't get expected queries") sbc2.Queries = nil masterSession.TargetString = "" } @@ -1591,13 +1591,13 @@ func TestKeyDestRangeQuery(t *testing.T) { if tc.expectedSbc1Query == "" { require.Empty(t, sbc1.BatchQueries, "sbc1") } else { - assertBatchQueriesContain(t, tc.expectedSbc1Query, "sbc1", sbc1) + assertQueriesContain(t, tc.expectedSbc1Query, "sbc1", sbc1) } if tc.expectedSbc2Query == "" { require.Empty(t, sbc2.BatchQueries) } else { - assertBatchQueriesContain(t, tc.expectedSbc2Query, "sbc2", sbc2) + assertQueriesContain(t, tc.expectedSbc2Query, "sbc2", sbc2) } }) } @@ -1612,13 +1612,13 @@ func TestKeyDestRangeQuery(t *testing.T) { masterSession.TargetString = "" } -func assertBatchQueriesContain(t *testing.T, sql, sbcName string, sbc *sandboxconn.SandboxConn) { +func assertQueriesContain(t *testing.T, sql, sbcName string, sbc *sandboxconn.SandboxConn) { t.Helper() - expectedQuery := &querypb.BoundQuery{ + expectedQuery := []*querypb.BoundQuery{{ Sql: sql, BindVariables: map[string]*querypb.BindVariable{}, - } - testBatchQuery(t, sbcName, sbc, expectedQuery) + }} + testQueries(t, sbcName, sbc, expectedQuery) } // Prepared statement tests diff --git a/go/vt/vtgate/executor_select_test.go b/go/vt/vtgate/executor_select_test.go index f70ce6da7de..6e53ec590d3 100644 --- a/go/vt/vtgate/executor_select_test.go +++ b/go/vt/vtgate/executor_select_test.go @@ -634,8 +634,8 @@ func TestSelectNormalize(t *testing.T) { "vtg1": sqltypes.TestBindVariable(int64(1)), }, }} - require.Empty(t, sbc1.BatchQueries) - utils.MustMatch(t, sbc2.BatchQueries[0], wantQueries, "sbc2.Queries") + require.Empty(t, sbc1.Queries) + utils.MustMatch(t, sbc2.Queries, wantQueries, "sbc2.Queries") sbc2.Queries = nil masterSession.TargetString = "" } diff --git a/go/vt/vtgate/executor_test.go b/go/vt/vtgate/executor_test.go index 543cd155806..6c1a690d143 100644 --- a/go/vt/vtgate/executor_test.go +++ b/go/vt/vtgate/executor_test.go @@ -208,10 +208,10 @@ func TestDirectTargetRewrites(t *testing.T) { _, err := executor.Execute(context.Background(), "TestExecute", NewSafeSession(session), sql, map[string]*querypb.BindVariable{}) require.NoError(t, err) - testBatchQuery(t, "sbclookup", sbclookup, &querypb.BoundQuery{ + testQueries(t, "sbclookup", sbclookup, []*querypb.BoundQuery{{ Sql: "select :__vtdbname as `database()` from dual", BindVariables: map[string]*querypb.BindVariable{"__vtdbname": sqltypes.StringBindVariable("TestUnsharded")}, - }) + }}) } func TestExecutorTransactionsAutoCommit(t *testing.T) { From c4658bf2010e8fdfbc922a2fc536a994d52cab53 Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Mon, 30 Mar 2020 14:35:17 +0200 Subject: [PATCH 11/13] Handle DML/non-DML separately for the Send primitive Signed-off-by: Andres Taylor --- go/vt/sqlparser/analyzer.go | 10 ++ go/vt/vtgate/engine/send.go | 16 ++- go/vt/vtgate/engine/send_test.go | 136 ++++++++++++------ go/vt/vtgate/planbuilder/bypass.go | 5 +- .../planbuilder/testdata/bypass_cases.txt | 8 +- 5 files changed, 119 insertions(+), 56 deletions(-) diff --git a/go/vt/sqlparser/analyzer.go b/go/vt/sqlparser/analyzer.go index 113a887fb3e..c221787fa4f 100644 --- a/go/vt/sqlparser/analyzer.go +++ b/go/vt/sqlparser/analyzer.go @@ -163,6 +163,16 @@ func IsDML(sql string) bool { return false } +//IsDMLStatement returns true if the query is an INSERT, UPDATE or DELETE statement. +func IsDMLStatement(stmt Statement) bool { + switch stmt.(type) { + case *Insert, *Update, *Delete: + return true + } + + return false +} + // SplitAndExpression breaks up the Expr into AND-separated conditions // and appends them to filters. Outer parenthesis are removed. Precedence // should be taken into account if expressions are recombined. diff --git a/go/vt/vtgate/engine/send.go b/go/vt/vtgate/engine/send.go index e9097a2ecd0..dd8314e0f42 100644 --- a/go/vt/vtgate/engine/send.go +++ b/go/vt/vtgate/engine/send.go @@ -25,15 +25,19 @@ type Send struct { // Query specifies the query to be executed. Query string - // OpCode specifies the route type - OpCode string + // NoAutoCommit specifies if we need to check autocommit behaviour + NoAutoCommit bool noInputs } // RouteType implements Primitive interface func (s *Send) RouteType() string { - return s.OpCode + if s.NoAutoCommit { + return "SendNoAutoCommit" + } + + return "Send" } // GetKeyspaceName implements Primitive interface @@ -65,7 +69,11 @@ func (s *Send) Execute(vcursor VCursor, bindVars map[string]*query.BindVariable, } } - canAutocommit := len(rss) == 1 && vcursor.AutocommitApproval() + canAutocommit := false + if !s.NoAutoCommit { + canAutocommit = len(rss) == 1 && vcursor.AutocommitApproval() + } + result, errs := vcursor.ExecuteMultiShard(rss, queries, true, canAutocommit) err = vterrors.Aggregate(errs) if err != nil { diff --git a/go/vt/vtgate/engine/send_test.go b/go/vt/vtgate/engine/send_test.go index c3e03ec755c..2096d22be1c 100644 --- a/go/vt/vtgate/engine/send_test.go +++ b/go/vt/vtgate/engine/send_test.go @@ -11,54 +11,102 @@ import ( "vitess.io/vitess/go/vt/vtgate/vindexes" ) -func TestSendUnsharded(t *testing.T) { - send := &Send{ - Keyspace: &vindexes.Keyspace{ - Name: "ks", - Sharded: false, - }, - Query: "dummy_query", - TargetDestination: key.DestinationAllShards{}, +func TestSendTable(t *testing.T) { + type testCase struct { + testName string + sharded bool + shards []string + destination key.Destination + expectedQueryLog []string + noAutoCommit bool } - vc := &loggingVCursor{shards: []string{"0"}} - _, err := send.Execute(vc, map[string]*querypb.BindVariable{}, false) - require.NoError(t, err) - vc.ExpectLog(t, []string{ - `ResolveDestinations ks [] Destinations:DestinationAllShards()`, - `ExecuteMultiShard ks.0: dummy_query {} true true`, - }) - - // Failure cases - vc = &loggingVCursor{shardErr: errors.New("shard_error")} - _, err = send.Execute(vc, map[string]*querypb.BindVariable{}, false) - expectError(t, "Execute", err, "sendExecute: shard_error") - - vc = &loggingVCursor{} - _, err = send.Execute(vc, map[string]*querypb.BindVariable{}, false) - expectError(t, "Execute", err, "Keyspace does not have exactly one shard: []") -} - -func TestSendSharded(t *testing.T) { - send := &Send{ - Keyspace: &vindexes.Keyspace{ - Name: "ks", - Sharded: true, + singleShard := []string{"0"} + twoShards := []string{"-20", "20-"} + tests := []testCase{ + { + testName: "unsharded with no autocommit", + sharded: false, + shards: singleShard, + destination: key.DestinationAllShards{}, + expectedQueryLog: []string{ + `ResolveDestinations ks [] Destinations:DestinationAllShards()`, + `ExecuteMultiShard ks.0: dummy_query {} true true`, + }, + noAutoCommit: true, + }, + { + testName: "sharded with no autocommit", + sharded: true, + shards: twoShards, + destination: key.DestinationShard("20-"), + expectedQueryLog: []string{ + `ResolveDestinations ks [] Destinations:DestinationShard(20-)`, + `ExecuteMultiShard ks.DestinationShard(20-): dummy_query {} true true`, + }, + noAutoCommit: true, + }, + { + testName: "unsharded", + sharded: false, + shards: singleShard, + destination: key.DestinationAllShards{}, + expectedQueryLog: []string{ + `ResolveDestinations ks [] Destinations:DestinationAllShards()`, + `ExecuteMultiShard ks.0: dummy_query {} true true`, + }, + noAutoCommit: false, + }, + { + testName: "sharded with single shard destination", + sharded: true, + shards: twoShards, + destination: key.DestinationShard("20-"), + expectedQueryLog: []string{ + `ResolveDestinations ks [] Destinations:DestinationShard(20-)`, + `ExecuteMultiShard ks.DestinationShard(20-): dummy_query {} true true`, + }, + noAutoCommit: false, + }, + { + testName: "sharded with multi shard destination", + sharded: true, + shards: twoShards, + destination: key.DestinationAllShards{}, + expectedQueryLog: []string{ + `ResolveDestinations ks [] Destinations:DestinationAllShards()`, + `ExecuteMultiShard ks.-20: dummy_query {} ks.20-: dummy_query {} true false`, + }, + noAutoCommit: false, }, - Query: "dummy_query", - TargetDestination: key.DestinationShard("20-"), } - vc := &loggingVCursor{shards: []string{"-20", "20-"}} - _, err := send.Execute(vc, map[string]*querypb.BindVariable{}, false) - require.NoError(t, err) - vc.ExpectLog(t, []string{ - `ResolveDestinations ks [] Destinations:DestinationShard(20-)`, - `ExecuteMultiShard ks.DestinationShard(20-): dummy_query {} true true`, - }) + for _, tc := range tests { + t.Run(tc.testName, func(t *testing.T) { + send := &Send{ + Keyspace: &vindexes.Keyspace{ + Name: "ks", + Sharded: tc.sharded, + }, + Query: "dummy_query", + TargetDestination: tc.destination, + NoAutoCommit: false, + } + vc := &loggingVCursor{shards: tc.shards} + _, err := send.Execute(vc, map[string]*querypb.BindVariable{}, false) + require.NoError(t, err) + vc.ExpectLog(t, tc.expectedQueryLog) - // Failure cases - vc = &loggingVCursor{shardErr: errors.New("shard_error")} - _, err = send.Execute(vc, map[string]*querypb.BindVariable{}, false) - expectError(t, "Execute", err, "sendExecute: shard_error") + // Failure cases + vc = &loggingVCursor{shardErr: errors.New("shard_error")} + _, err = send.Execute(vc, map[string]*querypb.BindVariable{}, false) + require.EqualError(t, err, "sendExecute: shard_error") + + if !tc.sharded { + vc = &loggingVCursor{} + _, err = send.Execute(vc, map[string]*querypb.BindVariable{}, false) + require.EqualError(t, err, "Keyspace does not have exactly one shard: []") + } + }) + } } diff --git a/go/vt/vtgate/planbuilder/bypass.go b/go/vt/vtgate/planbuilder/bypass.go index 88583d2104f..2ffb3f415ac 100644 --- a/go/vt/vtgate/planbuilder/bypass.go +++ b/go/vt/vtgate/planbuilder/bypass.go @@ -40,9 +40,6 @@ func buildPlanForBypass(stmt sqlparser.Statement, vschema ContextVSchema) (engin Keyspace: keyspace, TargetDestination: vschema.Destination(), Query: sqlparser.String(stmt), - OpCode: ByPassOpCode, + NoAutoCommit: !sqlparser.IsDMLStatement(stmt), }, nil } - -//ByPassOpCode is the opcode -const ByPassOpCode = "ByPass" diff --git a/go/vt/vtgate/planbuilder/testdata/bypass_cases.txt b/go/vt/vtgate/planbuilder/testdata/bypass_cases.txt index 30022a7cb59..672bbc87f1d 100644 --- a/go/vt/vtgate/planbuilder/testdata/bypass_cases.txt +++ b/go/vt/vtgate/planbuilder/testdata/bypass_cases.txt @@ -9,7 +9,7 @@ }, "TargetDestination": "-80", "Query": "select count(*), col from unsharded", - "OpCode": "ByPass" + "NoAutoCommit": true } } @@ -24,7 +24,7 @@ }, "TargetDestination": "-80", "Query": "update user set val = 1 where id = 18446744073709551616 and id = 1", - "OpCode": "ByPass" + "NoAutoCommit": false } } @@ -39,7 +39,7 @@ }, "TargetDestination": "-80", "Query": "delete from USER where ID = 42", - "OpCode": "ByPass" + "NoAutoCommit": false } } @@ -54,6 +54,6 @@ }, "TargetDestination": "-80", "Query": "insert into USER(ID, NAME) values (42, 'ms X')", - "OpCode": "ByPass" + "NoAutoCommit": false } } From 5a9f4da8cee33780c8f4106450fe903bd25724b8 Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Mon, 30 Mar 2020 15:05:13 +0200 Subject: [PATCH 12/13] Don't rollback on error when not needed Signed-off-by: Andres Taylor --- go/vt/vtgate/engine/send.go | 3 ++- go/vt/vtgate/engine/send_test.go | 6 +++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/go/vt/vtgate/engine/send.go b/go/vt/vtgate/engine/send.go index dd8314e0f42..7bd8bc9c77d 100644 --- a/go/vt/vtgate/engine/send.go +++ b/go/vt/vtgate/engine/send.go @@ -74,7 +74,8 @@ func (s *Send) Execute(vcursor VCursor, bindVars map[string]*query.BindVariable, canAutocommit = len(rss) == 1 && vcursor.AutocommitApproval() } - result, errs := vcursor.ExecuteMultiShard(rss, queries, true, canAutocommit) + rollbackOnError := !s.NoAutoCommit // for non-dml queries, there's no need to do a rollback + result, errs := vcursor.ExecuteMultiShard(rss, queries, rollbackOnError, canAutocommit) err = vterrors.Aggregate(errs) if err != nil { return nil, err diff --git a/go/vt/vtgate/engine/send_test.go b/go/vt/vtgate/engine/send_test.go index 2096d22be1c..38d8ad510bb 100644 --- a/go/vt/vtgate/engine/send_test.go +++ b/go/vt/vtgate/engine/send_test.go @@ -31,7 +31,7 @@ func TestSendTable(t *testing.T) { destination: key.DestinationAllShards{}, expectedQueryLog: []string{ `ResolveDestinations ks [] Destinations:DestinationAllShards()`, - `ExecuteMultiShard ks.0: dummy_query {} true true`, + `ExecuteMultiShard ks.0: dummy_query {} false false`, }, noAutoCommit: true, }, @@ -42,7 +42,7 @@ func TestSendTable(t *testing.T) { destination: key.DestinationShard("20-"), expectedQueryLog: []string{ `ResolveDestinations ks [] Destinations:DestinationShard(20-)`, - `ExecuteMultiShard ks.DestinationShard(20-): dummy_query {} true true`, + `ExecuteMultiShard ks.DestinationShard(20-): dummy_query {} false false`, }, noAutoCommit: true, }, @@ -90,7 +90,7 @@ func TestSendTable(t *testing.T) { }, Query: "dummy_query", TargetDestination: tc.destination, - NoAutoCommit: false, + NoAutoCommit: tc.noAutoCommit, } vc := &loggingVCursor{shards: tc.shards} _, err := send.Execute(vc, map[string]*querypb.BindVariable{}, false) From c6e02cb572471952b79a2ab40692ab58f227576a Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 30 Mar 2020 19:14:24 +0530 Subject: [PATCH 13/13] removed destination from handleExec signature Signed-off-by: Harshit Gangal --- go/vt/vtgate/executor.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index 0debe8a97e5..9da0a08c418 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -207,7 +207,7 @@ func (e *Executor) execute(ctx context.Context, safeSession *SafeSession, sql st switch specStmt := stmt.(type) { case *sqlparser.Select, *sqlparser.Union: - return e.handleExec(ctx, safeSession, sql, bindVars, destKeyspace, destTabletType, dest, logStats, stmtType) + return e.handleExec(ctx, safeSession, sql, bindVars, logStats, stmtType) case *sqlparser.Insert, *sqlparser.Update, *sqlparser.Delete: safeSession := safeSession @@ -232,7 +232,7 @@ func (e *Executor) execute(ctx context.Context, safeSession *SafeSession, sql st // at the beginning, but never after. safeSession.SetAutocommittable(mustCommit) - qr, err := e.handleExec(ctx, safeSession, sql, bindVars, destKeyspace, destTabletType, dest, logStats, stmtType) + qr, err := e.handleExec(ctx, safeSession, sql, bindVars, logStats, stmtType) if err != nil { return nil, err } @@ -265,7 +265,7 @@ func (e *Executor) execute(ctx context.Context, safeSession *SafeSession, sql st return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unrecognized statement: %s", sql) } -func (e *Executor) handleExec(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, destKeyspace string, destTabletType topodatapb.TabletType, dest key.Destination, logStats *LogStats, stmtType sqlparser.StatementType) (*sqltypes.Result, error) { +func (e *Executor) handleExec(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, logStats *LogStats, stmtType sqlparser.StatementType) (*sqltypes.Result, error) { // V3 mode. query, comments := sqlparser.SplitMarginComments(sql)