Skip to content

Commit

Permalink
Merge #108928
Browse files Browse the repository at this point in the history
108928: pkg/util/log: alter bufferedSink to handle writes during sync flush r=knz,Santamaura a=abarganier

Previously, if the bufferedSink had a synchronous flush scheduled, and an additional write (via the `output()` function) was sent to the bufferedSink, the bufferedSink would panic.

After some investigation & analysis of the code, this approach was found to be unnecessary. We can gracefully handle this scenario without panicking. Instead, we can buffer the message to be included in the upcoming flush.

In this scenario, if an additional forceSync output() call is sent to the bufferedSink, when one is already scheduled, we cannot make the call synchronous. Instead, we can buffer the message in the imminent flush, and return.

Because of this, we change the name of the forceSync option to tryForceSync, to indicate that it's best-effort and not an ironclad guarantee.

Release note: none

Fixes: #106345

NB: A follow up PR will reintroduce the flush trigger into the crash reporter / process shutdown procedure (similar to #101562, which was reverted). This PR focuses on the bufferedSink changes themselves, to keep discussion focused.

Co-authored-by: Alex Barganier <[email protected]>
  • Loading branch information
craig[bot] and abarganier committed Aug 24, 2023
2 parents e2990fe + 96acea6 commit 598b463
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 73 deletions.
152 changes: 96 additions & 56 deletions pkg/util/log/buffered_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package log

import (
"context"
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/cli/exit"
Expand All @@ -23,16 +24,19 @@ import (
// bufferedSink wraps a child sink to add buffering. Messages are accumulated
// and passed to the child sink in bulk when the buffer is flushed. The buffer
// is flushed periodically, and also when it reaches a configured size. Flushes
// can also be requested manually through the extraFlush and forceSync output()
// can also be requested manually through the extraFlush and tryForceSync output()
// options.
//
// bufferedSink's output() method never blocks on the child (except when the
// forceSync option is used). Instead, old messages are dropped if the buffer is
// tryForceSync option is used). Instead, old messages are dropped if the buffer is
// overflowing a configured limit.
//
// Should an error occur in the child sink, it's forwarded to the provided
// onAsyncFlushErr (unless forceSync is requested, in which case the error is
// onAsyncFlushErr (unless tryForceSync is requested, in which case the error is
// returned synchronously, as it would for any other sink).
//
// Note that tryForceSync is best-effort, but there are scenarios where it can't
// be honored. See output() documentation for additional details.
type bufferedSink struct {
// child is the wrapped logSink.
child logSink
Expand Down Expand Up @@ -104,7 +108,7 @@ func newBufferFmtConfig(bufferFmt *logconfig.BufferFormat) *bufferFmtConfig {
// automatically flushes its contents to the child sink. Zero values disable
// these flush triggers. If all triggers are disabled, the buffer is only ever
// flushed when a flush is explicitly requested through the extraFlush or
// forceSync options passed to output().
// tryForceSync options passed to output().
//
// maxBufferSize, if not zero, limits the size of the buffer. When a new message
// is causing the buffer to overflow, old messages are dropped. The caller must
Expand Down Expand Up @@ -193,10 +197,17 @@ func (bs *bufferedSink) attachHints(b []byte) []byte {
// sinks must not recursively call into logging when implementing
// this method.
//
// If forceSync is set, the output() call blocks on the child sink flush and
// returns the child sink's error (which is otherwise handled via the
// bufferedSink's onAsyncFlushErr). If the bufferedSink drops this message instead of
// passing it to the child sink, errSyncMsgDropped is returned.
// If tryForceSync is set, the output() call attempts to block on the
// child sink flush and returns the child sink's error (which is otherwise
// handled via the bufferedSink's onAsyncFlushErr). However, if a previous
// tryForceSync is already scheduled on flushC, we do not block. In this case,
// the msg provided to this output() call will be included in the already
// scheduled tryForceSync's flush, which is imminent. This is an edge case
// that should rarely happen, but it's possible.
//
// If the bufferedSink drops this message instead of passing it to the child
// sink, errSyncMsgDropped is returned. This is generally due to buffer size
// limitations.
func (bs *bufferedSink) output(b []byte, opts sinkOutputOptions) error {
// Make a copy to live in the async buffer.
// We can't take ownership of the slice we're passed --
Expand All @@ -206,36 +217,70 @@ func (bs *bufferedSink) output(b []byte, opts sinkOutputOptions) error {
_, _ = msg.Write(b)

var errC chan error
if opts.forceSync {
// We'll ask to be notified on errC when the flush is complete.
errC = make(chan error)
}

bs.mu.Lock()
// Append the message to the buffer.
if err := bs.mu.buf.appendMsg(msg, errC); err != nil {
bs.mu.Unlock()
return err
}
err := func() error {
bs.mu.Lock()
defer bs.mu.Unlock()
// Append the message to the buffer.
err := bs.mu.buf.appendMsg(msg)
if err != nil {
// Release the msg buffer, since our append failed.
putBuffer(msg)
return err
}

flush := opts.extraFlush || opts.forceSync || (bs.triggerSize > 0 && bs.mu.buf.size() >= bs.triggerSize)
if flush {
// Trigger a flush. The flush will take effect asynchronously (and can be
// arbitrarily delayed if there's another flush in progress). In the
// meantime, the current buffer continues accumulating messages until it
// hits its limit.
bs.flushAsyncLocked()
} else {
// Schedule a flush unless one is scheduled already.
if bs.mu.timer == nil && bs.maxStaleness > 0 {
bs.mu.timer = time.AfterFunc(bs.maxStaleness, func() {
bs.mu.Lock()
bs.flushAsyncLocked()
bs.mu.Unlock()
})
// If the errC on the buffer is already set, then a synchronous
// flush must already be scheduled & waiting on flushC to be executed.
// We only support scheduling one single synchronous flush at a time
// in the bufferedSink, triggered via the tryForceSync option.
//
// Since b.errC is already set by the currently scheduled synchronous flush,
// we can't honor the tryForceSync option for this output() call. However,
// this output() call's msg will be picked up in the imminent flush anyway,
// since it's already been buffered.
//
// WARNING: Attempting to use the buffer's errC when a tryForceSync flush
// is already scheduled, such as overwriting the reference here with a new
// errC, will cause the goroutine already waiting on errC to deadlock!
// Don't do this!
syncFlushAlreadyScheduled := bs.mu.buf.errC != nil
if !syncFlushAlreadyScheduled && opts.tryForceSync {
// We'll ask to be notified on errC when the flush is complete.
errC = make(chan error)
bs.mu.buf.errC = errC
}
if syncFlushAlreadyScheduled && opts.tryForceSync {
fmt.Printf(
"tryForceSync called on %T while one already scheduled. Msg will be included in imminent flush instead.\n",
bs.child)
}

// If a synchronous flush is already scheduled, then a flush is imminent, so don't bother
// scheduling another. Our msg will be included in the upcoming flush.
flush := !syncFlushAlreadyScheduled &&
(opts.extraFlush || opts.tryForceSync || (bs.triggerSize > 0 && bs.mu.buf.size() >= bs.triggerSize))
if flush {
// Trigger a flush. The flush will take effect asynchronously (and can be
// arbitrarily delayed if there's another flush in progress). In the
// meantime, the current buffer continues accumulating messages until it
// hits its limit.
bs.flushAsyncLocked()
} else {
// Schedule a flush for the future based on maxStaleness, unless
// one is scheduled already.
if bs.mu.timer == nil && bs.maxStaleness > 0 {
bs.mu.timer = time.AfterFunc(bs.maxStaleness, func() {
bs.mu.Lock()
defer bs.mu.Unlock()
bs.flushAsyncLocked()
})
}
}
return nil
}()
if err != nil {
return err
}
bs.mu.Unlock()

// If this is a synchronous flush, wait for its completion.
if errC != nil {
Expand Down Expand Up @@ -285,9 +330,11 @@ func (bs *bufferedSink) runFlusher(stopC <-chan struct{}) {
// We'll return after flushing everything.
done = true
}
bs.mu.Lock()
msg, errC := buf.flush(bs.format.prefix, bs.format.suffix, bs.format.delimiter)
bs.mu.Unlock()
msg, errC := func() (*buffer, chan<- error) {
bs.mu.Lock()
defer bs.mu.Unlock()
return buf.flush(bs.format.prefix, bs.format.suffix, bs.format.delimiter)
}()
if msg == nil {
// Nothing to flush.
// NOTE: This can happen in the done case, or if we get two flushC signals
Expand All @@ -299,15 +346,17 @@ func (bs *bufferedSink) runFlusher(stopC <-chan struct{}) {
continue
}

err := bs.child.output(msg.Bytes(), sinkOutputOptions{extraFlush: true, forceSync: errC != nil})
err := bs.child.output(msg.Bytes(), sinkOutputOptions{extraFlush: true, tryForceSync: errC != nil})
if errC != nil {
errC <- err
} else if err != nil {
Ops.Errorf(context.Background(), "logging error from %T: %v", bs.child, err)
if bs.crashOnAsyncFlushFailure {
logging.mu.Lock()
f := logging.mu.exitOverride.f
logging.mu.Unlock()
f := func() func(exit.Code, error) {
logging.mu.Lock()
defer logging.mu.Unlock()
return logging.mu.exitOverride.f
}()
code := bs.exitCode()
if f != nil {
f(code, err)
Expand Down Expand Up @@ -347,9 +396,12 @@ func (b *msgBuf) size() uint64 {

var errMsgTooLarge = errors.New("message dropped because it is too large")

// appendMsg appends msg to the buffer. If errC is not nil, then this channel
// will be signaled when the buffer is flushed.
func (b *msgBuf) appendMsg(msg *buffer, errC chan<- error) error {
// appendMsg appends msg to the buffer. If msg can't fit in the buffer,
// errMsgTooLarge is returned.
//
// If the buffer is full, then we drop older messages in the buffer
// until we have space for the new message.
func (b *msgBuf) appendMsg(msg *buffer) error {
msgLen := uint64(msg.Len())

// Make room for the new message, potentially by dropping the oldest messages
Expand All @@ -368,18 +420,6 @@ func (b *msgBuf) appendMsg(msg *buffer, errC chan<- error) error {

b.messages = append(b.messages, msg)
b.sizeBytes += msgLen

// Assert that b.errC is not already set. It shouldn't be set
// because, if there was a previous message with errC set, that
// message must have had the forceSync flag set and thus acts as a barrier:
// no more messages are sent until the flush of that message completes.
//
// If b.errorCh were to be set, we wouldn't know what to do about it
// since we can't overwrite it in case m.errorCh is also set.
if b.errC != nil {
panic(errors.AssertionFailedf("unexpected errC already set"))
}
b.errC = errC
return nil
}

Expand Down
Loading

0 comments on commit 598b463

Please sign in to comment.