Skip to content

Commit

Permalink
Merge #106497
Browse files Browse the repository at this point in the history
106497: roachprod: add DNS support for services r=renatolabs,smg260 a=herkolategan

This change lays the foundation for adding DNS providers to `roachprod`. It is
required to be able to keep track of multiple services that are running on the
same VM. For example running multiple external SQL processes and a shared
process on the same VM. A service denotes a unit of functionality provided on a
set of given hosts and ports(i.e., a singular service can be served by multiple
VMs)

An implementation for local clusters is introduced as well as an interface for
managing DNS records. The local implementation does not use true DNS services,
but emulates it through local storage. This is to avoid the complexities of a
per operating system implementation for local-only DNS resolution. The feature
is currently disabled for cloud clusters. Once a DNS cloud provider
implementation is completed for the interface the feature can be enabled more
widely.

Each VM is required to have a public DNS name for the services functionality to
function. The typical layout of DNS records for services pertaining to a cluster
will look as follows:

`_<tenant>-<service_type>._<proto>.<cluster>.<dns_zone> -> host:port`
For example:
```
_system-sql._tcp.cluster-name.roachprod-managed.crdb.io -> cluster-name-0001.roachprod.crdb.io : sql port (26257)
_system-sql._tcp.cluster-name.roachprod-managed.crdb.io -> cluster-name-0002.roachprod.crdb.io : sql port (26257)
_system-sql._tcp.cluster-name.roachprod-managed.crdb.io -> cluster-name-0003.roachprod.crdb.io : sql port (26257)
```

When multiple tenants are running on the same VM (0001) the records will take this form:
```
_tenant1-sql._tcp.cluster-name.roachprod-managed.crdb.io -> cluster-name-0001.roachprod.crdb.io : sql port (29001)
_tenant2-sql._tcp.cluster-name.roachprod-managed.crdb.io -> cluster-name-0001.roachprod.crdb.io : sql port (29002)
```
Note the DNS SRV records here point to the same host, but different ports.

Services will also need to find open ports on VMs when multiple services are
running on the VM to avoid a port collision. To facilitate this a new script has
been added to scan for open ports given a starting port. Having nondeterministic
ports introduces a dependency on the DNS service to be able to determine which
services are running on which ports.

Providers are currently not required to provide a DNS implementation, nor are
VMs required to specify a DNS Provider. The VM functions that invoke DNS
operations are lenient in order to allow falling back to default service-less
topologies. For now a fallback is provided to supply the default ports if no DNS
provider is present.

It is now possible to specify ports for SQL and UI services via `StartOpts`.
These ports are used when registering DNS records against a DNS provider. This
provides the functionality to not only start external service processes on
different ports, but the host cluster as well. To facilitate this the port
resolution is done prior to starting a cluster, as nodes need the information
beforehand to be able to join the cluster.

A few things are not covered in this change:
1. The `roachprod` log and stop commands need revisiting to support more than
one process on a single VM.
2. Implementations of the DNS interface for a cloud provider.
3. Update all areas where connection information is assumed rather than queried.

