Skip to content

Commit

Permalink
Use Durability Policy in the topo server in VTOrc and deprecate Durab…
Browse files Browse the repository at this point in the history
…ility config (vitessio#10423)

* feat: store keyspace information in the vtorc database

Signed-off-by: Manan Gupta <[email protected]>

* feat: add code for refreshing keyspaces

Signed-off-by: Manan Gupta <[email protected]>

* feat: remove the durability config from VTOrc and instead use the durability stored in the topo server

Signed-off-by: Manan Gupta <[email protected]>

* test: remove usage of Durability config from the tests

Signed-off-by: Manan Gupta <[email protected]>

* test: add a new test for checking that VTOrc works with keyspaces having different durability policies and verify it failed previously

Signed-off-by: Manan Gupta <[email protected]>

* feat: remove usage of durability config in examples

Signed-off-by: Manan Gupta <[email protected]>

* test: parallel execution of test causes data race

Signed-off-by: Manan Gupta <[email protected]>

* feat: add warning if no durability is specified

Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 authored Jun 6, 2022
1 parent daeded3 commit 67944bb
Show file tree
Hide file tree
Showing 26 changed files with 830 additions and 100 deletions.
1 change: 0 additions & 1 deletion examples/compose/orchestrator/default.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
{
"Debug": true,
"EnableSyslog": false,
"Durability" : "none",
"ListenAddress": ":3000",
"MySQLTopologyUser": "orc_client_user",
"MySQLTopologyPassword": "orc_client_user_password",
Expand Down
1 change: 0 additions & 1 deletion examples/operator/vtorc_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,6 @@ stringData:
orc_config.json: |
{
"Debug": true,
"Durability": "none",
"MySQLTopologyUser": "orc_client_user",
"MySQLTopologyPassword": "orc_client_user_password",
"MySQLReplicaUser": "vt_repl",
Expand Down
1 change: 0 additions & 1 deletion go/test/endtoend/cluster/vtorc_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ type VtorcConfiguration struct {
InstancePollSeconds int
PreventCrossDataCenterPrimaryFailover bool `json:",omitempty"`
LockShardTimeoutSeconds int `json:",omitempty"`
Durability string `json:",omitempty"`
ReplicationLagQuery string `json:",omitempty"`
FailPrimaryPromotionOnLagMinutes int `json:",omitempty"`
}
Expand Down
41 changes: 30 additions & 11 deletions go/test/endtoend/vtorc/general/vtorc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestPrimaryElection(t *testing.T) {
defer cluster.PanicHandler(t)
utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 1, nil, cluster.VtorcConfiguration{
PreventCrossDataCenterPrimaryFailover: true,
}, 2)
}, 2, "")
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]

Expand All @@ -67,7 +67,7 @@ func TestSingleKeyspace(t *testing.T) {
defer cluster.PanicHandler(t)
utils.SetupVttabletsAndVtorc(t, clusterInfo, 1, 1, []string{"--clusters_to_watch", "ks"}, cluster.VtorcConfiguration{
PreventCrossDataCenterPrimaryFailover: true,
}, 1)
}, 1, "")
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]

Expand All @@ -83,7 +83,7 @@ func TestKeyspaceShard(t *testing.T) {
defer cluster.PanicHandler(t)
utils.SetupVttabletsAndVtorc(t, clusterInfo, 1, 1, []string{"--clusters_to_watch", "ks/0"}, cluster.VtorcConfiguration{
PreventCrossDataCenterPrimaryFailover: true,
}, 1)
}, 1, "")
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]

Expand All @@ -96,7 +96,7 @@ func TestPrimaryReadOnly(t *testing.T) {
defer cluster.PanicHandler(t)
utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 0, nil, cluster.VtorcConfiguration{
PreventCrossDataCenterPrimaryFailover: true,
}, 1)
}, 1, "")
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]

