Skip to content

Commit

Permalink
Merge #35585 #36967
Browse files Browse the repository at this point in the history
35585: roachtest: rationalize cluster destruction r=andreimatei a=andreimatei

Cluster destruction can be initiated from two places: tests finishing
and a signal being received. Some synchronization is required for
handling races on this destruction. This patch makes this
synchronization nicer. Before this patch, the case where a signal was
received and then a test finishes was handled OK, but the reverse I
think was not.

This patch also kills a global containing a list of all the clusters and
encapsulates it in a nicer clusterRegistry.

Release note: None

36967: exec: add cancellation check to operators r=yuzefovich a=yuzefovich

We introduce a CancelChecker operator that wraps all other operators
and performs a cancellation check on every batch. Also, sorter and
hash joiner do additional checks themselves since they can spend
a lot of time in certain phases of execution.

Release note: None

Co-authored-by: Andrei Matei <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
3 people committed Apr 23, 2019
3 parents 1dfcb2d + 7c1ac1c + 6eed329 commit 7aa4020
Show file tree
Hide file tree
Showing 12 changed files with 355 additions and 111 deletions.
230 changes: 162 additions & 68 deletions pkg/cmd/roachtest/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,55 +189,77 @@ func initBinaries() {
}
}

var clusters = map[*cluster]struct{}{}
var clustersMu syncutil.Mutex
var interrupted int32

