Skip to content

Commit

Permalink
Merge branch 'release-18.0' of https://github.com/vitessio/vitess int…
Browse files Browse the repository at this point in the history
…o release-18.0-github

Signed-off-by: Arthur Schreiber <[email protected]>
  • Loading branch information
arthurschreiber committed Oct 28, 2024
2 parents 2aa50d2 + c58abd9 commit 20fd074
Show file tree
Hide file tree
Showing 7 changed files with 355 additions and 4 deletions.
16 changes: 16 additions & 0 deletions go/test/endtoend/vtgate/lookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,22 @@ func TestConsistentLookupUpdate(t *testing.T) {
require.Empty(t, qr.Rows)
}

func TestSelectMultiEqualLookup(t *testing.T) {
conn, closer := start(t)
defer closer()

utils.Exec(t, conn, "insert into t10 (id, sharding_key, col1) values (1, 1, 'bar'), (2, 1, 'bar'), (3, 1, 'bar'), (4, 2, 'bar'), (5, 2, 'bar')")

for _, workload := range []string{"oltp", "olap"} {
t.Run(workload, func(t *testing.T) {
utils.Exec(t, conn, "set workload = "+workload)

utils.AssertMatches(t, conn, "select id from t10 WHERE (col1, id) IN (('bar', 1), ('baz', 2), ('qux', 3), ('barbar', 4))", "[[INT64(1)]]")
utils.AssertMatches(t, conn, "select id from t10 WHERE (col1 = 'bar' AND id = 1) OR (col1 = 'baz' AND id = 2) OR (col1 = 'qux' AND id = 3) OR (col1 = 'barbar' AND id = 4)", "[[INT64(1)]]")
})
}
}

func TestSelectNullLookup(t *testing.T) {
conn, closer := start(t)
defer closer()
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/vtgate/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,4 @@ create table t11
col2 int,
col3 int,
primary key (id)
) Engine = InnoDB;
) Engine = InnoDB;
10 changes: 8 additions & 2 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
healthy: make(map[KeyspaceShardTabletType][]*TabletHealth),
subscribers: make(map[chan *TabletHealth]struct{}),
cellAliases: make(map[string]string),
loadTabletsTrigger: make(chan struct{}),
loadTabletsTrigger: make(chan struct{}, 1),
}
var topoWatchers []*TopologyWatcher
var filter TabletFilter
Expand Down Expand Up @@ -516,7 +516,13 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Targ
if prevTarget.TabletType == topodata.TabletType_PRIMARY {
if primaries := hc.healthData[oldTargetKey]; len(primaries) == 0 {
log.Infof("We will have no health data for the next new primary tablet after demoting the tablet: %v, so start loading tablets now", topotools.TabletIdent(th.Tablet))
hc.loadTabletsTrigger <- struct{}{}
// We want to trigger a loadTablets call, but if the channel is not empty
// then a trigger is already scheduled, we don't need to trigger another one.
// This also prevents the code from deadlocking as described in https://github.com/vitessio/vitess/issues/16994.
select {
case hc.loadTabletsTrigger <- struct{}{}:
default:
}
}
}
}
Expand Down
65 changes: 65 additions & 0 deletions go/vt/discovery/topology_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/test/utils"
querypb "vitess.io/vitess/go/vt/proto/query"

"vitess.io/vitess/go/vt/logutil"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
Expand Down Expand Up @@ -630,3 +631,67 @@ func TestFilterByKeypsaceSkipsIgnoredTablets(t *testing.T) {

tw.Stop()
}

// TestDeadlockBetweenTopologyWatcherAndHealthCheck tests the possibility of a deadlock
// between the topology watcher and the health check.
// The issue https://github.com/vitessio/vitess/issues/16994 has more details on the deadlock.
func TestDeadlockBetweenTopologyWatcherAndHealthCheck(t *testing.T) {
ctx := utils.LeakCheckContext(t)

// create a new memory topo server and an health check instance.
ts, _ := memorytopo.NewServerAndFactory(ctx, "zone-1")
hc := NewHealthCheck(ctx, time.Hour, time.Hour, ts, "zone-1", "")
defer hc.Close()
defer hc.topoWatchers[0].Stop()

// Add a tablet to the topology.
tablet1 := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "zone-1",
Uid: 100,
},
Type: topodatapb.TabletType_REPLICA,
Hostname: "host1",
PortMap: map[string]int32{
"grpc": 123,
},
Keyspace: "keyspace",
Shard: "shard",
}
err := ts.CreateTablet(ctx, tablet1)
// Run the first loadTablets call to ensure the tablet is present in the topology watcher.
hc.topoWatchers[0].loadTablets()
require.NoError(t, err)

