diff --git a/pkg/cli/BUILD.bazel b/pkg/cli/BUILD.bazel index 8e2808b7c16b..cd4e43ce6c32 100644 --- a/pkg/cli/BUILD.bazel +++ b/pkg/cli/BUILD.bazel @@ -328,6 +328,7 @@ go_test( "nodelocal_test.go", "prefixer_test.go", "sqlfmt_test.go", + "start_linux_test.go", "start_test.go", "statement_bundle_test.go", "statement_diag_test.go", diff --git a/pkg/cli/start.go b/pkg/cli/start.go index b2bf11d71c0d..d1e9353c7945 100644 --- a/pkg/cli/start.go +++ b/pkg/cli/start.go @@ -465,6 +465,11 @@ func runStartInternal( return clierror.NewError(err, exit.FatalError()) } + // Set a MakeProcessUnavailableFunc that will close all sockets. This guards + // against a persistent disk stall that prevents the process from exiting or + // making progress. + log.SetMakeProcessUnavailableFunc(closeAllSockets) + // Set up a cancellable context for the entire start command. // The context will be canceled at the end. ctx, cancel := context.WithCancel(context.Background()) diff --git a/pkg/cli/start_linux_test.go b/pkg/cli/start_linux_test.go new file mode 100644 index 000000000000..8541de78f836 --- /dev/null +++ b/pkg/cli/start_linux_test.go @@ -0,0 +1,49 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +//go:build linux +// +build linux + +package cli + +import ( + "net/http" + "net/http/httptest" + "sync" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +func TestFindOpenSocketFDs(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + s := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + time.Sleep(500 * time.Millisecond) + rw.WriteHeader(http.StatusOK) + })) + defer s.Close() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + _, _ = s.Client().Get(s.URL) + }() + defer wg.Wait() + + fds, _ := findOpenSocketFDs() + require.Greater(t, len(fds), 0) + s.CloseClientConnections() +} diff --git a/pkg/cli/start_unix.go b/pkg/cli/start_unix.go index 4c067bf8e16e..afb3d6431cf2 100644 --- a/pkg/cli/start_unix.go +++ b/pkg/cli/start_unix.go @@ -18,6 +18,8 @@ import ( "os" "os/exec" "os/signal" + "path/filepath" + "strconv" "strings" "github.com/cockroachdb/cockroach/pkg/cli/cliflags" @@ -164,3 +166,92 @@ func disableOtherPermissionBits() { mask |= 00007 _ = unix.Umask(mask) } + +// closeAllSockets is used in the event of a disk stall, in which case we want +// to terminate the process but may not be able to. A process stalled in disk +// I/O is in uninterruptible sleep within the kernel and cannot be terminated. +// If we can't terminate the process, the next best thing is to quarantine it by +// closing all sockets so that it appears dead to other nodes. +// +// See log.SetMakeProcessUnavailableFunc. +func closeAllSockets() { + // Close all sockets twice. A LISTEN socket may open a new socket after we + // list all FDs. If that's the case, the socket will be closed by the second + // call. + // + // TODO(jackson,#96342): This doesn't prevent the retry mechanisms from + // opening new outgoing connections. Consider marking the rpc.Context as + // poisoned to prevent new outgoing connections. + + _ = closeAllSocketsOnce() + _ = closeAllSocketsOnce() + + // It's unclear what to do with errors. We try to close all sockets in an + // emergency where we can't exit the process but want to quarantine it by + // removing all communication with the outside world. If we fail to close + // all sockets, panicking is unlikely to be able to terminate the process. + // We do nothing so that if the log sink is NOT stalled, we'll write the + // disk stall log entry. +} + +func closeAllSocketsOnce() error { + fds, err := findOpenSocketFDs() + // NB: Intentionally ignore `err`. findOpenSocketFDs may return a non-empty + // slice of FDs with a non-nil error. We want to close the descriptors we + // were able to identify regardless of any error. + for _, fd := range fds { + // Ignore errors so that if we can't close all sockets, we close as many + // as we can. When finished, return a combined error. + fdErr := unix.Shutdown(fd, unix.SHUT_RDWR) + err = errors.CombineErrors(err, fdErr) + } + return err +} + +func findOpenSocketFDs() ([]int, error) { + f, err := os.Open("/dev/fd") + if err != nil { + return nil, err + } + defer f.Close() + dirnames, err := f.Readdirnames(-1) + if err != nil { + return nil, err + } + var fds []int + for _, name := range dirnames { + // From the Linux /proc/[pid]/fd man page: + // + // For file descriptors for pipes and sockets, the entries + // will be symbolic links whose content is the file type with + // the inode. A readlink(2) call on this file returns a + // string in the format: + // + // type:[inode] + // + // For example, socket:[2248868] will be a socket and its + // inode is 2248868. For sockets, that inode can be used to + // find more information in one of the files under + // /proc/net/. + // + // We `readlink` each directory entry, and check that the destination + // has the `socket:` prefix. + dst, readLinkErr := os.Readlink(filepath.Join("/dev/fd", name)) + if readLinkErr != nil { + // Stumble forward. + err = errors.CombineErrors(err, readLinkErr) + continue + } + if !strings.HasPrefix(dst, "socket:") { + continue + } + fd, atoiErr := strconv.Atoi(name) + if atoiErr != nil { + // Stumble forward. + err = errors.CombineErrors(err, atoiErr) + continue + } + fds = append(fds, fd) + } + return fds, err +} diff --git a/pkg/cli/start_windows.go b/pkg/cli/start_windows.go index 7e57e973ecfd..85d73216fdeb 100644 --- a/pkg/cli/start_windows.go +++ b/pkg/cli/start_windows.go @@ -45,3 +45,8 @@ func maybeRerunBackground() (bool, error) { func disableOtherPermissionBits() { // No-op on windows, which does not support umask. } + +func closeAllSockets() { + // No-op on windows. + // TODO(jackson): Is there something else we can do on Windows? +} diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index 7f3c47eeb1b2..96f6bae3b4b5 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -1095,6 +1095,14 @@ func (p *Pebble) makeMetricEtcEventListener(ctx context.Context) pebble.EventLis // pebble.EventListener on why it's important for this method to return // quickly. if fatalOnExceeded { + // The write stall may prevent the process from exiting. If + // the process won't exit, we can at least terminate all our + // RPC connections first. + // + // See pkg/cli.runStart for where this function is hooked + // up. + log.MakeProcessUnavailable() + log.Fatalf(ctx, "file write stall detected: %s", info) } else { go log.Errorf(ctx, "file write stall detected: %s", info) diff --git a/pkg/util/log/exit_override.go b/pkg/util/log/exit_override.go index a1c969456f13..9d9df43b4c31 100644 --- a/pkg/util/log/exit_override.go +++ b/pkg/util/log/exit_override.go @@ -17,8 +17,35 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log/channel" "github.com/cockroachdb/cockroach/pkg/util/log/logpb" "github.com/cockroachdb/cockroach/pkg/util/log/severity" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" ) +var makeProcessUnavailableFunc struct { + syncutil.Mutex + fn func() +} + +// MakeProcessUnavailable invokes the emergency stop function set through +// SetMakeProcessUnavailableFunc, if any. MakeProcessUnavailable is a hack to +// close network connections in the event of a disk stall that may prevent the +// process from exiting. +func MakeProcessUnavailable() { + makeProcessUnavailableFunc.Lock() + fn := makeProcessUnavailableFunc.fn + makeProcessUnavailableFunc.Unlock() + if fn != nil { + fn() + } +} + +// SetMakeProcessUnavailableFunc sets a function that will be called when +// MakeProcessUnavailable is called. +func SetMakeProcessUnavailableFunc(fn func()) { + makeProcessUnavailableFunc.Lock() + makeProcessUnavailableFunc.fn = fn + makeProcessUnavailableFunc.Unlock() +} + // SetExitFunc allows setting a function that will be called to exit // the process when a Fatal message is generated. The supplied bool, // if true, suppresses the stack trace, which is useful for test diff --git a/pkg/util/log/file.go b/pkg/util/log/file.go index 2fbcd13d59ab..52f0ed218c64 100644 --- a/pkg/util/log/file.go +++ b/pkg/util/log/file.go @@ -255,6 +255,13 @@ func (l *fileSink) flushAndMaybeSyncLocked(doSync bool) { // recursive back-and-forth between the copy of FATAL events to // OPS and disk slowness detection here. (See the implementation // of logfDepth for details.) + + // The write stall may prevent the process from exiting. If the process + // won't exit, we can at least terminate all our RPC connections first. + // + // See pkg/cli.runStart for where this function is hooked up. + MakeProcessUnavailable() + Ops.Shoutf(context.Background(), severity.FATAL, "disk stall detected: unable to sync log files within %s", maxSyncDuration, )