Skip to content

Commit

Permalink
Handle DML/non-DML separately for the Send primitive
Browse files Browse the repository at this point in the history
Signed-off-by: Andres Taylor <[email protected]>
  • Loading branch information
systay committed Mar 30, 2020
1 parent d93107b commit c4658bf
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 56 deletions.
10 changes: 10 additions & 0 deletions go/vt/sqlparser/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,16 @@ func IsDML(sql string) bool {
return false
}

//IsDMLStatement returns true if the query is an INSERT, UPDATE or DELETE statement.
func IsDMLStatement(stmt Statement) bool {
switch stmt.(type) {
case *Insert, *Update, *Delete:
return true
}

return false
}

// SplitAndExpression breaks up the Expr into AND-separated conditions
// and appends them to filters. Outer parenthesis are removed. Precedence
// should be taken into account if expressions are recombined.
Expand Down
16 changes: 12 additions & 4 deletions go/vt/vtgate/engine/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,19 @@ type Send struct {
// Query specifies the query to be executed.
Query string

// OpCode specifies the route type
OpCode string
// NoAutoCommit specifies if we need to check autocommit behaviour
NoAutoCommit bool

noInputs
}

// RouteType implements Primitive interface
func (s *Send) RouteType() string {
return s.OpCode
if s.NoAutoCommit {
return "SendNoAutoCommit"
}

return "Send"
}

// GetKeyspaceName implements Primitive interface
Expand Down Expand Up @@ -65,7 +69,11 @@ func (s *Send) Execute(vcursor VCursor, bindVars map[string]*query.BindVariable,
}
}

canAutocommit := len(rss) == 1 && vcursor.AutocommitApproval()
canAutocommit := false
if !s.NoAutoCommit {
canAutocommit = len(rss) == 1 && vcursor.AutocommitApproval()
}

