Skip to content

Commit

Permalink
Merge pull request #8727 from vmg/vmg/primitives
Browse files Browse the repository at this point in the history
engine: allow retrying partial primitives
  • Loading branch information
deepthi authored Aug 30, 2021
2 parents d348b8a + c7da674 commit 7d0607c
Show file tree
Hide file tree
Showing 58 changed files with 357 additions and 319 deletions.
8 changes: 4 additions & 4 deletions go/vt/vtgate/engine/concatenate.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func formatTwoOptionsNicely(a, b string) string {
var errWrongNumberOfColumnsInSelect = vterrors.NewErrorf(vtrpcpb.Code_FAILED_PRECONDITION, vterrors.WrongNumberOfColumnsInSelect, "The used SELECT statements have a different number of columns")

// Execute performs a non-streaming exec.
func (c *Concatenate) Execute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
func (c *Concatenate) TryExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
res, err := c.execSources(vcursor, bindVars, wantfields)
if err != nil {
return nil, err
Expand Down Expand Up @@ -124,7 +124,7 @@ func (c *Concatenate) execSources(vcursor VCursor, bindVars map[string]*querypb.
for i, source := range c.Sources {
currIndex, currSource := i, source
g.Go(func() error {
result, err := currSource.Execute(vcursor, bindVars, wantfields)
result, err := vcursor.ExecutePrimitive(currSource, bindVars, wantfields)
if err != nil {
return err
}
Expand All @@ -140,7 +140,7 @@ func (c *Concatenate) execSources(vcursor VCursor, bindVars map[string]*querypb.
}

// StreamExecute performs a streaming exec.
func (c *Concatenate) StreamExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
func (c *Concatenate) TryStreamExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
var seenFields []*querypb.Field
var fieldset sync.WaitGroup
var cbMu sync.Mutex
Expand All @@ -154,7 +154,7 @@ func (c *Concatenate) StreamExecute(vcursor VCursor, bindVars map[string]*queryp
currIndex, currSource := i, source

g.Go(func() error {
err := currSource.StreamExecute(vcursor, bindVars, wantfields, func(resultChunk *sqltypes.Result) error {
err := vcursor.StreamExecutePrimitive(currSource, bindVars, wantfields, func(resultChunk *sqltypes.Result) error {
// if we have fields to compare, make sure all the fields are all the same
if currIndex == 0 && !fieldsSent {
defer fieldset.Done()
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/engine/concatenate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestConcatenate_NoErrors(t *testing.T) {
}

t.Run(tc.testName+"-Execute", func(t *testing.T) {
qr, err := concatenate.Execute(&noopVCursor{ctx: context.Background()}, nil, true)
qr, err := concatenate.TryExecute(&noopVCursor{ctx: context.Background()}, nil, true)
if tc.expectedError == "" {
require.NoError(t, err)
require.Equal(t, tc.expectedResult, qr)
Expand Down Expand Up @@ -129,7 +129,7 @@ func TestConcatenate_WithErrors(t *testing.T) {
},
}
ctx := context.Background()
_, err := concatenate.Execute(&noopVCursor{ctx: ctx}, nil, true)
_, err := concatenate.TryExecute(&noopVCursor{ctx: ctx}, nil, true)
require.EqualError(t, err, strFailed)

_, err = wrapStreamExecute(concatenate, &noopVCursor{ctx: ctx}, nil, true)
Expand All @@ -142,7 +142,7 @@ func TestConcatenate_WithErrors(t *testing.T) {
&fakePrimitive{results: []*sqltypes.Result{fake, fake}},
},
}
_, err = concatenate.Execute(&noopVCursor{ctx: ctx}, nil, true)
_, err = concatenate.TryExecute(&noopVCursor{ctx: ctx}, nil, true)
require.EqualError(t, err, strFailed)
_, err = wrapStreamExecute(concatenate, &noopVCursor{ctx: ctx}, nil, true)
require.EqualError(t, err, strFailed)
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/engine/dbddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (c *DBDDL) GetTableName() string {
}

// Execute implements the Primitive interface
func (c *DBDDL) Execute(vcursor VCursor, _ map[string]*querypb.BindVariable, _ bool) (*sqltypes.Result, error) {
func (c *DBDDL) TryExecute(vcursor VCursor, _ map[string]*querypb.BindVariable, _ bool) (*sqltypes.Result, error) {
name := vcursor.GetDBDDLPluginName()
plugin, ok := databaseCreatorPlugins[name]
if !ok {
Expand Down Expand Up @@ -181,8 +181,8 @@ func (c *DBDDL) dropDatabase(vcursor VCursor, plugin DBDDLPlugin) (*sqltypes.Res
}

// StreamExecute implements the Primitive interface
func (c *DBDDL) StreamExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
res, err := c.Execute(vcursor, bindVars, wantfields)
func (c *DBDDL) TryStreamExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
res, err := c.TryExecute(vcursor, bindVars, wantfields)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtgate/engine/dbddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestDBDDLCreateExecute(t *testing.T) {

vc := &loggingVCursor{dbDDLPlugin: pluginName}

_, err := primitive.Execute(vc, nil, false)
_, err := primitive.TryExecute(vc, nil, false)
require.NoError(t, err)
require.True(t, plugin.createCalled)
require.False(t, plugin.dropCalled)
Expand All @@ -61,7 +61,7 @@ func TestDBDDLDropExecute(t *testing.T) {

vc := &loggingVCursor{dbDDLPlugin: pluginName, ksAvailable: false}

_, err := primitive.Execute(vc, nil, false)
_, err := primitive.TryExecute(vc, nil, false)
require.NoError(t, err)
require.False(t, plugin.createCalled)
require.True(t, plugin.dropCalled)
Expand All @@ -74,11 +74,11 @@ func TestDBDDLTimeout(t *testing.T) {

primitive := &DBDDL{name: "ks", create: true, queryTimeout: 100}
vc := &loggingVCursor{dbDDLPlugin: pluginName, shardErr: fmt.Errorf("db not available")}
_, err := primitive.Execute(vc, nil, false)
_, err := primitive.TryExecute(vc, nil, false)
assert.EqualError(t, err, "could not validate create database: destination not resolved")

primitive = &DBDDL{name: "ks", queryTimeout: 100}
vc = &loggingVCursor{dbDDLPlugin: pluginName, ksAvailable: true}
_, err = primitive.Execute(vc, nil, false)
_, err = primitive.TryExecute(vc, nil, false)
assert.EqualError(t, err, "could not validate drop database: keyspace still available in vschema")
}
12 changes: 6 additions & 6 deletions go/vt/vtgate/engine/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ func (ddl *DDL) isOnlineSchemaDDL() bool {
}

// Execute implements the Primitive interface
func (ddl *DDL) Execute(vcursor VCursor, bindVars map[string]*query.BindVariable, wantfields bool) (result *sqltypes.Result, err error) {
func (ddl *DDL) TryExecute(vcursor VCursor, bindVars map[string]*query.BindVariable, wantfields bool) (result *sqltypes.Result, err error) {
if ddl.CreateTempTable {
vcursor.Session().HasCreatedTempTable()
vcursor.Session().NeedsReservedConn()
return ddl.NormalDDL.Execute(vcursor, bindVars, wantfields)
return vcursor.ExecutePrimitive(ddl.NormalDDL, bindVars, wantfields)
}

ddlStrategySetting, err := schema.ParseDDLStrategy(vcursor.Session().GetDDLStrategy())
Expand All @@ -104,18 +104,18 @@ func (ddl *DDL) Execute(vcursor VCursor, bindVars map[string]*query.BindVariable
if !ddl.OnlineDDLEnabled {
return nil, schema.ErrOnlineDDLDisabled
}
return ddl.OnlineDDL.Execute(vcursor, bindVars, wantfields)
return vcursor.ExecutePrimitive(ddl.OnlineDDL, bindVars, wantfields)
default: // non online-ddl
if !ddl.DirectDDLEnabled {
return nil, schema.ErrDirectDDLDisabled
}
return ddl.NormalDDL.Execute(vcursor, bindVars, wantfields)
return vcursor.ExecutePrimitive(ddl.NormalDDL, bindVars, wantfields)
}
}

// StreamExecute implements the Primitive interface
func (ddl *DDL) StreamExecute(vcursor VCursor, bindVars map[string]*query.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
results, err := ddl.Execute(vcursor, bindVars, wantfields)
func (ddl *DDL) TryStreamExecute(vcursor VCursor, bindVars map[string]*query.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
results, err := ddl.TryExecute(vcursor, bindVars, wantfields)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/engine/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (del *Delete) GetTableName() string {
}

// Execute performs a non-streaming exec.
func (del *Delete) Execute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, _ bool) (*sqltypes.Result, error) {
func (del *Delete) TryExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, _ bool) (*sqltypes.Result, error) {
if del.QueryTimeout != 0 {
cancel := vcursor.SetContextTimeout(time.Duration(del.QueryTimeout) * time.Millisecond)
defer cancel()
Expand All @@ -93,7 +93,7 @@ func (del *Delete) Execute(vcursor VCursor, bindVars map[string]*querypb.BindVar
}

// StreamExecute performs a streaming exec.
func (del *Delete) StreamExecute(VCursor, map[string]*querypb.BindVariable, bool, func(*sqltypes.Result) error) error {
func (del *Delete) TryStreamExecute(VCursor, map[string]*querypb.BindVariable, bool, func(*sqltypes.Result) error) error {
return fmt.Errorf("query %q cannot be used for streaming", del.Query)
}

Expand Down
32 changes: 16 additions & 16 deletions go/vt/vtgate/engine/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestDeleteUnsharded(t *testing.T) {
}

vc := newDMLTestVCursor("0")
_, err := del.Execute(vc, map[string]*querypb.BindVariable{}, false)
_, err := del.TryExecute(vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
`ResolveDestinations ks [] Destinations:DestinationAllShards()`,
Expand All @@ -50,11 +50,11 @@ func TestDeleteUnsharded(t *testing.T) {

// Failure cases
vc = &loggingVCursor{shardErr: errors.New("shard_error")}
_, err = del.Execute(vc, map[string]*querypb.BindVariable{}, false)
_, err = del.TryExecute(vc, map[string]*querypb.BindVariable{}, false)
require.EqualError(t, err, "shard_error")

vc = &loggingVCursor{}
_, err = del.Execute(vc, map[string]*querypb.BindVariable{}, false)
_, err = del.TryExecute(vc, map[string]*querypb.BindVariable{}, false)
require.EqualError(t, err, "cannot send query to multiple shards for un-sharded database: []")
}

Expand All @@ -74,7 +74,7 @@ func TestDeleteEqual(t *testing.T) {
}

vc := newDMLTestVCursor("-20", "20-")
_, err := del.Execute(vc, map[string]*querypb.BindVariable{}, false)
_, err := del.TryExecute(vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
`ResolveDestinations ks [] Destinations:DestinationKeyspaceID(166b40b44aba4bd6)`,
Expand All @@ -83,7 +83,7 @@ func TestDeleteEqual(t *testing.T) {

// Failure case
del.Values = []sqltypes.PlanValue{{Key: "aa"}}
_, err = del.Execute(vc, map[string]*querypb.BindVariable{}, false)
_, err = del.TryExecute(vc, map[string]*querypb.BindVariable{}, false)
require.EqualError(t, err, "missing bind var aa")
}

Expand All @@ -107,7 +107,7 @@ func TestDeleteEqualNoRoute(t *testing.T) {
}

vc := newDMLTestVCursor("0")
_, err := del.Execute(vc, map[string]*querypb.BindVariable{}, false)
_, err := del.TryExecute(vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
// This lookup query will return no rows. So, the DML will not be sent anywhere.
Expand Down Expand Up @@ -136,7 +136,7 @@ func TestDeleteEqualNoScatter(t *testing.T) {
}

vc := newDMLTestVCursor("0")
_, err := del.Execute(vc, map[string]*querypb.BindVariable{}, false)
_, err := del.TryExecute(vc, map[string]*querypb.BindVariable{}, false)
require.EqualError(t, err, "cannot map vindex to unique keyspace id: DestinationKeyRange(-)")
}

Expand Down Expand Up @@ -166,7 +166,7 @@ func TestDeleteOwnedVindex(t *testing.T) {
vc := newDMLTestVCursor("-20", "20-")
vc.results = results

_, err := del.Execute(vc, map[string]*querypb.BindVariable{}, false)
_, err := del.TryExecute(vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
`ResolveDestinations sharded [] Destinations:DestinationKeyspaceID(166b40b44aba4bd6)`,
Expand All @@ -182,7 +182,7 @@ func TestDeleteOwnedVindex(t *testing.T) {

// No rows changing
vc = newDMLTestVCursor("-20", "20-")
_, err = del.Execute(vc, map[string]*querypb.BindVariable{}, false)
_, err = del.TryExecute(vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
`ResolveDestinations sharded [] Destinations:DestinationKeyspaceID(166b40b44aba4bd6)`,
Expand All @@ -205,7 +205,7 @@ func TestDeleteOwnedVindex(t *testing.T) {
vc = newDMLTestVCursor("-20", "20-")
vc.results = results

_, err = del.Execute(vc, map[string]*querypb.BindVariable{}, false)
_, err = del.TryExecute(vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
`ResolveDestinations sharded [] Destinations:DestinationKeyspaceID(166b40b44aba4bd6)`,
Expand Down Expand Up @@ -235,7 +235,7 @@ func TestDeleteSharded(t *testing.T) {
}

vc := newDMLTestVCursor("-20", "20-")
_, err := del.Execute(vc, map[string]*querypb.BindVariable{}, false)
_, err := del.TryExecute(vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
`ResolveDestinations sharded [] Destinations:DestinationAllShards()`,
Expand All @@ -244,13 +244,13 @@ func TestDeleteSharded(t *testing.T) {

// Failure case
vc = &loggingVCursor{shardErr: errors.New("shard_error")}
_, err = del.Execute(vc, map[string]*querypb.BindVariable{}, false)
_, err = del.TryExecute(vc, map[string]*querypb.BindVariable{}, false)
require.EqualError(t, err, "shard_error")
}

func TestDeleteNoStream(t *testing.T) {
del := &Delete{}
err := del.StreamExecute(nil, nil, false, nil)
err := del.TryStreamExecute(nil, nil, false, nil)
require.EqualError(t, err, `query "" cannot be used for streaming`)
}

Expand Down Expand Up @@ -278,7 +278,7 @@ func TestDeleteScatterOwnedVindex(t *testing.T) {
vc := newDMLTestVCursor("-20", "20-")
vc.results = results

_, err := del.Execute(vc, map[string]*querypb.BindVariable{}, false)
_, err := del.TryExecute(vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
`ResolveDestinations sharded [] Destinations:DestinationAllShards()`,
Expand All @@ -295,7 +295,7 @@ func TestDeleteScatterOwnedVindex(t *testing.T) {
// No rows changing
vc = newDMLTestVCursor("-20", "20-")

_, err = del.Execute(vc, map[string]*querypb.BindVariable{}, false)
_, err = del.TryExecute(vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
`ResolveDestinations sharded [] Destinations:DestinationAllShards()`,
Expand All @@ -318,7 +318,7 @@ func TestDeleteScatterOwnedVindex(t *testing.T) {
vc = newDMLTestVCursor("-20", "20-")
vc.results = results

_, err = del.Execute(vc, map[string]*querypb.BindVariable{}, false)
_, err = del.TryExecute(vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
`ResolveDestinations sharded [] Destinations:DestinationAllShards()`,
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtgate/engine/distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ func newProbeTable() *probeTable {
}

// Execute implements the Primitive interface
func (d *Distinct) Execute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
input, err := d.Source.Execute(vcursor, bindVars, wantfields)
func (d *Distinct) TryExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
input, err := vcursor.ExecutePrimitive(d.Source, bindVars, wantfields)
if err != nil {
return nil, err
}
Expand All @@ -116,10 +116,10 @@ func (d *Distinct) Execute(vcursor VCursor, bindVars map[string]*querypb.BindVar
}

// StreamExecute implements the Primitive interface
func (d *Distinct) StreamExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
func (d *Distinct) TryStreamExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
pt := newProbeTable()

err := d.Source.StreamExecute(vcursor, bindVars, wantfields, func(input *sqltypes.Result) error {
err := vcursor.StreamExecutePrimitive(d.Source, bindVars, wantfields, func(input *sqltypes.Result) error {
result := &sqltypes.Result{
Fields: input.Fields,
InsertID: input.InsertID,
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/distinct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestDistinct(t *testing.T) {
t.Run(tc.testName+"-Execute", func(t *testing.T) {
distinct := &Distinct{Source: &fakePrimitive{results: []*sqltypes.Result{tc.inputs}}}

qr, err := distinct.Execute(&noopVCursor{ctx: context.Background()}, nil, true)
qr, err := distinct.TryExecute(&noopVCursor{ctx: context.Background()}, nil, true)
if tc.expectedError == "" {
require.NoError(t, err)
got := fmt.Sprintf("%v", qr.Rows)
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtgate/engine/fake_primitive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (f *fakePrimitive) GetTableName() string {
return "fakeTable"
}

func (f *fakePrimitive) Execute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
func (f *fakePrimitive) TryExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
f.log = append(f.log, fmt.Sprintf("Execute %v %v", printBindVars(bindVars), wantfields))
if f.results == nil {
return nil, f.sendErr
Expand All @@ -77,7 +77,7 @@ func (f *fakePrimitive) Execute(vcursor VCursor, bindVars map[string]*querypb.Bi
return r, nil
}

func (f *fakePrimitive) StreamExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
func (f *fakePrimitive) TryStreamExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
f.log = append(f.log, fmt.Sprintf("StreamExecute %v %v", printBindVars(bindVars), wantfields))
if f.results == nil {
return f.sendErr
Expand Down Expand Up @@ -112,7 +112,7 @@ func (f *fakePrimitive) StreamExecute(vcursor VCursor, bindVars map[string]*quer

func (f *fakePrimitive) GetFields(vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
f.log = append(f.log, fmt.Sprintf("GetFields %v", printBindVars(bindVars)))
return f.Execute(vcursor, bindVars, true /* wantfields */)
return f.TryExecute(vcursor, bindVars, true /* wantfields */)
}

func (f *fakePrimitive) ExpectLog(t *testing.T, want []string) {
Expand All @@ -128,7 +128,7 @@ func (f *fakePrimitive) NeedsTransaction() bool {

func wrapStreamExecute(prim Primitive, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
var result *sqltypes.Result
err := prim.StreamExecute(vcursor, bindVars, wantfields, func(r *sqltypes.Result) error {
err := prim.TryStreamExecute(vcursor, bindVars, wantfields, func(r *sqltypes.Result) error {
if result == nil {
result = r
} else {
Expand Down
Loading

0 comments on commit 7d0607c

Please sign in to comment.