Skip to content

Commit

Permalink
Merge branch 'slack-vitess-r14.0.5' into slack-vitess-r14.0.5-go1.19.…
Browse files Browse the repository at this point in the history
…10-upgrade
  • Loading branch information
maksimov authored Sep 13, 2023
2 parents e84e50a + 63e6952 commit bf1c7b6
Show file tree
Hide file tree
Showing 13 changed files with 334 additions and 63 deletions.
2 changes: 2 additions & 0 deletions go/flags/endtoend/vtgate.txt
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,8 @@ Usage of vtgate:
Select tcp, tcp4, or tcp6 to control the socket type. (default tcp)
--no_scatter
when set to true, the planner will fail instead of producing a plan that includes scatter queries
--no_vstream_copy
when set to true, vstream copy will not be allowed - temporary until we can properly support RDONLY for this
--normalize_queries
Rewrite queries with bind vars. Turn this off if the app itself sends normalized queries with bind vars. (default true)
--onclose_timeout duration
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtexplain/vtexplain_vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (vte *VTExplain) initVtgateExecutor(vSchemaStr, ksShardMapStr string, opts

streamSize := 10
var schemaTracker vtgate.SchemaInfo // no schema tracker for these tests
vte.vtgateExecutor = vtgate.NewExecutor(context.Background(), vte.explainTopo, vtexplainCell, resolver, opts.Normalize, false, streamSize, cache.DefaultConfig, schemaTracker, false, opts.PlannerVersion)
vte.vtgateExecutor = vtgate.NewExecutor(context.Background(), vte.explainTopo, vtexplainCell, resolver, opts.Normalize, false, streamSize, cache.DefaultConfig, schemaTracker, false, opts.PlannerVersion, false)

return nil
}
Expand Down
12 changes: 12 additions & 0 deletions go/vt/vtgate/endtoend/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ create table t1(
primary key(id1)
) Engine=InnoDB;
create table t1_copy_resume(
id1 bigint,
id2 bigint,
primary key(id1)
) Engine=InnoDB;
create table t1_id2_idx(
id2 bigint,
keyspace_id varbinary(10),
Expand Down Expand Up @@ -133,6 +139,12 @@ create table t1_sharded(
Name: "t1_id2_vdx",
}},
},
"t1_copy_resume": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Column: "id1",
Name: "hash",
}},
},
"t1_sharded": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Column: "id1",
Expand Down
142 changes: 142 additions & 0 deletions go/vt/vtgate/endtoend/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"io"
"sort"
"sync"
"testing"

Expand Down Expand Up @@ -232,6 +233,119 @@ func TestVStreamCopyBasic(t *testing.T) {
}
}

func TestVStreamCopyResume(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
gconn, conn, mconn, closeConnections := initialize(ctx, t)
defer closeConnections()

_, err := conn.ExecuteFetch("insert into t1_copy_resume(id1,id2) values(1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8)", 1, false)
if err != nil {
t.Fatal(err)
}

// Any subsequent GTIDs will be part of the stream
mpos, err := mconn.PrimaryPosition()
require.NoError(t, err)

// lastPK is id1=4, meaning we should only copy rows for id1 IN(5,6,7,8,9)
lastPK := sqltypes.Result{
Fields: []*query.Field{{Name: "id1", Type: query.Type_INT64}},
Rows: [][]sqltypes.Value{{sqltypes.NewInt64(4)}},
}
tableLastPK := []*binlogdatapb.TableLastPK{{
TableName: "t1_copy_resume",
Lastpk: sqltypes.ResultToProto3(&lastPK),
}}

catchupQueries := []string{
"insert into t1_copy_resume(id1,id2) values(9,9)", // this row will show up twice: once in catchup and copy
"update t1_copy_resume set id2 = 10 where id1 = 1",
"insert into t1(id1, id2) values(100,100)",
"delete from t1_copy_resume where id1 = 1",
"update t1_copy_resume set id2 = 90 where id1 = 9",
}
for _, query := range catchupQueries {
_, err = conn.ExecuteFetch(query, 1, false)
require.NoError(t, err)
}

var shardGtids []*binlogdatapb.ShardGtid
var vgtid = &binlogdatapb.VGtid{}
shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{
Keyspace: "ks",
Shard: "-80",
Gtid: fmt.Sprintf("%s/%s", mpos.GTIDSet.Flavor(), mpos),
TablePKs: tableLastPK,
})
shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{
Keyspace: "ks",
Shard: "80-",
Gtid: fmt.Sprintf("%s/%s", mpos.GTIDSet.Flavor(), mpos),
TablePKs: tableLastPK,
})
vgtid.ShardGtids = shardGtids
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1_copy_resume",
Filter: "select * from t1_copy_resume",
}},
}
flags := &vtgatepb.VStreamFlags{}
reader, err := gconn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
if err != nil {
t.Fatal(err)
}
require.NotNil(t, reader)

