Skip to content

Commit

Permalink
Merge #76997 #77461
Browse files Browse the repository at this point in the history
76997: changefeedccl: Augment kafka error messages. r=miretskiy a=miretskiy

Expand kafka error message to include message key as well
as message length for the rejected message.

Release Notes: None
Release Justification: Low impact; usability improvement.

77461: dev: don't throw away so much log output in `dev doctor` r=dt a=rickystewart

We keep the `stderr` for commands and have `doctor` log it along with
the `stdout`. Also reword some of the error output to make it clearer --
for example, the `stamp` test can fail for reasons besides stamping not
being enabled in your `.bazelrc.user`, so the error message should make
that clear.

Closes #77355.

Release justification: Non-production code changes
Release note: None

Co-authored-by: Yevgeniy Miretskiy <[email protected]>
Co-authored-by: Ricky Stewart <[email protected]>
  • Loading branch information
3 people committed Mar 8, 2022
3 parents 2321403 + e2cbb86 + d042e42 commit 9deb5c6
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 58 deletions.
2 changes: 1 addition & 1 deletion dev
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
set -euo pipefail

# Bump this counter to force rebuilding `dev` on all machines.
DEV_VERSION=19
DEV_VERSION=20

THIS_DIR=$(cd "$(dirname "$0")" && pwd)
BINARY_DIR=$THIS_DIR/bin/dev-versions
Expand Down
5 changes: 5 additions & 0 deletions pkg/ccl/changefeedccl/sink_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,11 @@ func (s *kafkaSink) workerLoop() {
ackMsg = m
case err := <-s.producer.Errors():
ackMsg, ackError = err.Msg, err.Err
if ackError != nil {
ackError = errors.Wrapf(ackError,
"while sending message with key=%s, size=%d",
err.Msg.Key, err.Msg.Key.Length()+err.Msg.Value.Length())
}
}

