Skip to content

Commit

Permalink
changefeedccl: add webhook-chaos roachtest
Browse files Browse the repository at this point in the history
Add new webhook-chaos roachtest with order validation.

Epic: CRDB-38755
Part of: cockroachdb#124148

Release note: none
  • Loading branch information
asg0451 committed Jul 29, 2024
1 parent 26ce6ee commit 3921eca
Show file tree
Hide file tree
Showing 3 changed files with 399 additions and 102 deletions.
49 changes: 49 additions & 0 deletions pkg/cmd/roachtest/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2034,6 +2034,55 @@ func (c *clusterImpl) Get(
return errors.Wrap(roachprod.Get(ctx, l, c.MakeNodes(opts...), src, dest), "cluster.Get")
}

func (c *clusterImpl) SpawnE(ctx context.Context, options install.RunOptions, logger *logger.Logger, args ...string) (stdouts []io.Reader, errs chan error, err error) {
if ctx.Err() != nil {
return nil, nil, ctx.Err()
}
c.status(fmt.Sprintf("running %v on %v", args, options.Nodes))
defer c.status("")

errs = make(chan error, len(options.Nodes))

var (
stdoutReaders []io.Reader
stdoutWriters []io.WriteCloser
)

errWg := &sync.WaitGroup{}
errWg.Add(len(options.Nodes))

// Close errs once all commands have finished.
go func() {
errWg.Wait()
close(errs)
}()

for range options.Nodes {
stdoutReader, stdoutWriter := io.Pipe()
stdoutReaders = append(stdoutReaders, stdoutReader)
stdoutWriters = append(stdoutWriters, stdoutWriter)

go func(stdout io.WriteCloser) {
defer errWg.Done()
defer func() { _ = stdout.Close() }()
err := roachprod.Run(ctx, logger, c.MakeNodes(option.FromInstallNodes(options.Nodes)),
"", "", false, stdout, io.Discard, args, options)
if err != nil && ctx.Err() == nil {
// This should never block since we allocate enough space in errs for each one.
errs <- err
}
}(stdoutWriter)

// Close the pipe when the context is canceled. This will cause the process to fail with SIGPIPE.
go func(stdout io.Closer) {
<-ctx.Done()
_ = stdout.Close()
}(stdoutReader)
}

return stdoutReaders, errs, nil
}

// PutString into the specified file on the remote(s).
func (c *clusterImpl) PutString(
ctx context.Context, content, dest string, mode os.FileMode, nodes ...option.Option,
Expand Down
8 changes: 8 additions & 0 deletions pkg/cmd/roachtest/cluster/cluster_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package cluster
import (
"context"
gosql "database/sql"
"io"
"os"

"github.com/cockroachdb/cockroach/pkg/cmd/roachprod/grafana"
Expand Down Expand Up @@ -127,6 +128,13 @@ type Cluster interface {
// options.
RunE(ctx context.Context, options install.RunOptions, args ...string) error

// SpawnE spawns a command specified by `args` on the given nodes, specified
// via `RunOptions.Nodes`, and returns the stdout of the command executions
// and a channel for errors. Each stdout will be closed as its process
// completes, and the errs channel will be closed when all processes have
// completed.
SpawnE(ctx context.Context, options install.RunOptions, logger *logger.Logger, args ...string) (stdouts []io.Reader, errs chan error, err error)

// RunWithDetailsSingleNode is just like RunWithDetails but used when 1) operating
// on a single node AND 2) an error from roachprod itself would be treated the same way
// you treat an error from the command. This makes error checking easier / friendlier
Expand Down
Loading

0 comments on commit 3921eca

Please sign in to comment.