func destroyAllClusters() {
type clusterRegistry struct {
mu struct {
syncutil.Mutex
clusters map[string]*cluster
}
}

func newClusterRegistry() *clusterRegistry {
cr := &clusterRegistry{}
cr.mu.clusters = make(map[string]*cluster)
return cr
}

func (r *clusterRegistry) registerCluster(c *cluster) error {
r.mu.Lock()
defer r.mu.Unlock()
if r.mu.clusters[c.name] != nil {
return fmt.Errorf("cluster named %q already exists in registry", c.name)
}
r.mu.clusters[c.name] = c
return nil
}

func (r *clusterRegistry) unregisterCluster(c *cluster) bool {
r.mu.Lock()
_, exists := r.mu.clusters[c.name]
if exists {
delete(r.mu.clusters, c.name)
}
r.mu.Unlock()
return exists
}

// destroyAllClusters destroys all the clusters. It responds to context
// cancelation.
func (r *clusterRegistry) destroyAllClusters(ctx context.Context) {
// No new clusters can be created.
atomic.StoreInt32(&interrupted, 1)

// Fire off a goroutine to destroy all of the clusters.
done := make(chan struct{})
go func() {
defer close(done)

var clusters []*cluster
r.mu.Lock()
for _, c := range r.mu.clusters {
clusters = append(clusters, c)
}
r.mu.Unlock()

var wg sync.WaitGroup
clustersMu.Lock()
wg.Add(len(clusters))
for c := range clusters {
for _, c := range clusters {
go func(c *cluster) {
defer wg.Done()
c.destroy(context.Background())
// We don't close the logger here since the cluster may be still in use
// by a test, and so the logger might still be needed.
c.Destroy(ctx, dontCloseLogger)
}(c)
}
clusters = map[*cluster]struct{}{}
clustersMu.Unlock()

wg.Wait()
}()

// Wait up to 5 min for clusters to be destroyed. This can take a while and
// we don't want to rush it.
select {
case <-done:
case <-time.After(5 * time.Minute):
}
}

func registerCluster(c *cluster) {
clustersMu.Lock()
clusters[c] = struct{}{}
clustersMu.Unlock()
}

func unregisterCluster(c *cluster) bool {
clustersMu.Lock()
_, exists := clusters[c]
if exists {
delete(clusters, c)
case <-ctx.Done():
}
clustersMu.Unlock()
return exists
}

func execCmd(ctx context.Context, l *logger, args ...string) error {
Expand Down Expand Up @@ -636,24 +658,54 @@ type cluster struct {
nodes int
status func(...interface{})
t testI
// r is the registry tracking this cluster. Destroying the cluster will
// unregister it.
r *clusterRegistry
// l is the logger used to log various cluster operations.
// DEPRECATED for use outside of cluster methods: Use a test's t.l instead.
// This is generally set to the current test's logger.
l *logger
// destroyed is used to coordinate between different goroutines that want to
// destroy a cluster. It is nil when the cluster should not be destroyed (i.e.
// when Destroy() should not be called).
destroyed chan struct{}
l *logger
expiration time.Time
// owned is set if this instance is responsible for `roachprod destroy`ing the
// cluster. It is set when a new cluster is created, but not when one is
// cloned or when we attach to an existing roachprod cluster.
// If not set, Destroy() only wipes the cluster.
owned bool
// encryptDefault is true if the cluster should default to having encryption
// at rest enabled. The default only applies if encryption is not explicitly
// enabled or disabled by options passed to Start.
encryptDefault bool

// destroyState is nil when the cluster should not be destroyed (i.e. when
// Destroy should not be called) - for example it is set to nil for results of
// c.clone().
//
// NB: destroyState is a pointer to allow for copying of this struct in
// cluster.clone().
destroyState *destroyState
}

type destroyState struct {
// owned is set if this instance is responsible for `roachprod destroy`ing the
// cluster. It is set when a new cluster is created, but not when we attach to
// an existing roachprod cluster.
// If not set, Destroy() only wipes the cluster.
owned bool

mu struct {
syncutil.Mutex
loggerClosed bool
// destroyed is used to coordinate between different goroutines that want to
// destroy a cluster. It is set once the destroy process starts. It it
// closed when the destruction is complete.
destroyed chan struct{}
}
}

// closeLogger closes c.l. It can be called multiple times.
func (c *cluster) closeLogger() {
c.destroyState.mu.Lock()
defer c.destroyState.mu.Unlock()
if c.destroyState.mu.loggerClosed {
return
}
c.destroyState.mu.loggerClosed = true
c.l.close()
}

type clusterConfig struct {
Expand All @@ -680,7 +732,9 @@ type clusterConfig struct {
// to figure out how to make that work with `roachprod create`. Perhaps one
// invocation of `roachprod create` per unique node-spec. Are there guarantees
// we're making here about the mapping of nodeSpecs to node IDs?
func newCluster(ctx context.Context, l *logger, cfg clusterConfig) (*cluster, error) {
func newCluster(
ctx context.Context, l *logger, cfg clusterConfig, r *clusterRegistry,
) (*cluster, error) {
if atomic.LoadInt32(&interrupted) == 1 {
return nil, fmt.Errorf("newCluster interrupted")
}
Expand All @@ -707,12 +761,16 @@ func newCluster(ctx context.Context, l *logger, cfg clusterConfig) (*cluster, er
nodes: cfg.nodes.NodeCount,
status: func(...interface{}) {},
l: l,
destroyed: make(chan struct{}),
expiration: cfg.nodes.expiration(),
owned: true,
encryptDefault: encrypt.asBool(),
r: r,
destroyState: &destroyState{
owned: true,
},
}
if err := r.registerCluster(c); err != nil {
return nil, err
}
registerCluster(c)

sargs := []string{roachprod, "create", c.name, "-n", fmt.Sprint(c.nodes)}
sargs = append(sargs, cfg.nodes.args()...)
Expand Down Expand Up @@ -744,20 +802,25 @@ type attachOpt struct {
//
// NOTE: setTest() needs to be called before a test can use this cluster.
func attachToExistingCluster(
ctx context.Context, name string, l *logger, nodes clusterSpec, opt attachOpt,
ctx context.Context, name string, l *logger, nodes clusterSpec, opt attachOpt, r *clusterRegistry,
) (*cluster, error) {
c := &cluster{
name: name,
nodes: nodes.NodeCount,
status: func(...interface{}) {},
l: l,
destroyed: make(chan struct{}),
expiration: nodes.expiration(),
// If we're attaching to an existing cluster, we're not going to destoy it.
owned: false,
name: name,
nodes: nodes.NodeCount,
status: func(...interface{}) {},
l: l,
expiration: nodes.expiration(),
encryptDefault: encrypt.asBool(),
destroyState: &destroyState{
// If we're attaching to an existing cluster, we're not going to destoy it.
owned: false,
},
r: r,
}

if err := r.registerCluster(c); err != nil {
return nil, err
}
registerCluster(c)

if !opt.skipValidation {
if err := c.validate(ctx, nodes, l); err != nil {
Expand Down Expand Up @@ -851,9 +914,9 @@ func (c *cluster) validate(ctx context.Context, nodes clusterSpec, l *logger) er
func (c *cluster) clone() *cluster {
cpy := *c
// This cloned cluster is not taking ownership. The parent retains it.
cpy.owned = false
cpy.destroyState = nil

cpy.encryptDefault = encrypt.asBool()
cpy.destroyed = nil
return &cpy
}

Expand Down Expand Up @@ -1042,31 +1105,56 @@ func (c *cluster) FetchCores(ctx context.Context) error {
})
}

func (c *cluster) Destroy(ctx context.Context) {
type closeLoggerOpt bool

const (
closeLogger closeLoggerOpt = true
dontCloseLogger = false
)

// Destroy calls `roachprod destroy` or `roachprod wipe` on the cluster.
// If called while another Destroy() or destroyInner() is in progress, the call
// blocks until that first call finishes.
func (c *cluster) Destroy(ctx context.Context, lo closeLoggerOpt) {
if c.destroyState == nil {
c.l.Errorf("Destroy() called on cluster copy")
return
}

if c.nodes == 0 {
// No nodes can happen during unit tests and implies nothing to do.
return
}

// Only destroy the cluster if it exists in the cluster registry. The cluster
// may not exist if the test was interrupted and the teardown machinery is
// destroying all clusters. (See destroyAllClusters).
if exists := unregisterCluster(c); exists {
c.destroy(ctx)
ch := c.doDestroy(ctx)
<-ch
// NB: Closing the logger without waiting on c.destroyState.destroyed above
// would be bad because we might cause the ongoing `roachprod destroy` to fail
// by closing its stdout/stderr.
if lo == closeLogger {
c.closeLogger()
}
// If the test was interrupted, another goroutine is destroying the cluster
// and we need to wait for that to finish before closing the
// logger. Otherwise, the destruction can get interrupted due to closing the
// stdout/stderr of the roachprod command.
<-c.destroyed
c.l.close()
}

func (c *cluster) destroy(ctx context.Context) {
defer close(c.destroyed)
// doDestroy calls `roachprod destroy` or `roachprod wipe` on the cluster. It
// returns a chan that will be closed when the destruction complete. If there's
// no other doDestroy() in flight, the call is synchronous and the channel is
// closed upon return.
func (c *cluster) doDestroy(ctx context.Context) <-chan struct{} {
var inFlight <-chan struct{}
c.destroyState.mu.Lock()
if c.destroyState.mu.destroyed == nil {
c.destroyState.mu.destroyed = make(chan struct{})
} else {
inFlight = c.destroyState.mu.destroyed
}
c.destroyState.mu.Unlock()
if inFlight != nil {
return inFlight
}

if clusterWipe {
if c.owned {
if c.destroyState.owned {
c.status("destroying cluster")
if err := execCmd(ctx, c.l, roachprod, "destroy", c.name); err != nil {
c.l.Errorf("%s", err)
Expand All @@ -1080,6 +1168,12 @@ func (c *cluster) destroy(ctx context.Context) {
} else {
c.l.Printf("skipping cluster wipe\n")
}
c.r.unregisterCluster(c)
c.destroyState.mu.Lock()
ch := c.destroyState.mu.destroyed
close(ch)
c.destroyState.mu.Unlock()
return ch
}

// Run a command with output redirected to the logs instead of to os.Stdout
Expand Down
Loading

0 comments on commit 7aa4020

Please sign in to comment.