Skip to content

Commit

Permalink
Avoid deadlocks related to 0 receiver behavior (#10132)
Browse files Browse the repository at this point in the history
* Prevent deadlocks related to 0 receiver behavior

Signed-off-by: Matt Lord <[email protected]>

* Update test tables to use poller_idx

Signed-off-by: Matt Lord <[email protected]>

* Minor changes after mutex usage review in message manager + cache

Signed-off-by: Matt Lord <[email protected]>

* Use atomics for receiver count and messages pending

Signed-off-by: Matt Lord <[email protected]>

* Don't take exclusive lock when in fast path

Signed-off-by: Matt Lord <[email protected]>

* Update tests to use the new recommended message table structure

See: vitessio/website#1015

Signed-off-by: Matt Lord <[email protected]>

* Correct tests

Signed-off-by: Matt Lord <[email protected]>

* Update e2e test to use new recommended table structure

Signed-off-by: Matt Lord <[email protected]>

* Fix TestMessageStreamingPlan test

Signed-off-by: Matt Lord <[email protected]>

* Fix godriver/TestStreamMessaging test

Signed-off-by: Matt Lord <[email protected]>

* Split streamMu into streamProcessingMu and lastPollPositionMu

Signed-off-by: Matt Lord <[email protected]>

* Poller cannot take main lock w/o having X stream processing lock

Signed-off-by: Matt Lord <[email protected]>

* Improve the comments a bit

Signed-off-by: Matt Lord <[email protected]>

* Hold the main mutex during Add

This is for safe concurrency with the last receiver unsubscribing

Signed-off-by: Matt Lord <[email protected]>

* Changes after pair reviewing with Sugu

Signed-off-by: Matt Lord <[email protected]>

* Use my GitHub handle for the self reference

Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord authored Apr 28, 2022
1 parent deb8ccf commit 870f13e
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 123 deletions.
52 changes: 34 additions & 18 deletions go/test/endtoend/messaging/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,27 +38,43 @@ var (
userKeyspace = "user"
lookupKeyspace = "lookup"
createShardedMessage = `create table sharded_message(
id bigint,
priority bigint default 0,
time_next bigint default 0,
epoch bigint,
time_acked bigint,
message varchar(128),
# required columns
id bigint NOT NULL COMMENT 'often an event id, can also be auto-increment or a sequence',
priority tinyint NOT NULL DEFAULT '50' COMMENT 'lower number priorities process first',
epoch bigint NOT NULL DEFAULT '0' COMMENT 'Vitess increments this each time it sends a message, and is used for incremental backoff doubling',
time_next bigint DEFAULT 0 COMMENT 'the earliest time the message will be sent in epoch nanoseconds. Must be null if time_acked is set',
time_acked bigint DEFAULT NULL COMMENT 'the time the message was acked in epoch nanoseconds. Must be null if time_next is set',
# add as many custom fields here as required
# optional - these are suggestions
tenant_id bigint,
message json,
# required indexes
primary key(id),
index next_idx(priority, time_next desc),
index ack_idx(time_acked)
) comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1'`
index poller_idx(time_acked, priority, time_next desc)
# add any secondary indexes or foreign keys - no restrictions
) comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1'`
createUnshardedMessage = `create table unsharded_message(
id bigint,
priority bigint default 0,
time_next bigint default 0,
epoch bigint,
time_acked bigint,
message varchar(128),
# required columns
id bigint NOT NULL COMMENT 'often an event id, can also be auto-increment or a sequence',
priority tinyint NOT NULL DEFAULT '50' COMMENT 'lower number priorities process first',
epoch bigint NOT NULL DEFAULT '0' COMMENT 'Vitess increments this each time it sends a message, and is used for incremental backoff doubling',
time_next bigint DEFAULT 0 COMMENT 'the earliest time the message will be sent in epoch nanoseconds. Must be null if time_acked is set',
time_acked bigint DEFAULT NULL COMMENT 'the time the message was acked in epoch nanoseconds. Must be null if time_next is set',
# add as many custom fields here as required
# optional - these are suggestions
tenant_id bigint,
message json,
# required indexes
primary key(id),
index next_idx(priority, time_next desc),
index ack_idx(time_acked)
) comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1'`
index poller_idx(time_acked, priority, time_next desc)
# add any secondary indexes or foreign keys - no restrictions
) comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1'`
userVschema = `{
"sharded": true,
"vindexes": {
Expand Down
120 changes: 82 additions & 38 deletions go/test/endtoend/messaging/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,30 @@ import (
"vitess.io/vitess/go/vt/vtgate/vtgateconn"
)

var createMessage = `create table vitess_message(
id bigint,
priority bigint default 0,
time_next bigint default 0,
epoch bigint,
time_acked bigint,
message varchar(128),
var testMessage = "{\"message\":\"hello world\"}"
var testShardedMessagef = "{\"message\": \"hello world\", \"id\": %d}"

var createMessage = `
create table vitess_message(
# required columns
id bigint NOT NULL COMMENT 'often an event id, can also be auto-increment or a sequence',
priority tinyint NOT NULL DEFAULT '50' COMMENT 'lower number priorities process first',
epoch bigint NOT NULL DEFAULT '0' COMMENT 'Vitess increments this each time it sends a message, and is used for incremental backoff doubling',
time_next bigint DEFAULT 0 COMMENT 'the earliest time the message will be sent in epoch nanoseconds. Must be null if time_acked is set',
time_acked bigint DEFAULT NULL COMMENT 'the time the message was acked in epoch nanoseconds. Must be null if time_next is set',
# add as many custom fields here as required
# optional - these are suggestions
tenant_id bigint,
message json,
# required indexes
primary key(id),
index next_idx(priority, time_next desc),
index ack_idx(time_acked))
comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1'`
index poller_idx(time_acked, priority, time_next desc)
# add any secondary indexes or foreign keys - no restrictions
) comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1'
`

func TestMessage(t *testing.T) {
ctx := context.Background()
Expand Down Expand Up @@ -84,9 +97,12 @@ func TestMessage(t *testing.T) {
wantFields := []*querypb.Field{{
Name: "id",
Type: sqltypes.Int64,
}, {
Name: "tenant_id",
Type: sqltypes.Int64,
}, {
Name: "message",
Type: sqltypes.VarChar,
Type: sqltypes.TypeJSON,
}}
gotFields, err := streamConn.Fields()
for i, field := range gotFields {
Expand All @@ -99,7 +115,7 @@ func TestMessage(t *testing.T) {
require.NoError(t, err)
cmp.MustMatch(t, wantFields, gotFields)

utils.Exec(t, conn, "insert into vitess_message(id, message) values(1, 'hello world')")
utils.Exec(t, conn, fmt.Sprintf("insert into vitess_message(id, tenant_id, message) values(1, 1, '%s')", testMessage))

// account for jitter in timings, maxJitter uses the current hardcoded value for jitter in message_manager.go
jitter := int64(0)
Expand All @@ -112,7 +128,8 @@ func TestMessage(t *testing.T) {

want := []sqltypes.Value{
sqltypes.NewInt64(1),
sqltypes.NewVarChar("hello world"),
sqltypes.NewInt64(1),
sqltypes.TestValue(sqltypes.TypeJSON, testMessage),
}
cmp.MustMatch(t, want, got)

Expand Down Expand Up @@ -163,18 +180,31 @@ func TestMessage(t *testing.T) {
assert.Equal(t, 0, len(qr.Rows))
}

var createThreeColMessage = `create table vitess_message3(
id bigint,
priority bigint default 0,
time_next bigint default 0,
epoch bigint,
time_acked bigint,
var createThreeColMessage = `
create table vitess_message3(
# required columns
id bigint NOT NULL COMMENT 'often an event id, can also be auto-increment or a sequence',
priority tinyint NOT NULL DEFAULT '50' COMMENT 'lower number priorities process first',
epoch bigint NOT NULL DEFAULT '0' COMMENT 'Vitess increments this each time it sends a message, and is used for incremental backoff doubling',
time_next bigint DEFAULT 0 COMMENT 'the earliest time the message will be sent in epoch nanoseconds. Must be null if time_acked is set',
time_acked bigint DEFAULT NULL COMMENT 'the time the message was acked in epoch nanoseconds. Must be null if time_next is set',
# add as many custom fields here as required
# optional - these are suggestions
tenant_id bigint,
message json,
# custom to this test
msg1 varchar(128),
msg2 bigint,
# required indexes
primary key(id),
index next_idx(priority, time_next desc),
index ack_idx(time_acked))
comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1'`
index poller_idx(time_acked, priority, time_next desc)
# add any secondary indexes or foreign keys - no restrictions
) comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1'
`

func TestThreeColMessage(t *testing.T) {
ctx := context.Background()
Expand Down Expand Up @@ -202,6 +232,12 @@ func TestThreeColMessage(t *testing.T) {
wantFields := []*querypb.Field{{
Name: "id",
Type: sqltypes.Int64,
}, {
Name: "tenant_id",
Type: sqltypes.Int64,
}, {
Name: "message",
Type: sqltypes.TypeJSON,
}, {
Name: "msg1",
Type: sqltypes.VarChar,
Expand All @@ -220,12 +256,14 @@ func TestThreeColMessage(t *testing.T) {
require.NoError(t, err)
cmp.MustMatch(t, wantFields, gotFields)

utils.Exec(t, conn, "insert into vitess_message3(id, msg1, msg2) values(1, 'hello world', 3)")
utils.Exec(t, conn, fmt.Sprintf("insert into vitess_message3(id, tenant_id, message, msg1, msg2) values(1, 3, '%s', 'hello world', 3)", testMessage))

got, err := streamConn.FetchNext(nil)
require.NoError(t, err)
want := []sqltypes.Value{
sqltypes.NewInt64(1),
sqltypes.NewInt64(3),
sqltypes.TestValue(sqltypes.TypeJSON, testMessage),
sqltypes.NewVarChar("hello world"),
sqltypes.NewInt64(3),
}
Expand Down Expand Up @@ -292,7 +330,8 @@ func TestReparenting(t *testing.T) {
assertClientCount(t, 1, shard0Replica)
assertClientCount(t, 1, shard1Primary)
session := stream.Session("@primary", nil)
cluster.ExecuteQueriesUsingVtgate(t, session, "insert into sharded_message (id, message) values (3,'hello world 3')")
msg3 := fmt.Sprintf(testShardedMessagef, 3)
cluster.ExecuteQueriesUsingVtgate(t, session, fmt.Sprintf("insert into sharded_message (id, tenant_id, message) values (3,3,'%s')", msg3))

// validate that we have received inserted message
stream.Next()
Expand Down Expand Up @@ -353,8 +392,10 @@ func TestConnection(t *testing.T) {
// in message stream
session := stream.Session("@primary", nil)
// insert data in primary
cluster.ExecuteQueriesUsingVtgate(t, session, "insert into sharded_message (id, message) values (2,'hello world 2')")
cluster.ExecuteQueriesUsingVtgate(t, session, "insert into sharded_message (id, message) values (5,'hello world 5')")
msg2 := fmt.Sprintf(testShardedMessagef, 2)
msg5 := fmt.Sprintf(testShardedMessagef, 5)
cluster.ExecuteQueriesUsingVtgate(t, session, fmt.Sprintf("insert into sharded_message (id, tenant_id, message) values (2,2,'%s')", msg2))
cluster.ExecuteQueriesUsingVtgate(t, session, fmt.Sprintf("insert into sharded_message (id, tenant_id, message) values (5,5,'%s')", msg5))
// validate in msg stream
_, err = stream.Next()
require.Nil(t, err)
Expand All @@ -380,15 +421,18 @@ func testMessaging(t *testing.T, name, ks string) {
defer stream.Close()

session := stream.Session("@primary", nil)
cluster.ExecuteQueriesUsingVtgate(t, session, "insert into "+name+" (id, message) values (4,'hello world 4')")
cluster.ExecuteQueriesUsingVtgate(t, session, "insert into "+name+" (id, message) values (1,'hello world 1')")
msg4 := fmt.Sprintf(testShardedMessagef, 4)
msg1 := fmt.Sprintf(testShardedMessagef, 1)
cluster.ExecuteQueriesUsingVtgate(t, session, fmt.Sprintf("insert into "+name+" (id, tenant_id, message) values (4,4,'%s')", msg4))
cluster.ExecuteQueriesUsingVtgate(t, session, fmt.Sprintf("insert into "+name+" (id, tenant_id, message) values (1,1,'%s')", msg1))

// validate fields
res, err := stream.MessageStream(ks, "", nil, name)
require.Nil(t, err)
require.Equal(t, 2, len(res.Fields))
require.Equal(t, 3, len(res.Fields))
validateField(t, res.Fields[0], "id", query.Type_INT64)
validateField(t, res.Fields[1], "message", query.Type_VARCHAR)
validateField(t, res.Fields[1], "tenant_id", query.Type_INT64)
validateField(t, res.Fields[2], "message", query.Type_JSON)

// validate recieved msgs
resMap := make(map[string]string)
Expand All @@ -406,8 +450,8 @@ func testMessaging(t *testing.T, name, ks string) {
}
}

assert.Equal(t, "hello world 1", resMap["1"])
assert.Equal(t, "hello world 4", resMap["4"])
assert.Equal(t, "1", resMap["1"])
assert.Equal(t, "4", resMap["4"])

resMap = make(map[string]string)
stream.ClearMem()
Expand All @@ -422,7 +466,7 @@ func testMessaging(t *testing.T, name, ks string) {
}
}

assert.Equal(t, "hello world 1", resMap["1"])
assert.Equal(t, "1", resMap["1"])

// validate message ack with 1 and 4, only 1 should be ack
qr, err = session.Execute(context.Background(), "update "+name+" set time_acked = 1, time_next = null where id in (1, 4) and time_acked is null", nil)
Expand Down Expand Up @@ -519,18 +563,18 @@ func assertClientCount(t *testing.T, expected int, vttablet *cluster.Vttablet) {
}

func parseDebugVars(t *testing.T, output interface{}, vttablet *cluster.Vttablet) {
debugVarUrl := fmt.Sprintf("http://%s:%d/debug/vars", vttablet.VttabletProcess.TabletHostname, vttablet.HTTPPort)
resp, err := http.Get(debugVarUrl)
debugVarURL := fmt.Sprintf("http://%s:%d/debug/vars", vttablet.VttabletProcess.TabletHostname, vttablet.HTTPPort)
resp, err := http.Get(debugVarURL)
if err != nil {
t.Fatalf("failed to fetch %q: %v", debugVarUrl, err)
t.Fatalf("failed to fetch %q: %v", debugVarURL, err)
}

respByte, _ := io.ReadAll(resp.Body)
if resp.StatusCode != 200 {
t.Fatalf("status code %d while fetching %q:\n%s", resp.StatusCode, debugVarUrl, respByte)
t.Fatalf("status code %d while fetching %q:\n%s", resp.StatusCode, debugVarURL, respByte)
}

if err := json.Unmarshal(respByte, output); err != nil {
t.Fatalf("failed to unmarshal JSON from %q: %v", debugVarUrl, err)
t.Fatalf("failed to unmarshal JSON from %q: %v", debugVarURL, err)
}
}
37 changes: 22 additions & 15 deletions go/test/endtoend/vtgate/godriver/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ limitations under the License.
package godriver

import (
"database/sql"
"flag"
"fmt"
"os"
"strconv"
"testing"
Expand All @@ -42,18 +42,24 @@ var (
KeyspaceName = "customer"
SchemaSQL = `
create table my_message(
time_scheduled bigint,
id bigint,
time_next bigint,
epoch bigint,
time_created bigint,
time_acked bigint,
message varchar(128),
priority tinyint NOT NULL DEFAULT '0',
primary key(time_scheduled, id),
unique index id_idx(id),
index next_idx(priority, time_next)
) comment 'vitess_message,vt_ack_wait=30,vt_purge_after=86400,vt_batch_size=10,vt_cache_size=10000,vt_poller_interval=30';
# required columns
id bigint NOT NULL COMMENT 'often an event id, can also be auto-increment or a sequence',
priority tinyint NOT NULL DEFAULT '50' COMMENT 'lower number priorities process first',
epoch bigint NOT NULL DEFAULT '0' COMMENT 'Vitess increments this each time it sends a message, and is used for incremental backoff doubling',
time_next bigint DEFAULT 0 COMMENT 'the earliest time the message will be sent in epoch nanoseconds. Must be null if time_acked is set',
time_acked bigint DEFAULT NULL COMMENT 'the time the message was acked in epoch nanoseconds. Must be null if time_next is set',
# add as many custom fields here as required
# optional - these are suggestions
tenant_id bigint,
message json,
# required indexes
primary key(id),
index poller_idx(time_acked, priority, time_next desc)
# add any secondary indexes or foreign keys - no restrictions
) comment 'vitess_message,vt_min_backoff=30,vt_max_backoff=3600,vt_ack_wait=30,vt_purge_after=86400,vt_batch_size=10,vt_cache_size=10000,vt_poller_interval=30'
`
VSchema = `
{
Expand All @@ -75,6 +81,8 @@ create table my_message(
}
}
`

testMessage = "{\"message\":\"hello world\"}"
)

func TestMain(m *testing.M) {
Expand Down Expand Up @@ -136,8 +144,7 @@ func TestStreamMessaging(t *testing.T) {
defer db.Close()

// Exec not allowed in streaming
timenow := time.Now().Add(time.Second * 60).UnixNano()
_, err = db.Exec("insert into my_message(id, message, time_scheduled) values(1, 'hello world', :curr_time)", sql.Named("curr_time", timenow))
_, err = db.Exec(fmt.Sprintf("insert into my_message(id, message) values(1, '%s')", testMessage))
require.NoError(t, err)

// for streaming messages
Expand Down
Loading

0 comments on commit 870f13e

Please sign in to comment.