Expand All @@ -118,7 +118,7 @@ func TestReplicaReadWrite(t *testing.T) {
defer cluster.PanicHandler(t)
utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 0, nil, cluster.VtorcConfiguration{
PreventCrossDataCenterPrimaryFailover: true,
}, 1)
}, 1, "")
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]

Expand Down Expand Up @@ -148,7 +148,7 @@ func TestStopReplication(t *testing.T) {
defer cluster.PanicHandler(t)
utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 0, nil, cluster.VtorcConfiguration{
PreventCrossDataCenterPrimaryFailover: true,
}, 1)
}, 1, "")
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]

Expand Down Expand Up @@ -182,7 +182,7 @@ func TestReplicationFromOtherReplica(t *testing.T) {
defer cluster.PanicHandler(t)
utils.SetupVttabletsAndVtorc(t, clusterInfo, 3, 0, nil, cluster.VtorcConfiguration{
PreventCrossDataCenterPrimaryFailover: true,
}, 1)
}, 1, "")
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]

Expand Down Expand Up @@ -230,7 +230,7 @@ func TestRepairAfterTER(t *testing.T) {
defer cluster.PanicHandler(t)
utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 0, nil, cluster.VtorcConfiguration{
PreventCrossDataCenterPrimaryFailover: true,
}, 1)
}, 1, "")
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]

Expand Down Expand Up @@ -263,7 +263,7 @@ func TestCircularReplication(t *testing.T) {
defer cluster.PanicHandler(t)
utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 0, nil, cluster.VtorcConfiguration{
PreventCrossDataCenterPrimaryFailover: true,
}, 1)
}, 1, "")
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]

Expand Down Expand Up @@ -307,7 +307,6 @@ func TestSemiSync(t *testing.T) {
newCluster := utils.SetupNewClusterSemiSync(t)
utils.StartVtorcs(t, newCluster, nil, cluster.VtorcConfiguration{
PreventCrossDataCenterPrimaryFailover: true,
Durability: "semi_sync",
}, 1)
defer func() {
utils.StopVtorcs(t, newCluster)
Expand Down Expand Up @@ -373,7 +372,7 @@ func TestVtorcWithPrs(t *testing.T) {
defer cluster.PanicHandler(t)
utils.SetupVttabletsAndVtorc(t, clusterInfo, 4, 0, nil, cluster.VtorcConfiguration{
PreventCrossDataCenterPrimaryFailover: true,
}, 1)
}, 1, "")
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]

Expand Down Expand Up @@ -407,3 +406,23 @@ func TestVtorcWithPrs(t *testing.T) {
utils.CheckPrimaryTablet(t, clusterInfo, replica, true)
utils.VerifyWritesSucceed(t, clusterInfo, replica, shard0.Vttablets, 10*time.Second)
}

// TestMultipleDurabilities tests that VTOrc works with 2 keyspaces having 2 different durability policies
func TestMultipleDurabilities(t *testing.T) {
defer cluster.PanicHandler(t)
// Setup a normal cluster and start vtorc
utils.SetupVttabletsAndVtorc(t, clusterInfo, 1, 1, nil, cluster.VtorcConfiguration{}, 1, "")
// Setup a semi-sync cluster
utils.AddSemiSyncKeyspace(t, clusterInfo)

keyspaceNone := &clusterInfo.ClusterInstance.Keyspaces[0]
shardNone := &keyspaceNone.Shards[0]
utils.CheckPrimaryTablet(t, clusterInfo, shardNone.Vttablets[0], true)
utils.CheckReplication(t, clusterInfo, shardNone.Vttablets[0], shardNone.Vttablets[1:], 10*time.Second)

keyspaceSemiSync := &clusterInfo.ClusterInstance.Keyspaces[1]
shardSemiSync := &keyspaceSemiSync.Shards[0]
// find primary from topo
primary := utils.ShardPrimaryTablet(t, clusterInfo, keyspaceSemiSync, shardSemiSync)
assert.NotNil(t, primary, "should have elected a primary")
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestGracefulPrimaryTakeover(t *testing.T) {
utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 0, nil, cluster.VtorcConfiguration{
PreventCrossDataCenterPrimaryFailover: true,
RecoveryPeriodBlockSeconds: 5,
}, 1)
}, 1, "")
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]