result, errs := vcursor.ExecuteMultiShard(rss, queries, true, canAutocommit)
err = vterrors.Aggregate(errs)
if err != nil {
Expand Down
136 changes: 92 additions & 44 deletions go/vt/vtgate/engine/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,54 +11,102 @@ import (
"vitess.io/vitess/go/vt/vtgate/vindexes"
)

func TestSendUnsharded(t *testing.T) {
send := &Send{
Keyspace: &vindexes.Keyspace{
Name: "ks",
Sharded: false,
},
Query: "dummy_query",
TargetDestination: key.DestinationAllShards{},
func TestSendTable(t *testing.T) {
type testCase struct {
testName string
sharded bool
shards []string
destination key.Destination
expectedQueryLog []string
noAutoCommit bool
}

vc := &loggingVCursor{shards: []string{"0"}}
_, err := send.Execute(vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
`ResolveDestinations ks [] Destinations:DestinationAllShards()`,
`ExecuteMultiShard ks.0: dummy_query {} true true`,
})

// Failure cases
vc = &loggingVCursor{shardErr: errors.New("shard_error")}
_, err = send.Execute(vc, map[string]*querypb.BindVariable{}, false)
expectError(t, "Execute", err, "sendExecute: shard_error")

vc = &loggingVCursor{}
_, err = send.Execute(vc, map[string]*querypb.BindVariable{}, false)
expectError(t, "Execute", err, "Keyspace does not have exactly one shard: []")
}

func TestSendSharded(t *testing.T) {
send := &Send{
Keyspace: &vindexes.Keyspace{
Name: "ks",
Sharded: true,
singleShard := []string{"0"}
twoShards := []string{"-20", "20-"}
tests := []testCase{
{
testName: "unsharded with no autocommit",
sharded: false,
shards: singleShard,
destination: key.DestinationAllShards{},
expectedQueryLog: []string{
`ResolveDestinations ks [] Destinations:DestinationAllShards()`,
`ExecuteMultiShard ks.0: dummy_query {} true true`,
},
noAutoCommit: true,
},
{
testName: "sharded with no autocommit",
sharded: true,
shards: twoShards,
destination: key.DestinationShard("20-"),
expectedQueryLog: []string{
`ResolveDestinations ks [] Destinations:DestinationShard(20-)`,
`ExecuteMultiShard ks.DestinationShard(20-): dummy_query {} true true`,
},
noAutoCommit: true,
},
{
testName: "unsharded",
sharded: false,
shards: singleShard,
destination: key.DestinationAllShards{},
expectedQueryLog: []string{
`ResolveDestinations ks [] Destinations:DestinationAllShards()`,
`ExecuteMultiShard ks.0: dummy_query {} true true`,
},
noAutoCommit: false,
},
{
testName: "sharded with single shard destination",
sharded: true,
shards: twoShards,
destination: key.DestinationShard("20-"),
expectedQueryLog: []string{
`ResolveDestinations ks [] Destinations:DestinationShard(20-)`,
`ExecuteMultiShard ks.DestinationShard(20-): dummy_query {} true true`,
},
noAutoCommit: false,
},
{
testName: "sharded with multi shard destination",
sharded: true,
shards: twoShards,
destination: key.DestinationAllShards{},
expectedQueryLog: []string{
`ResolveDestinations ks [] Destinations:DestinationAllShards()`,
`ExecuteMultiShard ks.-20: dummy_query {} ks.20-: dummy_query {} true false`,
},
noAutoCommit: false,
},
Query: "dummy_query",
TargetDestination: key.DestinationShard("20-"),
}

vc := &loggingVCursor{shards: []string{"-20", "20-"}}
_, err := send.Execute(vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
`ResolveDestinations ks [] Destinations:DestinationShard(20-)`,
`ExecuteMultiShard ks.DestinationShard(20-): dummy_query {} true true`,
})
for _, tc := range tests {
t.Run(tc.testName, func(t *testing.T) {
send := &Send{
Keyspace: &vindexes.Keyspace{
Name: "ks",
Sharded: tc.sharded,
},
Query: "dummy_query",
TargetDestination: tc.destination,
NoAutoCommit: false,
}
vc := &loggingVCursor{shards: tc.shards}
_, err := send.Execute(vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, tc.expectedQueryLog)

// Failure cases
vc = &loggingVCursor{shardErr: errors.New("shard_error")}
_, err = send.Execute(vc, map[string]*querypb.BindVariable{}, false)
expectError(t, "Execute", err, "sendExecute: shard_error")
// Failure cases
vc = &loggingVCursor{shardErr: errors.New("shard_error")}
_, err = send.Execute(vc, map[string]*querypb.BindVariable{}, false)
require.EqualError(t, err, "sendExecute: shard_error")

if !tc.sharded {
vc = &loggingVCursor{}
_, err = send.Execute(vc, map[string]*querypb.BindVariable{}, false)
require.EqualError(t, err, "Keyspace does not have exactly one shard: []")
}
})
}
}
5 changes: 1 addition & 4 deletions go/vt/vtgate/planbuilder/bypass.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ func buildPlanForBypass(stmt sqlparser.Statement, vschema ContextVSchema) (engin
Keyspace: keyspace,
TargetDestination: vschema.Destination(),
Query: sqlparser.String(stmt),
OpCode: ByPassOpCode,
NoAutoCommit: !sqlparser.IsDMLStatement(stmt),
}, nil
}

//ByPassOpCode is the opcode
const ByPassOpCode = "ByPass"
8 changes: 4 additions & 4 deletions go/vt/vtgate/planbuilder/testdata/bypass_cases.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
},
"TargetDestination": "-80",
"Query": "select count(*), col from unsharded",
"OpCode": "ByPass"
"NoAutoCommit": true
}
}

Expand All @@ -24,7 +24,7 @@
},
"TargetDestination": "-80",
"Query": "update user set val = 1 where id = 18446744073709551616 and id = 1",
"OpCode": "ByPass"
"NoAutoCommit": false
}
}

Expand All @@ -39,7 +39,7 @@
},
"TargetDestination": "-80",
"Query": "delete from USER where ID = 42",
"OpCode": "ByPass"
"NoAutoCommit": false
}
}

Expand All @@ -54,6 +54,6 @@
},
"TargetDestination": "-80",
"Query": "insert into USER(ID, NAME) values (42, 'ms X')",
"OpCode": "ByPass"
"NoAutoCommit": false
}
}

0 comments on commit c4658bf

Please sign in to comment.