Skip to content

Commit

Permalink
cli: close listeners and all open connections on disk stall
Browse files Browse the repository at this point in the history
Disk stalls prevent a node from making progress. Any ranges for which the
stalled node is leaseholder may also be prevented from making progress while
the stalled node remains online but incapacitated. CockroachDB nodes detect
stalls within their stores through timing all write filesystem operations.
Previously, when a stall was detected, Cockroach would simply fatal the
process. However, a process blocked on disk IO cannot be terminated. The
process would enter the zombie state, but would be unable to be reaped.

This commit adds a new step to disk stall handling, closing all open sockets.

Epic: None
Release note (bug fix): Fix a bug where a node with a disk stall would continue
to accept new connections and preserve existing connections until the disk
stall abated.
  • Loading branch information
jbowens committed Feb 1, 2023
1 parent d6a32ed commit 5024b24
Show file tree
Hide file tree
Showing 8 changed files with 193 additions and 0 deletions.
1 change: 1 addition & 0 deletions pkg/cli/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ go_test(
"nodelocal_test.go",
"prefixer_test.go",
"sqlfmt_test.go",
"start_linux_test.go",
"start_test.go",
"statement_bundle_test.go",
"statement_diag_test.go",
Expand Down
5 changes: 5 additions & 0 deletions pkg/cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,11 @@ func runStartInternal(
return clierror.NewError(err, exit.FatalError())
}

// Set a MakeProcessUnavailableFunc that will close all sockets. This guards
// against a persistent disk stall that prevents the process from exiting or
// making progress.
log.SetMakeProcessUnavailableFunc(closeAllSockets)

// Set up a cancellable context for the entire start command.
// The context will be canceled at the end.
ctx, cancel := context.WithCancel(context.Background())
Expand Down
49 changes: 49 additions & 0 deletions pkg/cli/start_linux_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

//go:build linux
// +build linux

package cli

import (
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
)

func TestFindOpenSocketFDs(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

s := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
time.Sleep(500 * time.Millisecond)
rw.WriteHeader(http.StatusOK)
}))
defer s.Close()

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
_, _ = s.Client().Get(s.URL)
}()
defer wg.Wait()

fds, _ := findOpenSocketFDs()
require.Greater(t, len(fds), 0)
s.CloseClientConnections()
}
91 changes: 91 additions & 0 deletions pkg/cli/start_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"os"
"os/exec"
"os/signal"
"path/filepath"
"strconv"
"strings"

"github.com/cockroachdb/cockroach/pkg/cli/cliflags"
Expand Down Expand Up @@ -164,3 +166,92 @@ func disableOtherPermissionBits() {
mask |= 00007
_ = unix.Umask(mask)
}

// closeAllSockets is used in the event of a disk stall, in which case we want
// to terminate the process but may not be able to. A process stalled in disk
// I/O is in uninterruptible sleep within the kernel and cannot be terminated.
// If we can't terminate the process, the next best thing is to quarantine it by
// closing all sockets so that it appears dead to other nodes.
//
// See log.SetMakeProcessUnavailableFunc.
func closeAllSockets() {
// Close all sockets twice. A LISTEN socket may open a new socket after we
// list all FDs. If that's the case, the socket will be closed by the second
// call.
//
// TODO(jackson,#96342): This doesn't prevent the retry mechanisms from
// opening new outgoing connections. Consider marking the rpc.Context as
// poisoned to prevent new outgoing connections.

_ = closeAllSocketsOnce()
_ = closeAllSocketsOnce()

// It's unclear what to do with errors. We try to close all sockets in an
// emergency where we can't exit the process but want to quarantine it by
// removing all communication with the outside world. If we fail to close
// all sockets, panicking is unlikely to be able to terminate the process.
// We do nothing so that if the log sink is NOT stalled, we'll write the
// disk stall log entry.
}

func closeAllSocketsOnce() error {
fds, err := findOpenSocketFDs()
// NB: Intentionally ignore `err`. findOpenSocketFDs may return a non-empty
// slice of FDs with a non-nil error. We want to close the descriptors we
// were able to identify regardless of any error.
for _, fd := range fds {
// Ignore errors so that if we can't close all sockets, we close as many
// as we can. When finished, return a combined error.
fdErr := unix.Shutdown(fd, unix.SHUT_RDWR)
err = errors.CombineErrors(err, fdErr)
}
return err
}