expectedRowCopyEvents := 5 // id1 and id2 IN(5,6,7,8,9)
expectedCatchupEvents := len(catchupQueries) - 1 // insert into t1 should never reach
rowCopyEvents, replCatchupEvents := 0, 0
expectedEvents := []string{
`type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1_copy_resume" row_changes:{before:{lengths:1 lengths:1 values:"11"} after:{lengths:1 lengths:2 values:"110"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`,
`type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1_copy_resume" row_changes:{before:{lengths:1 lengths:2 values:"110"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`,
`type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"55"}} keyspace:"ks" shard:"-80"} keyspace:"ks" shard:"-80"`,
`type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"66"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`,
`type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"77"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`,
`type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"88"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`,
`type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"99"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`,
`type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:2 values:"990"}} keyspace:"ks" shard:"-80"} keyspace:"ks" shard:"-80"`,
`type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1_copy_resume" row_changes:{before:{lengths:1 lengths:1 values:"99"} after:{lengths:1 lengths:2 values:"990"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`,
}
var evs []*binlogdatapb.VEvent
for {
e, err := reader.Recv()
switch err {
case nil:
for _, ev := range e {
if ev.Type == binlogdatapb.VEventType_ROW {
evs = append(evs, ev)
if ev.Timestamp == 0 {
rowCopyEvents++
} else {
replCatchupEvents++
}
printEvents(evs) // for debugging ci failures
}
}
if expectedCatchupEvents == replCatchupEvents && expectedRowCopyEvents == rowCopyEvents {
sort.Sort(VEventSorter(evs))
for i, ev := range evs {
require.Regexp(t, expectedEvents[i], ev.String())
}
t.Logf("TestVStreamCopyResume was successful")
return
}
case io.EOF:
log.Infof("stream ended\n")
cancel()
default:
log.Errorf("Returned err %v", err)
t.Fatalf("remote error: %v\n", err)
}
}
}

func TestVStreamCurrent(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -396,3 +510,31 @@ func printEvents(evs []*binlogdatapb.VEvent) {
s += "===END===" + "\n"
log.Infof("%s", s)
}

// Sort the VEvents by the first row change's after value bytes primarily, with
// secondary ordering by timestamp (ASC). Note that row copy events do not have
// a timestamp and the value will be 0.
type VEventSorter []*binlogdatapb.VEvent

func (v VEventSorter) Len() int {
return len(v)
}
func (v VEventSorter) Swap(i, j int) {
v[i], v[j] = v[j], v[i]
}
func (v VEventSorter) Less(i, j int) bool {
valsI := v[i].GetRowEvent().RowChanges[0].After
if valsI == nil {
valsI = v[i].GetRowEvent().RowChanges[0].Before
}
valsJ := v[j].GetRowEvent().RowChanges[0].After
if valsJ == nil {
valsJ = v[j].GetRowEvent().RowChanges[0].Before
}
valI := string(valsI.Values)
valJ := string(valsJ.Values)
if valI == valJ {
return v[i].Timestamp < v[j].Timestamp
}
return valI < valJ
}
31 changes: 18 additions & 13 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ type Executor struct {

// allowScatter will fail planning if set to false and a plan contains any scatter queries
allowScatter bool
// allowVstreamCopy will fail on vstream copy if false and no GTID provided for the stream.
// This is temporary until RDONLYs are properly supported for bootstrapping.
allowVstreamCopy bool
}

var executorOnce sync.Once
Expand All @@ -127,20 +130,22 @@ func NewExecutor(
schemaTracker SchemaInfo,
noScatter bool,
pv plancontext.PlannerVersion,
noVstreamCopy bool,
) *Executor {
e := &Executor{
serv: serv,
cell: cell,
resolver: resolver,
scatterConn: resolver.scatterConn,
txConn: resolver.scatterConn.txConn,
plans: cache.NewDefaultCacheImpl(cacheCfg),
normalize: normalize,
warnShardedOnly: warnOnShardedOnly,
streamSize: streamSize,
schemaTracker: schemaTracker,
allowScatter: !noScatter,
pv: pv,
serv: serv,
cell: cell,
resolver: resolver,
scatterConn: resolver.scatterConn,
txConn: resolver.scatterConn.txConn,
plans: cache.NewDefaultCacheImpl(cacheCfg),
normalize: normalize,
warnShardedOnly: warnOnShardedOnly,
streamSize: streamSize,
schemaTracker: schemaTracker,
allowScatter: !noScatter,
allowVstreamCopy: !noVstreamCopy,
pv: pv,
}

vschemaacl.Init()
Expand Down Expand Up @@ -1318,7 +1323,7 @@ func (e *Executor) startVStream(ctx context.Context, rss []*srvtopo.ResolvedShar
return err
}

vsm := newVStreamManager(e.resolver.resolver, e.serv, e.cell)
vsm := newVStreamManager(e.resolver.resolver, e.serv, e.cell, e.allowVstreamCopy)
vs := &vstream{
vgtid: vgtid,
tabletType: topodatapb.TabletType_PRIMARY,
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/executor_framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ func createExecutorEnv() (executor *Executor, sbc1, sbc2, sbclookup *sandboxconn
bad.VSchema = badVSchema

getSandbox(KsTestUnsharded).VSchema = unshardedVSchema
executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3)
executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3, false)

key.AnyShardPicker = DestinationAnyShardPickerFirstShard{}
// create a new session each time so that ShardSessions don't get re-used across tests
Expand All @@ -493,7 +493,7 @@ func createCustomExecutor(vschema string) (executor *Executor, sbc1, sbc2, sbclo
sbclookup = hc.AddTestTablet(cell, "0", 1, KsTestUnsharded, "0", topodatapb.TabletType_PRIMARY, true, 1, nil)
getSandbox(KsTestUnsharded).VSchema = unshardedVSchema

executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3)
executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3, false)
// create a new session each time so that ShardSessions don't get re-used across tests
primarySession = &vtgatepb.Session{
TargetString: "@primary",
Expand Down Expand Up @@ -522,7 +522,7 @@ func createCustomExecutorSetValues(vschema string, values []*sqltypes.Result) (e
sbclookup = hc.AddTestTablet(cell, "0", 1, KsTestUnsharded, "0", topodatapb.TabletType_PRIMARY, true, 1, nil)
getSandbox(KsTestUnsharded).VSchema = unshardedVSchema

executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3)
executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3, false)
// create a new session each time so that ShardSessions don't get re-used across tests
primarySession = &vtgatepb.Session{
TargetString: "@primary",
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/executor_select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1483,7 +1483,7 @@ func TestStreamSelectIN(t *testing.T) {
}

func createExecutor(serv *sandboxTopo, cell string, resolver *Resolver) *Executor {
return NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3)
return NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3, false)
}

