Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

roachtest: rationalize cluster destruction #35585

Merged
merged 2 commits into from
Apr 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 162 additions & 68 deletions pkg/cmd/roachtest/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,55 +189,77 @@ func initBinaries() {
}
}

var clusters = map[*cluster]struct{}{}
var clustersMu syncutil.Mutex
var interrupted int32

func destroyAllClusters() {
type clusterRegistry struct {
mu struct {
syncutil.Mutex
clusters map[string]*cluster
}
}

func newClusterRegistry() *clusterRegistry {
cr := &clusterRegistry{}
cr.mu.clusters = make(map[string]*cluster)
return cr
}

func (r *clusterRegistry) registerCluster(c *cluster) error {
r.mu.Lock()
defer r.mu.Unlock()
if r.mu.clusters[c.name] != nil {
return fmt.Errorf("cluster named %q already exists in registry", c.name)
}
r.mu.clusters[c.name] = c
return nil
}

func (r *clusterRegistry) unregisterCluster(c *cluster) bool {
r.mu.Lock()
_, exists := r.mu.clusters[c.name]
if exists {
delete(r.mu.clusters, c.name)
}
r.mu.Unlock()
return exists
}

// destroyAllClusters destroys all the clusters. It responds to context
// cancelation.
func (r *clusterRegistry) destroyAllClusters(ctx context.Context) {
// No new clusters can be created.
atomic.StoreInt32(&interrupted, 1)

// Fire off a goroutine to destroy all of the clusters.
done := make(chan struct{})
go func() {
defer close(done)

var clusters []*cluster
r.mu.Lock()
for _, c := range r.mu.clusters {
clusters = append(clusters, c)
}
r.mu.Unlock()

var wg sync.WaitGroup
clustersMu.Lock()
wg.Add(len(clusters))
for c := range clusters {
for _, c := range clusters {
go func(c *cluster) {
defer wg.Done()
c.destroy(context.Background())
// We don't close the logger here since the cluster may be still in use
// by a test, and so the logger might still be needed.
c.Destroy(ctx, dontCloseLogger)
}(c)
}
clusters = map[*cluster]struct{}{}
clustersMu.Unlock()

wg.Wait()
}()

// Wait up to 5 min for clusters to be destroyed. This can take a while and
// we don't want to rush it.
select {
case <-done:
case <-time.After(5 * time.Minute):
}
}

func registerCluster(c *cluster) {
clustersMu.Lock()
clusters[c] = struct{}{}
clustersMu.Unlock()
}

func unregisterCluster(c *cluster) bool {
clustersMu.Lock()
_, exists := clusters[c]
if exists {
delete(clusters, c)
case <-ctx.Done():
}
clustersMu.Unlock()
return exists
}

func execCmd(ctx context.Context, l *logger, args ...string) error {
Expand Down Expand Up @@ -636,24 +658,54 @@ type cluster struct {
nodes int
status func(...interface{})
t testI
// r is the registry tracking this cluster. Destroying the cluster will
// unregister it.
r *clusterRegistry
// l is the logger used to log various cluster operations.
// DEPRECATED for use outside of cluster methods: Use a test's t.l instead.
// This is generally set to the current test's logger.
l *logger
// destroyed is used to coordinate between different goroutines that want to
// destroy a cluster. It is nil when the cluster should not be destroyed (i.e.
// when Destroy() should not be called).
destroyed chan struct{}
l *logger
expiration time.Time
// owned is set if this instance is responsible for `roachprod destroy`ing the
// cluster. It is set when a new cluster is created, but not when one is
// cloned or when we attach to an existing roachprod cluster.
// If not set, Destroy() only wipes the cluster.
owned bool
// encryptDefault is true if the cluster should default to having encryption
// at rest enabled. The default only applies if encryption is not explicitly
// enabled or disabled by options passed to Start.
encryptDefault bool

// destroyState is nil when the cluster should not be destroyed (i.e. when
// Destroy should not be called) - for example it is set to nil for results of
// c.clone().
//
// NB: destroyState is a pointer to allow for copying of this struct in
// cluster.clone().
destroyState *destroyState
}

type destroyState struct {
// owned is set if this instance is responsible for `roachprod destroy`ing the
// cluster. It is set when a new cluster is created, but not when we attach to
// an existing roachprod cluster.
// If not set, Destroy() only wipes the cluster.
owned bool

mu struct {
syncutil.Mutex
loggerClosed bool
// destroyed is used to coordinate between different goroutines that want to
// destroy a cluster. It is set once the destroy process starts. It it
// closed when the destruction is complete.
destroyed chan struct{}
}
}

// closeLogger closes c.l. It can be called multiple times.
func (c *cluster) closeLogger() {
c.destroyState.mu.Lock()
defer c.destroyState.mu.Unlock()
if c.destroyState.mu.loggerClosed {
return
}
c.destroyState.mu.loggerClosed = true
c.l.close()
}

type clusterConfig struct {
Expand All @@ -680,7 +732,9 @@ type clusterConfig struct {
// to figure out how to make that work with `roachprod create`. Perhaps one
// invocation of `roachprod create` per unique node-spec. Are there guarantees
// we're making here about the mapping of nodeSpecs to node IDs?
func newCluster(ctx context.Context, l *logger, cfg clusterConfig) (*cluster, error) {
func newCluster(
ctx context.Context, l *logger, cfg clusterConfig, r *clusterRegistry,
) (*cluster, error) {
if atomic.LoadInt32(&interrupted) == 1 {
return nil, fmt.Errorf("newCluster interrupted")
}
Expand All @@ -707,12 +761,16 @@ func newCluster(ctx context.Context, l *logger, cfg clusterConfig) (*cluster, er
nodes: cfg.nodes.NodeCount,
status: func(...interface{}) {},
l: l,
destroyed: make(chan struct{}),
expiration: cfg.nodes.expiration(),
owned: true,
encryptDefault: encrypt.asBool(),
r: r,
destroyState: &destroyState{
owned: true,
},
}
if err := r.registerCluster(c); err != nil {
return nil, err
}
registerCluster(c)

sargs := []string{roachprod, "create", c.name, "-n", fmt.Sprint(c.nodes)}
sargs = append(sargs, cfg.nodes.args()...)
Expand Down Expand Up @@ -744,20 +802,25 @@ type attachOpt struct {
//
// NOTE: setTest() needs to be called before a test can use this cluster.
func attachToExistingCluster(
ctx context.Context, name string, l *logger, nodes clusterSpec, opt attachOpt,
ctx context.Context, name string, l *logger, nodes clusterSpec, opt attachOpt, r *clusterRegistry,
) (*cluster, error) {
c := &cluster{
name: name,
nodes: nodes.NodeCount,
status: func(...interface{}) {},
l: l,
destroyed: make(chan struct{}),
expiration: nodes.expiration(),
// If we're attaching to an existing cluster, we're not going to destoy it.
owned: false,
name: name,
nodes: nodes.NodeCount,
status: func(...interface{}) {},
l: l,
expiration: nodes.expiration(),
encryptDefault: encrypt.asBool(),
destroyState: &destroyState{
// If we're attaching to an existing cluster, we're not going to destoy it.
owned: false,
},
r: r,
}

if err := r.registerCluster(c); err != nil {
return nil, err
}
registerCluster(c)

if !opt.skipValidation {
if err := c.validate(ctx, nodes, l); err != nil {
Expand Down Expand Up @@ -851,9 +914,9 @@ func (c *cluster) validate(ctx context.Context, nodes clusterSpec, l *logger) er
func (c *cluster) clone() *cluster {
cpy := *c
// This cloned cluster is not taking ownership. The parent retains it.
cpy.owned = false
cpy.destroyState = nil

cpy.encryptDefault = encrypt.asBool()
cpy.destroyed = nil
return &cpy
}

Expand Down Expand Up @@ -1042,31 +1105,56 @@ func (c *cluster) FetchCores(ctx context.Context) error {
})
}

func (c *cluster) Destroy(ctx context.Context) {
type closeLoggerOpt bool

const (
closeLogger closeLoggerOpt = true
dontCloseLogger = false
)

// Destroy calls `roachprod destroy` or `roachprod wipe` on the cluster.
// If called while another Destroy() or destroyInner() is in progress, the call
// blocks until that first call finishes.
func (c *cluster) Destroy(ctx context.Context, lo closeLoggerOpt) {
if c.destroyState == nil {
c.l.Errorf("Destroy() called on cluster copy")
return
}

if c.nodes == 0 {
// No nodes can happen during unit tests and implies nothing to do.
return
}

// Only destroy the cluster if it exists in the cluster registry. The cluster
// may not exist if the test was interrupted and the teardown machinery is
// destroying all clusters. (See destroyAllClusters).
if exists := unregisterCluster(c); exists {
c.destroy(ctx)
ch := c.doDestroy(ctx)
<-ch
// NB: Closing the logger without waiting on c.destroyState.destroyed above
// would be bad because we might cause the ongoing `roachprod destroy` to fail
// by closing its stdout/stderr.
if lo == closeLogger {
c.closeLogger()
}
// If the test was interrupted, another goroutine is destroying the cluster
// and we need to wait for that to finish before closing the
// logger. Otherwise, the destruction can get interrupted due to closing the
// stdout/stderr of the roachprod command.
<-c.destroyed
c.l.close()
}

func (c *cluster) destroy(ctx context.Context) {
defer close(c.destroyed)
// doDestroy calls `roachprod destroy` or `roachprod wipe` on the cluster. It
// returns a chan that will be closed when the destruction complete. If there's
// no other doDestroy() in flight, the call is synchronous and the channel is
// closed upon return.
func (c *cluster) doDestroy(ctx context.Context) <-chan struct{} {
var inFlight <-chan struct{}
c.destroyState.mu.Lock()
if c.destroyState.mu.destroyed == nil {
c.destroyState.mu.destroyed = make(chan struct{})
} else {
inFlight = c.destroyState.mu.destroyed
}
c.destroyState.mu.Unlock()
if inFlight != nil {
return inFlight
}

if clusterWipe {
if c.owned {
if c.destroyState.owned {
c.status("destroying cluster")
if err := execCmd(ctx, c.l, roachprod, "destroy", c.name); err != nil {
c.l.Errorf("%s", err)
Expand All @@ -1080,6 +1168,12 @@ func (c *cluster) destroy(ctx context.Context) {
} else {
c.l.Printf("skipping cluster wipe\n")
}
c.r.unregisterCluster(c)
c.destroyState.mu.Lock()
ch := c.destroyState.mu.destroyed
close(ch)
c.destroyState.mu.Unlock()
return ch
}

// Run a command with output redirected to the logs instead of to os.Stdout
Expand Down
Loading