Epic: [CRDB-18499](https://cockroachlabs.atlassian.net/browse/CRDB-18499)
Release note: None

Co-authored-by: Herko Lategan <[email protected]>
  • Loading branch information
craig[bot] and herkolategan committed Aug 21, 2023
2 parents 2ac44e6 + c78021e commit b4623a5
Show file tree
Hide file tree
Showing 34 changed files with 1,373 additions and 177 deletions.
3 changes: 3 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ ALL_TESTS = [
"//pkg/roachprod/prometheus:prometheus_test",
"//pkg/roachprod/ssh:ssh_test",
"//pkg/roachprod/vm/gce:gce_test",
"//pkg/roachprod/vm/local:local_test",
"//pkg/roachprod/vm:vm_test",
"//pkg/rpc/nodedialer:nodedialer_test",
"//pkg/rpc:rpc_test",
Expand Down Expand Up @@ -1475,6 +1476,7 @@ GO_TARGETS = [
"//pkg/roachprod/errors:errors",
"//pkg/roachprod/install:install",
"//pkg/roachprod/install:install_test",
"//pkg/roachprod/lock:lock",
"//pkg/roachprod/logger:logger",
"//pkg/roachprod/prometheus:prometheus",
"//pkg/roachprod/prometheus:prometheus_test",
Expand All @@ -1489,6 +1491,7 @@ GO_TARGETS = [
"//pkg/roachprod/vm/gce:gce",
"//pkg/roachprod/vm/gce:gce_test",
"//pkg/roachprod/vm/local:local",
"//pkg/roachprod/vm/local:local_test",
"//pkg/roachprod/vm:vm",
"//pkg/roachprod/vm:vm_test",
"//pkg/roachprod:roachprod",
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/roachprod/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ Default is "RECURRING '*/15 * * * *' FULL BACKUP '@hourly' WITH SCHEDULE OPTIONS
cmd.Flags().BoolVar(&secure,
"secure", false, "use a secure cluster")
}
for _, cmd := range []*cobra.Command{pgurlCmd, sqlCmd} {
for _, cmd := range []*cobra.Command{pgurlCmd, sqlCmd, adminurlCmd} {
cmd.Flags().StringVar(&tenantName,
"tenant-name", "", "specific tenant to connect to")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/roachprod/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,7 @@ var adminurlCmd = &cobra.Command{
`,
Args: cobra.ExactArgs(1),
Run: wrap(func(cmd *cobra.Command, args []string) error {
urls, err := roachprod.AdminURL(config.Logger, args[0], adminurlPath, adminurlIPs, adminurlOpen, secure)
urls, err := roachprod.AdminURL(config.Logger, args[0], tenantName, adminurlPath, adminurlIPs, adminurlOpen, secure)
if err != nil {
return err
}
Expand Down
25 changes: 17 additions & 8 deletions pkg/cmd/roachtest/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2341,8 +2341,7 @@ func addrToAdminUIAddr(addr string) (string, error) {
if err != nil {
return "", err
}
// Roachprod makes Admin UI's port to be node's port + 1.
return fmt.Sprintf("%s:%d", host, webPort+1), nil
return fmt.Sprintf("%s:%d", host, webPort), nil
}

func urlToAddr(pgURL string) (string, error) {
Expand Down Expand Up @@ -2379,12 +2378,17 @@ func (c *clusterImpl) InternalAdminUIAddr(
ctx context.Context, l *logger.Logger, node option.NodeListOption,
) ([]string, error) {
var addrs []string
urls, err := c.InternalAddr(ctx, l, node)
internalAddrs, err := roachprod.AdminURL(l, c.MakeNodes(node), "", "",
false, false, false)
if err != nil {
return nil, err
}
for _, u := range urls {
adminUIAddr, err := addrToAdminUIAddr(u)
for _, u := range internalAddrs {
addr, err := urlToAddr(u)
if err != nil {
return nil, err
}
adminUIAddr, err := addrToAdminUIAddr(addr)
if err != nil {
return nil, err
}
Expand All @@ -2396,15 +2400,20 @@ func (c *clusterImpl) InternalAdminUIAddr(
// ExternalAdminUIAddr returns the external Admin UI address in the form host:port
// for the specified node.
func (c *clusterImpl) ExternalAdminUIAddr(
ctx context.Context, l *logger.Logger, node option.NodeListOption,
_ context.Context, l *logger.Logger, node option.NodeListOption,
) ([]string, error) {
var addrs []string
externalAddrs, err := c.ExternalAddr(ctx, l, node)
externalAddrs, err := roachprod.AdminURL(l, c.MakeNodes(node), "", "",
true, false, false)
if err != nil {
return nil, err
}
for _, u := range externalAddrs {
adminUIAddr, err := addrToAdminUIAddr(u)
addr, err := urlToAddr(u)
if err != nil {
return nil, err
}
adminUIAddr, err := addrToAdminUIAddr(addr)
if err != nil {
return nil, err
}
Expand Down
35 changes: 12 additions & 23 deletions pkg/cmd/roachtest/tests/cluster_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,37 +33,26 @@ import (
func runClusterInit(ctx context.Context, t test.Test, c cluster.Cluster) {
c.Put(ctx, t.Cockroach(), "./cockroach")

t.L().Printf("retrieving VM addresses")
addrs, err := c.InternalAddr(ctx, t.L(), c.All())
if err != nil {
t.Fatal(err)
}

// TODO(tbg): this should never happen, but I saw it locally. The result
// is the test hanging forever, because all nodes will create their own
// single node cluster and waitForFullReplication never returns.
if addrs[0] == "" {
t.Fatal("no address for first node")
}

// We start all nodes with the same join flags and then issue an "init"
// command to one of the nodes. We do this twice, since roachtest has some
// special casing for the first node in a cluster (the join flags of all nodes
// default to just the first node) and we want to make sure that we're not
// relying on it.
startOpts := option.DefaultStartOpts()

// We don't want roachprod to auto-init this cluster.
startOpts.RoachprodOpts.SkipInit = true

// We need to point all nodes at all other nodes. By default,
// roachprod will point all nodes at the first node, but this
// won't allow init'ing any but the first node - we require
// that all nodes can discover the init'ed node (transitively)
// via the join targets.
startOpts.RoachprodOpts.JoinTargets = c.All()

for _, initNode := range []int{2, 1} {
c.Wipe(ctx, false /* preserveCerts */)
t.L().Printf("starting test with init node %d", initNode)
startOpts := option.DefaultStartOpts()

// We don't want roachprod to auto-init this cluster.
startOpts.RoachprodOpts.SkipInit = true
// We need to point all nodes at all other nodes. By default
// roachprod will point all nodes at the first node, but this
// won't allow init'ing any but the first node - we require
// that all nodes can discover the init'ed node (transitively)
// via their join flags.
startOpts.RoachprodOpts.ExtraArgs = append(startOpts.RoachprodOpts.ExtraArgs, "--join="+strings.Join(addrs, ","))
c.Start(ctx, t.L(), startOpts, install.MakeClusterSettings())

urlMap := make(map[int]string)
Expand Down
6 changes: 1 addition & 5 deletions pkg/cmd/roachtest/tests/decommission.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,13 +349,9 @@ func runDecommission(
db := c.Conn(ctx, t.L(), pinnedNode)
defer db.Close()

internalAddrs, err := c.InternalAddr(ctx, t.L(), c.Node(pinnedNode))
if err != nil {
return err
}
startOpts := option.DefaultStartSingleNodeOpts()
startOpts.RoachprodOpts.JoinTargets = []int{pinnedNode}
extraArgs := []string{
"--join", internalAddrs[0],
fmt.Sprintf("--attrs=node%d", node),
}
startOpts.RoachprodOpts.ExtraArgs = append(startOpts.RoachprodOpts.ExtraArgs, extraArgs...)
Expand Down
2 changes: 1 addition & 1 deletion pkg/roachprod/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"//pkg/roachprod/cloud",
"//pkg/roachprod/config",
"//pkg/roachprod/install",
"//pkg/roachprod/lock",
"//pkg/roachprod/logger",
"//pkg/roachprod/prometheus",
"//pkg/roachprod/vm",
Expand All @@ -30,6 +31,5 @@ go_library(
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_errors//oserror",
"@org_golang_x_sys//unix",
],
)
8 changes: 7 additions & 1 deletion pkg/roachprod/cloud/cluster_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,13 +267,19 @@ func CreateCluster(

// DestroyCluster TODO(peter): document
func DestroyCluster(l *logger.Logger, c *Cluster) error {
return vm.FanOut(c.VMs, func(p vm.Provider, vms vm.List) error {
err := vm.FanOut(c.VMs, func(p vm.Provider, vms vm.List) error {
// Enable a fast-path for providers that can destroy a cluster in one shot.
if x, ok := p.(vm.DeleteCluster); ok {
return x.DeleteCluster(l, c.Name)
}
return p.Delete(l, vms)
})
if err != nil {
return err
}
return vm.FanOutDNS(c.VMs, func(p vm.DNSProvider, vms vm.List) error {
return p.DeleteRecordsBySubdomain(c.Name)
})
}

// ExtendCluster TODO(peter): document
Expand Down
10 changes: 6 additions & 4 deletions pkg/roachprod/clusters_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,13 @@ func readSyncedClusters(key string) (*cloud.Cluster, bool) {
// InitDirs initializes the directories for storing cluster metadata and debug
// logs.
func InitDirs() error {
cd := os.ExpandEnv(config.ClustersDir)
if err := os.MkdirAll(cd, 0755); err != nil {
return err
dirs := []string{config.ClustersDir, config.DefaultDebugDir, config.DNSDir}
for _, dir := range dirs {
if err := os.MkdirAll(os.ExpandEnv(dir), 0755); err != nil {
return err
}
}
return os.MkdirAll(os.ExpandEnv(config.DefaultDebugDir), 0755)
return nil
}

// saveCluster creates (or overwrites) the file in config.ClusterDir storing the
Expand Down
10 changes: 10 additions & 0 deletions pkg/roachprod/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ const (
// ClustersDir is the directory where we cache information about clusters.
ClustersDir = "${HOME}/.roachprod/clusters"

// DefaultLockPath is the path to the lock file used to synchronize access to
// shared roachprod resources.
DefaultLockPath = "$HOME/.roachprod/LOCK"

// DNSDir is the directory where we cache local cluster DNS information.
DNSDir = "${HOME}/.roachprod/dns"

// SharedUser is the linux username for shared use on all vms.
SharedUser = "ubuntu"

Expand All @@ -86,6 +93,9 @@ const (
// listening for HTTP connections for the Admin UI.
DefaultAdminUIPort = 26258

// DefaultOpenPortStart is the default starting range used to find open ports.
DefaultOpenPortStart = 29000

// DefaultNumFilesLimit is the default limit on the number of files that can
// be opened by the process.
DefaultNumFilesLimit = 65 << 13
Expand Down
5 changes: 5 additions & 0 deletions pkg/roachprod/install/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ go_library(
"install.go",
"iterm2.go",
"nodes.go",
"services.go",
"session.go",
"staging.go",
],
embedsrcs = [
"scripts/download.sh",
"scripts/start.sh",
"scripts/open_ports.sh",
],
importpath = "github.com/cockroachdb/cockroach/pkg/roachprod/install",
visibility = ["//visibility:public"],
Expand Down Expand Up @@ -49,15 +51,18 @@ go_test(
name = "install_test",
srcs = [
"cluster_synced_test.go",
"services_test.go",
"staging_test.go",
"start_template_test.go",
],
args = ["-test.timeout=295s"],
data = glob(["testdata/**"]),
embed = [":install"],
deps = [
"//pkg/roachprod/cloud",
"//pkg/roachprod/logger",
"//pkg/roachprod/vm",
"//pkg/roachprod/vm/local",
"//pkg/testutils/datapathutils",
"//pkg/util/retry",
"@com_github_cockroachdb_datadriven//:datadriven",
Expand Down
19 changes: 16 additions & 3 deletions pkg/roachprod/install/cluster_synced.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,12 @@ func (c *SyncedCluster) Monitor(
defer wg.Done()

node := nodes[i]

port, err := c.NodePort(node)
if err != nil {
err := errors.Wrap(err, "failed to get node port")
sendEvent(NodeMonitorInfo{Node: node, Event: MonitorError{err}})
return
}
// On each monitored node, we loop looking for a cockroach process.
data := struct {
OneShot bool
Expand All @@ -678,7 +683,7 @@ func (c *SyncedCluster) Monitor(
OneShot: opts.OneShot,
IgnoreEmpty: opts.IgnoreEmptyNodes,
Store: c.NodeDir(node, 1 /* storeIndex */),
Port: c.NodePort(node),
Port: port,
Local: c.IsLocal(),
Separator: separator,
SkippedMsg: skippedMsg,
Expand Down Expand Up @@ -2345,7 +2350,15 @@ func (c *SyncedCluster) pgurls(
}
m := make(map[Node]string, len(hosts))
for node, host := range hosts {
m[node] = c.NodeURL(host, c.NodePort(node), tenantName)
desc, err := c.DiscoverService(node, tenantName, ServiceTypeSQL)
if err != nil {
return nil, err
}
sharedTenantName := ""
if desc.ServiceMode == ServiceModeShared {
sharedTenantName = tenantName
}
m[node] = c.NodeURL(host, desc.Port, sharedTenantName)
}
return m, nil
}
Expand Down
Loading

0 comments on commit b4623a5

Please sign in to comment.