Expand Down Expand Up @@ -87,7 +87,7 @@ func TestGracefulPrimaryTakeoverNoTarget(t *testing.T) {
defer cluster.PanicHandler(t)
utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 0, nil, cluster.VtorcConfiguration{
PreventCrossDataCenterPrimaryFailover: true,
}, 1)
}, 1, "")
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]

Expand Down Expand Up @@ -122,7 +122,7 @@ func TestGracefulPrimaryTakeoverAuto(t *testing.T) {
defer cluster.PanicHandler(t)
utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 1, nil, cluster.VtorcConfiguration{
PreventCrossDataCenterPrimaryFailover: true,
}, 1)
}, 1, "")
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]

Expand Down Expand Up @@ -168,7 +168,7 @@ func TestGracefulPrimaryTakeoverFailCrossCell(t *testing.T) {
defer cluster.PanicHandler(t)
utils.SetupVttabletsAndVtorc(t, clusterInfo, 1, 1, nil, cluster.VtorcConfiguration{
PreventCrossDataCenterPrimaryFailover: true,
}, 1)
}, 1, "")
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]

Expand Down
21 changes: 9 additions & 12 deletions go/test/endtoend/vtorc/primaryfailure/primary_failure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestDownPrimary(t *testing.T) {
defer cluster.PanicHandler(t)
utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 1, nil, cluster.VtorcConfiguration{
PreventCrossDataCenterPrimaryFailover: true,
}, 1)
}, 1, "")
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]
// find primary from topo
Expand Down Expand Up @@ -78,7 +78,7 @@ func TestCrossDataCenterFailure(t *testing.T) {
defer cluster.PanicHandler(t)
utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 1, nil, cluster.VtorcConfiguration{
PreventCrossDataCenterPrimaryFailover: true,
}, 1)
}, 1, "")
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]
// find primary from topo
Expand Down Expand Up @@ -123,7 +123,7 @@ func TestCrossDataCenterFailureError(t *testing.T) {
defer cluster.PanicHandler(t)
utils.SetupVttabletsAndVtorc(t, clusterInfo, 1, 1, nil, cluster.VtorcConfiguration{
PreventCrossDataCenterPrimaryFailover: true,
}, 1)
}, 1, "")
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]
// find primary from topo
Expand Down Expand Up @@ -169,7 +169,7 @@ func TestLostRdonlyOnPrimaryFailure(t *testing.T) {
defer cluster.PanicHandler(t)
utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 2, nil, cluster.VtorcConfiguration{
PreventCrossDataCenterPrimaryFailover: true,
}, 1)
}, 1, "")
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]
// find primary from topo
Expand Down Expand Up @@ -251,7 +251,7 @@ func TestPromotionLagSuccess(t *testing.T) {
utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 1, nil, cluster.VtorcConfiguration{
ReplicationLagQuery: "select 59",
FailPrimaryPromotionOnLagMinutes: 1,
}, 1)
}, 1, "")
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]
// find primary from topo
Expand Down Expand Up @@ -300,7 +300,7 @@ func TestPromotionLagFailure(t *testing.T) {
utils.SetupVttabletsAndVtorc(t, clusterInfo, 3, 1, nil, cluster.VtorcConfiguration{
ReplicationLagQuery: "select 61",
FailPrimaryPromotionOnLagMinutes: 1,
}, 1)
}, 1, "")
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]
// find primary from topo
Expand Down Expand Up @@ -351,8 +351,7 @@ func TestDownPrimaryPromotionRule(t *testing.T) {
defer cluster.PanicHandler(t)
utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 1, nil, cluster.VtorcConfiguration{
LockShardTimeoutSeconds: 5,
Durability: "test",
}, 1)
}, 1, "test")
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]
// find primary from topo
Expand Down Expand Up @@ -399,8 +398,7 @@ func TestDownPrimaryPromotionRuleWithLag(t *testing.T) {
defer cluster.PanicHandler(t)
utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 1, nil, cluster.VtorcConfiguration{
LockShardTimeoutSeconds: 5,
Durability: "test",
}, 1)
}, 1, "test")
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]
// find primary from topo
Expand Down Expand Up @@ -479,9 +477,8 @@ func TestDownPrimaryPromotionRuleWithLagCrossCenter(t *testing.T) {
defer cluster.PanicHandler(t)
utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 1, nil, cluster.VtorcConfiguration{
LockShardTimeoutSeconds: 5,
Durability: "test",
PreventCrossDataCenterPrimaryFailover: true,
}, 1)
}, 1, "test")
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]
// find primary from topo
Expand Down
75 changes: 70 additions & 5 deletions go/test/endtoend/vtorc/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func StopVtorcs(t *testing.T, clusterInfo *VtOrcClusterInfo) {
}

