From 6afe25cc462ac4418f5461ee853665d6552a7e63 Mon Sep 17 00:00:00 2001 From: Herko Lategan Date: Fri, 6 Dec 2024 12:45:47 +0000 Subject: [PATCH] roachprod: refactor Monitor Previously, Monitor was only able to monitor processes already started. It could not detect a new processes if it was started after monitor started monitoring. This change uses the new monitor scripts `monitor_local.sh` & `monitor_remote.sh` which produces a frame of processes when any cockroach process changes occur including when a new cockroach process is started. The logic for detecting changes in processes has been moved to the "client" side. The scripts are now less complicated and only have the responsibility of sending through a frame of processes. A frame consists of the cluster label, process id, and processes status of all cockroach processes. In addition, Monitor has been moved into its own source file for better separation of logic. The `ignore empty nodes` functionality has also been removed, as it is an unreliable way of detecting "empty nodes", and does not serve any real purpose anymore. Informs: #118214 Epic: None --- pkg/cmd/roachprod/flags.go | 5 - pkg/roachprod/install/BUILD.bazel | 3 + pkg/roachprod/install/cluster_synced.go | 429 ------------------------ pkg/roachprod/install/monitor.go | 374 +++++++++++++++++++++ 4 files changed, 377 insertions(+), 434 deletions(-) create mode 100644 pkg/roachprod/install/monitor.go diff --git a/pkg/cmd/roachprod/flags.go b/pkg/cmd/roachprod/flags.go index 94fd671ab67a..067b23b95edb 100644 --- a/pkg/cmd/roachprod/flags.go +++ b/pkg/cmd/roachprod/flags.go @@ -311,11 +311,6 @@ func initFlags() { logsCmd.Flags().StringVar(&logsProgramFilter, "logs-program", "^cockroach$", "regular expression of the name of program in log files to search") - monitorCmd.Flags().BoolVar(&monitorOpts.IgnoreEmptyNodes, - "ignore-empty-nodes", false, - "Automatically detect the (subset of the given) nodes which to monitor "+ - "based on the presence of a nontrivial data directory.") - monitorCmd.Flags().BoolVar(&monitorOpts.OneShot, "oneshot", false, "Report the status of all targeted nodes once, then exit. The exit "+ diff --git a/pkg/roachprod/install/BUILD.bazel b/pkg/roachprod/install/BUILD.bazel index fb3a06edec1f..60c59fda0a2f 100644 --- a/pkg/roachprod/install/BUILD.bazel +++ b/pkg/roachprod/install/BUILD.bazel @@ -10,6 +10,7 @@ go_library( "expander.go", "install.go", "iterm2.go", + "monitor.go", "nodes.go", "run_options.go", "services.go", @@ -21,6 +22,8 @@ go_library( "scripts/download.sh", "scripts/start.sh", "scripts/open_ports.sh", + "scripts/monitor_remote.sh", + "scripts/monitor_local.sh", ], importpath = "github.com/cockroachdb/cockroach/pkg/roachprod/install", visibility = ["//visibility:public"], diff --git a/pkg/roachprod/install/cluster_synced.go b/pkg/roachprod/install/cluster_synced.go index d1770ea56e22..6676b6745271 100644 --- a/pkg/roachprod/install/cluster_synced.go +++ b/pkg/roachprod/install/cluster_synced.go @@ -6,7 +6,6 @@ package install import ( - "bufio" "bytes" "context" "fmt" @@ -17,11 +16,9 @@ import ( "os/exec" "os/signal" "path/filepath" - "strconv" "strings" "sync" "syscall" - "text/template" "time" "github.com/cockroachdb/cockroach/pkg/roachprod/cloud" @@ -679,432 +676,6 @@ fi return statuses, nil } -// MonitorProcessSkipped represents a cockroach process whose status -// was not checked. -type MonitorProcessSkipped struct { - VirtualClusterName string - SQLInstance int -} - -// MonitorProcessRunning represents the cockroach process running on a -// node. -type MonitorProcessRunning struct { - VirtualClusterName string - SQLInstance int - PID string -} - -// MonitorProcessDead represents the cockroach process dying on a node. -type MonitorProcessDead struct { - VirtualClusterName string - SQLInstance int - ExitCode string -} - -type MonitorError struct { - Err error -} - -// MonitorNoCockroachProcessesError is the error returned when the -// monitor is called on a node that is not running a `cockroach` -// process by the time the monitor runs. -var MonitorNoCockroachProcessesError = errors.New("no cockroach processes running") - -// NodeMonitorInfo is a message describing a cockroach process' status. -type NodeMonitorInfo struct { - // The index of the node (in a SyncedCluster) at which the message originated. - Node Node - // Event describes what happened to the node; it is one of - // MonitorProcessSkipped (no store directory was found); - // MonitorProcessRunning, sent when cockroach is running on a node; - // MonitorProcessDead, when the cockroach process stops running on a - // node; or MonitorError, typically indicate networking issues or - // nodes that have (physically) shut down. - Event interface{} -} - -func (nmi NodeMonitorInfo) String() string { - var status string - - virtualClusterDesc := func(name string, instance int) string { - if name == SystemInterfaceName { - return "system interface" - } - - return fmt.Sprintf("virtual cluster %q, instance %d", name, instance) - } - - switch event := nmi.Event.(type) { - case MonitorProcessRunning: - status = fmt.Sprintf("cockroach process for %s is running (PID: %s)", - virtualClusterDesc(event.VirtualClusterName, event.SQLInstance), event.PID, - ) - case MonitorProcessSkipped: - status = fmt.Sprintf("%s was skipped", virtualClusterDesc(event.VirtualClusterName, event.SQLInstance)) - case MonitorProcessDead: - status = fmt.Sprintf("cockroach process for %s died (exit code %s)", - virtualClusterDesc(event.VirtualClusterName, event.SQLInstance), event.ExitCode, - ) - case MonitorError: - status = fmt.Sprintf("error: %s", event.Err.Error()) - } - - return fmt.Sprintf("n%d: %s", nmi.Node, status) -} - -// MonitorOpts is used to pass the options needed by Monitor. -type MonitorOpts struct { - OneShot bool // Report the status of all targeted nodes once, then exit. - IgnoreEmptyNodes bool // Only monitor nodes with a nontrivial data directory. -} - -// Monitor writes NodeMonitorInfo for the cluster nodes to the returned channel. -// Infos sent to the channel always have the Node the event refers to, and the -// event itself. See documentation for NodeMonitorInfo for possible event types. -// -// If OneShot is true, infos are retrieved only once for each node and the -// channel is subsequently closed; otherwise the process continues indefinitely -// (emitting new information as the status of the cockroach process changes). -// -// If IgnoreEmptyNodes is true, tenants on which no CockroachDB data is found -// (in {store-dir}) will not be probed and single event, MonitorTenantSkipped, -// will be emitted for each tenant. -// -// Note that the monitor will only send events for tenants that exist -// at the time this function is called. In other words, this function -// will not emit events for tenants started *after* a call to -// Monitor(). -func (c *SyncedCluster) Monitor( - l *logger.Logger, ctx context.Context, opts MonitorOpts, -) chan NodeMonitorInfo { - ch := make(chan NodeMonitorInfo) - nodes := c.TargetNodes() - var wg sync.WaitGroup - monitorCtx, cancel := context.WithCancel(ctx) - - // sendEvent sends the NodeMonitorInfo passed through the channel - // that is listened to by the caller. Bails if the context is - // canceled. - sendEvent := func(info NodeMonitorInfo) { - // if the monitor's context is already canceled, do not attempt to - // send the error down the channel, as it is most likely *caused* - // by the cancelation itself. - if monitorCtx.Err() != nil { - return - } - - select { - case ch <- info: - // We were able to send the info through the channel. - case <-monitorCtx.Done(): - // Don't block trying to send the info. - } - } - - const ( - separator = "|" - skippedMsg = "skipped" - runningMsg = "running" - deadMsg = "dead" - ) - - wg.Add(len(nodes)) - for i := range nodes { - go func(i int) { - defer wg.Done() - node := nodes[i] - - // We first find out all cockroach processes that are currently - // running in this node. - cockroachProcessesCmd := fmt.Sprintf(`ps axeww -o command | `+ - `grep -E '%s' | `+ // processes started by roachprod - `grep -E -o 'ROACHPROD_VIRTUAL_CLUSTER=[^ ]*' | `+ // ROACHPROD_VIRTUAL_CLUSTER indicates this is a cockroach process - `cut -d= -f2`, // grab the virtual cluster label - c.roachprodEnvRegex(node), - ) - - result, err := c.runCmdOnSingleNode( - ctx, l, node, cockroachProcessesCmd, defaultCmdOpts("list-processes"), - ) - if err := errors.CombineErrors(err, result.Err); err != nil { - err := errors.Wrap(err, "failed to list cockroach processes") - sendEvent(NodeMonitorInfo{Node: node, Event: MonitorError{err}}) - return - } - - type virtualClusterInfo struct { - Name string - Instance int - } - - // Make the collection of virtual clusters a set to handle the - // unlikely but possible case that, in `local` runs, we'll find - // two processes associated with the same virtual cluster - // label. This can happen if we invoke the command above while the - // parent cockroach process already created the child, - // background process, but has not terminated yet. - vcs := map[virtualClusterInfo]struct{}{} - vcLines := strings.TrimSuffix(result.CombinedOut, "\n") - if vcLines == "" { - sendEvent(NodeMonitorInfo{Node: node, Event: MonitorError{MonitorNoCockroachProcessesError}}) - return - } - for _, label := range strings.Split(vcLines, "\n") { - name, instance, err := VirtualClusterInfoFromLabel(label) - if err != nil { - sendEvent(NodeMonitorInfo{Node: node, Event: MonitorError{err}}) - return - } - vcs[virtualClusterInfo{name, instance}] = struct{}{} - } - - data := struct { - OneShot bool - Node Node - IgnoreEmpty bool - Store string - Local bool - Separator string - SkippedMsg string - RunningMsg string - DeadMsg string - Processes []virtualClusterInfo - }{ - OneShot: opts.OneShot, - Node: node, - IgnoreEmpty: opts.IgnoreEmptyNodes, - Store: c.NodeDir(node, 1 /* storeIndex */), - Local: c.IsLocal(), - Separator: separator, - SkippedMsg: skippedMsg, - RunningMsg: runningMsg, - DeadMsg: deadMsg, - Processes: maps.Keys(vcs), - } - - storeFor := func(name string, instance int) string { - return c.InstanceStoreDir(node, name, instance) - } - - localPIDFile := func(name string, instance int) string { - return filepath.Join(c.LogDir(node, name, instance), "cockroach.pid") - } - - // NB.: we parse the output of every line this script - // prints. Every call to `echo` must match the parsing logic - // down below in order to produce structured results to the - // caller. - snippet := ` -dead_parent() { - ! ps -p "$1" >/dev/null || ps -o ucomm -p "$1" | grep -q defunct -} -{{ range .Processes }} -monitor_process_{{$.Node}}_{{.Name}}_{{.Instance}}() { - {{ if $.IgnoreEmpty }} - if ! ls {{storeFor .Name .Instance}}/marker.* 1> /dev/null 2>&1; then - echo "{{.Name}}{{$.Separator}}{{.Instance}}{{$.Separator}}{{$.SkippedMsg}}" - return 0 - fi - {{- end}} - # Init with -1 so that when cockroach is initially dead, we print - # a dead event for it. - lastpid=-1 - while :; do - # if parent process terminated, quit as well. - if dead_parent "$1"; then - return 0 - fi - {{ if $.Local }} - pidFile=$(cat "{{pidFile .Name .Instance}}") - # Make sure the process is still running - pid=$(test -n "${pidFile}" && ps -p "${pidFile}" >/dev/null && echo "${pidFile}") - pid=${pid:-0} # default to 0 - status="unknown" - {{- else }} - # When CRDB is not running, this is zero. - pid=$(systemctl show "{{virtualClusterLabel .Name .Instance}}" --property MainPID --value) - status=$(systemctl show "{{virtualClusterLabel .Name .Instance}}" --property ExecMainStatus --value) - {{- end }} - if [[ "${lastpid}" == -1 && "${pid}" != 0 ]]; then - # On the first iteration through the loop, if the process is running, - # don't register a PID change (which would trigger an erroneous dead - # event). - lastpid=0 - fi - # Output a dead event whenever the PID changes from a nonzero value to - # any other value. In particular, we emit a dead event when the node stops - # (lastpid is nonzero, pid is zero), but not when the process then starts - # again (lastpid is zero, pid is nonzero). - if [ "${pid}" != "${lastpid}" ]; then - if [ "${lastpid}" != 0 ]; then - if [ "${pid}" != 0 ]; then - # If the PID changed but neither is zero, then the status refers to - # the new incarnation. We lost the actual exit status of the old PID. - status="unknown" - fi - echo "{{.Name}}{{$.Separator}}{{.Instance}}{{$.Separator}}{{$.DeadMsg}}{{$.Separator}}${status}" - fi - if [ "${pid}" != 0 ]; then - echo "{{.Name}}{{$.Separator}}{{.Instance}}{{$.Separator}}{{$.RunningMsg}}{{$.Separator}}${pid}" - fi - lastpid=${pid} - fi - {{ if $.OneShot }} - return 0 - {{- end }} - sleep 1 - if [ "${pid}" != 0 ]; then - while kill -0 "${pid}" && ! dead_parent "$1"; do - sleep 1 - done - fi - done -} -{{ end }} - -# monitor every cockroach process in parallel. -{{ range .Processes }} -monitor_process_{{$.Node}}_{{.Name}}_{{.Instance}} $$ & -{{ end }} - -wait -` - - t := template.Must(template.New("script").Funcs(template.FuncMap{ - "storeFor": storeFor, - "pidFile": localPIDFile, - "virtualClusterLabel": VirtualClusterLabel, - }).Parse(snippet)) - var buf bytes.Buffer - if err := t.Execute(&buf, data); err != nil { - err := errors.Wrap(err, "failed to execute template") - sendEvent(NodeMonitorInfo{Node: node, Event: MonitorError{err}}) - return - } - - // This is the exception to funneling all SSH traffic through `c.runCmdOnSingleNode` - sess := c.newSession(l, node, buf.String(), withDebugDisabled()) - defer sess.Close() - - p, err := sess.StdoutPipe() - if err != nil { - err := errors.Wrap(err, "failed to read stdout pipe") - sendEvent(NodeMonitorInfo{Node: node, Event: MonitorError{err}}) - return - } - // Request a PTY so that the script will receive a SIGPIPE when the - // session is closed. - if err := sess.RequestPty(); err != nil { - err := errors.Wrap(err, "failed to request PTY") - sendEvent(NodeMonitorInfo{Node: node, Event: MonitorError{err}}) - return - } - - var readerWg sync.WaitGroup - readerWg.Add(1) - go func(p io.Reader) { - defer readerWg.Done() - r := bufio.NewReader(p) - for { - line, _, err := r.ReadLine() - if err == io.EOF { - sendEvent(NodeMonitorInfo{Node: node, Event: MonitorError{err}}) - return - } - if err != nil { - err := errors.Wrap(err, "error reading from session") - sendEvent(NodeMonitorInfo{Node: node, Event: MonitorError{err}}) - return - } - - parts := strings.Split(string(line), separator) - ensureNumParts := func(n int) { - if len(parts) < n { - panic(fmt.Errorf("invalid output from monitor: %q", line)) - } - } - // Every event is expected to have at least 3 parts. If - // that's not the case, panic explicitly below. Otherwise, - // we'd get a slice out of bounds error and the error - // message would not include the actual problematic line, - // which would make understanding the failure more - // difficult. - ensureNumParts(3) // name, instance, event - - // Virtual cluster name and instance are the first fields of - // every event type. - name, instanceStr := parts[0], parts[1] - instance, _ := strconv.Atoi(instanceStr) - switch parts[2] { - case skippedMsg: - sendEvent(NodeMonitorInfo{Node: node, Event: MonitorProcessSkipped{ - VirtualClusterName: name, SQLInstance: instance, - }}) - case runningMsg: - ensureNumParts(4) - pid := parts[3] - sendEvent(NodeMonitorInfo{Node: node, Event: MonitorProcessRunning{ - VirtualClusterName: name, SQLInstance: instance, PID: pid, - }}) - case deadMsg: - ensureNumParts(4) - exitCode := parts[3] - sendEvent(NodeMonitorInfo{Node: node, Event: MonitorProcessDead{ - VirtualClusterName: name, SQLInstance: instance, ExitCode: exitCode, - }}) - default: - err := fmt.Errorf("internal error: unrecognized output from monitor: %q", line) - sendEvent(NodeMonitorInfo{Node: node, Event: MonitorError{err}}) - } - } - }(p) - - if err := sess.Start(); err != nil { - err := errors.Wrap(err, "failed to start session") - sendEvent(NodeMonitorInfo{Node: node, Event: MonitorError{err}}) - return - } - - // Watch for context cancellation, which can happen if the test - // fails, or if the monitor loop exits. - go func() { - <-monitorCtx.Done() - sess.Close() - if pc, ok := p.(io.ReadCloser); ok { - _ = pc.Close() - } - }() - - readerWg.Wait() - // We must call `sess.Wait()` only after finishing reading from the stdout - // pipe. Otherwise it can be closed under us, causing the reader to loop - // infinitely receiving a non-`io.EOF` error. - if err := sess.Wait(); err != nil { - // If we got an error waiting for the session but the context - // is already canceled, do not send an error through the - // channel; context cancelation happens at the user's request - // or when the test finishes. In either case, the monitor - // should quiesce. Reporting the error is confusing and can be - // spammy in the case of multiple monitors. - if monitorCtx.Err() != nil { - return - } - - err := errors.Wrap(err, "failed to wait for session") - sendEvent(NodeMonitorInfo{Node: node, Event: MonitorError{err}}) - return - } - }(i) - } - go func() { - wg.Wait() - cancel() - close(ch) - }() - - return ch -} - // RunResultDetails holds details of the result of commands executed by Run(). type RunResultDetails struct { Node Node diff --git a/pkg/roachprod/install/monitor.go b/pkg/roachprod/install/monitor.go new file mode 100644 index 000000000000..7b5d11c3cbae --- /dev/null +++ b/pkg/roachprod/install/monitor.go @@ -0,0 +1,374 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package install + +import ( + "bufio" + "bytes" + "context" + _ "embed" + "fmt" + "io" + "strconv" + "strings" + "sync" + "text/template" + + "github.com/alessio/shellescape" + "github.com/cockroachdb/cockroach/pkg/roachprod/logger" + "github.com/cockroachdb/errors" +) + +//go:embed scripts/monitor_remote.sh +var monitorRemoteScript string + +//go:embed scripts/monitor_local.sh +var monitorLocalScript string + +// MonitorProcessSkipped represents a cockroach process whose status +// was not checked. +type MonitorProcessSkipped struct { + VirtualClusterName string + SQLInstance int +} + +// MonitorProcessRunning represents the cockroach process running on a +// node. +type MonitorProcessRunning struct { + VirtualClusterName string + SQLInstance int + PID string +} + +// MonitorProcessDead represents the cockroach process dying on a node. +type MonitorProcessDead struct { + VirtualClusterName string + SQLInstance int + ExitCode string +} + +type MonitorError struct { + Err error +} + +// MonitorNoCockroachProcessesError is the error returned when the +// monitor is called on a node that is not running a `cockroach` +// process by the time the monitor runs. +var MonitorNoCockroachProcessesError = errors.New("no cockroach processes running") + +// NodeMonitorInfo is a message describing a cockroach process' status. +type NodeMonitorInfo struct { + // The index of the node (in a SyncedCluster) at which the message originated. + Node Node + // Event describes what happened to the node; it is one of + // MonitorProcessSkipped (no store directory was found); + // MonitorProcessRunning, sent when cockroach is running on a node; + // MonitorProcessDead, when the cockroach process stops running on a + // node; or MonitorError, typically indicate networking issues or + // nodes that have (physically) shut down. + Event interface{} +} + +func (nmi NodeMonitorInfo) String() string { + var status string + + virtualClusterDesc := func(name string, instance int) string { + if name == SystemInterfaceName { + return "system interface" + } + + return fmt.Sprintf("virtual cluster %q, instance %d", name, instance) + } + + switch event := nmi.Event.(type) { + case MonitorProcessRunning: + status = fmt.Sprintf("cockroach process for %s is running (PID: %s)", + virtualClusterDesc(event.VirtualClusterName, event.SQLInstance), event.PID, + ) + case MonitorProcessSkipped: + status = fmt.Sprintf("%s was skipped", virtualClusterDesc(event.VirtualClusterName, event.SQLInstance)) + case MonitorProcessDead: + status = fmt.Sprintf("cockroach process for %s died (exit code %s)", + virtualClusterDesc(event.VirtualClusterName, event.SQLInstance), event.ExitCode, + ) + case MonitorError: + status = fmt.Sprintf("error: %s", event.Err.Error()) + } + + return fmt.Sprintf("n%d: %s", nmi.Node, status) +} + +// MonitorOpts is used to pass the options needed by Monitor. +type MonitorOpts struct { + OneShot bool // Report the status of all targeted nodes once, then exit. +} + +type monitorProcess struct { + processID int + lastProcessID int + status string +} + +type monitorNode struct { + cluster *SyncedCluster + node Node + processes map[string]*monitorProcess + sendEvent func(info NodeMonitorInfo) + opts MonitorOpts +} + +func (m *monitorNode) processMonitorOutput(lines []string) { + if len(lines)%2 != 0 { + m.sendEvent(NodeMonitorInfo{Node: m.node, Event: MonitorError{errors.New("bad frame from script")}}) + return + } + + // Reset all process IDs to 0. Each process ID will be updated from the + // monitor script output frame. If a process ID is not updated, it is + // considered dead. + for _, process := range m.processes { + process.lastProcessID = process.processID + process.processID = 0 + } + + // Update process IDs from the monitor script output frame. + for i := 0; i < len(lines); i += 2 { + vcLine := strings.Split(lines[i], "=") + if len(vcLine) != 2 { + m.sendEvent(NodeMonitorInfo{Node: m.node, Event: MonitorError{errors.New("failed to parse vcs line")}}) + continue + } + processID, err := strconv.Atoi(vcLine[1]) + if err != nil { + parseErr := errors.Wrap(err, "failed to parse process ID") + m.sendEvent(NodeMonitorInfo{Node: m.node, Event: MonitorError{parseErr}}) + continue + } + statusLine := strings.Split(lines[i+1], "=") + if len(statusLine) != 2 { + m.sendEvent(NodeMonitorInfo{Node: m.node, Event: MonitorError{errors.New("failed to parse status line")}}) + continue + } + process := m.processes[vcLine[0]] + if process == nil { + process = &monitorProcess{} + m.processes[vcLine[0]] = process + } + process.processID = processID + process.status = statusLine[1] + } + + // Check for dead processes by comparing previous process IDs. + for vcLabel, process := range m.processes { + name, instance, err := VirtualClusterInfoFromLabel(vcLabel) + pid := fmt.Sprintf("%d", process.processID) + if err != nil { + parseError := errors.Wrap(err, "failed to parse virtual cluster label") + m.sendEvent(NodeMonitorInfo{Node: m.node, Event: MonitorError{parseError}}) + continue + } + // Output a dead event whenever the processID changes from a nonzero value + // to any other value. In particular, we emit a dead event when the node + // stops (lastProcessID is nonzero, processID is zero), but not when the + // process then starts again (lastProcessID is zero, processID is nonzero). + if process.lastProcessID != process.processID { + status := process.status + if process.lastProcessID != 0 { + if process.processID != 0 { + status = "unknown" + } + m.sendEvent(NodeMonitorInfo{Node: m.node, Event: MonitorProcessDead{ + VirtualClusterName: name, SQLInstance: instance, ExitCode: status, + }}) + } + if process.processID != 0 { + m.sendEvent(NodeMonitorInfo{Node: m.node, Event: MonitorProcessRunning{ + VirtualClusterName: name, SQLInstance: instance, PID: pid, + }}) + } + } + } +} + +func (m *monitorNode) monitorNode(ctx context.Context, l *logger.Logger) { + params := struct { + OneShot bool + RoachprodEnvRegex string + }{ + OneShot: m.opts.OneShot, + RoachprodEnvRegex: m.cluster.roachprodEnvRegex(m.node), + } + monitorScript := monitorRemoteScript + if m.cluster.IsLocal() { + monitorScript = monitorLocalScript + } + t := template.Must(template.New("monitor"). + Funcs(template.FuncMap{"shesc": func(i interface{}) string { + return shellescape.Quote(fmt.Sprint(i)) + }}). + Delims("#{", "#}"). + Parse(monitorScript)) + + var buf bytes.Buffer + if err := t.Execute(&buf, params); err != nil { + templateErr := errors.Wrap(err, "failed to execute template") + m.sendEvent(NodeMonitorInfo{Node: m.node, Event: MonitorError{templateErr}}) + return + } + + // This is the exception to funneling all SSH traffic through `c.runCmdOnSingleNode` + sess := m.cluster.newSession(l, m.node, buf.String(), withDebugDisabled()) + defer sess.Close() + + p, err := sess.StdoutPipe() + if err != nil { + pipeErr := errors.Wrap(err, "failed to read stdout pipe") + m.sendEvent(NodeMonitorInfo{Node: m.node, Event: MonitorError{pipeErr}}) + return + } + // Request a PTY so that the script will receive a SIGPIPE when the + // session is closed. + if err = sess.RequestPty(); err != nil { + ptyErr := errors.Wrap(err, "failed to request PTY") + m.sendEvent(NodeMonitorInfo{Node: m.node, Event: MonitorError{ptyErr}}) + return + } + + var readerWg sync.WaitGroup + readerWg.Add(1) + go func(p io.Reader) { + defer readerWg.Done() + r := bufio.NewReader(p) + lines := make([]string, 0) + for { + lineData, _, err := r.ReadLine() + line := strings.TrimSpace(string(lineData)) + if err == io.EOF { + // Only report EOF errors if we are not in a one-shot mode. + if !m.opts.OneShot { + m.sendEvent(NodeMonitorInfo{Node: m.node, Event: MonitorError{err}}) + } + return + } + if err != nil { + err := errors.Wrap(err, "error reading from session") + m.sendEvent(NodeMonitorInfo{Node: m.node, Event: MonitorError{err}}) + return + } + // An empty line indicates the end of a frame. + if len(line) == 0 { + m.processMonitorOutput(lines) + lines = make([]string, 0) + continue + } + lines = append(lines, line) + } + }(p) + if err := sess.Start(); err != nil { + err := errors.Wrap(err, "failed to start session") + m.sendEvent(NodeMonitorInfo{Node: m.node, Event: MonitorError{err}}) + return + } + + // Watch for context cancellation, which can happen if the test + // fails, or if the monitor loop exits. + go func() { + <-ctx.Done() + sess.Close() + if pc, ok := p.(io.ReadCloser); ok { + _ = pc.Close() + } + }() + + readerWg.Wait() + // We must call `sess.Wait()` only after finishing reading from the stdout + // pipe. Otherwise, it can be closed under us, causing the reader to loop + // infinitely receiving a non-`io.EOF` error. + if err := sess.Wait(); err != nil { + // If we got an error waiting for the session but the context + // is already canceled, do not send an error through the + // channel; context cancelation happens at the user's request + // or when the test finishes. In either case, the monitor + // should quiesce. Reporting the error is confusing and can be + // noisy in the case of multiple monitors. + if ctx.Err() != nil { + return + } + + err := errors.Wrap(err, "failed to wait for session") + m.sendEvent(NodeMonitorInfo{Node: m.node, Event: MonitorError{err}}) + return + } +} + +// Monitor writes NodeMonitorInfo for the cluster nodes to the returned channel. +// Infos sent to the channel always have the Node the event refers to, and the +// event itself. See documentation for NodeMonitorInfo for possible event types. +// +// If OneShot is true, infos are retrieved only once for each node and the +// channel is subsequently closed; otherwise the process continues indefinitely +// (emitting new information as the status of the cockroach process changes). +// +// If IgnoreEmptyNodes is true, tenants on which no CockroachDB data is found +// (in {store-dir}) will not be probed and single event, MonitorTenantSkipped, +// will be emitted for each tenant. +// +// Note that the monitor will only send events for tenants that exist +// at the time this function is called. In other words, this function +// will not emit events for tenants started *after* a call to +// Monitor(). +func (c *SyncedCluster) Monitor( + l *logger.Logger, ctx context.Context, opts MonitorOpts, +) chan NodeMonitorInfo { + ch := make(chan NodeMonitorInfo) + nodes := c.TargetNodes() + var wg sync.WaitGroup + monitorCtx, cancel := context.WithCancel(ctx) + + // sendEvent sends the NodeMonitorInfo passed through the channel + // that is listened to by the caller. Bails if the context is + // canceled. + sendEvent := func(info NodeMonitorInfo) { + // if the monitor's context is already canceled, do not attempt to + // send the error down the channel, as it is most likely *caused* + // by the cancelation itself. + if monitorCtx.Err() != nil { + return + } + + select { + case ch <- info: + // We were able to send the info through the channel. + case <-monitorCtx.Done(): + // Don't block trying to send the info. + } + } + + // Start monitoring each node. + wg.Add(len(nodes)) + for _, node := range nodes { + go func(node Node) { + defer wg.Done() + m := &monitorNode{ + cluster: c, + node: node, + sendEvent: sendEvent, + processes: make(map[string]*monitorProcess), + opts: opts, + } + m.monitorNode(monitorCtx, l) + }(node) + } + + // Wait for all monitoring goroutines to finish. + go func() { + wg.Wait() + cancel() + close(ch) + }() + + return ch +}