func findOpenSocketFDs() ([]int, error) {
f, err := os.Open("/dev/fd")
if err != nil {
return nil, err
}
defer f.Close()
dirnames, err := f.Readdirnames(-1)
if err != nil {
return nil, err
}
var fds []int
for _, name := range dirnames {
// From the Linux /proc/[pid]/fd man page:
//
// For file descriptors for pipes and sockets, the entries
// will be symbolic links whose content is the file type with
// the inode. A readlink(2) call on this file returns a
// string in the format:
//
// type:[inode]
//
// For example, socket:[2248868] will be a socket and its
// inode is 2248868. For sockets, that inode can be used to
// find more information in one of the files under
// /proc/net/.
//
// We `readlink` each directory entry, and check that the destination
// has the `socket:` prefix.
dst, readLinkErr := os.Readlink(filepath.Join("/dev/fd", name))
if readLinkErr != nil {
// Stumble forward.
err = errors.CombineErrors(err, readLinkErr)
continue
}
if !strings.HasPrefix(dst, "socket:") {
continue
}
fd, atoiErr := strconv.Atoi(name)
if atoiErr != nil {
// Stumble forward.
err = errors.CombineErrors(err, atoiErr)
continue
}
fds = append(fds, fd)
}
return fds, err
}
5 changes: 5 additions & 0 deletions pkg/cli/start_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,8 @@ func maybeRerunBackground() (bool, error) {
func disableOtherPermissionBits() {
// No-op on windows, which does not support umask.
}

func closeAllSockets() {
// No-op on windows.
// TODO(jackson): Is there something else we can do on Windows?
}
8 changes: 8 additions & 0 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -1095,6 +1095,14 @@ func (p *Pebble) makeMetricEtcEventListener(ctx context.Context) pebble.EventLis
// pebble.EventListener on why it's important for this method to return
// quickly.
if fatalOnExceeded {
// The write stall may prevent the process from exiting. If
// the process won't exit, we can at least terminate all our
// RPC connections first.
//
// See pkg/cli.runStart for where this function is hooked
// up.
log.MakeProcessUnavailable()

log.Fatalf(ctx, "file write stall detected: %s", info)
} else {
go log.Errorf(ctx, "file write stall detected: %s", info)
Expand Down
27 changes: 27 additions & 0 deletions pkg/util/log/exit_override.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,35 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log/channel"
"github.com/cockroachdb/cockroach/pkg/util/log/logpb"
"github.com/cockroachdb/cockroach/pkg/util/log/severity"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

var makeProcessUnavailableFunc struct {
syncutil.Mutex
fn func()
}

// MakeProcessUnavailable invokes the emergency stop function set through
// SetMakeProcessUnavailableFunc, if any. MakeProcessUnavailable is a hack to
// close network connections in the event of a disk stall that may prevent the
// process from exiting.
func MakeProcessUnavailable() {
makeProcessUnavailableFunc.Lock()
fn := makeProcessUnavailableFunc.fn
makeProcessUnavailableFunc.Unlock()
if fn != nil {
fn()
}
}

// SetMakeProcessUnavailableFunc sets a function that will be called when
// MakeProcessUnavailable is called.
func SetMakeProcessUnavailableFunc(fn func()) {
makeProcessUnavailableFunc.Lock()
makeProcessUnavailableFunc.fn = fn
makeProcessUnavailableFunc.Unlock()
}

// SetExitFunc allows setting a function that will be called to exit
// the process when a Fatal message is generated. The supplied bool,
// if true, suppresses the stack trace, which is useful for test
Expand Down
7 changes: 7 additions & 0 deletions pkg/util/log/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,13 @@ func (l *fileSink) flushAndMaybeSyncLocked(doSync bool) {
// recursive back-and-forth between the copy of FATAL events to
// OPS and disk slowness detection here. (See the implementation
// of logfDepth for details.)

// The write stall may prevent the process from exiting. If the process
// won't exit, we can at least terminate all our RPC connections first.
//
// See pkg/cli.runStart for where this function is hooked up.
MakeProcessUnavailable()

Ops.Shoutf(context.Background(), severity.FATAL,
"disk stall detected: unable to sync log files within %s", maxSyncDuration,
)
Expand Down

0 comments on commit 5024b24

Please sign in to comment.