// We want to run updateHealth with arguments that always
// make it trigger load Tablets.
th := &TabletHealth{
Tablet: tablet1,
Target: &querypb.Target{
Keyspace: "keyspace",
Shard: "shard",
TabletType: topodatapb.TabletType_REPLICA,
},
}
prevTarget := &querypb.Target{
Keyspace: "keyspace",
Shard: "shard",
TabletType: topodatapb.TabletType_PRIMARY,
}

// If we run the updateHealth function often enough, then we
// will see the deadlock where the topology watcher is trying to replace
// the tablet in the health check, but health check has the mutex acquired
// already because it is calling updateHealth.
// updateHealth itself will be stuck trying to send on the shared channel.
for i := 0; i < 10; i++ {
// Update the port of the tablet so that when update Health asks topo watcher to
// refresh the tablets, it finds an update and tries to replace it.
_, err = ts.UpdateTabletFields(ctx, tablet1.Alias, func(t *topodatapb.Tablet) error {
t.PortMap["testing_port"] = int32(i + 1)
return nil
})
require.NoError(t, err)
hc.updateHealth(th, prevTarget, false, false)
}
}
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/vindex_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func (vr *VindexLookup) generateIds(ctx context.Context, vcursor VCursor, bindVa
switch vr.Opcode {
case Equal, EqualUnique:
return []sqltypes.Value{value.Value(vcursor.ConnCollation())}, nil
case IN:
case IN, MultiEqual:
return value.TupleValues(), nil
}
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "opcode %s not supported for VindexLookup", vr.Opcode.String())
Expand Down
191 changes: 191 additions & 0 deletions go/vt/vtgate/executor_select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2874,6 +2874,197 @@ func TestSubQueryAndQueryWithLimit(t *testing.T) {
assert.Equal(t, `type:INT64 value:"100"`, sbc2.Queries[1].BindVariables["__upper_limit"].String())
}

