Skip to content

Commit

Permalink
externalize etcd peer port and tmp directory
Browse files Browse the repository at this point in the history
Signed-off-by: Arindam Nayak <[email protected]>
  • Loading branch information
arindamnayak committed Oct 25, 2019
1 parent 0bd72fe commit ef7139d
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 73 deletions.
23 changes: 14 additions & 9 deletions go/test/endtoend/cluster/cluster_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package cluster
import (
"fmt"
"math/rand"
"os"
"path"

"vitess.io/vitess/go/vt/log"
)
Expand All @@ -33,6 +35,7 @@ type LocalProcessCluster struct {
BaseTabletUID int
Hostname string
TopoPort int
TmpDirectory string

VtgateMySQLPort int
VtctldHTTPPort int
Expand Down Expand Up @@ -82,7 +85,8 @@ func (cluster *LocalProcessCluster) StartTopo() (err error) {
cluster.Cell = DefaultCell
}
cluster.TopoPort = cluster.GetAndReservePort()
cluster.topoProcess = *EtcdProcessInstance(cluster.TopoPort, cluster.Hostname)
cluster.TmpDirectory = path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/tmp_%d", cluster.GetAndReservePort()))
cluster.topoProcess = *EtcdProcessInstance(cluster.TopoPort, cluster.GetAndReservePort(), cluster.Hostname, "global")
log.Info(fmt.Sprintf("Starting etcd server on port : %d", cluster.TopoPort))
if err = cluster.topoProcess.Setup(); err != nil {
log.Error(err.Error())
Expand All @@ -107,15 +111,15 @@ func (cluster *LocalProcessCluster) StartTopo() (err error) {
return
}

cluster.vtctldProcess = *VtctldProcessInstance(cluster.GetAndReservePort(), cluster.GetAndReservePort(), cluster.topoProcess.Port, cluster.Hostname)
cluster.vtctldProcess = *VtctldProcessInstance(cluster.GetAndReservePort(), cluster.GetAndReservePort(), cluster.topoProcess.Port, cluster.Hostname, cluster.TmpDirectory)
log.Info(fmt.Sprintf("Starting vtctld server on port : %d", cluster.vtctldProcess.Port))
cluster.VtctldHTTPPort = cluster.vtctldProcess.Port
if err = cluster.vtctldProcess.Setup(cluster.Cell); err != nil {
log.Error(err.Error())
return
}

cluster.VtctlclientProcess = *VtctlClientProcessInstance("localhost", cluster.vtctldProcess.GrpcPort)
cluster.VtctlclientProcess = *VtctlClientProcessInstance("localhost", cluster.vtctldProcess.GrpcPort, cluster.TmpDirectory)
return
}

Expand Down Expand Up @@ -157,7 +161,7 @@ func (cluster *LocalProcessCluster) StartKeyspace(keyspace Keyspace, shardNames
}
// Start Mysqlctl process
log.Info(fmt.Sprintf("Starting mysqlctl for table uid %d, mysql port %d", tablet.TabletUID, tablet.MySQLPort))
tablet.mysqlctlProcess = *MysqlCtlProcessInstance(tablet.TabletUID, tablet.MySQLPort)
tablet.mysqlctlProcess = *MysqlCtlProcessInstance(tablet.TabletUID, tablet.MySQLPort, cluster.TmpDirectory)
if err = tablet.mysqlctlProcess.Start(); err != nil {
log.Error(err.Error())
return
Expand All @@ -169,12 +173,12 @@ func (cluster *LocalProcessCluster) StartKeyspace(keyspace Keyspace, shardNames
tablet.TabletUID,
cluster.Cell,
shardName,
cluster.Hostname,
keyspace.Name,
cluster.vtctldProcess.Port,
tablet.Type,
cluster.topoProcess.Port,
cluster.Hostname)
cluster.Hostname,
cluster.TmpDirectory)
log.Info(fmt.Sprintf("Starting vttablet for tablet uid %d, grpc port %d", tablet.TabletUID, tablet.GrpcPort))

if err = tablet.vttabletProcess.Setup(); err != nil {
Expand Down Expand Up @@ -224,9 +228,10 @@ func (cluster *LocalProcessCluster) StartVtgate() (err error) {
cluster.VtgateMySQLPort,
cluster.Cell,
cluster.Cell,
cluster.Hostname, "MASTER,REPLICA",
"MASTER,REPLICA",
cluster.topoProcess.Port,
cluster.Hostname)
cluster.Hostname,
cluster.TmpDirectory)

log.Info(fmt.Sprintf("Vtgate started, connect to mysql using : mysql -h 127.0.0.1 -P %d", cluster.VtgateMySQLPort))
return cluster.VtgateProcess.Setup()
Expand Down Expand Up @@ -279,7 +284,7 @@ func (cluster *LocalProcessCluster) GetAndReservePort() int {
// GetAndReserveTabletUID gives tablet uid
func (cluster *LocalProcessCluster) GetAndReserveTabletUID() int {
if cluster.BaseTabletUID == 0 {
cluster.BaseTabletUID = getRandomNumber(100, 0)
cluster.BaseTabletUID = getRandomNumber(10000, 0)
}
cluster.BaseTabletUID = cluster.BaseTabletUID + 1
return cluster.BaseTabletUID
Expand Down
28 changes: 18 additions & 10 deletions go/test/endtoend/cluster/etcd_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ type EtcdProcess struct {
ListenClientURL string
AdvertiseClientURL string
Port int
PeerPort int
Host string
VerifyURL string
PeerURL string

proc *exec.Cmd
exit chan error
Expand All @@ -50,9 +52,13 @@ type EtcdProcess struct {
func (etcd *EtcdProcess) Setup() (err error) {
etcd.proc = exec.Command(
etcd.Binary,
"--name", etcd.Name,
"--data-dir", etcd.DataDirectory,
"--listen-client-urls", etcd.ListenClientURL,
"--advertise-client-urls", etcd.AdvertiseClientURL,
"--initial-advertise-peer-urls", etcd.PeerURL,
"--listen-peer-urls", etcd.PeerURL,
"--initial-cluster", fmt.Sprintf("%s=%s", etcd.Name, etcd.PeerURL),
)

etcd.proc.Stderr = os.Stderr
Expand All @@ -79,13 +85,13 @@ func (etcd *EtcdProcess) Setup() (err error) {
}
select {
case err := <-etcd.exit:
return fmt.Errorf("process '%s' exited prematurely (err: %s)", etcd.Name, err)
return fmt.Errorf("process '%s' exited prematurely (err: %s)", etcd.Binary, err)
default:
time.Sleep(300 * time.Millisecond)
}
}

return fmt.Errorf("process '%s' timed out after 60s (err: %s)", etcd.Name, <-etcd.exit)
return fmt.Errorf("process '%s' timed out after 60s (err: %s)", etcd.Binary, <-etcd.exit)
}

// TearDown shutdowns the running mysqld service
Expand All @@ -97,8 +103,8 @@ func (etcd *EtcdProcess) TearDown(Cell string) error {
etcd.removeTopoDirectories(Cell)

// Attempt graceful shutdown with SIGTERM first
etcd.proc.Process.Signal(syscall.SIGTERM)
os.RemoveAll(path.Join(os.Getenv("VTDATAROOT"), "etcd"))
_ = etcd.proc.Process.Signal(syscall.SIGTERM)
_ = os.RemoveAll(etcd.DataDirectory)
select {
case err := <-etcd.exit:
etcd.proc = nil
Expand Down Expand Up @@ -150,17 +156,19 @@ func (etcd *EtcdProcess) ManageTopoDir(command string, directory string) (err er
// EtcdProcessInstance returns a EtcdProcess handle for a etcd sevice,
// configured with the given Config.
// The process must be manually started by calling setup()
func EtcdProcessInstance(port int, hostname string) *EtcdProcess {
func EtcdProcessInstance(port int, peerPort int, hostname string, name string) *EtcdProcess {
etcd := &EtcdProcess{
Name: "etcd",
Binary: "etcd",
Port: port,
Host: hostname,
Name: name,
Binary: "etcd",
Port: port,
Host: hostname,
PeerPort: peerPort,
}

etcd.AdvertiseClientURL = fmt.Sprintf("http://%s:%d", etcd.Host, etcd.Port)
etcd.ListenClientURL = fmt.Sprintf("http://%s:%d", etcd.Host, etcd.Port)
etcd.DataDirectory = path.Join(os.Getenv("VTDATAROOT"), "etcd")
etcd.DataDirectory = path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("%s_%d", "etcd", port))
etcd.VerifyURL = fmt.Sprintf("http://%s:%d/v2/keys", etcd.Host, etcd.Port)
etcd.PeerURL = fmt.Sprintf("http://%s:%d", hostname, peerPort)
return etcd
}
8 changes: 4 additions & 4 deletions go/test/endtoend/cluster/mysqlctl_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,14 @@ func (mysqlctl *MysqlctlProcess) Stop() (err error) {

// MysqlCtlProcessInstance returns a Mysqlctl handle for mysqlctl process
// configured with the given Config.
func MysqlCtlProcessInstance(TabletUID int, MySQLPort int) *MysqlctlProcess {
func MysqlCtlProcessInstance(tabletUID int, mySQLPort int, tmpDirectory string) *MysqlctlProcess {
mysqlctl := &MysqlctlProcess{
Name: "mysqlctl",
Binary: "mysqlctl",
LogDirectory: path.Join(os.Getenv("VTDATAROOT"), "/tmp"),
LogDirectory: tmpDirectory,
InitDBFile: path.Join(os.Getenv("VTROOT"), "/config/init_db.sql"),
}
mysqlctl.MySQLPort = MySQLPort
mysqlctl.TabletUID = TabletUID
mysqlctl.MySQLPort = mySQLPort
mysqlctl.TabletUID = tabletUID
return mysqlctl
}
8 changes: 3 additions & 5 deletions go/test/endtoend/cluster/vtctlclient_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ package cluster

import (
"fmt"
"os"
"os/exec"
"path"
"strings"

"vitess.io/vitess/go/vt/log"
Expand Down Expand Up @@ -89,12 +87,12 @@ func (vtctlclient *VtctlClientProcess) ExecuteCommandWithOutput(args ...string)

// VtctlClientProcessInstance returns a VtctlProcess handle for vtctlclient process
// configured with the given Config.
func VtctlClientProcessInstance(Hostname string, GrpcPort int) *VtctlClientProcess {
func VtctlClientProcessInstance(hostname string, grpcPort int, tmpDirectory string) *VtctlClientProcess {
vtctlclient := &VtctlClientProcess{
Name: "vtctlclient",
Binary: "vtctlclient",
Server: fmt.Sprintf("%s:%d", Hostname, GrpcPort),
TempDirectory: path.Join(os.Getenv("VTDATAROOT"), "/tmp"),
Server: fmt.Sprintf("%s:%d", hostname, grpcPort),
TempDirectory: tmpDirectory,
}
return vtctlclient
}
27 changes: 14 additions & 13 deletions go/test/endtoend/cluster/vtctld_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,8 @@ type VtctldProcess struct {

// Setup starts vtctld process with required arguements
func (vtctld *VtctldProcess) Setup(Cell string) (err error) {
err = os.Mkdir(path.Join(vtctld.Directory, "tmp"), 0700)
if err != nil {
return
}
err = os.Mkdir(path.Join(vtctld.Directory, "backups"), 0700)
if err != nil {
return
}
_ = createDirectory(vtctld.LogDir, 0700)
_ = createDirectory(path.Join(vtctld.Directory, "backups"), 0700)
vtctld.proc = exec.Command(
vtctld.Binary,
"-enable_queries",
Expand Down Expand Up @@ -114,6 +108,13 @@ func (vtctld *VtctldProcess) Setup(Cell string) (err error) {
return fmt.Errorf("process '%s' timed out after 60s (err: %s)", vtctld.Name, <-vtctld.exit)
}

func createDirectory(dirName string, mode os.FileMode) error {
if _, err := os.Stat(dirName); os.IsNotExist(err) {
return os.Mkdir(dirName, mode)
}
return nil
}

// IsHealthy function checks if vtctld process is up and running
func (vtctld *VtctldProcess) IsHealthy() bool {
resp, err := http.Get(vtctld.VerifyURL)
Expand All @@ -132,8 +133,8 @@ func (vtctld *VtctldProcess) TearDown() error {
return nil
}

os.RemoveAll(path.Join(vtctld.Directory, "tmp"))
os.RemoveAll(path.Join(vtctld.Directory, "backups"))
os.RemoveAll(vtctld.LogDir)
//os.RemoveAll(path.Join(vtctld.Directory, "backups"))

// Attempt graceful shutdown with SIGTERM first
vtctld.proc.Process.Signal(syscall.SIGTERM)
Expand All @@ -153,7 +154,7 @@ func (vtctld *VtctldProcess) TearDown() error {
// VtctldProcessInstance returns a VtctlProcess handle for vtctl process
// configured with the given Config.
// The process must be manually started by calling setup()
func VtctldProcessInstance(httpPort int, grpcPort int, topoPort int, hostname string) *VtctldProcess {
func VtctldProcessInstance(httpPort int, grpcPort int, topoPort int, hostname string, tmpDirectory string) *VtctldProcess {
vtctl := VtctlProcessInstance(topoPort, hostname)
vtctld := &VtctldProcess{
Name: "vtctld",
Expand All @@ -164,10 +165,10 @@ func VtctldProcessInstance(httpPort int, grpcPort int, topoPort int, hostname st
ServiceMap: "grpc-vtctl",
BackupStorageImplementation: "file",
FileBackupStorageRoot: path.Join(os.Getenv("VTDATAROOT"), "/backups"),
LogDir: path.Join(os.Getenv("VTDATAROOT"), "/tmp"),
LogDir: tmpDirectory,
Port: httpPort,
GrpcPort: grpcPort,
PidFile: path.Join(os.Getenv("VTDATAROOT"), "/tmp", "vtctld.pid"),
PidFile: path.Join(tmpDirectory, "vtctld.pid"),
Directory: os.Getenv("VTDATAROOT"),
}
vtctld.VerifyURL = fmt.Sprintf("http://localhost:%d", vtctld.Port)
Expand Down
22 changes: 11 additions & 11 deletions go/test/endtoend/cluster/vtgate_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,29 +159,29 @@ func (vtgate *VtgateProcess) TearDown() error {
// VtgateProcessInstance returns a Vtgate handle for vtgate process
// configured with the given Config.
// The process must be manually started by calling setup()
func VtgateProcessInstance(Port int, GrpcPort int, MySQLServerPort int, Cell string, CellsToWatch string, Hostname string, TabletTypesToWait string, topoPort int, hostname string) *VtgateProcess {
func VtgateProcessInstance(port int, grpcPort int, mySQLServerPort int, cell string, cellsToWatch string, tabletTypesToWait string, topoPort int, hostname string, tmpDirectory string) *VtgateProcess {
vtctl := VtctlProcessInstance(topoPort, hostname)
vtgate := &VtgateProcess{
Name: "vtgate",
Binary: "vtgate",
FileToLogQueries: path.Join(os.Getenv("VTDATAROOT"), "/tmp/vtgate_querylog.txt"),
FileToLogQueries: path.Join(tmpDirectory, "/vtgate_querylog.txt"),
Directory: os.Getenv("VTDATAROOT"),
ServiceMap: "grpc-vtgateservice",
LogDir: path.Join(os.Getenv("VTDATAROOT"), "/tmp"),
Port: Port,
GrpcPort: GrpcPort,
MySQLServerPort: MySQLServerPort,
LogDir: tmpDirectory,
Port: port,
GrpcPort: grpcPort,
MySQLServerPort: mySQLServerPort,
MySQLServerSocketPath: "/tmp/mysql.sock",
Cell: Cell,
CellsToWatch: CellsToWatch,
TabletTypesToWait: TabletTypesToWait,
Cell: cell,
CellsToWatch: cellsToWatch,
TabletTypesToWait: tabletTypesToWait,
GatewayImplementation: "discoverygateway",
CommonArg: *vtctl,
PidFile: path.Join(os.Getenv("VTDATAROOT"), "/tmp/vtgate.pid"),
PidFile: path.Join(tmpDirectory, "/vtgate.pid"),
MySQLAuthServerImpl: "none",
}

vtgate.VerifyURL = fmt.Sprintf("http://%s:%d/debug/vars", Hostname, Port)
vtgate.VerifyURL = fmt.Sprintf("http://%s:%d/debug/vars", hostname, port)

return vtgate
}
30 changes: 15 additions & 15 deletions go/test/endtoend/cluster/vttablet_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,34 +168,34 @@ func (vttablet *VttabletProcess) TearDown() error {
// VttabletProcessInstance returns a VttabletProcess handle for vttablet process
// configured with the given Config.
// The process must be manually started by calling setup()
func VttabletProcessInstance(Port int, GrpcPort int, TabletUID int, Cell string, Shard string, Hostname string, Keyspace string, VtctldPort int, TabletType string, topoPort int, hostname string) *VttabletProcess {
func VttabletProcessInstance(port int, grpcPort int, tabletUID int, cell string, shard string, keyspace string, vtctldPort int, tabletType string, topoPort int, hostname string, tmpDirectory string) *VttabletProcess {
vtctl := VtctlProcessInstance(topoPort, hostname)
vttablet := &VttabletProcess{
Name: "vttablet",
Binary: "vttablet",
FileToLogQueries: path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/tmp/vt_%010d/vttable.pid", TabletUID)),
Directory: path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/vt_%010d", TabletUID)),
TabletPath: fmt.Sprintf("%s-%010d", Cell, TabletUID),
FileToLogQueries: path.Join(tmpDirectory, fmt.Sprintf("/vt_%010d/vttable.pid", tabletUID)),
Directory: path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/vt_%010d", tabletUID)),
TabletPath: fmt.Sprintf("%s-%010d", cell, tabletUID),
ServiceMap: "grpc-queryservice,grpc-tabletmanager,grpc-updatestream",
LogDir: path.Join(os.Getenv("VTDATAROOT"), "/tmp"),
Shard: Shard,
TabletHostname: Hostname,
Keyspace: Keyspace,
LogDir: tmpDirectory,
Shard: shard,
TabletHostname: hostname,
Keyspace: keyspace,
TabletType: "replica",
CommonArg: *vtctl,
HealthCheckInterval: 5,
BackupStorageImplementation: "file",
FileBackupStorageRoot: path.Join(os.Getenv("VTDATAROOT"), "/backups"),
Port: Port,
GrpcPort: GrpcPort,
PidFile: path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/vt_%010d/vttable.pid", TabletUID)),
VtctldAddress: fmt.Sprintf("http://%s:%d", Hostname, VtctldPort),
Port: port,
GrpcPort: grpcPort,
PidFile: path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/vt_%010d/vttable.pid", tabletUID)),
VtctldAddress: fmt.Sprintf("http://%s:%d", hostname, vtctldPort),
}

if TabletType == "rdonly" {
vttablet.TabletType = TabletType
if tabletType == "rdonly" {
vttablet.TabletType = tabletType
}
vttablet.VerifyURL = fmt.Sprintf("http://%s:%d/debug/vars", Hostname, Port)
vttablet.VerifyURL = fmt.Sprintf("http://%s:%d/debug/vars", hostname, port)

return vttablet
}
Loading

0 comments on commit ef7139d

Please sign in to comment.