func TestSelectScatter(t *testing.T) {
Expand Down Expand Up @@ -2981,7 +2981,7 @@ func TestStreamOrderByLimitWithMultipleResults(t *testing.T) {
count++
}

executor := NewExecutor(context.Background(), serv, cell, resolver, true, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3)
executor := NewExecutor(context.Background(), serv, cell, resolver, true, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3, false)
before := runtime.NumGoroutine()

query := "select id, col from user order by id limit 2"
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/executor_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestStreamSQLSharded(t *testing.T) {
for _, shard := range shards {
_ = hc.AddTestTablet(cell, shard, 1, "TestExecutor", shard, topodatapb.TabletType_PRIMARY, true, 1, nil)
}
executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3)
executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3, false)

sql := "stream * from sharded_user_msgs"
result, err := executorStreamMessages(executor, sql)
Expand Down
19 changes: 15 additions & 4 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/discovery"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/topo"

Expand All @@ -47,6 +48,9 @@ type vstreamManager struct {
resolver *srvtopo.Resolver
toposerv srvtopo.Server
cell string
// allowVstreamCopy will fail on vstream copy if false and no GTID provided for the stream.
// This is temporary until RDONLYs are properly supported for bootstrapping.
allowVstreamCopy bool

vstreamsCreated *stats.CountersWithMultiLabels
vstreamsLag *stats.GaugesWithMultiLabels
Expand Down Expand Up @@ -119,12 +123,13 @@ type journalEvent struct {
done chan struct{}
}

func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell string) *vstreamManager {
func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell string, allowVstreamCopy bool) *vstreamManager {
exporter := servenv.NewExporter(cell, "VStreamManager")
return &vstreamManager{
resolver: resolver,
toposerv: serv,
cell: cell,
resolver: resolver,
toposerv: serv,
cell: cell,
allowVstreamCopy: allowVstreamCopy,
vstreamsCreated: exporter.NewCountersWithMultiLabels(
"VStreamsCreated",
"Number of vstreams created",
Expand Down Expand Up @@ -540,6 +545,12 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
log.Infof("Starting to vstream from %s", tablet.Alias.String())
// Safe to access sgtid.Gtid here (because it can't change until streaming begins).
var vstreamCreatedOnce sync.Once

if !vs.vsm.allowVstreamCopy && (sgtid.Gtid == "" || len(sgtid.TablePKs) > 0) {
// We are attempting a vstream copy, but are not allowed (temporary until we can properly support RDONLYs for bootstrapping)
return vterrors.NewErrorf(vtrpc.Code_UNIMPLEMENTED, vterrors.NotSupportedYet, "vstream copy is not currently supported")
}

err = tabletConn.VStream(ctx, target, sgtid.Gtid, sgtid.TablePKs, vs.filter, func(events []*binlogdatapb.VEvent) error {
// We received a valid event. Reset error count.
errCount = 0
Expand Down
Loading

0 comments on commit bf1c7b6

Please sign in to comment.