From f13b01dc30d226db9e216defc105004aed991664 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Tue, 23 May 2023 01:16:19 -0400 Subject: [PATCH] roach{prod,test}: add first-class support for disk snapshots Long-lived disk snapshots can drastically reduce testing time for scale tests. Tests, whether run by hand or through CI, need only run the long running fixture generating code (importing some dataset, generating it organically through workload, etc.) once snapshot fingerprints are changed, fingerprints that incorporate the major crdb version that generated them. Here's an example run that freshly generates disk snapshots: === RUN admission-control/index-backfill 03:57:19 admission_control_index_backfill.go:53: no existing snapshots found for admission-control/index-backfill (ac-index-backfill), doing pre-work 03:57:54 roachprod.go:1626: created volume snapshot ac-index-backfill-0001-vunknown-1-n2-standard-8 (id=6426236595187320652) for volume irfansharif-snapshot-0001-1 on irfansharif-snapshot-0001-1/n1 03:57:55 admission_control_index_backfill.go:61: using 1 newly created snapshot(s) with prefix "ac-index-backfill" 03:58:02 roachprod.go:1716: detached and deleted volume irfansharif-snapshot-0001-1 from irfansharif-snapshot-0001 03:58:28 roachprod.go:1764: created volume irfansharif-snapshot-0001-1 03:58:33 roachprod.go:1770: attached volume irfansharif-snapshot-0001-1 to irfansharif-snapshot-0001 03:58:36 roachprod.go:1783: mounted irfansharif-snapshot-0001-1 to irfansharif-snapshot-0001 --- PASS: admission-control/index-backfill (79.14s) Here's a subsequent run that makes use of the aforementioned disk snapshot: === RUN admission-control/index-backfill 04:00:40 admission_control_index_backfill.go:63: using 1 pre-existing snapshot(s) with prefix "ac-index-backfill" 04:00:47 roachprod.go:1716: detached and deleted volume irfansharif-snapshot-0001-1 from irfansharif-snapshot-0001 04:01:14 roachprod.go:1763: created volume irfansharif-snapshot-0001-1 04:01:19 roachprod.go:1769: attached volume irfansharif-snapshot-0001-1 to irfansharif-snapshot-0001 04:01:22 roachprod.go:1782: mounted irfansharif-snapshot-0001-1 to irfansharif-snapshot-0001 --- PASS: admission-control/index-backfill (43.47s) We add the following APIs to the roachtest.Cluster interface, for tests to interact with disk snapshots. admission-control/index-backfill is a placeholder test making use of these APIs. type Cluster interface { // ... // CreateSnapshot creates volume snapshots of the cluster using // the given prefix. These snapshots can later be retrieved, // deleted or applied to already instantiated clusters. CreateSnapshot(ctx context.Context, snapshotPrefix string) error // ListSnapshots lists the individual volume snapshots that // satisfy the search criteria. ListSnapshots( ctx context.Context, vslo vm.VolumeSnapshotListOpts, ) ([]vm.VolumeSnapshot, error) // DeleteSnapshots permanently deletes the given snapshots. DeleteSnapshots( ctx context.Context, snapshots ...vm.VolumeSnapshot, ) error // ApplySnapshots applies the given volume snapshots to the // underlying cluster. This is a destructive operation as far as // existing state is concerned - all already-attached volumes are // detached and deleted to make room for new snapshot-derived // volumes. The new volumes are created using the same specs // (size, disk type, etc.) as the original cluster. ApplySnapshots( ctx context.Context, snapshots []vm.VolumeSnapshot, ) error } This in turn is powered by the following additions to the vm.Provider interface, implemented by each cloud provider. type Provider interface { // ... // CreateVolume creates a new volume using the given options. CreateVolume(l *logger.Logger, vco VolumeCreateOpts) (Volume, error) // ListVolumes lists all volumes already attached to the given VM. ListVolumes(l *logger.Logger, vm *VM) ([]Volume, error) // DeleteVolume detaches and deletes the given volume from the // given VM. DeleteVolume(l *logger.Logger, volume Volume, vm *VM) error // AttachVolume attaches the given volume to the given VM. AttachVolume(l *logger.Logger, volume Volume, vm *VM) (string, error) // CreateVolumeSnapshot creates a snapshot of the given volume, // using the given options. CreateVolumeSnapshot( l *logger.Logger, volume Volume, vsco VolumeSnapshotCreateOpts, ) (VolumeSnapshot, error) // ListVolumeSnapshots lists the individual volume snapshots that // satisfy the search criteria. ListVolumeSnapshots( l *logger.Logger, vslo VolumeSnapshotListOpts, ) ([]VolumeSnapshot, error) // DeleteVolumeSnapshot permanently deletes the given snapshot. DeleteVolumeSnapshot(l *logger.Logger, snapshot VolumeSnapshot) error } Since these snapshots necessarily outlive the tests, and we don't want them dangling perpetually, we introduce a prune-dangling roachtest that acts as a poor man's cron job, sifting through expired snapshots (>30days) and deleting them. For GCE at least it's not obvious to me how to create these snapshots in cloud buckets with a TTL built in, hence this hack. It looks like this (with change to the TTL): === RUN prune-dangling 06:22:48 prune_dangling_snapshots_and_disks.go:54: pruned old snapshot ac-index-backfill-0001-vunknown-1-n2-standard-8 (id=7962137245497025996) 06:22:48 test_runner.go:1023: tearing down after success; see teardown.log --- PASS: prune-dangling (8.59s) Subsequent commits will: - [ ] Fill out admission-control/index-backfill, a non-trivial use of disk snapshots. It will cut down the test time from >4hrs to <25m. - [ ] Expose top-level commands in roachprod to manipulate these snapshots. Release note: None --- pkg/cmd/roachprod/main.go | 5 +- pkg/cmd/roachtest/cluster.go | 34 ++ pkg/cmd/roachtest/cluster/BUILD.bazel | 1 + .../roachtest/cluster/cluster_interface.go | 19 ++ pkg/cmd/roachtest/spec/cluster_spec.go | 14 +- pkg/cmd/roachtest/spec/option.go | 11 + pkg/cmd/roachtest/tests/BUILD.bazel | 3 + pkg/cmd/roachtest/tests/admission_control.go | 1 + .../tests/admission_control_index_backfill.go | 83 +++++ .../prune_dangling_snapshots_and_disks.go | 63 ++++ pkg/cmd/roachtest/tests/registry.go | 1 + pkg/roachprod/cloud/cluster_cloud.go | 22 +- pkg/roachprod/roachprod.go | 237 +++++++++++--- pkg/roachprod/vm/aws/aws.go | 41 ++- pkg/roachprod/vm/azure/azure.go | 28 +- pkg/roachprod/vm/flagstub/flagstub.go | 32 +- pkg/roachprod/vm/gce/gcloud.go | 293 ++++++++++++++---- pkg/roachprod/vm/local/local.go | 28 +- pkg/roachprod/vm/vm.go | 80 ++++- 19 files changed, 848 insertions(+), 148 deletions(-) create mode 100644 pkg/cmd/roachtest/tests/admission_control_index_backfill.go create mode 100644 pkg/cmd/roachtest/tests/prune_dangling_snapshots_and_disks.go diff --git a/pkg/cmd/roachprod/main.go b/pkg/cmd/roachprod/main.go index 3e1895a59729..2cdcdcd7b5f1 100644 --- a/pkg/cmd/roachprod/main.go +++ b/pkg/cmd/roachprod/main.go @@ -1125,7 +1125,10 @@ var storageSnapshotCmd = &cobra.Command{ cluster := args[0] name := args[1] desc := args[2] - return roachprod.SnapshotVolume(context.Background(), config.Logger, cluster, name, desc) + return roachprod.CreateSnapshot(context.Background(), config.Logger, cluster, vm.VolumeSnapshotCreateOpts{ + Name: name, + Description: desc, + }) }), } diff --git a/pkg/cmd/roachtest/cluster.go b/pkg/cmd/roachtest/cluster.go index 3e2222d1a3ae..b2d348251f00 100644 --- a/pkg/cmd/roachtest/cluster.go +++ b/pkg/cmd/roachtest/cluster.go @@ -574,6 +574,9 @@ func MachineTypeToCPUs(s string) int { if _, err := fmt.Sscanf(s, "n1-standard-%d", &v); err == nil { return v } + if _, err := fmt.Sscanf(s, "n2-standard-%d", &v); err == nil { + return v + } if _, err := fmt.Sscanf(s, "n1-highcpu-%d", &v); err == nil { return v } @@ -1690,6 +1693,37 @@ func (c *clusterImpl) doDestroy(ctx context.Context, l *logger.Logger) <-chan st return ch } +func (c *clusterImpl) ListSnapshots( + ctx context.Context, vslo vm.VolumeSnapshotListOpts, +) ([]vm.VolumeSnapshot, error) { + return roachprod.ListSnapshots(ctx, c.l, c.name, vslo) +} + +func (c *clusterImpl) DeleteSnapshots(ctx context.Context, snapshots ...vm.VolumeSnapshot) error { + return roachprod.DeleteSnapshots(ctx, c.l, c.name, snapshots...) +} + +func (c *clusterImpl) CreateSnapshot(ctx context.Context, snapshotPrefix string) error { + return roachprod.CreateSnapshot(ctx, c.l, c.name, vm.VolumeSnapshotCreateOpts{ + Name: snapshotPrefix, + Description: fmt.Sprintf("snapshot for test: %s", c.t.Name()), + Labels: map[string]string{ + vm.TagUsage: "roachtest", + }, + }) +} + +func (c *clusterImpl) ApplySnapshots(ctx context.Context, snapshots []vm.VolumeSnapshot) error { + opts := vm.VolumeCreateOpts{ + Size: c.spec.VolumeSize, + Type: c.spec.GCEVolumeType, // TODO(irfansharif): This is only applicable to GCE. Change that. + Labels: map[string]string{ + "usage": "roachtest", + }, + } + return roachprod.ApplySnapshots(ctx, c.l, c.name, snapshots, opts) +} + // Put a local file to all of the machines in a cluster. // Put is DEPRECATED. Use PutE instead. func (c *clusterImpl) Put(ctx context.Context, src, dest string, nodes ...option.Option) { diff --git a/pkg/cmd/roachtest/cluster/BUILD.bazel b/pkg/cmd/roachtest/cluster/BUILD.bazel index afd151fae747..a5c0b9f53899 100644 --- a/pkg/cmd/roachtest/cluster/BUILD.bazel +++ b/pkg/cmd/roachtest/cluster/BUILD.bazel @@ -16,6 +16,7 @@ go_library( "//pkg/roachprod/install", "//pkg/roachprod/logger", "//pkg/roachprod/prometheus", + "//pkg/roachprod/vm", "@com_github_cockroachdb_errors//:errors", ], ) diff --git a/pkg/cmd/roachtest/cluster/cluster_interface.go b/pkg/cmd/roachtest/cluster/cluster_interface.go index e0a1c8885d4a..b8d4c6991a7e 100644 --- a/pkg/cmd/roachtest/cluster/cluster_interface.go +++ b/pkg/cmd/roachtest/cluster/cluster_interface.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachprod/install" "github.com/cockroachdb/cockroach/pkg/roachprod/logger" "github.com/cockroachdb/cockroach/pkg/roachprod/prometheus" + "github.com/cockroachdb/cockroach/pkg/roachprod/vm" ) // Cluster is the interface through which a given roachtest interacts with the @@ -134,4 +135,22 @@ type Cluster interface { StartGrafana(ctx context.Context, l *logger.Logger, promCfg *prometheus.Config) error StopGrafana(ctx context.Context, l *logger.Logger, dumpDir string) error + + // Volume snapshot related APIs. + + // CreateSnapshot creates volume snapshots of the cluster using the given + // prefix. These snapshots can later be retrieved, deleted or applied to + // already instantiated clusters. + CreateSnapshot(ctx context.Context, snapshotPrefix string) error + // ListSnapshots lists the individual volume snapshots that satisfy the + // search criteria. + ListSnapshots(ctx context.Context, vslo vm.VolumeSnapshotListOpts) ([]vm.VolumeSnapshot, error) + // DeleteSnapshots permanently deletes the given snapshots. + DeleteSnapshots(ctx context.Context, snapshots ...vm.VolumeSnapshot) error + // ApplySnapshots applies the given volume snapshots to the underlying + // cluster. This is a destructive operation as far as existing state is + // concerned - all already-attached volumes are detached and deleted to make + // room for new snapshot-derived volumes. The new volumes are created using + // the same specs (size, disk type, etc.) as the original cluster. + ApplySnapshots(ctx context.Context, snapshots []vm.VolumeSnapshot) error } diff --git a/pkg/cmd/roachtest/spec/cluster_spec.go b/pkg/cmd/roachtest/spec/cluster_spec.go index 0a381b0c1cfe..856e0d6eb708 100644 --- a/pkg/cmd/roachtest/spec/cluster_spec.go +++ b/pkg/cmd/roachtest/spec/cluster_spec.go @@ -85,6 +85,13 @@ type ClusterSpec struct { RandomlyUseZfs bool GatherCores bool + + // GCE-specific arguments. + // + // TODO(irfansharif): This cluster spec type suffers the curse of + // generality. Make it easier to just inject cloud-specific arguments. + GCEMinCPUPlatform string + GCEVolumeType string } // MakeClusterSpec makes a ClusterSpec. @@ -155,6 +162,7 @@ func getGCEOpts( localSSD bool, RAID0 bool, terminateOnMigration bool, + minCPUPlatform, volumeType string, ) vm.ProviderOpts { opts := gce.DefaultProviderOpts() opts.MachineType = machineType @@ -173,6 +181,8 @@ func getGCEOpts( opts.UseMultipleDisks = !RAID0 } opts.TerminateOnMigration = terminateOnMigration + opts.MinCPUPlatform = minCPUPlatform + opts.PDVolumeType = volumeType return opts } @@ -289,7 +299,9 @@ func (s *ClusterSpec) RoachprodOpts( providerOpts = getAWSOpts(machineType, zones, s.VolumeSize, createVMOpts.SSDOpts.UseLocalSSD) case GCE: providerOpts = getGCEOpts(machineType, zones, s.VolumeSize, ssdCount, - createVMOpts.SSDOpts.UseLocalSSD, s.RAID0, s.TerminateOnMigration) + createVMOpts.SSDOpts.UseLocalSSD, s.RAID0, s.TerminateOnMigration, + s.GCEMinCPUPlatform, s.GCEVolumeType, + ) case Azure: providerOpts = getAzureOpts(machineType, zones) } diff --git a/pkg/cmd/roachtest/spec/option.go b/pkg/cmd/roachtest/spec/option.go index 1523255c12fe..5060967c9766 100644 --- a/pkg/cmd/roachtest/spec/option.go +++ b/pkg/cmd/roachtest/spec/option.go @@ -17,6 +17,17 @@ type Option interface { apply(spec *ClusterSpec) } +type cloudOption string + +func (o cloudOption) apply(spec *ClusterSpec) { + spec.Cloud = string(o) +} + +// Cloud controls what cloud is used to create the cluster. +func Cloud(s string) Option { + return cloudOption(s) +} + type nodeCPUOption int func (o nodeCPUOption) apply(spec *ClusterSpec) { diff --git a/pkg/cmd/roachtest/tests/BUILD.bazel b/pkg/cmd/roachtest/tests/BUILD.bazel index a54d3370c973..387bbcc490a9 100644 --- a/pkg/cmd/roachtest/tests/BUILD.bazel +++ b/pkg/cmd/roachtest/tests/BUILD.bazel @@ -11,6 +11,7 @@ go_library( "admission_control.go", "admission_control_elastic_backup.go", "admission_control_elastic_cdc.go", + "admission_control_index_backfill.go", "admission_control_index_overload.go", "admission_control_multi_store_overload.go", "admission_control_multitenant_fairness.go", @@ -118,6 +119,7 @@ go_library( "pgx_blocklist.go", "pop.go", "process_lock.go", + "prune_dangling_snapshots_and_disks.go", "psycopg.go", "psycopg_blocklist.go", "query_comparison_util.go", @@ -211,6 +213,7 @@ go_library( "//pkg/roachprod/install", "//pkg/roachprod/logger", "//pkg/roachprod/prometheus", + "//pkg/roachprod/vm", "//pkg/server", "//pkg/server/serverpb", "//pkg/sql", diff --git a/pkg/cmd/roachtest/tests/admission_control.go b/pkg/cmd/roachtest/tests/admission_control.go index 637258f73be1..70b0018ae267 100644 --- a/pkg/cmd/roachtest/tests/admission_control.go +++ b/pkg/cmd/roachtest/tests/admission_control.go @@ -36,4 +36,5 @@ func registerAdmission(r registry.Registry) { registerTPCCOverload(r) registerTPCCSevereOverload(r) registerIndexOverload(r) + registerIndexBackfill(r) } diff --git a/pkg/cmd/roachtest/tests/admission_control_index_backfill.go b/pkg/cmd/roachtest/tests/admission_control_index_backfill.go new file mode 100644 index 000000000000..f9565af043b3 --- /dev/null +++ b/pkg/cmd/roachtest/tests/admission_control_index_backfill.go @@ -0,0 +1,83 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tests + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" + "github.com/cockroachdb/cockroach/pkg/roachprod/vm" +) + +func registerIndexBackfill(r registry.Registry) { + clusterSpec := r.MakeClusterSpec( + 1, /* nodeCount */ + spec.CPU(8), + spec.Zones("us-east1-b"), + spec.VolumeSize(500), + spec.Cloud(spec.GCE), + ) + clusterSpec.InstanceType = "n2-standard-8" + clusterSpec.GCEMinCPUPlatform = "Intel Ice Lake" + clusterSpec.GCEVolumeType = "pd-ssd" + + r.Add(registry.TestSpec{ + Name: "admission-control/index-backfill", + Owner: registry.OwnerAdmissionControl, + // TODO(irfansharif): Reduce to weekly cadence once stabilized. + // Tags: registry.Tags(`weekly`), + Cluster: clusterSpec, + RequiresLicense: true, + Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { + // TODO(irfansharif): Make a registry of these prefix strings. It's + // important no registered name is a prefix of another. + const snapshotPrefix = "ac-index-backfill" + + var snapshots []vm.VolumeSnapshot + snapshots, err := c.ListSnapshots(ctx, vm.VolumeSnapshotListOpts{ + // TODO(irfansharif): Search by taking in the other parts of the + // snapshot fingerprint, i.e. the node count, the version, etc. + Name: snapshotPrefix, + }) + if err != nil { + t.Fatal(err) + } + if len(snapshots) == 0 { + t.L().Printf("no existing snapshots found for %s (%s), doing pre-work", t.Name(), snapshotPrefix) + // TODO(irfansharif): Add validation that we're some released + // version, probably the predecessor one. Also ensure that any + // running CRDB processes have been stopped since we're taking + // raw disk snapshots. Also later we'll be unmounting/mounting + // attached volumes. + if err := c.CreateSnapshot(ctx, snapshotPrefix); err != nil { + t.Fatal(err) + } + snapshots, err = c.ListSnapshots(ctx, vm.VolumeSnapshotListOpts{Name: snapshotPrefix}) + if err != nil { + t.Fatal(err) + } + t.L().Printf("using %d newly created snapshot(s) with prefix %q", len(snapshots), snapshotPrefix) + } else { + t.L().Printf("using %d pre-existing snapshot(s) with prefix %q", len(snapshots), snapshotPrefix) + } + + if err := c.ApplySnapshots(ctx, snapshots); err != nil { + t.Fatal(err) + } + + // TODO(irfansharif): Actually do something using TPC-E, index + // backfills and replication admission control. + }, + }) +} diff --git a/pkg/cmd/roachtest/tests/prune_dangling_snapshots_and_disks.go b/pkg/cmd/roachtest/tests/prune_dangling_snapshots_and_disks.go new file mode 100644 index 000000000000..0527b7f49ec1 --- /dev/null +++ b/pkg/cmd/roachtest/tests/prune_dangling_snapshots_and_disks.go @@ -0,0 +1,63 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tests + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" + "github.com/cockroachdb/cockroach/pkg/roachprod" + "github.com/cockroachdb/cockroach/pkg/roachprod/vm" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" +) + +// This test exists only to prune expired snapshots. Not all cloud providers +// (GCE) let you store volume snapshots in buckets with a pre-configured TTL. So +// we use this nightly roachtest as a poor man's cron job. +func registerPruneDanglingSnapshotsAndDisks(r registry.Registry) { + clusterSpec := r.MakeClusterSpec( + 1, /* nodeCount */ + spec.Cloud(spec.GCE), + ) + + r.Add(registry.TestSpec{ + Name: "prune-dangling", + Owner: registry.OwnerTestEng, + Cluster: clusterSpec, + RequiresLicense: true, + Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { + snapshots, err := c.ListSnapshots(ctx, vm.VolumeSnapshotListOpts{ + CreatedBefore: timeutil.Now().Add(-1 * roachprod.SnapshotTTL), + Labels: map[string]string{ + vm.TagUsage: "roachtest", // only prune out snapshots created in tests + }, + }) + if err != nil { + t.Fatal(err) + } + + for _, snapshot := range snapshots { + if err := c.DeleteSnapshots(ctx, snapshot); err != nil { + t.Fatal(err) + } + t.L().Printf("pruned old snapshot %s (id=%s)", snapshot.Name, snapshot.ID) + } + + // TODO(irfansharif): Also prune out unattached disks. Use something + // like: + // + // gcloud compute --project $project disks list --filter="-users:*" + }, + }) +} diff --git a/pkg/cmd/roachtest/tests/registry.go b/pkg/cmd/roachtest/tests/registry.go index 8e50c197413c..7f374ff41450 100644 --- a/pkg/cmd/roachtest/tests/registry.go +++ b/pkg/cmd/roachtest/tests/registry.go @@ -100,6 +100,7 @@ func RegisterTests(r registry.Registry) { registerPop(r) registerProcessLock(r) registerPsycopg(r) + registerPruneDanglingSnapshotsAndDisks(r) registerQueue(r) registerQuitTransfersLeases(r) registerRebalanceLoad(r) diff --git a/pkg/roachprod/cloud/cluster_cloud.go b/pkg/roachprod/cloud/cluster_cloud.go index 5ea3bc3da24d..5b8e46699c59 100644 --- a/pkg/roachprod/cloud/cluster_cloud.go +++ b/pkg/roachprod/cloud/cluster_cloud.go @@ -15,7 +15,6 @@ import ( "fmt" "regexp" "sort" - "strings" "time" "github.com/cockroachdb/cockroach/pkg/roachprod/config" @@ -164,21 +163,6 @@ func (c *Cluster) IsLocal() bool { return config.IsLocalClusterName(c.Name) } -const vmNameFormat = "user--" - -// namesFromVM determines the user name and the cluster name from a VM. -func namesFromVM(v vm.VM) (userName string, clusterName string, _ error) { - if v.IsLocal() { - return config.Local, v.LocalClusterName, nil - } - name := v.Name - parts := strings.Split(name, "-") - if len(parts) < 3 { - return "", "", fmt.Errorf("expected VM name in the form %s, got %s", vmNameFormat, name) - } - return parts[0], strings.Join(parts[:len(parts)-1], "-"), nil -} - // ListCloud returns information about all instances (across all available // providers). func ListCloud(l *logger.Logger, options vm.ListOptions) (*Cloud, error) { @@ -207,7 +191,11 @@ func ListCloud(l *logger.Logger, options vm.ListOptions) (*Cloud, error) { for _, vms := range providerVMs { for _, v := range vms { // Parse cluster/user from VM name, but only for non-local VMs - userName, clusterName, err := namesFromVM(v) + userName, err := v.UserName() + if err != nil { + v.Errors = append(v.Errors, vm.ErrInvalidName) + } + clusterName, err := v.ClusterName() if err != nil { v.Errors = append(v.Errors, vm.ErrInvalidName) } diff --git a/pkg/roachprod/roachprod.go b/pkg/roachprod/roachprod.go index 872a4af94ccc..5d6047e4c954 100644 --- a/pkg/roachprod/roachprod.go +++ b/pkg/roachprod/roachprod.go @@ -1573,10 +1573,13 @@ func PrometheusSnapshot( return nil } -// SnapshotVolume snapshots any of the volumes attached to the nodes in a -// cluster specification. -func SnapshotVolume( - ctx context.Context, l *logger.Logger, clusterName, name, description string, +// SnapshotTTL controls how long volume snapshots are kept around. +const SnapshotTTL = 30 * 24 * time.Hour // 30 days + +// CreateSnapshot snapshots all the persistent volumes attached to nodes in the +// named cluster. +func CreateSnapshot( + ctx context.Context, l *logger.Logger, clusterName string, vsco vm.VolumeSnapshotCreateOpts, ) error { if err := LoadClusters(); err != nil { return err @@ -1590,49 +1593,212 @@ func SnapshotVolume( if err != nil { return err } + for nodeSpecIdx, nodeID := range nodes { cVM := c.VMs[nodeID-1] + crdbVersion := nodesStatus[nodeSpecIdx].Version + if crdbVersion == "" { + crdbVersion = "unknown" + } labels := map[string]string{ "roachprod-node-src-spec": cVM.MachineType, "roachprod-cluster-node": cVM.Name, - "roachprod-crdb-version": nodesStatus[nodeSpecIdx].Version, + "roachprod-crdb-version": crdbVersion, + vm.TagCluster: clusterName, + vm.TagRoachprod: "true", + vm.TagLifetime: SnapshotTTL.String(), + vm.TagCreated: strings.ToLower( + strings.ReplaceAll(timeutil.Now().Format(time.RFC3339), ":", "_")), // format according to gce label naming requirements } - foundMatchingVolume := false - if len(cVM.NonBootAttachedVolumes) == 0 { - l.Printf("Node %d does not have any non-bootable volumes attached. Did you run `sync --include-volumes`?", - nodeID) + for k, v := range vsco.Labels { + labels[k] = v } - for _, volume := range cVM.NonBootAttachedVolumes { - if isWorkloadCollectorVolume(volume) { - l.Printf("Creating snapshot for node %d volume %s\n", nodeID, volume.Name) - nameSuffix := "" - if len(nodes) != 1 { - nameSuffix = fmt.Sprintf("-%d", nodeID) - } - err := vm.ForProvider(cVM.Provider, func(provider vm.Provider) error { - sID, err := provider.SnapshotVolume(l, volume, name+nameSuffix, description, labels) - if err != nil { - return err - } - l.Printf("Created snapshot %s for volume %s (%s)\n", sID, volume.Name, volume.ProviderResourceID) - foundMatchingVolume = true - return nil - }) + + if err := vm.ForProvider(cVM.Provider, func(provider vm.Provider) error { + volumes, err := provider.ListVolumes(l, &cVM) + if err != nil { + return err + } + + if len(volumes) == 0 { + return fmt.Errorf("node %d does not have any non-bootable persistent volumes attached", nodeID) + } + + for _, volume := range volumes { + snapshotName := fmt.Sprintf("%s-%04d-v%s-%d-%s", vsco.Name, nodeID, crdbVersion, len(nodes), cVM.MachineType) + volumeSnapshot, err := provider.CreateVolumeSnapshot(l, volume, + vm.VolumeSnapshotCreateOpts{ + Name: snapshotName, + Labels: labels, + Description: vsco.Description, + }) if err != nil { return err } + l.Printf("created volume snapshot %s (id=%s) for volume %s on %s/n%d\n", + volumeSnapshot.Name, volumeSnapshot.ID, volume.Name, volume.ProviderResourceID, nodeID) } - if !foundMatchingVolume { - l.Printf("No volumes matched the workload collector filter for node %d. "+ - "Volumes are missing the `roachprod_collector` label.", nodeID) + return nil + }); err != nil { + return err + } + } + return nil +} + +func ListSnapshots( + ctx context.Context, l *logger.Logger, clusterName string, vslo vm.VolumeSnapshotListOpts, +) ([]vm.VolumeSnapshot, error) { + if err := LoadClusters(); err != nil { + return nil, err + } + c, err := newCluster(l, clusterName) + if err != nil { + return nil, err + } + + var volumeSnapshots []vm.VolumeSnapshot + if err := vm.ForProvider(c.VMs[0].Provider, func(provider vm.Provider) error { + var err error + volumeSnapshots, err = provider.ListVolumeSnapshots(l, vslo) + return err + }); err != nil { + return nil, err + } + return volumeSnapshots, nil +} + +func DeleteSnapshots( + ctx context.Context, l *logger.Logger, clusterName string, snapshots ...vm.VolumeSnapshot, +) error { + if err := LoadClusters(); err != nil { + return err + } + c, err := newCluster(l, clusterName) + if err != nil { + return err + } + + if err := vm.ForProvider(c.VMs[0].Provider, func(provider vm.Provider) error { + for _, snapshot := range snapshots { + if err := provider.DeleteVolumeSnapshot(l, snapshot); err != nil { + return err } } + return nil + }); err != nil { + return err } + return nil } -func generateVolumeName(clusterName string, nodeID install.Node) string { - return fmt.Sprintf("%s-n%d", clusterName, nodeID) +func ApplySnapshots( + ctx context.Context, + l *logger.Logger, + clusterName string, + snapshots []vm.VolumeSnapshot, + opts vm.VolumeCreateOpts, +) error { + if err := LoadClusters(); err != nil { + return err + } + c, err := newCluster(l, clusterName) + if err != nil { + return err + } + + if n := len(c.TargetNodes()); n != len(snapshots) { + return fmt.Errorf("mismatched number of snapshots (%d) to node count (%d)", len(snapshots), n) + // TODO(irfansharif): Validate labels (version, instance types). + } + + { // Detach and delete existing volumes. This is destructive. + for _, n := range c.TargetNodes() { + cVM := &c.VMs[n-1] + if err := vm.ForProvider(cVM.Provider, func(provider vm.Provider) error { + volumes, err := provider.ListVolumes(l, cVM) + if err != nil { + return err + } + for _, volume := range volumes { + if err := provider.DeleteVolume(l, volume, cVM); err != nil { + return err + } + l.Printf("detached and deleted volume %s from %s", volume.ProviderResourceID, cVM.Name) + } + return nil + }); err != nil { + return err + } + } + } + + nodes := c.TargetNodes() + for idx, n := range nodes { + curNode := nodes[idx : idx+1] + volumeOpts := opts // make a copy + + cVM := &c.VMs[n-1] + if err := vm.ForProvider(cVM.Provider, func(provider vm.Provider) error { + volumeOpts.Zone = cVM.Zone + // NB: The "-1" signifies that it's the first attached non-boot volume. + // This is typical naming convention in GCE clusters. + volumeOpts.Name = fmt.Sprintf("%s-%04d-1", clusterName, n) + volumeOpts.SourceSnapshotID = snapshots[idx].ID + + volumes, err := provider.ListVolumes(l, cVM) + if err != nil { + return err + } + for _, vol := range volumes { + if vol.Name == volumeOpts.Name { + l.Printf( + "volume (%s) is already attached to node %d skipping volume creation", vol.ProviderResourceID, n) + return nil + } + } + + if volumeOpts.Labels == nil { + volumeOpts.Labels = map[string]string{} + } + volumeOpts.Labels[vm.TagCluster] = clusterName + volumeOpts.Labels[vm.TagLifetime] = cVM.Lifetime.String() + volumeOpts.Labels[vm.TagRoachprod] = "true" + volumeOpts.Labels[vm.TagCreated] = strings.ToLower( + strings.ReplaceAll(timeutil.Now().Format(time.RFC3339), ":", "_")) // format according to gce label naming requirements + + volume, err := provider.CreateVolume(l, volumeOpts) + if err != nil { + return err + } + l.Printf("created volume %s", volume.ProviderResourceID) + + device, err := cVM.AttachVolume(l, volume) + if err != nil { + return err + } + l.Printf("attached volume %s to %s", volume.ProviderResourceID, cVM.ProviderID) + + // Save the cluster to cache. + if err := saveCluster(l, &c.Cluster); err != nil { + return err + } + + var buf bytes.Buffer + if err := c.Run(ctx, l, &buf, &buf, curNode, + "mounting volume", genMountCommands(device, "/mnt/data1")); err != nil { + l.Printf(buf.String()) + return err + } + l.Printf("mounted %s to %s", volume.ProviderResourceID, cVM.ProviderID) + + return nil + }); err != nil { + return err + } + } + return nil } func genMountCommands(devicePath, mountDir string) string { @@ -1679,11 +1845,14 @@ func StorageCollectionPerformAction( return err } + if opts.Labels == nil { + opts.Labels = map[string]string{} + } + opts.Labels["roachprod_collector"] = "true" mountDir := "/mnt/capture/" switch action { case "start": - err = createAttachMountVolumes(ctx, l, c, opts, mountDir) - if err != nil { + if err := createAttachMountVolumes(ctx, l, c, opts, mountDir); err != nil { return err } case "stop": @@ -1792,13 +1961,12 @@ func createAttachMountVolumes( mountDir string, ) error { nodes := c.TargetNodes() - var labels = map[string]string{"roachprod_collector": "true"} for idx, n := range nodes { curNode := nodes[idx : idx+1] cVM := &c.VMs[n-1] err := vm.ForProvider(cVM.Provider, func(provider vm.Provider) error { - opts.Name = generateVolumeName(c.Name, n) + opts.Name = fmt.Sprintf("%s-n%d", c.Name, n) for _, vol := range cVM.NonBootAttachedVolumes { if vol.Name == opts.Name { l.Printf( @@ -1807,7 +1975,6 @@ func createAttachMountVolumes( } } opts.Zone = cVM.Zone - opts.Labels = labels volume, err := provider.CreateVolume(l, opts) if err != nil { diff --git a/pkg/roachprod/vm/aws/aws.go b/pkg/roachprod/vm/aws/aws.go index ea7cf60bc636..5bf96e80723a 100644 --- a/pkg/roachprod/vm/aws/aws.go +++ b/pkg/roachprod/vm/aws/aws.go @@ -1121,7 +1121,7 @@ type attachJsonResponse struct { Device string `json:"Device"` } -func (p *Provider) AttachVolumeToVM(l *logger.Logger, volume vm.Volume, vm *vm.VM) (string, error) { +func (p *Provider) AttachVolume(l *logger.Logger, volume vm.Volume, vm *vm.VM) (string, error) { // TODO(leon): what happens if this device already exists? deviceName := "/dev/sdf" args := []string{ @@ -1268,6 +1268,14 @@ func (p *Provider) CreateVolume( return vol, err } +func (p *Provider) DeleteVolume(l *logger.Logger, volume vm.Volume, vm *vm.VM) error { + panic("unimplemented") +} + +func (p *Provider) ListVolumes(l *logger.Logger, vm *vm.VM) ([]vm.Volume, error) { + return vm.NonBootAttachedVolumes, nil +} + type snapshotOutput struct { Description string `json:"Description"` Tags []struct { @@ -1284,25 +1292,40 @@ type snapshotOutput struct { SnapshotID string `json:"SnapshotId"` } -func (p *Provider) SnapshotVolume( - l *logger.Logger, volume vm.Volume, name, description string, labels map[string]string, -) (string, error) { +func (p *Provider) CreateVolumeSnapshot( + l *logger.Logger, volume vm.Volume, vsco vm.VolumeSnapshotCreateOpts, +) (vm.VolumeSnapshot, error) { region := volume.Zone[:len(volume.Zone)-1] - labels["Name"] = name var tags []string - for k, v := range labels { + for k, v := range vsco.Labels { tags = append(tags, fmt.Sprintf("{Key=%s,Value=%s}", k, v)) } + tags = append(tags, fmt.Sprintf("{Key=%s,Value=%s}", "Name", vsco.Name)) args := []string{ "ec2", "create-snapshot", - "--description", description, + "--description", vsco.Description, "--region", region, "--volume-id", volume.ProviderResourceID, "--tag-specifications", "ResourceType=snapshot,Tags=[" + strings.Join(tags, ",") + "]", } var so snapshotOutput - err := p.runJSONCommand(l, args, &so) - return so.SnapshotID, err + if err := p.runJSONCommand(l, args, &so); err != nil { + return vm.VolumeSnapshot{}, err + } + return vm.VolumeSnapshot{ + ID: so.SnapshotID, + Name: vsco.Name, + }, nil +} + +func (p *Provider) ListVolumeSnapshots( + l *logger.Logger, vslo vm.VolumeSnapshotListOpts, +) ([]vm.VolumeSnapshot, error) { + panic("unimplemented") +} + +func (p *Provider) DeleteVolumeSnapshot(l *logger.Logger, snapshot vm.VolumeSnapshot) error { + panic("unimplemented") } diff --git a/pkg/roachprod/vm/azure/azure.go b/pkg/roachprod/vm/azure/azure.go index 7ebf4dd94672..cfc68eb1f7bd 100644 --- a/pkg/roachprod/vm/azure/azure.go +++ b/pkg/roachprod/vm/azure/azure.go @@ -90,20 +90,36 @@ type Provider struct { } } -func (p *Provider) SnapshotVolume( - l *logger.Logger, volume vm.Volume, name, description string, labels map[string]string, -) (string, error) { +func (p *Provider) CreateVolumeSnapshot( + l *logger.Logger, volume vm.Volume, vsco vm.VolumeSnapshotCreateOpts, +) (vm.VolumeSnapshot, error) { // TODO(leon): implement panic("unimplemented") } +func (p *Provider) ListVolumeSnapshots( + l *logger.Logger, vslo vm.VolumeSnapshotListOpts, +) ([]vm.VolumeSnapshot, error) { + panic("unimplemented") +} + +func (p *Provider) DeleteVolumeSnapshot(l *logger.Logger, snapshot vm.VolumeSnapshot) error { + panic("unimplemented") +} + func (p *Provider) CreateVolume(*logger.Logger, vm.VolumeCreateOpts) (vm.Volume, error) { - // TODO(leon): implement panic("unimplemented") } -func (p *Provider) AttachVolumeToVM(*logger.Logger, vm.Volume, *vm.VM) (string, error) { - // TODO(leon): implement +func (p *Provider) DeleteVolume(l *logger.Logger, volume vm.Volume, vm *vm.VM) error { + panic("unimplemented") +} + +func (p *Provider) ListVolumes(l *logger.Logger, vm *vm.VM) ([]vm.Volume, error) { + return vm.NonBootAttachedVolumes, nil +} + +func (p *Provider) AttachVolume(*logger.Logger, vm.Volume, *vm.VM) (string, error) { panic("unimplemented") } diff --git a/pkg/roachprod/vm/flagstub/flagstub.go b/pkg/roachprod/vm/flagstub/flagstub.go index 86bde0460755..e20418eca39c 100644 --- a/pkg/roachprod/vm/flagstub/flagstub.go +++ b/pkg/roachprod/vm/flagstub/flagstub.go @@ -32,17 +32,35 @@ type provider struct { unimplemented string } -func (p *provider) SnapshotVolume( - *logger.Logger, vm.Volume, string, string, map[string]string, -) (string, error) { - return "", errors.Newf("%s", p.unimplemented) +func (p *provider) CreateVolumeSnapshot( + l *logger.Logger, volume vm.Volume, vsco vm.VolumeSnapshotCreateOpts, +) (vm.VolumeSnapshot, error) { + return vm.VolumeSnapshot{}, errors.Newf("%s", p.unimplemented) +} + +func (p *provider) ListVolumeSnapshots( + l *logger.Logger, vslo vm.VolumeSnapshotListOpts, +) ([]vm.VolumeSnapshot, error) { + return nil, errors.Newf("%s", p.unimplemented) +} + +func (p *provider) DeleteVolumeSnapshot(l *logger.Logger, snapshot vm.VolumeSnapshot) error { + return errors.Newf("%s", p.unimplemented) +} + +func (p *provider) CreateVolume(*logger.Logger, vm.VolumeCreateOpts) (vm.Volume, error) { + return vm.Volume{}, errors.Newf("%s", p.unimplemented) +} + +func (p *provider) DeleteVolume(l *logger.Logger, volume vm.Volume, vm *vm.VM) error { + return errors.Newf("%s", p.unimplemented) } -func (p *provider) CreateVolume(*logger.Logger, vm.VolumeCreateOpts) (vol vm.Volume, err error) { - return vol, errors.Newf("%s", p.unimplemented) +func (p *provider) ListVolumes(l *logger.Logger, vm *vm.VM) ([]vm.Volume, error) { + return vm.NonBootAttachedVolumes, nil } -func (p *provider) AttachVolumeToVM(*logger.Logger, vm.Volume, *vm.VM) (string, error) { +func (p *provider) AttachVolume(*logger.Logger, vm.Volume, *vm.VM) (string, error) { return "", errors.Newf("%s", p.unimplemented) } diff --git a/pkg/roachprod/vm/gce/gcloud.go b/pkg/roachprod/vm/gce/gcloud.go index e88ede73b3d0..700b30211b49 100644 --- a/pkg/roachprod/vm/gce/gcloud.go +++ b/pkg/roachprod/vm/gce/gcloud.go @@ -97,7 +97,7 @@ func runJSONCommand(args []string, parsed interface{}) error { } if err := json.Unmarshal(rawJSON, &parsed); err != nil { - return errors.Wrapf(err, "failed to parse json %s", rawJSON) + return errors.Wrapf(err, "failed to parse json %s: %v", rawJSON, rawJSON) } return nil @@ -139,16 +139,6 @@ func (jsonVM *jsonVM) toVM( vmErrors = append(vmErrors, vm.ErrNoExpiration) } - // lastComponent splits a url path and returns only the last part. This is - // used because some of the fields in jsonVM are defined using URLs like: - // "https://www.googleapis.com/compute/v1/projects/cockroach-shared/zones/us-east1-b/machineTypes/n1-standard-16" - // We want to strip this down to "n1-standard-16", so we only want the last - // component. - lastComponent := func(url string) string { - s := strings.Split(url, "/") - return s[len(s)-1] - } - // Extract network information var publicIP, privateIP, vpc string if len(jsonVM.NetworkInterfaces) == 0 { @@ -201,9 +191,13 @@ func (jsonVM *jsonVM) toVM( for _, detailedDisk := range disks { if detailedDisk.SelfLink == jsonVMDisk.Source { vol := vm.Volume{ - ProviderResourceID: detailedDisk.Name, + // NB: See TODO in toDescribeVolumeCommandResponse. We + // should be able to "just" use detailedDisk.Name here, + // but we're abusing that field elsewhere, and + // incorrectly. Using SelfLink is correct. + ProviderResourceID: lastComponent(detailedDisk.SelfLink), ProviderVolumeType: detailedDisk.Type, - Zone: detailedDisk.Zone, + Zone: lastComponent(detailedDisk.Zone), Name: detailedDisk.Name, Labels: detailedDisk.Labels, Size: parseDiskSize(detailedDisk.SizeGB), @@ -295,13 +289,13 @@ type Provider struct { ServiceAccount string } -type snapshotCreateJson struct { +type snapshotJson struct { CreationSizeBytes string `json:"creationSizeBytes"` CreationTimestamp time.Time `json:"creationTimestamp"` Description string `json:"description"` DiskSizeGb string `json:"diskSizeGb"` DownloadBytes string `json:"downloadBytes"` - Id string `json:"id"` + ID string `json:"id"` Kind string `json:"kind"` LabelFingerprint string `json:"labelFingerprint"` Name string `json:"name"` @@ -314,27 +308,26 @@ type snapshotCreateJson struct { StorageLocations []string `json:"storageLocations"` } -func (p *Provider) SnapshotVolume( - l *logger.Logger, volume vm.Volume, name, description string, labels map[string]string, -) (string, error) { +func (p *Provider) CreateVolumeSnapshot( + l *logger.Logger, volume vm.Volume, vsco vm.VolumeSnapshotCreateOpts, +) (vm.VolumeSnapshot, error) { args := []string{ "compute", "snapshots", - "create", name, + "create", vsco.Name, "--source-disk", volume.ProviderResourceID, "--source-disk-zone", volume.Zone, - "--description", description, + "--description", vsco.Description, "--format", "json", } - var createJsonResponse snapshotCreateJson - err := runJSONCommand(args, &createJsonResponse) - if err != nil { - return "", err + var createJsonResponse snapshotJson + if err := runJSONCommand(args, &createJsonResponse); err != nil { + return vm.VolumeSnapshot{}, err } sb := strings.Builder{} - for k, v := range labels { + for k, v := range vsco.Labels { fmt.Fprintf(&sb, "%s=%s,", serializeLabel(k), serializeLabel(v)) } s := sb.String() @@ -342,16 +335,70 @@ func (p *Provider) SnapshotVolume( args = []string{ "compute", "snapshots", - "add-labels", name, + "add-labels", vsco.Name, "--labels", s[:len(s)-1], } cmd := exec.Command("gcloud", args...) - _, err = cmd.CombinedOutput() + if _, err := cmd.CombinedOutput(); err != nil { + return vm.VolumeSnapshot{}, err + } + return vm.VolumeSnapshot{ + ID: createJsonResponse.ID, + Name: createJsonResponse.Name, + }, nil +} - if err != nil { - return "", err +func (p *Provider) ListVolumeSnapshots( + l *logger.Logger, vslo vm.VolumeSnapshotListOpts, +) ([]vm.VolumeSnapshot, error) { + args := []string{ + "compute", + "snapshots", + "list", + "--format", "json(name,id)", } - return createJsonResponse.Name, nil + var filters []string + if vslo.Name != "" { + filters = append(filters, fmt.Sprintf("name:%s", vslo.Name)) + } + if !vslo.CreatedBefore.IsZero() { + filters = append(filters, fmt.Sprintf("creationTimestamp<'%s'", vslo.CreatedBefore.Format("2006-01-02"))) + } + for k, v := range vslo.Labels { + filters = append(filters, fmt.Sprintf("labels.%s=%s", k, v)) + } + if len(filters) > 0 { + args = append(args, "--filter", strings.Join(filters, " AND ")) + } + + var snapshotsJSONResponse []snapshotJson + if err := runJSONCommand(args, &snapshotsJSONResponse); err != nil { + return nil, err + } + + var snapshots []vm.VolumeSnapshot + for _, snapshotJson := range snapshotsJSONResponse { + snapshots = append(snapshots, vm.VolumeSnapshot{ + ID: snapshotJson.ID, + Name: snapshotJson.Name, + }) + } + return snapshots, nil +} + +func (p *Provider) DeleteVolumeSnapshot(l *logger.Logger, snapshot vm.VolumeSnapshot) error { + args := []string{ + "compute", + "snapshots", + "delete", + snapshot.Name, + } + + cmd := exec.Command("gcloud", args...) + if _, err := cmd.CombinedOutput(); err != nil { + return err + } + return nil } type describeVolumeCommandResponse struct { @@ -373,9 +420,9 @@ type describeVolumeCommandResponse struct { func (p *Provider) CreateVolume( l *logger.Logger, vco vm.VolumeCreateOpts, ) (vol vm.Volume, err error) { - // TODO(leon): SourceSnapshotID and IOPS, are not handled - if vco.SourceSnapshotID != "" || vco.IOPS != 0 { - err = errors.New("Creating a volume with SourceSnapshotID or IOPS is not supported at this time.") + // TODO(leon): IOPS is not handled. + if vco.IOPS != 0 { + err = errors.New("Creating a volume with IOPS is not supported at this time.") return vol, err } args := []string{ @@ -386,6 +433,9 @@ func (p *Provider) CreateVolume( "--zone", vco.Zone, "--format", "json", } + if vco.SourceSnapshotID != "" { + args = append(args, "--source-snapshot", vco.SourceSnapshotID) + } if vco.Size == 0 { return vol, errors.New("Cannot create a volume of size 0") @@ -397,7 +447,7 @@ func (p *Provider) CreateVolume( if vco.Architecture != "" { if vco.Architecture == "ARM64" || vco.Architecture == "X86_64" { - args = append(args, "--architecture=", vco.Architecture) + args = append(args, "--architecture", vco.Architecture) } else { return vol, errors.Newf("Expected architecture to be one of ARM64, X86_64 got %s\n", vco.Architecture) } @@ -405,7 +455,7 @@ func (p *Provider) CreateVolume( switch vco.Type { case "local-ssd", "pd-balanced", "pd-extreme", "pd-ssd", "pd-standard": - args = append(args, "--type=", vco.Type) + args = append(args, "--type", vco.Type) case "": // use the default default: @@ -427,34 +477,139 @@ func (p *Provider) CreateVolume( if err != nil { return vol, err } - sb := strings.Builder{} - for k, v := range vco.Labels { - fmt.Fprintf(&sb, "%s=%s,", serializeLabel(k), serializeLabel(v)) - } - s := sb.String() - - args = []string{ - "compute", - "disks", - "add-labels", vco.Name, - "--labels", s[:len(s)-1], - "--zone", vco.Zone, - } - cmd := exec.Command("gcloud", args...) - _, err = cmd.CombinedOutput() - - if err != nil { - return vol, err + if len(vco.Labels) > 0 { + sb := strings.Builder{} + for k, v := range vco.Labels { + fmt.Fprintf(&sb, "%s=%s,", serializeLabel(k), serializeLabel(v)) + } + s := sb.String() + args = []string{ + "compute", + "disks", + "add-labels", vco.Name, + "--labels", s[:len(s)-1], + "--zone", vco.Zone, + } + cmd := exec.Command("gcloud", args...) + if _, err := cmd.CombinedOutput(); err != nil { + return vm.Volume{}, err + } } return vm.Volume{ ProviderResourceID: createdVolume.Name, + ProviderVolumeType: lastComponent(createdVolume.Type), + Zone: lastComponent(createdVolume.Zone), + Encrypted: false, // only used for aws Name: createdVolume.Name, - ProviderVolumeType: createdVolume.Type, - Zone: vco.Zone, + Labels: createdVolume.Labels, Size: size, - Labels: vco.Labels, - }, err + }, nil +} + +func (p *Provider) DeleteVolume(l *logger.Logger, volume vm.Volume, vm *vm.VM) error { + { // Detach disks. + args := []string{ + "compute", + "instances", + "detach-disk", vm.Name, + "--disk", volume.ProviderResourceID, + "--zone", volume.Zone, + } + cmd := exec.Command("gcloud", args...) + if _, err := cmd.CombinedOutput(); err != nil { + return err + } + } + { // Delete disks. + args := []string{ + "compute", + "disks", + "delete", + volume.ProviderResourceID, + "--zone", volume.Zone, + "--quiet", + } + cmd := exec.Command("gcloud", args...) + if _, err := cmd.CombinedOutput(); err != nil { + return err + } + } + return nil +} + +func (p *Provider) ListVolumes(l *logger.Logger, v *vm.VM) ([]vm.Volume, error) { + var attachedDisks []attachDiskCmdDisk + var describedVolumes []describeVolumeCommandResponse + + { + // We're running the equivalent of: + // gcloud compute instances describe irfansharif-snapshot-0001 \ + // --project cockroach-ephemeral --zone us-east1-b \ + // --format json(disks) + // + // We'll use this data to filter out boot disks. + var commandResponse instanceDisksResponse + args := []string{ + "compute", + "instances", + "describe", + v.Name, + "--project", p.GetProject(), + "--zone", v.Zone, + "--format", "json(disks)", + } + if err := runJSONCommand(args, &commandResponse); err != nil { + return nil, err + } + attachedDisks = commandResponse.Disks + } + + { + // We're running the equivalent of + // gcloud compute disks list --project cockroach-ephemeral \ + // --filter "users:(irfansharif-snapshot-0001)" --format json + // + // This contains more per-disk metadata than the command above, but + // annoyingly does not contain whether the disk is a boot volume. + args := []string{ + "compute", + "disks", + "list", + "--project", p.GetProject(), + "--filter", fmt.Sprintf("users:(%s)", v.Name), + "--format", "json", + } + if err := runJSONCommand(args, &describedVolumes); err != nil { + return nil, err + } + } + + var volumes []vm.Volume + for idx := range attachedDisks { + attachedDisk := attachedDisks[idx] + if attachedDisk.Boot { + continue + } + describedVolume := describedVolumes[idx] + size, err := strconv.Atoi(describedVolume.SizeGB) + if err != nil { + return nil, err + } + volumes = append(volumes, vm.Volume{ + ProviderResourceID: describedVolume.Name, + ProviderVolumeType: lastComponent(describedVolume.Type), + Zone: lastComponent(describedVolume.Zone), + Encrypted: false, // only used for aws + Name: describedVolume.Name, + Labels: describedVolume.Labels, + Size: size, + }) + } + + // TODO(irfansharif): Update v.NonBootAttachedVolumes? It's awkward to have + // that field at all. + return volumes, nil } type instanceDisksResponse struct { @@ -476,8 +631,8 @@ type attachDiskCmdDisk struct { Type string `json:"type"` } -func (p *Provider) AttachVolumeToVM(l *logger.Logger, volume vm.Volume, vm *vm.VM) (string, error) { - // Volume attach +func (p *Provider) AttachVolume(l *logger.Logger, volume vm.Volume, vm *vm.VM) (string, error) { + // Volume attach. args := []string{ "compute", "instances", @@ -506,7 +661,7 @@ func (p *Provider) AttachVolumeToVM(l *logger.Logger, volume vm.Volume, vm *vm.V volume.ProviderResourceID, vm.ProviderID) } - // Volume auto delete + // Volume auto delete. args = []string{ "compute", "instances", @@ -1155,6 +1310,12 @@ func toDescribeVolumeCommandResponse( continue } res = append(res, describeVolumeCommandResponse{ + // TODO(irfansharif): Use of the device name here is wrong -- it's + // ends up being things like "persistent-disk-1" but in other, more + // correct uses, it's "irfansharif-snapshot-0001-1". In fact, this + // whole transformation from attachDiskCmdDisk to + // describeVolumeCommandResponse if funky. Use something like + // (Provider).ListVolumes instead. Name: d.DeviceName, SelfLink: d.Source, SizeGB: d.DiskSizeGB, @@ -1294,3 +1455,15 @@ func (p *Provider) ProjectActive(project string) bool { } return false } + +// lastComponent splits a url path and returns only the last part. This is +// used because some fields in GCE APIs are defined using URLs like: +// +// "https://www.googleapis.com/compute/v1/projects/cockroach-shared/zones/us-east1-b/machineTypes/n1-standard-16" +// +// We want to strip this down to "n1-standard-16", so we only want the last +// component. +func lastComponent(url string) string { + s := strings.Split(url, "/") + return s[len(s)-1] +} diff --git a/pkg/roachprod/vm/local/local.go b/pkg/roachprod/vm/local/local.go index 8eefab53fd2c..6bc3bd84090d 100644 --- a/pkg/roachprod/vm/local/local.go +++ b/pkg/roachprod/vm/local/local.go @@ -120,17 +120,35 @@ type Provider struct { storage VMStorage } -func (p *Provider) SnapshotVolume( - l *logger.Logger, volume vm.Volume, name, description string, labels map[string]string, -) (string, error) { - return "", nil +func (p *Provider) CreateVolumeSnapshot( + l *logger.Logger, volume vm.Volume, vsco vm.VolumeSnapshotCreateOpts, +) (vm.VolumeSnapshot, error) { + return vm.VolumeSnapshot{}, nil } func (p *Provider) CreateVolume(*logger.Logger, vm.VolumeCreateOpts) (vm.Volume, error) { return vm.Volume{}, nil } -func (p *Provider) AttachVolumeToVM(*logger.Logger, vm.Volume, *vm.VM) (string, error) { +func (p *Provider) DeleteVolume(l *logger.Logger, volume vm.Volume, vm *vm.VM) error { + return nil +} + +func (p *Provider) ListVolumes(l *logger.Logger, vm *vm.VM) ([]vm.Volume, error) { + return vm.NonBootAttachedVolumes, nil +} + +func (p *Provider) ListVolumeSnapshots( + l *logger.Logger, vslo vm.VolumeSnapshotListOpts, +) ([]vm.VolumeSnapshot, error) { + return nil, nil +} + +func (p *Provider) DeleteVolumeSnapshot(l *logger.Logger, snapshot vm.VolumeSnapshot) error { + return nil +} + +func (p *Provider) AttachVolume(*logger.Logger, vm.Volume, *vm.VM) (string, error) { return "", nil } diff --git a/pkg/roachprod/vm/vm.go b/pkg/roachprod/vm/vm.go index 3fc77563a87a..cec36b906d09 100644 --- a/pkg/roachprod/vm/vm.go +++ b/pkg/roachprod/vm/vm.go @@ -34,6 +34,9 @@ const ( TagLifetime = "lifetime" // TagRoachprod is roachprod tag const, value is true & false. TagRoachprod = "roachprod" + // TagUsage indicates where a certain resource is used. "roachtest" is used + // as the key for roachtest created resources. + TagUsage = "usage" ) // GetDefaultLabelMap returns a label map for a common set of labels. @@ -147,13 +150,44 @@ func (vm *VM) ZoneEntry() (string, error) { return fmt.Sprintf("%s 60 IN A %s\n", vm.Name, vm.PublicIP), nil } -func (vm *VM) AttachVolume(l *logger.Logger, v Volume) (deviceName string, err error) { +func (vm *VM) AttachVolume(l *logger.Logger, v Volume) (deviceName string, _ error) { vm.NonBootAttachedVolumes = append(vm.NonBootAttachedVolumes, v) - err = ForProvider(vm.Provider, func(provider Provider) error { - deviceName, err = provider.AttachVolumeToVM(l, v, vm) + if err := ForProvider(vm.Provider, func(provider Provider) error { + var err error + deviceName, err = provider.AttachVolume(l, v, vm) return err - }) - return deviceName, err + }); err != nil { + return "", err + } + return deviceName, nil +} + +const vmNameFormat = "user--" + +// ClusterName returns the cluster name a VM belongs to. +func (vm *VM) ClusterName() (string, error) { + if vm.IsLocal() { + return vm.LocalClusterName, nil + } + name := vm.Name + parts := strings.Split(name, "-") + if len(parts) < 3 { + return "", fmt.Errorf("expected VM name in the form %s, got %s", vmNameFormat, name) + } + return strings.Join(parts[:len(parts)-1], "-"), nil +} + +// UserName returns the username of a VM. +func (vm *VM) UserName() (string, error) { + if vm.IsLocal() { + return config.Local, nil + } + name := vm.Name + parts := strings.Split(name, "-") + if len(parts) < 3 { + return "", fmt.Errorf("expected VM name in the form %s, got %s", vmNameFormat, name) + } + return parts[0], nil } // List represents a list of VMs. @@ -254,6 +288,23 @@ type ProviderOpts interface { ConfigureClusterFlags(*pflag.FlagSet, MultipleProjectsOption) } +type VolumeSnapshot struct { + ID string + Name string +} + +type VolumeSnapshotCreateOpts struct { + Name string + Labels map[string]string + Description string +} + +type VolumeSnapshotListOpts struct { + Name string + Labels map[string]string + CreatedBefore time.Time +} + type Volume struct { ProviderResourceID string ProviderVolumeType string @@ -312,9 +363,24 @@ type Provider interface { // provider. ProjectActive(project string) bool + // Volume and volume snapshot related APIs. + + // CreateVolume creates a new volume using the given options. CreateVolume(l *logger.Logger, vco VolumeCreateOpts) (Volume, error) - AttachVolumeToVM(l *logger.Logger, volume Volume, vm *VM) (string, error) - SnapshotVolume(l *logger.Logger, volume Volume, name, description string, labels map[string]string) (string, error) + // ListVolumes lists all volumes already attached to the given VM. + ListVolumes(l *logger.Logger, vm *VM) ([]Volume, error) + // DeleteVolume detaches and deletes the given volume from the given VM. + DeleteVolume(l *logger.Logger, volume Volume, vm *VM) error + // AttachVolume attaches the given volume to the given VM. + AttachVolume(l *logger.Logger, volume Volume, vm *VM) (string, error) + // CreateVolumeSnapshot creates a snapshot of the given volume, using the + // given options. + CreateVolumeSnapshot(l *logger.Logger, volume Volume, vsco VolumeSnapshotCreateOpts) (VolumeSnapshot, error) + // ListVolumeSnapshots lists the individual volume snapshots that satisfy + // the search criteria. + ListVolumeSnapshots(l *logger.Logger, vslo VolumeSnapshotListOpts) ([]VolumeSnapshot, error) + // DeleteVolumeSnapshot permanently deletes the given snapshot. + DeleteVolumeSnapshot(l *logger.Logger, snapshot VolumeSnapshot) error } // DeleteCluster is an optional capability for a Provider which can