diff --git a/docs/generated/logsinks.md b/docs/generated/logsinks.md index 1a6ee932dcb1..12b6111c7357 100644 --- a/docs/generated/logsinks.md +++ b/docs/generated/logsinks.md @@ -389,6 +389,7 @@ set to "NONE" to disable buffering. Example configuration: buffering: max-staleness: 20s flush-trigger-size: 25KB + max-buffer-size: 10MB sinks: file-groups: health: @@ -404,6 +405,6 @@ set to "NONE" to disable buffering. Example configuration: |--|--| | `max-staleness` | the maximum time a log message will sit in the buffer before a flush is triggered. | | `flush-trigger-size` | the number of bytes that will trigger the buffer to flush. | -| `max-in-flight` | the maximum number of buffered flushes before messages start being dropped. | +| `max-buffer-size` | the limit on the size of the messages that are buffered. If this limit is exceeded, messages are dropped. The limit is expected to be higher than FlushTriggerSize. A buffer is flushed as soon as FlushTriggerSize is reached, and a new buffer is created once the flushing is started. Only one flushing operation is active at a time. | diff --git a/pkg/util/log/BUILD.bazel b/pkg/util/log/BUILD.bazel index 7e3b69075774..da316d6846b7 100644 --- a/pkg/util/log/BUILD.bazel +++ b/pkg/util/log/BUILD.bazel @@ -5,7 +5,7 @@ go_library( name = "log", srcs = [ "ambient_context.go", - "buffer_sink.go", + "buffered_sink.go", "channels.go", "clog.go", "doc.go", @@ -131,7 +131,7 @@ go_test( size = "small", srcs = [ "ambient_context_test.go", - "buffer_sink_test.go", + "buffered_sink_test.go", "clog_test.go", "file_log_gc_test.go", "file_names_test.go", diff --git a/pkg/util/log/buffer_sink.go b/pkg/util/log/buffer_sink.go deleted file mode 100644 index 9fda295fb99c..000000000000 --- a/pkg/util/log/buffer_sink.go +++ /dev/null @@ -1,346 +0,0 @@ -// Copyright 2021 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package log - -import ( - "context" - "sync/atomic" - "time" - - "github.com/cockroachdb/cockroach/pkg/cli/exit" - "github.com/cockroachdb/errors" -) - -// bufferSink wraps a child logSink to add buffering and asynchronous behavior. -// The buffer is flushed at a configurable size threshold and/or max staleness. -// bufferSink's output method will not block on downstream I/O (unless required -// by the forceSync option). -// -// Incoming messages are accumulated in a "bundle" until the configured staleness -// or size threshold is met, at which point they are put in a queue to be flushed -// by the child sink. If the queue is full, current bundle is compacted rather -// sent, which currently drops the messages but retains their count for later -// reporting. -// TODO(knz): Actually report the count of dropped messages. -// See: https://github.com/cockroachdb/cockroach/issues/72453 -// -// Should an error occur in the child sink, it's forwarded to the provided -// errCallback (unless forceSync is requested, in which case the error is returned -// synchronously, as it would for any other sink). -type bufferSink struct { - // child is the wrapped logSink. - child logSink - - // messageCh sends messages from output(), which is called when a - // log entry is initially created, to accumulator(), which is running - // asynchronously to populate the buffer. - messageCh chan bufferSinkMessage - // flushCh sends bundles of messages from accumulator(), which is - // running asynchronously and collects incoming log entries, to - // flusher(), which is running asynchronously and pushes entries to - // the child sink. - flushCh chan bufferSinkBundle - // nInFlight internally tracks the population of flushCh to detect when it's full. - nInFlight int32 - - // maxStaleness is the duration after which a flush is triggered. - // 0 disables this trigger. - maxStaleness time.Duration - // triggerSize is the size in bytes of accumulated messages which trigger a flush. - // 0 disables this trigger. - triggerSize int - // maxInFlight is the maximum number of flushes to buffer before dropping messages. - maxInFlight int32 - // errCallback is called when the child sync has an error. - errCallback func(error) - - // inErrorState is used internally to temporarily disable the sink during error handling. - inErrorState bool - - // onMsgDrop is a hook that's called, if set, before dropping messages. - onMsgDrop func() -} - -const bufferSinkDefaultMaxInFlight = 4 - -func newBufferSink( - ctx context.Context, - child logSink, - maxStaleness time.Duration, - triggerSize int, - maxInFlight int32, - errCallback func(error), -) *bufferSink { - if maxInFlight <= 0 { - maxInFlight = bufferSinkDefaultMaxInFlight - } - - sink := &bufferSink{ - child: child, - messageCh: make(chan bufferSinkMessage), - flushCh: make(chan bufferSinkBundle, maxInFlight), - maxStaleness: maxStaleness, - triggerSize: triggerSize, - maxInFlight: maxInFlight, - errCallback: errCallback, - } - go sink.accumulator(ctx) - go sink.flusher(ctx) - return sink -} - -// accumulator accumulates messages and sends bundles of them to the flusher. -func (bs *bufferSink) accumulator(ctx context.Context) { - var ( - b bufferSinkBundle - // timer tracks the staleness of the oldest unflushed message. - // It can be thought of as having 3 states: - // - nil when there are no accumulated messages. - // - non-nil but unreadable when the oldest accumulated message is - // younger than maxStaleness. - // - non-nil and readable when the older accumulated message is older - // than maxStaleness and a flush should therefore be triggered. - timer <-chan time.Time // staleness timer - ) - reset := func() { - b = bufferSinkBundle{} - timer = nil - } - - for { - flush := false - - appendMessage := func(m bufferSinkMessage) { - b.messages = append(b.messages, m) - b.byteLen += len(m.b.Bytes()) + 1 // account for the final newline. - if m.flush || m.errorCh != nil || (bs.triggerSize > 0 && b.byteLen > bs.triggerSize) { - flush = true - // Assert that b.errorCh is not already set. It shouldn't be set - // because, if there was a previous message with errorCh set, that - // message must have had the forceSink flag set and thus acts as a barrier: - // no more messages are sent until the flush of that message completes. - // - // If b.errorCh were to be set, we wouldn't know what to do about it - // since we can't overwrite it in case m.errorCh is also set. - if b.errorCh != nil { - panic(errors.AssertionFailedf("unexpected errorCh already set")) - } - b.errorCh = m.errorCh - } else if timer == nil && bs.maxStaleness != 0 { - timer = time.After(bs.maxStaleness) - } - } - select { - case <-timer: - flush = true - case <-ctx.Done(): - // Finally, drain all remaining messages on messageCh, so messages don't - // get dropped. - for { - select { - case m := <-bs.messageCh: - appendMessage(m) - default: - b.done = true - } - if b.done { - break - } - } - flush = true - case m := <-bs.messageCh: - appendMessage(m) - } - - done := b.done - if flush { - // Drop if we have too many pending flushes. But don't drop a batch with - // the done flag set, since that's responsible for stopping the flusher. - // - // TODO(knz): This logic seems to contain a race condition (with - // the flusher). Also it's not clear why this is using a custom - // atomic counter? Why not using a buffered channel and check - // via `select` that the write is possible? - // See: https://github.com/cockroachdb/cockroach/issues/72460 - if done || atomic.LoadInt32(&bs.nInFlight) < bs.maxInFlight { - // bs.flushCh has a buffer of capacity maxInFlight, so this write is - // generally non-blocking. There is one case where it might block: if - // done is set, then the buffer might be full and we're sending anyway. - // In that case, it doesn't matter whether we block or not since we're - // about to return anyway. - bs.flushCh <- b - atomic.AddInt32(&bs.nInFlight, 1) - reset() - } else { - if bs.onMsgDrop != nil { - bs.onMsgDrop() - } - b.compact() - } - } - if done { - return - } - } -} - -// flusher concatenates bundled messages and sends them to the child sink. -// -// TODO(knz): How does this interact with the flusher logic in log_flush.go? -// See: https://github.com/cockroachdb/cockroach/issues/72458 -// -// TODO(knz): this code should be extended to detect server shutdowns: -// as currently implemented the flusher will only terminate after all -// the writes in the channel are completed. If the writes are slow, -// the goroutine may not terminate properly when server shutdown is -// requested. -// See: https://github.com/cockroachdb/cockroach/issues/72459 -func (bs *bufferSink) flusher(ctx context.Context) { - for b := range bs.flushCh { - if len(b.messages) > 0 { - // Append all the messages in the first buffer. - buf := b.messages[0].b - buf.Grow(b.byteLen - len(buf.Bytes())) - for i, m := range b.messages { - if i == 0 { - // First buffer skips putBuffer -- - // we're still using it and it's a weird size - // for reuse. - continue - } - buf.WriteByte('\n') - buf.Write(m.b.Bytes()) - putBuffer(m.b) - } - forceSync := b.errorCh != nil - // Send the accumulated messages to the child sink. - err := bs.child.output(buf.Bytes(), - sinkOutputOptions{extraFlush: true, forceSync: forceSync}) - if forceSync { - b.errorCh <- err - } else if err != nil && bs.errCallback != nil { - // Forward error to the callback, if provided. - // Temporarily disable this sink so it's skipped by - // any logging in the callback. - bs.inErrorState = true - bs.errCallback(err) - bs.inErrorState = false - } - } - // Decrease visible queue size at the end, - // so a long-running flush triggers a compaction - // instead of a blocked channel. - atomic.AddInt32(&bs.nInFlight, -1) - if b.done { - return - } - } -} - -// bufferSinkMessage holds an individual log message sent from output to accumulate. -type bufferSinkMessage struct { - b *buffer - // flush is set if the call explicitly requests to trigger a flush. - flush bool - // errorCh is set iff the message was emitted with the forceSync flag. - // This indicates that the caller is interested in knowing the error status - // of child sink writes. - // The caller will block expecting a (possibly nil) error to return synchronously. - errorCh chan<- error -} - -// bufferSinkBundle is the accumulated state; the unit sent from the accumulator to the flusher. -type bufferSinkBundle struct { - messages []bufferSinkMessage - // byteLen is the total length in bytes of the accumulated messages, - // plus enough for separators. - byteLen int - // droppedCount is the number of dropped messages due to buffer fullness. - // TODO(knz): This needs to get reported somehow, see - // https://github.com/cockroachdb/cockroach/issues/72453 - droppedCount int - // errorCh, if non-nil, expects to receive the (possibly nil) error - // after the flush completes. - errorCh chan<- error - // done indicates that this is the last bundle and the flusher - // should shutdown after sending. - done bool -} - -// compact compacts a bundle, and is called if the buffer is full. -// Currently, drops all messages and keeps track of the -// count. In the future, if there's visibility into message -// severities, some prioritization could happen to keep more -// important messages if there's room. Maybe the timestamp -// range too. -func (b *bufferSinkBundle) compact() { - if b.errorCh != nil { - b.errorCh <- errSyncMsgDropped - b.errorCh = nil - } - b.droppedCount += len(b.messages) - for _, m := range b.messages { - putBuffer(m.b) - } - b.messages = nil -} - -// active returns true if this sink is currently active. -func (bs *bufferSink) active() bool { - return !bs.inErrorState && bs.child.active() -} - -// attachHints attaches some hints about the location of the message -// to the stack message. -func (bs *bufferSink) attachHints(b []byte) []byte { - return bs.child.attachHints(b) -} - -// errSyncMsgDropped is returned by bufferSink.output() whenever a message sent -// with with the forceSync option is dropped. -var errSyncMsgDropped = errors.New("sync log message dropped") - -// output emits some formatted bytes to this sink. -// the sink is invited to perform an extra flush if indicated -// by the argument. This is set to true for e.g. Fatal -// entries. -// -// The parent logger's outputMu is held during this operation: log -// sinks must not recursively call into logging when implementing -// this method. -// -// If forceSync is set, the output() call blocks on the child sink flush and -// returns the child sink's error (which is otherwise handled via the -// bufferSink's errCallback). If the bufferSink drops this message instead of -// passing it to the child sink, errSyncMsgDropped is returned. -func (bs *bufferSink) output(b []byte, opts sinkOutputOptions) error { - // Make a copy to live in the async buffer. - // We can't take ownership of the slice we're passed -- - // it belongs to a buffer that's synchronously being returned - // to the pool for reuse. - buf := getBuffer() - if _, err := buf.Write(b); err != nil { - return err - } - if opts.forceSync { - errorCh := make(chan error) - bs.messageCh <- bufferSinkMessage{b: buf, flush: opts.extraFlush, errorCh: errorCh} - return <-errorCh - } - bs.messageCh <- bufferSinkMessage{b: buf, flush: opts.extraFlush, errorCh: nil} - return nil -} - -// exitCode returns the exit code to use if the logger decides -// to terminate because of an error in output(). -func (bs *bufferSink) exitCode() exit.Code { - return bs.child.exitCode() -} diff --git a/pkg/util/log/buffered_sink.go b/pkg/util/log/buffered_sink.go new file mode 100644 index 000000000000..ddbbca367ef8 --- /dev/null +++ b/pkg/util/log/buffered_sink.go @@ -0,0 +1,396 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package log + +import ( + "context" + "time" + + "github.com/cockroachdb/cockroach/pkg/cli/exit" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" +) + +// bufferedSink wraps a child sink to add buffering. Messages are accumulated +// and passed to the child sink in bulk when the buffer is flushed. The buffer +// is flushed periodically, and also when it reaches a configured size. Flushes +// can also be requested manually through the extraFlush and forceSync output() +// options. +// +// bufferedSink's output() method never blocks on the child (except when the +// forceSync option is used). Instead, old messages are dropped if the buffer is +// overflowing a configured limit. +// +// Should an error occur in the child sink, it's forwarded to the provided +// onAsyncFlushErr (unless forceSync is requested, in which case the error is +// returned synchronously, as it would for any other sink). +type bufferedSink struct { + // child is the wrapped logSink. + child logSink + // maxStaleness is the duration after which a flush is triggered. + // 0 disables this trigger. + maxStaleness time.Duration + // triggerSize is the size in bytes of accumulated messages which trigger a flush. + // 0 disables this trigger. + triggerSize uint64 + // crashOnAsyncFlushFailure, if set, causes the sink to terminate the process + // on an async flush failure. + // + // There's also sync flushes, which have the opportunity to deliver their + // errors to the caller, so those are not subject to this crash. + crashOnAsyncFlushFailure bool + + // flushC is a channel on which requests to flush the buffer are sent to the + // runFlusher goroutine. Each request to flush comes with a channel (can be nil) + // on which the result of the flush is to be communicated. + flushC chan struct{} + + mu struct { + syncutil.Mutex + // buf buffers the messages that have yet to be flushed. + buf msgBuf + // timer is set when a flushAsync() call is scheduled to happen in the + // future. + timer *time.Timer + } +} + +// newBufferedSink creates a bufferedSink that wraps child. +// +// Start() must be called on it before use. +// +// maxStaleness and triggerSize control the circumstances under which the sink +// automatically flushes its contents to the child sink. Zero values disable +// these flush triggers. If all triggers are disabled, the buffer is only ever +// flushed when a flush is explicitly requested through the extraFlush or +// forceSync options passed to output(). +// +// maxBufferSize, if not zero, limits the size of the buffer. When a new message +// is causing the buffer to overflow, old messages are dropped. The caller must +// ensure that maxBufferSize makes sense in relation to triggerSize: triggerSize +// should be lower (otherwise the buffer will never flush based on the size +// threshold), and there should be enough of a gap between the two to generally +// fit at least one message (otherwise the buffer might again never flush, since +// incoming messages would cause old messages to be dropped and the buffer's +// size might never fall in between triggerSize and maxSize). See the diagram +// below. +// +// |msg|msg|msg|msg|msg|msg|msg|msg|msg| +// └----------------------^--------------┘ +// triggerSize maxBufferSize +// └--------------┘ +// sized-based flush is triggered when size falls in this range +// +// maxBufferSize should also be set such that it makes sense in relationship +// with the flush latency: only one flush is ever in flight at a time, so the +// buffer should be sized to generally hold at least the amount of data that is +// expected to be produced during the time it takes one flush to complete. +func newBufferedSink( + child logSink, + maxStaleness time.Duration, + triggerSize uint64, + maxBufferSize uint64, + crashOnAsyncFlushErr bool, +) *bufferedSink { + if triggerSize != 0 && maxBufferSize != 0 { + // Validate triggerSize in relation to maxBufferSize. As explained above, we + // actually want some gap between these two, but the minimum acceptable gap + // is left to the caller (which does its own validation). + if triggerSize >= maxBufferSize { + panic(errors.AssertionFailedf("expected triggerSize (%d) < maxBufferSize (%d)", + triggerSize, maxBufferSize)) + } + } + sink := &bufferedSink{ + child: child, + // flushC is a buffered channel, so that an async flush triggered while + // another flush is in progress doesn't block. + flushC: make(chan struct{}, 1), + triggerSize: triggerSize, + maxStaleness: maxStaleness, + crashOnAsyncFlushFailure: crashOnAsyncFlushErr, + } + sink.mu.buf.maxSizeBytes = maxBufferSize + return sink +} + +// Start starts an internal goroutine that will run until ctx is canceled. +func (bs *bufferedSink) Start(ctx context.Context) { + // Start the runFlusher goroutine. + go bs.runFlusher(ctx) +} + +// active returns true if this sink is currently active. +func (bs *bufferedSink) active() bool { + return bs.child.active() +} + +// attachHints attaches some hints about the location of the message +// to the stack message. +func (bs *bufferedSink) attachHints(b []byte) []byte { + return bs.child.attachHints(b) +} + +// output emits some formatted bytes to this sink. +// the sink is invited to perform an extra flush if indicated +// by the argument. This is set to true for e.g. Fatal +// entries. +// +// The parent logger's outputMu is held during this operation: log +// sinks must not recursively call into logging when implementing +// this method. +// +// If forceSync is set, the output() call blocks on the child sink flush and +// returns the child sink's error (which is otherwise handled via the +// bufferedSink's onAsyncFlushErr). If the bufferedSink drops this message instead of +// passing it to the child sink, errSyncMsgDropped is returned. +func (bs *bufferedSink) output(b []byte, opts sinkOutputOptions) error { + // Make a copy to live in the async buffer. + // We can't take ownership of the slice we're passed -- + // it belongs to a buffer that's synchronously being returned + // to the pool for reuse. + msg := getBuffer() + _, _ = msg.Write(b) + + var errC chan error + if opts.forceSync { + // We'll ask to be notified on errC when the flush is complete. + errC = make(chan error) + } + + bs.mu.Lock() + // Append the message to the buffer. + if err := bs.mu.buf.appendMsg(msg, errC); err != nil { + bs.mu.Unlock() + return err + } + + flush := opts.extraFlush || opts.forceSync || (bs.triggerSize > 0 && bs.mu.buf.size() >= bs.triggerSize) + if flush { + // Trigger a flush. The flush will take effect asynchronously (and can be + // arbitrarily delayed if there's another flush in progress). In the + // meantime, the current buffer continues accumulating messages until it + // hits its limit. + bs.flushAsyncLocked() + } else { + // Schedule a flush unless one is scheduled already. + if bs.mu.timer == nil && bs.maxStaleness > 0 { + bs.mu.timer = time.AfterFunc(bs.maxStaleness, func() { + bs.mu.Lock() + bs.flushAsyncLocked() + bs.mu.Unlock() + }) + } + } + bs.mu.Unlock() + + // If this is a synchronous flush, wait for its completion. + if errC != nil { + return <-errC + } + return nil +} + +// flushAsyncLocked signals the flusher goroutine to flush. +func (bs *bufferedSink) flushAsyncLocked() { + // Make a best-effort attempt to stop a scheduled future flush, if any. + // flushAsyncLocked might have been called by the timer, in which case the + // timer.Stop() call below will be a no-op. It's possible that + // flushAsyncLocked() is not called by the timer, and timer.Stop() still + // returns false, indicating that another flushAsyncLocked() call is imminent. + // That's also fine - we'll flush again, which will be a no-op if the buffer + // remains empty until then. + if bs.mu.timer != nil { + bs.mu.timer.Stop() + bs.mu.timer = nil + } + // Signal the runFlusher to flush, unless it's already been signaled. + select { + case bs.flushC <- struct{}{}: + default: + } +} + +// exitCode returns the exit code to use if the logger decides +// to terminate because of an error in output(). +func (bs *bufferedSink) exitCode() exit.Code { + return bs.child.exitCode() +} + +// runFlusher waits for flush signals in a loop and, when it gets one, flushes +// bs.msgBuf to the wrapped sink. The function returns when ctx is canceled. +// +// TODO(knz): How does this interact with the runFlusher logic in log_flush.go? +// See: https://github.com/cockroachdb/cockroach/issues/72458 +// +// TODO(knz): this code should be extended to detect server shutdowns: +// as currently implemented the runFlusher will only terminate after all +// the writes in the channel are completed. If the writes are slow, +// the goroutine may not terminate properly when server shutdown is +// requested. +// See: https://github.com/cockroachdb/cockroach/issues/72459 +func (bs *bufferedSink) runFlusher(ctx context.Context) { + buf := &bs.mu.buf + for { + done := false + select { + case <-bs.flushC: + case <-ctx.Done(): + // We'll return after flushing everything. + done = true + } + bs.mu.Lock() + msg, errC := buf.flush() + bs.mu.Unlock() + if msg == nil { + // Nothing to flush. + // NOTE: This can happen in the done case, or if we get two flushC signals + // in close succession: one from a manual flush and another from a + // scheduled flush that wasn't canceled in time. + if done { + return + } + continue + } + + err := bs.child.output(msg.Bytes(), sinkOutputOptions{extraFlush: true, forceSync: errC != nil}) + if errC != nil { + errC <- err + } else if err != nil { + Ops.Errorf(context.Background(), "logging error from %T: %v", bs.child, err) + if bs.crashOnAsyncFlushFailure { + logging.mu.Lock() + f := logging.mu.exitOverride.f + logging.mu.Unlock() + code := bs.exitCode() + if f != nil { + f(code, err) + } else { + exit.WithCode(code) + } + } + } + if done { + return + } + } +} + +// msgBuf accumulates messages (represented as buffers) and tracks their size. +// +// msgBuf is not thread-safe. It is protected by the bufferedSink's lock. +type msgBuf struct { + // maxSizeBytes is the size limit. Trying to appendMsg() a message that would + // cause the buffer to exceed this limit returns an error. 0 means no limit. + maxSizeBytes uint64 + + // The messages that have been appended to the buffer. + messages []*buffer + // The sum of the sizes of messages. + sizeBytes uint64 + // errC, if set, specifies that, when the buffer is flushed, the result of the + // flush (success or error) should be signaled on this channel. + errC chan<- error +} + +// size returns the size of b's contents, in bytes. +func (b *msgBuf) size() uint64 { + // We account for the newline after each message. + return b.sizeBytes + uint64(len(b.messages)) +} + +var errMsgTooLarge = errors.New("message dropped because it is too large") + +// appendMsg appends msg to the buffer. If errC is not nil, then this channel +// will be signaled when the buffer is flushed. +func (b *msgBuf) appendMsg(msg *buffer, errC chan<- error) error { + msgLen := uint64(msg.Len()) + + // Make room for the new message, potentially by dropping the oldest messages + // in the buffer. + if b.maxSizeBytes > 0 { + if msgLen > b.maxSizeBytes { + // This message will never fit. + return errMsgTooLarge + } + + // The +1 accounts for a trailing newline. + for b.size()+msgLen+1 > b.maxSizeBytes { + b.dropFirstMsg() + } + } + + b.messages = append(b.messages, msg) + b.sizeBytes += msgLen + + // Assert that b.errC is not already set. It shouldn't be set + // because, if there was a previous message with errC set, that + // message must have had the forceSync flag set and thus acts as a barrier: + // no more messages are sent until the flush of that message completes. + // + // If b.errorCh were to be set, we wouldn't know what to do about it + // since we can't overwrite it in case m.errorCh is also set. + if b.errC != nil { + panic(errors.AssertionFailedf("unexpected errC already set")) + } + b.errC = errC + return nil +} + +// flush resets b, returning its contents in concatenated form. If b is empty, a +// nil buffer is returned. +func (b *msgBuf) flush() (*buffer, chan<- error) { + msg := b.concatMessages() + b.messages = nil + b.sizeBytes = 0 + errC := b.errC + b.errC = nil + return msg, errC +} + +// concatMessages copies over the contents of all the buffers to the first one, +// which is returned. +// +// All buffers but the first one are released to the pool. +func (b *msgBuf) concatMessages() *buffer { + if len(b.messages) == 0 { + return nil + } + var totalSize int + for _, msg := range b.messages { + totalSize += msg.Len() + 1 // leave space for newLine + } + // Append all the messages in the first buffer. + buf := b.messages[0] + buf.Grow(totalSize - buf.Len()) + for i, b := range b.messages { + if i == 0 { + // First buffer skips putBuffer -- + // we're still using it and it's a weird size + // for reuse. + continue + } + buf.WriteByte('\n') + buf.Write(b.Bytes()) + // Make b available for reuse. + putBuffer(b) + } + return buf +} + +func (b *msgBuf) dropFirstMsg() { + // TODO(knz): This needs to get reported somehow, see + // https://github.com/cockroachdb/cockroach/issues/72453 + firstMsg := b.messages[0] + b.messages = b.messages[1:] + b.sizeBytes -= uint64(firstMsg.Len()) + putBuffer(firstMsg) +} diff --git a/pkg/util/log/buffer_sink_test.go b/pkg/util/log/buffered_sink_test.go similarity index 52% rename from pkg/util/log/buffer_sink_test.go rename to pkg/util/log/buffered_sink_test.go index f1d2f9434e1e..eef5de213ffe 100644 --- a/pkg/util/log/buffer_sink_test.go +++ b/pkg/util/log/buffered_sink_test.go @@ -11,7 +11,9 @@ package log import ( + "bytes" "context" + "errors" "fmt" "strings" "sync" @@ -19,18 +21,24 @@ import ( "testing" "time" + "github.com/cockroachdb/cockroach/pkg/cli/exit" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" ) -func getMockBufferSync( - t *testing.T, maxStaleness time.Duration, sizeTrigger int, errCallback func(error), -) (sink *bufferSink, mock *MockLogSink, cleanup func()) { +const noMaxStaleness = time.Duration(0) +const noSizeTrigger = 0 +const noMaxBufferSize = 0 + +func getMockBufferedSync( + t *testing.T, maxStaleness time.Duration, sizeTrigger uint64, maxBufferSize uint64, +) (sink *bufferedSink, mock *MockLogSink, cleanup func()) { ctx, cancel := context.WithCancel(context.Background()) ctrl := gomock.NewController(t) mock = NewMockLogSink(ctrl) - sink = newBufferSink(ctx, mock, maxStaleness, sizeTrigger, 2 /* maxInFlight */, errCallback) + sink = newBufferedSink(mock, maxStaleness, sizeTrigger, maxBufferSize, false /* crashOnAsyncFlushErr */) + sink.Start(ctx) cleanup = func() { cancel() ctrl.Finish() @@ -48,7 +56,7 @@ func addArgs(f func()) func([]byte, sinkOutputOptions) { func TestBufferOneLine(t *testing.T) { defer leaktest.AfterTest(t)() - sink, mock, cleanup := getMockBufferSync(t, 0 /* maxStaleness*/, 0 /* sizeTrigger */, nil /* errCallback*/) + sink, mock, cleanup := getMockBufferedSync(t, noMaxStaleness, noSizeTrigger, noMaxBufferSize) defer cleanup() var wg sync.WaitGroup @@ -63,27 +71,36 @@ func TestBufferOneLine(t *testing.T) { require.NoError(t, sink.output(message, sinkOutputOptions{extraFlush: true})) } -func TestBufferManyLinesOneFlush(t *testing.T) { +func TestBufferSinkBuffers(t *testing.T) { defer leaktest.AfterTest(t)() - sink, mock, cleanup := getMockBufferSync(t, 0 /* maxStaleness*/, 0 /* sizeTrigger */, nil /* errCallback*/) + sink, mock, cleanup := getMockBufferedSync(t, noMaxStaleness, noSizeTrigger, noMaxBufferSize) defer cleanup() - var wg sync.WaitGroup - wg.Add(1) - defer wg.Wait() + flushC := make(chan struct{}) message := []byte("test") mock.EXPECT(). output(gomock.Eq([]byte("test\ntest")), sinkOutputOptionsMatcher{extraFlush: gomock.Eq(true)}). - Do(addArgs(wg.Done)) + Do(addArgs(func() { close(flushC) })) + // Send one message; it should be buffered. require.NoError(t, sink.output(message, sinkOutputOptions{})) + // Sleep a little bit to convince ourselves that no flush is happening. The + // mock would yell if it did happen. + time.Sleep(50 * time.Millisecond) + // Send another message and ask for a flush. require.NoError(t, sink.output(message, sinkOutputOptions{extraFlush: true})) + select { + case <-flushC: + // Good, we got our flush. + case <-time.After(10 * time.Second): + t.Fatalf("expected flush didn't happen") + } } func TestBufferMaxStaleness(t *testing.T) { defer leaktest.AfterTest(t)() - sink, mock, cleanup := getMockBufferSync(t, time.Second /* maxStaleness*/, 0 /* sizeTrigger */, nil /* errCallback*/) + sink, mock, cleanup := getMockBufferedSync(t, time.Second /* maxStaleness*/, noSizeTrigger, noMaxBufferSize) defer cleanup() var wg sync.WaitGroup @@ -100,7 +117,7 @@ func TestBufferMaxStaleness(t *testing.T) { func TestBufferSizeTrigger(t *testing.T) { defer leaktest.AfterTest(t)() - sink, mock, cleanup := getMockBufferSync(t, 0 /* maxStaleness*/, 2 /* sizeTrigger */, nil /* errCallback*/) + sink, mock, cleanup := getMockBufferedSync(t, noMaxStaleness, 2 /* sizeTrigger */, noMaxBufferSize) defer cleanup() var wg sync.WaitGroup @@ -117,56 +134,75 @@ func TestBufferSizeTrigger(t *testing.T) { func TestBufferSizeTriggerMultipleFlush(t *testing.T) { defer leaktest.AfterTest(t)() - sink, mock, cleanup := getMockBufferSync(t, 0 /* maxStaleness*/, 8 /* sizeTrigger */, nil /* errCallback*/) + sink, mock, cleanup := getMockBufferedSync(t, noMaxStaleness, 8 /* sizeTrigger */, noMaxBufferSize) defer cleanup() - var wg sync.WaitGroup - wg.Add(2) - defer wg.Wait() + flush1C := make(chan struct{}) + flush2C := make(chan struct{}) gomock.InOrder( mock.EXPECT(). output(gomock.Eq([]byte("test1\ntest2")), sinkOutputOptionsMatcher{extraFlush: gomock.Eq(true)}). - Do(addArgs(wg.Done)), + Do(addArgs(func() { close(flush1C) })), mock.EXPECT(). output(gomock.Eq([]byte("test3")), sinkOutputOptionsMatcher{extraFlush: gomock.Eq(true)}). - Do(addArgs(wg.Done)), + Do(addArgs(func() { close(flush2C) })), ) require.NoError(t, sink.output([]byte("test1"), sinkOutputOptions{})) require.NoError(t, sink.output([]byte("test2"), sinkOutputOptions{})) + select { + case <-flush1C: + case <-time.After(10 * time.Second): + t.Fatal("first flush didn't happen") + } require.NoError(t, sink.output([]byte("test3"), sinkOutputOptions{extraFlush: true})) + select { + case <-flush2C: + case <-time.After(10 * time.Second): + t.Fatal("first flush didn't happen") + } } -type testError struct{} - -func (testError) Error() string { - return "Test Error" -} - -func TestBufferErrCallback(t *testing.T) { +func TestBufferedSinkCrashOnAsyncFlushErr(t *testing.T) { defer leaktest.AfterTest(t)() - testCh := make(chan struct{}) - errCallback := func(error) { - close(testCh) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mock := NewMockLogSink(ctrl) + bufferMaxSize := uint64(20) + triggerSize := uint64(10) + // Configure a sink to crash on flush errors. + sink := newBufferedSink(mock, noMaxStaleness, triggerSize, bufferMaxSize, true /* crashOnAsyncFlushErr */) + sink.Start(ctx) + + crashC := make(chan struct{}) + SetExitFunc(false /* hideStack */, func(code exit.Code) { + close(crashC) + }) + defer ResetExitFunc() + + // Inject an error in flushes. + mock.EXPECT().output(gomock.Any(), gomock.Any()).Return(errors.New("boom")) + mock.EXPECT().exitCode().Return(exit.LoggingNetCollectorUnavailable()) + // Force a flush. + require.NoError(t, sink.output([]byte("test"), sinkOutputOptions{extraFlush: true})) + // Check that we crashed. + select { + case <-crashC: + // Good; we would have crashed in production. + case <-time.After(10 * time.Second): + t.Fatalf("expected crash didn't happen") } - - sink, mock, cleanup := getMockBufferSync(t, 0 /* maxStaleness*/, 0 /* sizeTrigger */, errCallback) - defer cleanup() - - message := []byte("test") - err := testError{} - mock.EXPECT(). - output(gomock.Eq(message), gomock.Any()).Return(err) - require.NoError(t, sink.output(message, sinkOutputOptions{extraFlush: true})) - - <-testCh } -func TestBufferForceSync(t *testing.T) { +// Test that a call to output() with the forceSync option doesn't return until +// the flush is done. +func TestBufferedSinkForceSync(t *testing.T) { defer leaktest.AfterTest(t)() - sink, mock, cleanup := getMockBufferSync(t, 0 /* maxStaleness*/, 0 /* sizeTrigger */, nil /* errCallback*/) + sink, mock, cleanup := getMockBufferedSync(t, noMaxStaleness, noSizeTrigger, noMaxBufferSize) defer cleanup() ch := make(chan struct{}) @@ -180,7 +216,7 @@ func TestBufferForceSync(t *testing.T) { go func() { // Wait a second, verify that the call to output() is still blocked, // then close the channel to unblock it. - <-time.After(time.Second) + <-time.After(50 * time.Millisecond) if atomic.LoadInt32(&marker) != 0 { t.Error("sink.output returned while child sync should be blocking") } @@ -192,74 +228,110 @@ func TestBufferForceSync(t *testing.T) { atomic.StoreInt32(&marker, 1) } -// Test that a forceSync message that is dropped by the bufferSink does not lead -// to the respective output() call deadlocking. -func TestForceSyncDropNoDeadlock(t *testing.T) { +// Test that messages are buffered while a flush is in-flight. +func TestBufferedSinkBlockedFlush(t *testing.T) { defer leaktest.AfterTest(t)() ctx, cancel := context.WithCancel(context.Background()) defer cancel() ctrl := gomock.NewController(t) defer ctrl.Finish() mock := NewMockLogSink(ctrl) - sink := newBufferSink(ctx, mock, 0 /*maxStaleness*/, 1 /*triggerSize*/, 1 /* maxInFlight */, nil /*errCallback*/) + bufferMaxSize := uint64(20) + triggerSize := uint64(10) + sink := newBufferedSink(mock, noMaxStaleness, triggerSize, bufferMaxSize, false /* crashOnAsyncFlushErr */) + sink.Start(ctx) - // firstFlushC will be signaled when the bufferSink flushes for the first + // firstFlushSem will be signaled when the bufferedSink flushes for the first // time. That flush will be blocked until the channel is written to again. firstFlushSem := make(chan struct{}) - // We'll write a message. This message will trigger a flush based on the byte - // limit. We'll block that flush and write a second message. - m1 := []byte("some message") + // We'll write a large message which will trigger a flush based on the + // triggerSize limit. We'll block that flush and then write more messages. + largeMsg := bytes.Repeat([]byte("a"), int(triggerSize)) mock.EXPECT(). output(gomock.Any(), gomock.Any()). Do(func([]byte, sinkOutputOptions) { firstFlushSem <- struct{}{} <-firstFlushSem }) - require.NoError(t, sink.output(m1, sinkOutputOptions{})) + require.NoError(t, sink.output(largeMsg, sinkOutputOptions{})) select { case <-firstFlushSem: case <-time.After(10 * time.Second): t.Fatal("expected flush didn't happen") } - // Unblock the flush when the test is done. - defer func() { - firstFlushSem <- struct{}{} - }() - // With the flush blocked, we now send a second message with the forceSync - // option. This message is expected to be dropped because of the maxInFlight - // limit. We install an onCompact hook to intercept the message drop so we can - // synchronize with it. - compactCh := make(chan struct{}, 1) - sink.onMsgDrop = func() { - select { - case compactCh <- struct{}{}: - default: - } + // With the flush blocked, we now send more messages. These messages will run + // into the bufferMaxSize limit, and so the oldest will be dropped. + + // First, we arm a channel for a second flush. We don't expect this to fire + // yet. + secondFlush := make(chan []byte) + mock.EXPECT(). + output(gomock.Any(), gomock.Any()). + Do(func(logs []byte, _ sinkOutputOptions) { + secondFlush <- logs + }) + + // We're going to send a sequence of messages. They'll overflow the buffer, + // and we'll expect only the last few to be eventually flushed. + for i := 0; i < 10; i++ { + s := fmt.Sprintf("a%d", i) + require.NoError(t, sink.output([]byte(s), sinkOutputOptions{})) + } + for i := 0; i < 10; i++ { + s := fmt.Sprintf("b%d", i) + require.NoError(t, sink.output([]byte(s), sinkOutputOptions{})) } - m2 := []byte("a sync message") - logCh := make(chan error) - go func() { - logCh <- sink.output(m2, sinkOutputOptions{forceSync: true}) - }() - // Wait to be notified that the message was dropped. - <-compactCh select { - case err := <-logCh: - require.ErrorIs(t, err, errSyncMsgDropped) + case <-secondFlush: + t.Fatalf("unexpected second flush while first flush is in-flight") + case <-time.After(10 * time.Millisecond): + // Good; it appears that a second flush does not happen while the first is in-flight. + } + + // Now unblock the original flush, which in turn will allow a second flush to happen. + firstFlushSem <- struct{}{} + + // Check that the second flush happens, and delivers the tail of the messages. + select { case <-time.After(10 * time.Second): - t.Fatal("sink.Output call never returned. Deadlock?") + t.Fatal("expected 2nd flush didn't happen") + case out := <-secondFlush: + require.Equal(t, []byte(`b4 +b5 +b6 +b7 +b8 +b9`), out) } } +// Test that multiple messages with the forceSync option work. +func TestBufferedSinkSyncFlush(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mock := NewMockLogSink(ctrl) + sink := newBufferedSink(mock, noMaxStaleness, noSizeTrigger, noMaxBufferSize, false /* crashOnAsyncFlushErr */) + sink.Start(ctx) + + mock.EXPECT().output(gomock.Eq([]byte("a")), gomock.Any()) + mock.EXPECT().output(gomock.Eq([]byte("b")), gomock.Any()) + require.NoError(t, sink.output([]byte("a"), sinkOutputOptions{forceSync: true})) + require.NoError(t, sink.output([]byte("b"), sinkOutputOptions{forceSync: true})) +} + func TestBufferCtxDoneFlushesRemainingMsgs(t *testing.T) { defer leaktest.AfterTest(t)() ctx, cancel := context.WithCancel(context.Background()) ctrl := gomock.NewController(t) mock := NewMockLogSink(ctrl) - sink := newBufferSink(ctx, mock, 0 /*maxStaleness*/, 0 /*sizeTrigger*/, 2 /* maxInFlight */, nil /*errCallback*/) + sink := newBufferedSink(mock, noMaxStaleness, noSizeTrigger, noMaxBufferSize, false /* crashOnAsyncFlushErr */) + sink.Start(ctx) defer ctrl.Finish() var wg sync.WaitGroup diff --git a/pkg/util/log/file_api.go b/pkg/util/log/file_api.go index 71a0a8116ebd..23198fd3f471 100644 --- a/pkg/util/log/file_api.go +++ b/pkg/util/log/file_api.go @@ -250,7 +250,7 @@ func GetLogReader(filename string) (io.ReadCloser, error) { sb, ok := fs.mu.file.(*syncBuffer) if ok && baseFileName == filepath.Base(sb.file.Name()) { // If the file being read is also the file being written to, then we - // want mutual exclusion between the reader and the flusher. + // want mutual exclusion between the reader and the runFlusher. lr := &lockedReader{} lr.mu.RWMutex = &fs.mu.RWMutex lr.mu.wrappedFile = file @@ -371,7 +371,7 @@ var _ io.ReadCloser = (*lockedReader)(nil) // lockedReader locks accesses to a wrapped io.ReadCloser, // using a RWMutex shared with another component. // We use this when reading log files (using the GetLogReader API) -// that are concurrently being written to by the log flusher, +// that are concurrently being written to by the log runFlusher, // to ensure that read operations cannot observe partial flushes. type lockedReader struct { mu struct { diff --git a/pkg/util/log/flags.go b/pkg/util/log/flags.go index 4a850530f510..2a0c899cbca0 100644 --- a/pkg/util/log/flags.go +++ b/pkg/util/log/flags.go @@ -16,7 +16,6 @@ import ( "io/fs" "math" - "github.com/cockroachdb/cockroach/pkg/cli/exit" "github.com/cockroachdb/cockroach/pkg/util/log/channel" "github.com/cockroachdb/cockroach/pkg/util/log/logconfig" "github.com/cockroachdb/cockroach/pkg/util/log/logflags" @@ -289,7 +288,7 @@ func ApplyConfig(config logconfig.Config) (resFn func(), err error) { if err != nil { return nil, err } - attachBufferWrapper(secLoggersCtx, fileSinkInfo, fc.CommonSinkConfig) + attachBufferWrapper(secLoggersCtx, fileSinkInfo, fc.CommonSinkConfig.Buffering) attachSinkInfo(fileSinkInfo, &fc.Channels) // Start the GC process. This ensures that old capture files get @@ -306,7 +305,7 @@ func ApplyConfig(config logconfig.Config) (resFn func(), err error) { if err != nil { return nil, err } - attachBufferWrapper(secLoggersCtx, fluentSinkInfo, fc.CommonSinkConfig) + attachBufferWrapper(secLoggersCtx, fluentSinkInfo, fc.CommonSinkConfig.Buffering) attachSinkInfo(fluentSinkInfo, &fc.Channels) } @@ -319,7 +318,7 @@ func ApplyConfig(config logconfig.Config) (resFn func(), err error) { if err != nil { return nil, err } - attachBufferWrapper(secLoggersCtx, httpSinkInfo, fc.CommonSinkConfig) + attachBufferWrapper(secLoggersCtx, httpSinkInfo, fc.CommonSinkConfig.Buffering) attachSinkInfo(httpSinkInfo, &fc.Channels) } @@ -401,38 +400,24 @@ func (l *sinkInfo) applyFilters(chs logconfig.ChannelFilters) { } } -func attachBufferWrapper(ctx context.Context, s *sinkInfo, c logconfig.CommonSinkConfig) { - b := c.Buffering - if b.IsNone() { +// attachBufferWrapper modifies s, wrapping its sink in a bufferedSink unless +// bufConfig.IsNone(). +// +// ctx needs to be canceled to stop the bufferedSink internal goroutines. +func attachBufferWrapper( + ctx context.Context, s *sinkInfo, bufConfig logconfig.CommonBufferSinkConfigWrapper, +) { + if bufConfig.IsNone() { return } - - errCallback := func(err error) { - // TODO(knz): explain which sink is encountering the error in the - // error message. - // See: https://github.com/cockroachdb/cockroach/issues/72461 - Ops.Errorf(context.Background(), "logging error: %v", err) - } - if s.criticality { - // TODO(knz): explain which sink is encountering the error in the - // error message. - // See: https://github.com/cockroachdb/cockroach/issues/72461 - errCallback = func(err error) { - Ops.Errorf(context.Background(), "logging error: %v", err) - - logging.mu.Lock() - f := logging.mu.exitOverride.f - logging.mu.Unlock() - - code := s.sink.exitCode() - if f != nil { - f(code, err) - } else { - exit.WithCode(code) - } - } - } - s.sink = newBufferSink(ctx, s.sink, *b.MaxStaleness, int(*b.FlushTriggerSize), int32(*b.MaxInFlight), errCallback) + bs := newBufferedSink( + s.sink, + *bufConfig.MaxStaleness, + uint64(*bufConfig.FlushTriggerSize), + uint64(*bufConfig.MaxBufferSize), + s.criticality /* crashOnAsyncFlushErr */) + bs.Start(ctx) + s.sink = bs } // applyConfig applies a common sink configuration to a sinkInfo. diff --git a/pkg/util/log/logconfig/BUILD.bazel b/pkg/util/log/logconfig/BUILD.bazel index 48374a7312a6..7bc63bcefb3d 100644 --- a/pkg/util/log/logconfig/BUILD.bazel +++ b/pkg/util/log/logconfig/BUILD.bazel @@ -25,6 +25,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/build", + "//pkg/util/humanizeutil", "//pkg/util/log/logpb", "@com_github_cockroachdb_errors//:errors", "@com_github_dustin_go_humanize//:go-humanize", diff --git a/pkg/util/log/logconfig/config.go b/pkg/util/log/logconfig/config.go index eb9da6df568a..0804fd793204 100644 --- a/pkg/util/log/logconfig/config.go +++ b/pkg/util/log/logconfig/config.go @@ -150,6 +150,7 @@ type CaptureFd2Config struct { // buffering: // max-staleness: 20s // flush-trigger-size: 25KB +// max-buffer-size: 10MB // sinks: // file-groups: // health: @@ -162,15 +163,18 @@ type CaptureFd2Config struct { type CommonBufferSinkConfig struct { // MaxStaleness is the maximum time a log message will sit in the buffer // before a flush is triggered. - MaxStaleness *time.Duration `yaml:"max-staleness,omitempty"` + MaxStaleness *time.Duration `yaml:"max-staleness"` // FlushTriggerSize is the number of bytes that will trigger the buffer // to flush. - FlushTriggerSize *ByteSize `yaml:"flush-trigger-size,omitempty"` - - // MaxInFlight is the maximum number of buffered flushes before messages - // start being dropped. - MaxInFlight *int `yaml:"max-in-flight,omitempty"` + FlushTriggerSize *ByteSize `yaml:"flush-trigger-size"` + + // MaxBufferSize is the limit on the size of the messages that are buffered. + // If this limit is exceeded, messages are dropped. The limit is expected to + // be higher than FlushTriggerSize. A buffer is flushed as soon as + // FlushTriggerSize is reached, and a new buffer is created once the flushing + // is started. Only one flushing operation is active at a time. + MaxBufferSize *ByteSize `yaml:"max-buffer-size"` } // CommonBufferSinkConfigWrapper is a BufferSinkConfig with a special value represented in YAML by @@ -1059,6 +1063,7 @@ func (w *CommonBufferSinkConfigWrapper) UnmarshalYAML(fn func(interface{}) error w.CommonBufferSinkConfig = CommonBufferSinkConfig{ MaxStaleness: &d, FlushTriggerSize: &s, + MaxBufferSize: &s, } return nil } @@ -1071,7 +1076,8 @@ func (w *CommonBufferSinkConfigWrapper) UnmarshalYAML(fn func(interface{}) error // After default propagation, buffering is disabled iff IsNone(). func (w CommonBufferSinkConfigWrapper) IsNone() bool { return (w.MaxStaleness != nil && *w.MaxStaleness == 0) && - (w.FlushTriggerSize != nil && *w.FlushTriggerSize == 0) + (w.FlushTriggerSize != nil && *w.FlushTriggerSize == 0) && + (w.MaxBufferSize != nil && *w.MaxBufferSize == 0) } // HTTPSinkMethod is a string restricted to "POST" and "GET" diff --git a/pkg/util/log/logconfig/testdata/validate b/pkg/util/log/logconfig/testdata/validate index 1bf2573753ac..b5a2b6f818ca 100644 --- a/pkg/util/log/logconfig/testdata/validate +++ b/pkg/util/log/logconfig/testdata/validate @@ -445,7 +445,7 @@ fluent-defaults: buffering: max-staleness: 15s flush-trigger-size: 10KiB - max-in-flight: 4 + max-buffer-size: 2MiB sinks: fluent-servers: a: @@ -462,7 +462,7 @@ sinks: address: c channels: HEALTH buffering: - max-in-flight: 16 + max-buffer-size: 3MiB d: address: d channels: SESSIONS @@ -486,7 +486,7 @@ sinks: buffering: max-staleness: 10s flush-trigger-size: 10KiB - max-in-flight: 4 + max-buffer-size: 2.0MiB b: channels: {INFO: [OPS]} net: tcp @@ -499,7 +499,7 @@ sinks: buffering: max-staleness: 15s flush-trigger-size: 5.0KiB - max-in-flight: 4 + max-buffer-size: 2.0MiB c: channels: {INFO: [HEALTH]} net: tcp @@ -512,7 +512,7 @@ sinks: buffering: max-staleness: 15s flush-trigger-size: 10KiB - max-in-flight: 16 + max-buffer-size: 3.0MiB d: channels: {INFO: [SESSIONS]} net: tcp diff --git a/pkg/util/log/logconfig/validate.go b/pkg/util/log/logconfig/validate.go index bccc35b830b0..d00f8df8ee77 100644 --- a/pkg/util/log/logconfig/validate.go +++ b/pkg/util/log/logconfig/validate.go @@ -21,6 +21,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/build" + "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/log/logpb" "github.com/cockroachdb/errors" ) @@ -38,7 +39,6 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) { bt, bf := true, false zeroDuration := time.Duration(0) zeroByteSize := ByteSize(0) - zeroInt := int(0) baseCommonSinkConfig := CommonSinkConfig{ Filter: logpb.Severity_INFO, @@ -46,11 +46,17 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) { Redactable: &bt, Redact: &bf, Criticality: &bf, + // Buffering is configured to "NONE". This is different from a zero value + // which buffers infinitely. + // + // TODO(andrei,alexb): Enable buffering by default for some sinks once the + // shutdown of the bufferedSink is improved. Note that baseFileDefaults + // below does not inherit the buffering settings from here. Buffering: CommonBufferSinkConfigWrapper{ CommonBufferSinkConfig: CommonBufferSinkConfig{ MaxStaleness: &zeroDuration, FlushTriggerSize: &zeroByteSize, - MaxInFlight: &zeroInt, + MaxBufferSize: &zeroByteSize, }, }, } @@ -63,6 +69,16 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) { CommonSinkConfig: CommonSinkConfig{ Format: func() *string { s := DefaultFileFormat; return &s }(), Criticality: &bt, + // Buffering is configured to "NONE". We're setting this to something so + // that the defaults are not propagated from baseCommonSinkConfig because + // buffering is not supported on file sinks at the moment (#72452). + Buffering: CommonBufferSinkConfigWrapper{ + CommonBufferSinkConfig: CommonBufferSinkConfig{ + MaxStaleness: &zeroDuration, + FlushTriggerSize: &zeroByteSize, + MaxBufferSize: &zeroByteSize, + }, + }, }, } baseFluentDefaults := FluentDefaults{ @@ -100,7 +116,7 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) { } else { fc.prefix = prefix } - if err := c.validateFileSinkConfig(fc, defaultLogDir); err != nil { + if err := c.validateFileSinkConfig(fc); err != nil { fmt.Fprintf(&errBuf, "file group %q: %v\n", prefix, err) } } @@ -141,6 +157,9 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) { c.Sinks.Stderr.Criticality = &bt } c.Sinks.Stderr.Auditable = nil + if err := c.ValidateCommonSinkConfig(c.Sinks.Stderr.CommonSinkConfig); err != nil { + fmt.Fprintf(&errBuf, "stderr sink: %v\n", err) + } // Propagate the sink-wide default filter to all channels that don't // have a filter yet. @@ -244,7 +263,7 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) { } else { // "default" did not exist yet. Create it. fc = c.newFileSinkConfig("default") - if err := c.validateFileSinkConfig(fc, defaultLogDir); err != nil { + if err := c.validateFileSinkConfig(fc); err != nil { fmt.Fprintln(&errBuf, err) } } @@ -322,7 +341,7 @@ func (c *Config) newFileSinkConfig(groupName string) *FileSinkConfig { return fc } -func (c *Config) validateFileSinkConfig(fc *FileSinkConfig, defaultLogDir *string) error { +func (c *Config) validateFileSinkConfig(fc *FileSinkConfig) error { propagateFileDefaults(&fc.FileDefaults, c.FileDefaults) if !fc.Buffering.IsNone() { // We cannot use unimplemented.WithIssue() here because of a @@ -359,6 +378,29 @@ func (c *Config) validateFileSinkConfig(fc *FileSinkConfig, defaultLogDir *strin } fc.Auditable = nil + return c.ValidateCommonSinkConfig(fc.CommonSinkConfig) +} + +// ValidateCommonSinkConfig validates a CommonSinkConfig. +func (c *Config) ValidateCommonSinkConfig(conf CommonSinkConfig) error { + b := conf.Buffering + if b.IsNone() { + return nil + } + + const minSlackBytes = 1 << 20 // 1MB + + if b.FlushTriggerSize != nil && b.MaxBufferSize != nil { + if *b.FlushTriggerSize > *b.MaxBufferSize-minSlackBytes { + // See comments on newBufferSink. + return errors.Newf( + "not enough slack between flush-trigger-size (%s) and max-buffer-size (%s); "+ + "flush-trigger-size needs to be <= max-buffer-size - %s (= %s) to ensure that"+ + "a large message does not cause the buffer to overflow without triggering a flush", + b.FlushTriggerSize, b.MaxBufferSize, humanizeutil.IBytes(minSlackBytes), *b.MaxBufferSize-minSlackBytes, + ) + } + } return nil } @@ -386,7 +428,7 @@ func (c *Config) validateFluentSinkConfig(fc *FluentSinkConfig) error { } fc.Auditable = nil - return nil + return c.ValidateCommonSinkConfig(fc.CommonSinkConfig) } func (c *Config) validateHTTPSinkConfig(hsc *HTTPSinkConfig) error { @@ -394,7 +436,7 @@ func (c *Config) validateHTTPSinkConfig(hsc *HTTPSinkConfig) error { if hsc.Address == nil || len(*hsc.Address) == 0 { return errors.New("address cannot be empty") } - return nil + return c.ValidateCommonSinkConfig(hsc.CommonSinkConfig) } func normalizeDir(dir **string) error { diff --git a/pkg/util/log/sinks.go b/pkg/util/log/sinks.go index f265e8ee3aaa..9ef1455f3679 100644 --- a/pkg/util/log/sinks.go +++ b/pkg/util/log/sinks.go @@ -72,4 +72,4 @@ var _ logSink = (*stderrSink)(nil) var _ logSink = (*fileSink)(nil) var _ logSink = (*fluentSink)(nil) var _ logSink = (*httpSink)(nil) -var _ logSink = (*bufferSink)(nil) +var _ logSink = (*bufferedSink)(nil)