// SetupVttabletsAndVtorc is used to setup the vttablets and start the orchestrator
func SetupVttabletsAndVtorc(t *testing.T, clusterInfo *VtOrcClusterInfo, numReplicasReqCell1, numRdonlyReqCell1 int, orcExtraArgs []string, config cluster.VtorcConfiguration, vtorcCount int) {
func SetupVttabletsAndVtorc(t *testing.T, clusterInfo *VtOrcClusterInfo, numReplicasReqCell1, numRdonlyReqCell1 int, orcExtraArgs []string, config cluster.VtorcConfiguration, vtorcCount int, durability string) {
// stop vtorc if it is running
StopVtorcs(t, clusterInfo)

Expand Down Expand Up @@ -313,9 +313,8 @@ func SetupVttabletsAndVtorc(t *testing.T, clusterInfo *VtOrcClusterInfo, numRepl
require.NoError(t, err)
}

durability := "none"
if config.Durability != "" {
durability = config.Durability
if durability == "" {
durability = "none"
}
out, err := clusterInfo.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, fmt.Sprintf("--durability-policy=%s", durability))
require.NoError(t, err, out)
Expand Down Expand Up @@ -439,7 +438,7 @@ func CheckReplication(t *testing.T, clusterInfo *VtOrcClusterInfo, primary *clus
default:
_, err := RunSQL(t, sqlSchema, primary, "")
if err != nil {
log.Warning("create table failed on primary, will retry")
log.Warningf("create table failed on primary - %v, will retry", err)
time.Sleep(100 * time.Millisecond)
break
}
Expand Down Expand Up @@ -816,6 +815,72 @@ func SetupNewClusterSemiSync(t *testing.T) *VtOrcClusterInfo {
return clusterInfo
}

// AddSemiSyncKeyspace is used to setup a new keyspace with semi-sync.
// It creates a keyspace with 3 tablets
func AddSemiSyncKeyspace(t *testing.T, clusterInfo *VtOrcClusterInfo) {
var tablets []*cluster.Vttablet
keyspaceSemiSyncName := "ks2"
keyspace := &cluster.Keyspace{Name: keyspaceSemiSyncName}

for i := 0; i < 3; i++ {
tablet := clusterInfo.ClusterInstance.NewVttabletInstance("replica", 300+i, Cell1)
tablets = append(tablets, tablet)
}

shard := &cluster.Shard{Name: shardName}
shard.Vttablets = tablets

oldVttabletArgs := clusterInfo.ClusterInstance.VtTabletExtraArgs
defer func() {
clusterInfo.ClusterInstance.VtTabletExtraArgs = oldVttabletArgs
}()
clusterInfo.ClusterInstance.VtTabletExtraArgs = []string{
"--lock_tables_timeout", "5s",
"--disable_active_reparents",
"--enable_semi_sync",
}

// Initialize Cluster
err := clusterInfo.ClusterInstance.SetupCluster(keyspace, []cluster.Shard{*shard})
require.NoError(t, err, "Cannot launch cluster: %v", err)

//Start MySql
var mysqlCtlProcessList []*exec.Cmd
for _, shard := range clusterInfo.ClusterInstance.Keyspaces[1].Shards {
for _, tablet := range shard.Vttablets {
log.Infof("Starting MySql for tablet %v", tablet.Alias)
proc, err := tablet.MysqlctlProcess.StartProcess()
if err != nil {
require.NoError(t, err, "Error starting start mysql: %v", err)
}
mysqlCtlProcessList = append(mysqlCtlProcessList, proc)
}
}

// Wait for mysql processes to start
for _, proc := range mysqlCtlProcessList {
if err := proc.Wait(); err != nil {
require.NoError(t, err, "Error starting mysql: %v", err)
}
}

for _, tablet := range tablets {
require.NoError(t, err)
// Start the tablet
err = tablet.VttabletProcess.Setup()
require.NoError(t, err)
}

for _, tablet := range tablets {
err := tablet.VttabletProcess.WaitForTabletStatuses([]string{"SERVING", "NOT_SERVING"})
require.NoError(t, err)
}

vtctldClientProcess := cluster.VtctldClientProcessInstance("localhost", clusterInfo.ClusterInstance.VtctldProcess.GrpcPort, clusterInfo.ClusterInstance.TmpDirectory)
out, err := vtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceSemiSyncName, fmt.Sprintf("--durability-policy=semi_sync"))
require.NoError(t, err, out)
}

// IsSemiSyncSetupCorrectly checks that the semi-sync is setup correctly on the given vttablet
func IsSemiSyncSetupCorrectly(t *testing.T, tablet *cluster.Vttablet, semiSyncVal string) bool {
dbVar, err := tablet.VttabletProcess.GetDBVar("rpl_semi_sync_slave_enabled", "")
Expand Down
2 changes: 0 additions & 2 deletions go/vt/orchestrator/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ type Configuration struct {
ListenSocket string // Where orchestrator HTTP should listen for unix socket (default: empty; when given, TCP is disabled)
HTTPAdvertise string // optional, for raft setups, what is the HTTP address this node will advertise to its peers (potentially use where behind NAT or when rerouting ports; example: "http://11.22.33.44:3030")
AgentsServerPort string // port orchestrator agents talk back to
Durability string // The type of durability to enforce. Default is "none". Other values are dictated by registered plugins
MySQLTopologyUser string // The user VTOrc will use to connect to MySQL instances
MySQLTopologyPassword string // The password VTOrc will use to connect to MySQL instances
MySQLReplicaUser string // User to set on replica MySQL instances while configuring replication settings on them. If set, use this credential instead of discovering from mysql. TODO(sougou): deprecate this in favor of fetching from vttablet
Expand Down Expand Up @@ -254,7 +253,6 @@ func newConfiguration() *Configuration {
ListenSocket: "",
HTTPAdvertise: "",
AgentsServerPort: ":3001",
Durability: "none",
StatusEndpoint: DefaultStatusAPIEndpoint,
StatusOUVerify: false,
BackendDB: "sqlite",
Expand Down
8 changes: 8 additions & 0 deletions go/vt/orchestrator/db/generate_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,4 +870,12 @@ var generateSQLBase = []string{
`
CREATE INDEX ks_idx_vitess_tablet ON vitess_tablet (keyspace, shard)
`,
`
CREATE TABLE IF NOT EXISTS vitess_keyspace (
keyspace varchar(128) CHARACTER SET ascii NOT NULL,
keyspace_type smallint(5) NOT NULL,
durability_policy varchar(512) CHARACTER SET ascii NOT NULL,
PRIMARY KEY (keyspace)
) ENGINE=InnoDB DEFAULT CHARSET=ascii
`,
}
Loading

0 comments on commit 67944bb

Please sign in to comment.