if m, ok := ackMsg.Metadata.(messageMetadata); ok {
Expand Down
14 changes: 12 additions & 2 deletions pkg/cmd/dev/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,14 @@ func (d *dev) cache(cmd *cobra.Command, _ []string) error {
if err != nil {
return err
}
_, err = d.checkPresenceInBazelRc(bazelRcLine)
return err
errStr, err := d.checkPresenceInBazelRc(bazelRcLine)
if err != nil {
return err
}
if errStr != "" {
return fmt.Errorf("%s", errStr)
}
return nil
}

func bazelRemoteCacheDir() (string, error) {
Expand Down Expand Up @@ -130,6 +136,10 @@ func (d *dev) setUpCache(ctx context.Context) (string, error) {

log.Printf("Configuring cache...\n")

err := d.exec.CommandContextInheritingStdStreams(ctx, "bazel", "build", bazelRemoteTarget)
if err != nil {
return "", err
}
bazelRemoteLoc, err := d.exec.CommandContextSilent(ctx, "bazel", "run", bazelRemoteTarget, "--run_under=//build/bazelutil/whereis")
if err != nil {
return "", err
Expand Down
114 changes: 66 additions & 48 deletions pkg/cmd/dev/doctor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ package main
import (
"context"
"errors"
"fmt"
"io/ioutil"
"log"
"os"
osexec "os/exec"
"path/filepath"
"runtime"
"strconv"
Expand Down Expand Up @@ -79,7 +79,7 @@ func printStdoutAndErr(stdoutStr string, err error) {
if len(stdoutStr) > 0 {
log.Printf("stdout: %s", stdoutStr)
}
var cmderr *osexec.ExitError
var cmderr *exec.ExitError
if errors.As(err, &cmderr) {
stderrStr := strings.TrimSpace(string(cmderr.Stderr))
if len(stderrStr) > 0 {
Expand All @@ -105,9 +105,13 @@ func makeDoctorCmd(runE func(cmd *cobra.Command, args []string) error) *cobra.Co

func (d *dev) doctor(cmd *cobra.Command, _ []string) error {
ctx := cmd.Context()
success := true
failures := []string{}
noCache := mustGetFlagBool(cmd, noCacheFlag)
noCacheEnv := d.os.Getenv("DEV_NO_REMOTE_CACHE")
workspace, err := d.getWorkspace(ctx)
if err != nil {
return err
}
if noCacheEnv != "" {
noCache = true
}
Expand All @@ -117,16 +121,15 @@ func (d *dev) doctor(cmd *cobra.Command, _ []string) error {
if runtime.GOOS == "darwin" {
stdout, err := d.exec.CommandContextSilent(ctx, "/usr/bin/xcodebuild", "-version")
if err != nil {
success = false
log.Printf("Failed to run `/usr/bin/xcodebuild -version`.")
log.Println("Failed to run `/usr/bin/xcodebuild -version`.")
stdoutStr := strings.TrimSpace(string(stdout))
printStdoutAndErr(stdoutStr, err)
log.Println(`You must have a full installation of XCode to build with Bazel.
failures = append(failures, `You must have a full installation of XCode to build with Bazel.
A command-line tools instance does not suffice.
Please perform the following steps:
1. Install XCode from the App Store.
2. Launch Xcode.app at least once to perform one-time initialization of developer tools.
3. Run ` + "`xcode-select -switch /Applications/Xcode.app/`.")
3. Run `+"`xcode-select -switch /Applications/Xcode.app/`.")
}
}

Expand All @@ -136,25 +139,24 @@ Please perform the following steps:
stdout, err := d.exec.CommandContextSilent(ctx, "cmake", "--version")
stdoutStr := strings.TrimSpace(string(stdout))
if err != nil {
log.Println("Failed to run `cmake --version`.")
printStdoutAndErr(stdoutStr, err)
success = false
failures = append(failures, "Failed to run `cmake --version`; do you have it installed?")
} else {
versionFields := strings.Split(strings.TrimPrefix(stdoutStr, "cmake version "), ".")
if len(versionFields) < 3 {
log.Printf("malformed cmake version: %q\n", stdoutStr)
success = false
failures = append(failures, fmt.Sprintf("malformed cmake version: %q", stdoutStr))
} else {
major, majorErr := strconv.Atoi(versionFields[0])
minor, minorErr := strconv.Atoi(versionFields[1])
if majorErr != nil || minorErr != nil {
log.Printf("malformed cmake version: %q\n", stdoutStr)
success = false
failures = append(failures, fmt.Sprintf("malformed cmake version: %q", stdoutStr))
} else if major < cmakeRequiredMajor || minor < cmakeRequiredMinor {
log.Printf("cmake is too old, upgrade to 3.20.x+\n")
msg := "cmake is too old, upgrade to 3.20.x+"
if runtime.GOOS == "linux" {
log.Printf("\t If this is a gceworker you can use ./build/bootstrap/bootstrap-debian.sh to update all tools\n")
msg = msg + "\n\t If this is a gceworker you can use ./build/bootstrap/bootstrap-debian.sh to update all tools"
}
success = false
failures = append(failures, msg)
}
}
}
Expand All @@ -179,41 +181,43 @@ Please perform the following steps:
}

// Check whether the build is properly configured to use stamping.
passedStampTest := true
if _, err := d.exec.CommandContextSilent(ctx, "bazel", "build", "//build/bazelutil:test_stamping"); err != nil {
passedStampTest = false
d.log.Println("doctor: running stamp test")
failedStampTestMsg := ""
stdout, err := d.exec.CommandContextSilent(ctx, "bazel", "build", "//build/bazelutil:test_stamping")
if err != nil {
failedStampTestMsg = "Failed to run `bazel build //build/bazelutil:test_stamping`"
log.Println(failedStampTestMsg)
printStdoutAndErr(string(stdout), err)
} else {
bazelBin, err := d.getBazelBin(ctx)
if err != nil {
return err
}
fileContents, err := d.os.ReadFile(
filepath.Join(bazelBin, "build", "bazelutil", "test_stamping.txt"))
testStampingTxt := filepath.Join(bazelBin, "build", "bazelutil", "test_stamping.txt")
fileContents, err := d.os.ReadFile(testStampingTxt)
if err != nil {
return err
}
if !strings.Contains(fileContents, "STABLE_BUILD_GIT_BUILD_TYPE") {
passedStampTest = false
failedStampTestMsg = fmt.Sprintf("Could not find STABLE_BUILD_GIT_TYPE in %s\n", testStampingTxt)
}
}
workspace, err := d.getWorkspace(ctx)
if err != nil {
return err
}
if !passedStampTest {
success = false
log.Printf(`Your machine is not configured to "stamp" your built executables.
Please add one of the following to your %s/.bazelrc.user:`, workspace)
if failedStampTestMsg != "" {
failedStampTestMsg = failedStampTestMsg + fmt.Sprintf(`
This may be because your Bazel is not configured to "stamp" built executables.
Make sure one of the following lines is in the file %s/.bazelrc.user:
`, workspace)
if runtime.GOOS == "darwin" && runtime.GOARCH == "amd64" {
log.Printf(" build --config=devdarwinx86_64")
failedStampTestMsg = failedStampTestMsg + " build --config=devdarwinx86_64"
} else if runtime.GOOS == "linux" && runtime.GOARCH == "amd64" {
log.Printf(" build --config=dev")
log.Printf(" OR ")
log.Printf(" build --config=crosslinux")
log.Printf("The former will use your host toolchain, while the latter will use the cross-compiler that we use in CI.")
failedStampTestMsg = failedStampTestMsg + " build --config=dev\n"
failedStampTestMsg = failedStampTestMsg + " OR \n"
failedStampTestMsg = failedStampTestMsg + " build --config=crosslinux\n"
failedStampTestMsg = failedStampTestMsg + "The former will use your host toolchain, while the latter will use the cross-compiler that we use in CI."
} else {
log.Printf(" build --config=dev")
failedStampTestMsg = failedStampTestMsg + " build --config=dev"
}
failures = append(failures, failedStampTestMsg)
}

if !noCache {
Expand All @@ -222,13 +226,20 @@ Please add one of the following to your %s/.bazelrc.user:`, workspace)
if err != nil {
return err
}
success, err = d.checkPresenceInBazelRc(bazelRcLine)
msg, err := d.checkPresenceInBazelRc(bazelRcLine)
if err != nil {
return err
}
if msg != "" {
failures = append(failures, msg)
}
}

if !success {
if len(failures) > 0 {
log.Printf("doctor: encountered %d errors", len(failures))
for _, failure := range failures {
log.Println(failure)
}
return errors.New("please address the errors described above and try again")
}

Expand All @@ -239,30 +250,37 @@ Please add one of the following to your %s/.bazelrc.user:`, workspace)
return nil
}

func (d *dev) checkPresenceInBazelRc(expectedBazelRcLine string) (found bool, _ error) {
// checkPresenceInBazelRc checks whether the given line is in ~/.bazelrc.
// If it is, this function returns an empty string and a nil error.
// If it isn't, this function returns a non-empty human-readable string describing
// what the user should do to solve the issue and a nil error.
// In other failure cases the function returns an empty string and a non-nil error.
func (d *dev) checkPresenceInBazelRc(expectedBazelRcLine string) (string, error) {
homeDir, err := os.UserHomeDir()
if err != nil {
return false, err
return "", err
}
defer func() {
if !found {
log.Printf("Please add the string `%s` to your ~/.bazelrc:\n", expectedBazelRcLine)
log.Printf(" echo \"%s\" >> ~/.bazelrc", expectedBazelRcLine)
}
}()
errString := fmt.Sprintf("Please add the string `%s` to your ~/.bazelrc:\n", expectedBazelRcLine)
errString = errString + fmt.Sprintf(" echo \"%s\" >> ~/.bazelrc", expectedBazelRcLine)

bazelRcContents, err := d.os.ReadFile(filepath.Join(homeDir, ".bazelrc"))
if err != nil {
return false, err
// The file may not exist; that's OK, but the line definitely is
// not in the file.
return errString, nil //nolint:returnerrcheck
}
found := false
for _, line := range strings.Split(bazelRcContents, "\n") {
if !strings.Contains(line, expectedBazelRcLine) {
continue
}
if strings.HasPrefix(strings.TrimSpace(line), "#") {
continue
}
return true, nil
found = true
}
if found {
return "", nil
}
return false, nil
return errString, nil
}
29 changes: 22 additions & 7 deletions pkg/cmd/dev/io/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,21 @@ import (
"github.com/irfansharif/recorder"
)

// ExitError is an error type similar to os.ExitError. The stderr for
// failed commands is captured as `Stderr`.
type ExitError struct {
Stderr []byte
Inner error
}

func (e *ExitError) Unwrap() error {
return e.Inner
}

func (e *ExitError) Error() string {
return e.Inner.Error()
}

// Exec is a convenience wrapper around the stdlib os/exec package. It lets us:
//
// (a) mock all instances where we shell out, for tests, and
Expand Down Expand Up @@ -223,13 +238,13 @@ func (e *Exec) commandContextImpl(

output, err := e.Next(command, func(outTrace, errTrace io.Writer) (string, error) {
cmd := exec.CommandContext(ctx, name, args...)
var buffer bytes.Buffer
var stdoutBuffer, stderrBuffer bytes.Buffer
if silent {
cmd.Stdout = io.MultiWriter(&buffer, outTrace)
cmd.Stderr = errTrace
cmd.Stdout = io.MultiWriter(&stdoutBuffer, outTrace)
cmd.Stderr = io.MultiWriter(&stderrBuffer, errTrace)
} else {
cmd.Stdout = io.MultiWriter(e.stdout, &buffer, outTrace)
cmd.Stderr = io.MultiWriter(e.stderr, errTrace)
cmd.Stdout = io.MultiWriter(e.stdout, &stdoutBuffer, outTrace)
cmd.Stderr = io.MultiWriter(e.stderr, &stderrBuffer, errTrace)
}
if stdin != nil {
cmd.Stdin = stdin
Expand All @@ -240,9 +255,9 @@ func (e *Exec) commandContextImpl(
return "", err
}
if err := cmd.Wait(); err != nil {
return "", err
return "", &ExitError{Inner: err, Stderr: stderrBuffer.Bytes()}
}
return buffer.String(), nil
return stdoutBuffer.String(), nil
})

if err != nil {
Expand Down

0 comments on commit 9deb5c6

Please sign in to comment.