func TestSelectUsingLookupColumn(t *testing.T) {
t.Run("using multi value tuple", func(t *testing.T) {
ctx := utils.LeakCheckContext(t)

// Special setup: Don't use createExecutorEnv.
cell := "aa"
hc := discovery.NewFakeHealthCheck(nil)

u := createSandbox(KsTestUnsharded)
s := createSandbox(KsTestSharded)

s.VSchema = executorVSchema
u.VSchema = unshardedVSchema

serv := newSandboxForCells(ctx, []string{cell})
resolver := newTestResolver(ctx, hc, serv, cell)

shards := []string{"-20", "20-40", "40-60", "60-80", "80-a0", "a0-c0", "c0-e0", "e0-"}
sbcs := []*sandboxconn.SandboxConn{}
for _, shard := range shards {
sbcs = append(sbcs, hc.AddTestTablet(cell, shard, 1, "TestExecutor", shard, topodatapb.TabletType_PRIMARY, true, 1, nil))
}

sbclookup := hc.AddTestTablet(cell, "0", 1, KsTestUnsharded, "0", topodatapb.TabletType_PRIMARY, true, 1, nil)

executor := createExecutor(ctx, serv, cell, resolver)
defer executor.Close()
logChan := executor.queryLogger.Subscribe("Test")
defer executor.queryLogger.Unsubscribe(logChan)

// Only lookup results on shard `40-60` (`sbc[2]`)
sbclookup.SetResults([]*sqltypes.Result{{
Fields: []*querypb.Field{
{Name: "lu_col", Type: sqltypes.Int32, Charset: collations.CollationBinaryID, Flags: uint32(querypb.MySqlFlag_NUM_FLAG)},
{Name: "keyspace_id", Type: sqltypes.VarBinary, Charset: collations.CollationBinaryID, Flags: uint32(querypb.MySqlFlag_BINARY_FLAG)},
},
Rows: [][]sqltypes.Value{{
sqltypes.NewInt32(2),
sqltypes.MakeTrusted(sqltypes.VarBinary, []byte("\x45")),
}},
}})

sbcs[2].SetResults([]*sqltypes.Result{{
Fields: []*querypb.Field{
{Name: "nv_lu_col", Type: sqltypes.Int32, Charset: collations.CollationBinaryID, Flags: uint32(querypb.MySqlFlag_NUM_FLAG)},
{Name: "other", Type: sqltypes.VarChar, Charset: collations.CollationUtf8mb4ID},
},
Rows: [][]sqltypes.Value{{
sqltypes.NewInt32(2),
sqltypes.NewVarChar("baz"),
}},
}})

result, err := exec(executor, NewSafeSession(&vtgatepb.Session{
TargetString: KsTestSharded,
}), "select nv_lu_col, other from t2_lookup WHERE (nv_lu_col, other) IN ((1, 'bar'), (2, 'baz'), (3, 'qux'), (4, 'brz'), (5, 'brz'))")

require.NoError(t, err)

require.Len(t, sbclookup.Queries, 1)
require.Len(t, sbcs[0].Queries, 0)
require.Len(t, sbcs[1].Queries, 0)
require.Len(t, sbcs[2].Queries, 1)
require.Len(t, sbcs[3].Queries, 0)
require.Len(t, sbcs[4].Queries, 0)
require.Len(t, sbcs[5].Queries, 0)
require.Len(t, sbcs[6].Queries, 0)
require.Len(t, sbcs[7].Queries, 0)

require.Equal(t, []*querypb.BoundQuery{{
Sql: "select nv_lu_col, other from t2_lookup where (nv_lu_col, other) in ((1, 'bar'), (2, 'baz'), (3, 'qux'), (4, 'brz'), (5, 'brz'))",
BindVariables: map[string]*querypb.BindVariable{},
}}, sbcs[2].Queries)

wantResult := &sqltypes.Result{
Fields: []*querypb.Field{
{Name: "nv_lu_col", Type: sqltypes.Int32, Charset: collations.CollationBinaryID, Flags: uint32(querypb.MySqlFlag_NUM_FLAG)},
{Name: "other", Type: sqltypes.VarChar, Charset: collations.CollationUtf8mb4ID},
},
Rows: [][]sqltypes.Value{{
sqltypes.NewInt32(2),
sqltypes.NewVarChar("baz"),
}},
}
require.Equal(t, wantResult, result)
})

t.Run("using disjunction of conjunctions", func(t *testing.T) {
ctx := utils.LeakCheckContext(t)

// Special setup: Don't use createExecutorEnv.
cell := "aa"
hc := discovery.NewFakeHealthCheck(nil)

u := createSandbox(KsTestUnsharded)
s := createSandbox(KsTestSharded)

s.VSchema = executorVSchema
u.VSchema = unshardedVSchema

serv := newSandboxForCells(ctx, []string{cell})
resolver := newTestResolver(ctx, hc, serv, cell)

shards := []string{"-20", "20-40", "40-60", "60-80", "80-a0", "a0-c0", "c0-e0", "e0-"}
sbcs := []*sandboxconn.SandboxConn{}
for _, shard := range shards {
sbcs = append(sbcs, hc.AddTestTablet(cell, shard, 1, "TestExecutor", shard, topodatapb.TabletType_PRIMARY, true, 1, nil))
}

sbclookup := hc.AddTestTablet(cell, "0", 1, KsTestUnsharded, "0", topodatapb.TabletType_PRIMARY, true, 1, nil)

executor := createExecutor(ctx, serv, cell, resolver)
defer executor.Close()
logChan := executor.queryLogger.Subscribe("Test")
defer executor.queryLogger.Unsubscribe(logChan)

// Only lookup results on shard `40-60` (`sbc[2]`)
sbclookup.SetResults([]*sqltypes.Result{{
Fields: []*querypb.Field{
{Name: "lu_col", Type: sqltypes.Int32, Charset: collations.CollationBinaryID, Flags: uint32(querypb.MySqlFlag_NUM_FLAG)},
{Name: "keyspace_id", Type: sqltypes.VarBinary, Charset: collations.CollationBinaryID, Flags: uint32(querypb.MySqlFlag_BINARY_FLAG)},
},
Rows: [][]sqltypes.Value{{
sqltypes.NewInt32(2),
sqltypes.MakeTrusted(sqltypes.VarBinary, []byte("\x45")),
}},
}})

emptyResult := []*sqltypes.Result{{
Fields: []*querypb.Field{
{Name: "nv_lu_col", Type: sqltypes.Int32, Charset: collations.CollationBinaryID, Flags: uint32(querypb.MySqlFlag_NUM_FLAG)},
{Name: "other", Type: sqltypes.VarChar, Charset: collations.CollationUtf8mb4ID},
},
Rows: [][]sqltypes.Value{},
}}

sbcs[0].SetResults(emptyResult)
sbcs[1].SetResults(emptyResult)
sbcs[2].SetResults([]*sqltypes.Result{{
Fields: []*querypb.Field{
{Name: "nv_lu_col", Type: sqltypes.Int32, Charset: collations.CollationBinaryID, Flags: uint32(querypb.MySqlFlag_NUM_FLAG)},
{Name: "other", Type: sqltypes.VarChar, Charset: collations.CollationUtf8mb4ID},
},
Rows: [][]sqltypes.Value{{
sqltypes.NewInt32(2),
sqltypes.NewVarChar("baz"),
}},
}})
sbcs[3].SetResults(emptyResult)
sbcs[4].SetResults(emptyResult)
sbcs[5].SetResults(emptyResult)
sbcs[6].SetResults(emptyResult)
sbcs[7].SetResults(emptyResult)

result, err := exec(executor, NewSafeSession(&vtgatepb.Session{
TargetString: KsTestSharded,
}), "select nv_lu_col, other from t2_lookup WHERE (nv_lu_col = 1 AND other = 'bar') OR (nv_lu_col = 2 AND other = 'baz') OR (nv_lu_col = 3 AND other = 'qux') OR (nv_lu_col = 4 AND other = 'brz') OR (nv_lu_col = 5 AND other = 'brz')")

require.NoError(t, err)

// We end up doing a scatter query here, so no queries are sent to the lookup table
require.Len(t, sbclookup.Queries, 0)
require.Len(t, sbcs[0].Queries, 1)
require.Len(t, sbcs[1].Queries, 1)
require.Len(t, sbcs[2].Queries, 1)
require.Len(t, sbcs[3].Queries, 1)
require.Len(t, sbcs[4].Queries, 1)
require.Len(t, sbcs[5].Queries, 1)
require.Len(t, sbcs[6].Queries, 1)
require.Len(t, sbcs[7].Queries, 1)

for _, sbc := range sbcs {
require.Equal(t, []*querypb.BoundQuery{{
Sql: "select nv_lu_col, other from t2_lookup where nv_lu_col = 1 and other = 'bar' or nv_lu_col = 2 and other = 'baz' or nv_lu_col = 3 and other = 'qux' or nv_lu_col = 4 and other = 'brz' or nv_lu_col = 5 and other = 'brz'",
BindVariables: map[string]*querypb.BindVariable{},
}}, sbc.Queries)
}
wantResult := &sqltypes.Result{
Fields: []*querypb.Field{
{Name: "nv_lu_col", Type: sqltypes.Int32, Charset: collations.CollationBinaryID, Flags: uint32(querypb.MySqlFlag_NUM_FLAG)},
{Name: "other", Type: sqltypes.VarChar, Charset: collations.CollationUtf8mb4ID},
},
Rows: [][]sqltypes.Value{{
sqltypes.NewInt32(2),
sqltypes.NewVarChar("baz"),
}},
}
require.Equal(t, wantResult, result)
})
}

func TestCrossShardSubqueryStream(t *testing.T) {
executor, sbc1, sbc2, _, ctx := createExecutorEnv(t)
result1 := []*sqltypes.Result{{
Expand Down
Loading

0 comments on commit 20fd074

Please sign in to comment.