From c5b02077cafc34cdb4b83b09951ebd14f647d072 Mon Sep 17 00:00:00 2001 From: Jay Rauchenstein Date: Tue, 27 Jul 2021 09:59:26 -0700 Subject: [PATCH 01/19] util/log: add buffer sink decorator Previously, only the file sink had buffering, and in that case it is built into the sink. It's important to add buffering to network sinks for various reasons -- reducing network chatter, and making the networking call itself asynchronous so the log call returns with very low latency. This change adds a buffering decorator so that buffering can be added to any log sink with little or no development effort, and allowing buffering to be configured in a uniform way. This functionality is not yet enabled by default and does not yet replace the existing functionality for file sinks, though both these things are intended in the future. Release note (cli change): Add buffering to log sinks. This can be configured with the new "buffering" field on any log sink provided via the "--log" or "--log-config-file" flags. Release justification: This change is safe because it is a no-op without a configuration change specifically enabling it. --- build/bazelutil/check.sh | 1 + docs/generated/logsinks.md | 37 +++ pkg/cli/log_flags_test.go | 27 ++- pkg/cli/testdata/logflags | 3 +- pkg/util/log/BUILD.bazel | 17 ++ pkg/util/log/buffer_sink.go | 275 +++++++++++++++++++++++ pkg/util/log/buffer_sink_test.go | 229 +++++++++++++++++++ pkg/util/log/clog.go | 7 +- pkg/util/log/exit_override.go | 2 +- pkg/util/log/file.go | 8 +- pkg/util/log/flags.go | 32 +++ pkg/util/log/fluent_client.go | 11 +- pkg/util/log/http_sink.go | 12 +- pkg/util/log/intercept.go | 6 +- pkg/util/log/logconfig/config.go | 79 +++++++ pkg/util/log/logconfig/gen.go | 26 ++- pkg/util/log/logconfig/testdata/validate | 72 ++++++ pkg/util/log/logconfig/validate.go | 16 +- pkg/util/log/logconfig/validate_test.go | 6 + pkg/util/log/mock_generated.go | 91 ++++++++ pkg/util/log/sinks.go | 18 +- pkg/util/log/stderr_sink.go | 7 +- 22 files changed, 925 insertions(+), 57 deletions(-) create mode 100644 pkg/util/log/buffer_sink.go create mode 100644 pkg/util/log/buffer_sink_test.go create mode 100644 pkg/util/log/mock_generated.go diff --git a/build/bazelutil/check.sh b/build/bazelutil/check.sh index 324f84df5485..df3ecfb2cc5f 100755 --- a/build/bazelutil/check.sh +++ b/build/bazelutil/check.sh @@ -38,6 +38,7 @@ pkg/util/log/channels.go://go:generate go run gen/main.go logpb/log.proto channe pkg/util/log/channels.go://go:generate go run gen/main.go logpb/log.proto log_channels.go log_channels_generated.go pkg/util/log/channels.go://go:generate go run gen/main.go logpb/log.proto logging.md ../../../docs/generated/logging.md pkg/util/log/channels.go://go:generate go run gen/main.go logpb/log.proto severity.go severity/severity_generated.go +pkg/util/log/sinks.go://go:generate mockgen -package=log -source=sinks.go -destination=mock_generated.go -mock_names=logSink=MockLogSink logSink pkg/util/timeutil/zoneinfo.go://go:generate go run gen/main.go " diff --git a/docs/generated/logsinks.md b/docs/generated/logsinks.md index d25d91a7ee0b..1a6ee932dcb1 100644 --- a/docs/generated/logsinks.md +++ b/docs/generated/logsinks.md @@ -86,6 +86,7 @@ Configuration options shared across all sink types: | `redactable` | whether to keep redaction markers in the sink's output. The presence of redaction markers makes it possible to strip sensitive data reliably. | | `exit-on-error` | whether the logging system should terminate the process if an error is encountered while writing to this sink. | | `auditable` | translated to tweaks to the other settings for this sink during validation. For example, it enables `exit-on-error` and changes the format of files from `crdb-v1` to `crdb-v1-count`. | +| `buffering` | configures buffering for this log sink, or NONE to explicitly disable. See the [common buffering configuration](#buffering-config) section for details. | @@ -168,6 +169,7 @@ Configuration options shared across all sink types: | `redactable` | whether to keep redaction markers in the sink's output. The presence of redaction markers makes it possible to strip sensitive data reliably. | | `exit-on-error` | whether the logging system should terminate the process if an error is encountered while writing to this sink. | | `auditable` | translated to tweaks to the other settings for this sink during validation. For example, it enables `exit-on-error` and changes the format of files from `crdb-v1` to `crdb-v1-count`. | +| `buffering` | configures buffering for this log sink, or NONE to explicitly disable. See the [common buffering configuration](#buffering-config) section for details. | @@ -233,6 +235,7 @@ Configuration options shared across all sink types: | `redactable` | whether to keep redaction markers in the sink's output. The presence of redaction markers makes it possible to strip sensitive data reliably. | | `exit-on-error` | whether the logging system should terminate the process if an error is encountered while writing to this sink. | | `auditable` | translated to tweaks to the other settings for this sink during validation. For example, it enables `exit-on-error` and changes the format of files from `crdb-v1` to `crdb-v1-count`. | +| `buffering` | configures buffering for this log sink, or NONE to explicitly disable. See the [common buffering configuration](#buffering-config) section for details. | @@ -291,6 +294,7 @@ Configuration options shared across all sink types: | `redactable` | whether to keep redaction markers in the sink's output. The presence of redaction markers makes it possible to strip sensitive data reliably. | | `exit-on-error` | whether the logging system should terminate the process if an error is encountered while writing to this sink. | | `auditable` | translated to tweaks to the other settings for this sink during validation. For example, it enables `exit-on-error` and changes the format of files from `crdb-v1` to `crdb-v1-count`. | +| `buffering` | configures buffering for this log sink, or NONE to explicitly disable. See the [common buffering configuration](#buffering-config) section for details. | @@ -370,3 +374,36 @@ Likewise: channels: {INFO: all except ops,health} etc. + + + + + +## Common buffering configuration + +Buffering may be configured with the following fields. It may also be explicitly +set to "NONE" to disable buffering. Example configuration: + + file-defaults: + dir: logs + buffering: + max-staleness: 20s + flush-trigger-size: 25KB + sinks: + file-groups: + health: + channels: HEALTH + buffering: + max-staleness: 5s # Override max-staleness for this sink. + ops: + channels: OPS + buffering: NONE # Disable buffering for this sink. + + +| Field | Description | +|--|--| +| `max-staleness` | the maximum time a log message will sit in the buffer before a flush is triggered. | +| `flush-trigger-size` | the number of bytes that will trigger the buffer to flush. | +| `max-in-flight` | the maximum number of buffered flushes before messages start being dropped. | + + diff --git a/pkg/cli/log_flags_test.go b/pkg/cli/log_flags_test.go index 896472abe422..858fae8cd837 100644 --- a/pkg/cli/log_flags_test.go +++ b/pkg/cli/log_flags_test.go @@ -39,7 +39,8 @@ func TestSetupLogging(t *testing.T) { `filter: INFO, ` + `format: json-fluent-compact, ` + `redactable: true, ` + - `exit-on-error: false}` + `exit-on-error: false, ` + + `buffering: NONE}` const defaultHTTPConfig = `http-defaults: {` + `method: POST, ` + `unsafe-tls: false, ` + @@ -48,7 +49,8 @@ func TestSetupLogging(t *testing.T) { `filter: INFO, ` + `format: json-compact, ` + `redactable: true, ` + - `exit-on-error: false}` + `exit-on-error: false, ` + + `buffering: NONE}` stdFileDefaultsRe := regexp.MustCompile( `file-defaults: \{` + `dir: (?P[^,]+), ` + @@ -57,7 +59,8 @@ func TestSetupLogging(t *testing.T) { `buffered-writes: true, ` + `filter: INFO, ` + `format: crdb-v2, ` + - `redactable: true\}`) + `redactable: true, ` + + `buffering: NONE\}`) fileDefaultsNoMaxSizeRe := regexp.MustCompile( `file-defaults: \{` + `dir: (?P[^,]+), ` + @@ -65,13 +68,15 @@ func TestSetupLogging(t *testing.T) { `buffered-writes: true, ` + `filter: INFO, ` + `format: crdb-v2, ` + - `redactable: true\}`) + `redactable: true, ` + + `buffering: NONE\}`) const fileDefaultsNoDir = `file-defaults: {` + `file-permissions: "0644", ` + `buffered-writes: true, ` + `filter: INFO, ` + `format: crdb-v2, ` + - `redactable: true}` + `redactable: true, ` + + `buffering: NONE}` const defaultLogDir = `PWD/cockroach-data/logs` stdCaptureFd2Re := regexp.MustCompile( `capture-stray-errors: \{` + @@ -85,7 +90,8 @@ func TestSetupLogging(t *testing.T) { `buffered-writes: (?P[^,]+), ` + `filter: INFO, ` + `format: (?P[^,]+), ` + - `redactable: true\}`) + `redactable: true, ` + + `buffering: NONE\}`) telemetryFileCfgRe := regexp.MustCompile( `\{channels: \{INFO: \[TELEMETRY\]\}, ` + `dir: (?P[^,]+), ` + @@ -95,18 +101,21 @@ func TestSetupLogging(t *testing.T) { `buffered-writes: true, ` + `filter: INFO, ` + `format: crdb-v2, ` + - `redactable: true\}`) + `redactable: true, ` + + `buffering: NONE\}`) stderrCfgRe := regexp.MustCompile( `stderr: {channels: \{(?P[^:]+): all\}, ` + `filter: [^,]+, ` + `format: crdb-v2-tty, ` + - `redactable: (?P[^}]+)}`) + `redactable: (?P[^}]+), ` + + `buffering: NONE}`) stderrCfgNoneRe := regexp.MustCompile( `stderr: {filter: NONE, ` + `format: crdb-v2-tty, ` + - `redactable: (?P[^}]+)}`) + `redactable: (?P[^}]+), ` + + `buffering: NONE}`) wd, err := os.Getwd() if err != nil { diff --git a/pkg/cli/testdata/logflags b/pkg/cli/testdata/logflags index 7c5d73ab2cf1..82ec8b8dcf9e 100644 --- a/pkg/cli/testdata/logflags +++ b/pkg/cli/testdata/logflags @@ -482,7 +482,8 @@ file-permissions: "0644", buffered-writes: true, filter: INFO, format: crdb-v2, -redactable: true}}, +redactable: true, +buffering: NONE}}, }} # For servers, --logtostderr overrides the threshold and keeps diff --git a/pkg/util/log/BUILD.bazel b/pkg/util/log/BUILD.bazel index 253f9b8a4511..4541c4cc94c7 100644 --- a/pkg/util/log/BUILD.bazel +++ b/pkg/util/log/BUILD.bazel @@ -1,9 +1,13 @@ +load("@bazel_gomock//:gomock.bzl", "gomock") load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") +# gazelle:exclude mock_generated.go + go_library( name = "log", srcs = [ "ambient_context.go", + "buffer_sink.go", "channels.go", "clog.go", "doc.go", @@ -127,6 +131,7 @@ go_test( size = "small", srcs = [ "ambient_context_test.go", + "buffer_sink_test.go", "clog_test.go", "file_log_gc_test.go", "file_names_test.go", @@ -146,6 +151,7 @@ go_test( "test_log_scope_test.go", "trace_client_test.go", "trace_test.go", + ":mock_logsink", # keep ], data = glob(["testdata/**"]), embed = [":log"], @@ -170,6 +176,7 @@ go_test( "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_logtags//:logtags", "@com_github_cockroachdb_redact//:redact", + "@com_github_golang_mock//gomock", # keep "@com_github_kr_pretty//:pretty", "@com_github_pmezard_go_difflib//difflib", "@com_github_stretchr_testify//assert", @@ -192,3 +199,13 @@ genrule( "//pkg/util/log/gen", ], ) + +gomock( + name = "mock_logsink", + out = "mock_generated.go", + interfaces = ["logSink"], + library = ":log", + mock_names = {"logSink": "MockLogSink"}, + package = "log", + source = "sinks.go", +) diff --git a/pkg/util/log/buffer_sink.go b/pkg/util/log/buffer_sink.go new file mode 100644 index 000000000000..05d5b0527a78 --- /dev/null +++ b/pkg/util/log/buffer_sink.go @@ -0,0 +1,275 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package log + +import ( + "context" + "sync/atomic" + "time" + + "github.com/cockroachdb/cockroach/pkg/cli/exit" +) + +// bufferSink wraps a child logSink to add buffering and asynchronous behavior. +// The buffer is flushed at a configurable size threshold and/or max staleness. +// bufferSink's output method will not block on downstream I/O (unless required +// by the forceSync option). +// +// Incoming messages are accumulated in a "bundle" until the configured staleness +// or size threshold is met, at which point they are put in a queue to be flushed +// by the child sink. If the queue is full, current bundle is compacted rather +// sent, which currently drops the messages but retains their count for later +// reporting (TODO). +// +// Should an error occur in the child sink, it's forwarded to the provided +// errCallback (unless forceSync is requested, in which case the error is returned +// synchronously, as it would for any other sink). +type bufferSink struct { + // child is the wrapped logSink. + child logSink + + // messageCh sends messages from output() to accumulate(). + messageCh chan bufferSinkMessage + // flushCh sends bundles of messages from accumulate() to flusher(). + flushCh chan bufferSinkBundle + // nInFlight internally tracks the population of flushCh to detect when it's full. + nInFlight int32 + + // maxStaleness is the duration after which a flush is triggered. + // 0 disables this trigger. + maxStaleness time.Duration + // triggerSize is the size in bytes of accumulated messages which trigger a flush. + // 0 disables this trigger. + triggerSize int + // maxInFlight is the maximum number of flushes to buffer before dropping messages. + maxInFlight int32 + // errCallback is called when the child sync has an error. + errCallback func(error) + + // inErrorState is used internally to temporarily disable the sink during error handling. + inErrorState bool +} + +const bufferSinkDefaultMaxInFlight = 4 + +func newBufferSink( + ctx context.Context, + child logSink, + maxStaleness time.Duration, + triggerSize int, + maxInFlight int32, + errCallback func(error), +) *bufferSink { + if maxInFlight <= 0 { + maxInFlight = bufferSinkDefaultMaxInFlight + } + + sink := &bufferSink{ + child: child, + messageCh: make(chan bufferSinkMessage), + flushCh: make(chan bufferSinkBundle, maxInFlight), + maxStaleness: maxStaleness, + triggerSize: triggerSize, + maxInFlight: maxInFlight, + errCallback: errCallback, + } + go sink.accumulator(ctx) + go sink.flusher(ctx) + return sink +} + +// accumulator accumulates messages and sends bundles of them to the accumulator. +func (bs *bufferSink) accumulator(ctx context.Context) { + var ( + b bufferSinkBundle + // timer tracks the staleness of the oldest unflushed message. + // It can be thought of as having 3 states: + // - nil when there are no accumulated messages. + // - non-nil but unreadable when the oldest accumulated message is + // younger than maxStaleness. + // - non-nil and readable when the older accumulated message is older + // than maxStaleness and a flush should therefore be triggered. + timer <-chan time.Time // staleness timer + ) + reset := func() { + b = bufferSinkBundle{} + timer = nil + } + + for { + flush := false + + appendMessage := func(m bufferSinkMessage) { + b.messages = append(b.messages, m) + b.byteLen += len(m.b.Bytes()) + 1 + if m.flush || m.errorCh != nil || (bs.triggerSize > 0 && b.byteLen > bs.triggerSize) { + flush = true + b.errorCh = m.errorCh + } else if timer == nil && bs.maxStaleness != 0 { + timer = time.After(bs.maxStaleness) + } + } + select { + case <-timer: + flush = true + case <-ctx.Done(): + // Do one last non-blocking read on messageCh, so messages don't get dropped. + select { + case m := <-bs.messageCh: + appendMessage(m) + default: + } + b.done = true + flush = true + case m := <-bs.messageCh: + appendMessage(m) + } + + done := b.done + if flush { + if atomic.LoadInt32(&bs.nInFlight) < bs.maxInFlight { + bs.flushCh <- b + atomic.AddInt32(&bs.nInFlight, 1) + reset() + } else { + b.compact() + } + } + if done { + return + } + } +} + +// flusher concatenates bundled messages and sends them to the child sink. +func (bs *bufferSink) flusher(ctx context.Context) { + for b := range bs.flushCh { + if len(b.messages) > 0 { + // Append all the messages in the first buffer. + buf := b.messages[0].b + buf.Grow(b.byteLen - len(buf.Bytes())) + for i, m := range b.messages { + if i == 0 { + // First buffer skips putBuffer -- + // we're still using it and it's a weird size + // for reuse. + continue + } + buf.WriteByte('\n') + buf.Write(m.b.Bytes()) + putBuffer(m.b) + } + forceSync := b.errorCh != nil + // Send the accumulated messages to the child sink. + err := bs.child.output(buf.Bytes(), sinkOutputOptions{extraFlush: true, forceSync: forceSync}) + if forceSync { + b.errorCh <- err + } else if err != nil && bs.errCallback != nil { + // Forward error to the callback, if provided. + // Temporarily disable this sink so it's skipped by + // any logging in the callback. + bs.inErrorState = true + bs.errCallback(err) + bs.inErrorState = false + } + } + // Decrease visible queue size at the end, + // so a long-running flush triggers a compact + // instead of a blocked channel. + atomic.AddInt32(&bs.nInFlight, -1) + if b.done { + return + } + } +} + +// bufferSinkMessage holds an individual log message sent from output to accumulate. +type bufferSinkMessage struct { + b *buffer + // flush is set if the call explicitly requests to trigger an flush. + flush bool + // errorCh is set iff the message is forceSync. + // The caller will block expecting a (possibly nil) error to return synchronously. + errorCh chan<- error +} + +// bufferSinkBundle is the accumulated state; the unit sent from the accumulator to the flusher. +type bufferSinkBundle struct { + messages []bufferSinkMessage + // byteLen is the total length in bytes of the accumulated messages, plus enough for separators. + byteLen int + // droppedCount is the number of dropped messages due to buffer fullness. + droppedCount int + // errorCh, if non-nil, expects to receive the (possibly nil) error after the flush completes. + errorCh chan<- error + // done indicates that this is the last bundle and the flusher should shutdown after sending. + done bool +} + +// compact compacts a bundle, and is called if the buffer is full. +// Currently, drops all messages and keeps track of the +// count. In the future, if there's visibility into message +// severities, some prioritization could happen to keep more +// important messages if there's room. Maybe the timestamp +// range too. +func (b *bufferSinkBundle) compact() { + b.droppedCount += len(b.messages) + for _, m := range b.messages { + putBuffer(m.b) + } + b.messages = nil +} + +// active returns true if this sink is currently active. +func (bs *bufferSink) active() bool { + return !bs.inErrorState && bs.child.active() +} + +// attachHints attaches some hints about the location of the message +// to the stack message. +func (bs *bufferSink) attachHints(b []byte) []byte { + return bs.child.attachHints(b) +} + +// output emits some formatted bytes to this sink. +// the sink is invited to perform an extra flush if indicated +// by the argument. This is set to true for e.g. Fatal +// entries. +// +// The parent logger's outputMu is held during this operation: log +// sinks must not recursively call into logging when implementing +// this method. +// +// If forceSync is set, returns the child sink's error (which is otherwise +// handled via the bufferSink's errCallback.) +func (bs *bufferSink) output(b []byte, opts sinkOutputOptions) error { + // Make a copy to live in the async buffer. + // We can't take ownership of the slice we're passed -- + // it belongs to a buffer that's synchronously being returned + // to the pool for reuse. + buf := getBuffer() + if _, err := buf.Write(b); err != nil { + return err + } + if opts.forceSync { + errorCh := make(chan error) + bs.messageCh <- bufferSinkMessage{buf, opts.extraFlush, errorCh} + return <-errorCh + } + bs.messageCh <- bufferSinkMessage{buf, opts.extraFlush, nil} + return nil +} + +// exitCode returns the exit code to use if the logger decides +// to terminate because of an error in output(). +func (bs *bufferSink) exitCode() exit.Code { + return bs.child.exitCode() +} diff --git a/pkg/util/log/buffer_sink_test.go b/pkg/util/log/buffer_sink_test.go new file mode 100644 index 000000000000..585da4e5e4c2 --- /dev/null +++ b/pkg/util/log/buffer_sink_test.go @@ -0,0 +1,229 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package log + +import ( + "context" + "fmt" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" +) + +func getMockBufferSync( + t *testing.T, maxStaleness time.Duration, sizeTrigger int, errCallback func(error), +) (sink *bufferSink, mock *MockLogSink, cleanup func()) { + ctx, cancel := context.WithCancel(context.Background()) + ctrl := gomock.NewController(t) + mock = NewMockLogSink(ctrl) + sink = newBufferSink(ctx, mock, maxStaleness, sizeTrigger, 2 /* maxInFlight */, errCallback) + cleanup = func() { + cancel() + ctrl.Finish() + } + return sink, mock, cleanup +} + +// addArgs adapts a zero-arg call to take the args expected by logSink.output, +// for usage in gomock.Call.Do +func addArgs(f func()) func([]byte, sinkOutputOptions) { + return func([]byte, sinkOutputOptions) { + f() + } +} + +func TestBufferOneLine(t *testing.T) { + defer leaktest.AfterTest(t)() + sink, mock, cleanup := getMockBufferSync(t, 0 /* maxStaleness*/, 0 /* sizeTrigger */, nil /* errCallback*/) + defer cleanup() + + var wg sync.WaitGroup + wg.Add(1) + defer wg.Wait() + + message := []byte("test") + mock.EXPECT(). + output(gomock.Eq(message), sinkOutputOptionsMatcher{extraFlush: gomock.Eq(true)}). + Do(addArgs(wg.Done)) + + require.NoError(t, sink.output(message, sinkOutputOptions{extraFlush: true})) +} + +func TestBufferManyLinesOneFlush(t *testing.T) { + defer leaktest.AfterTest(t)() + sink, mock, cleanup := getMockBufferSync(t, 0 /* maxStaleness*/, 0 /* sizeTrigger */, nil /* errCallback*/) + defer cleanup() + + var wg sync.WaitGroup + wg.Add(1) + defer wg.Wait() + + message := []byte("test") + mock.EXPECT(). + output(gomock.Eq([]byte("test\ntest")), sinkOutputOptionsMatcher{extraFlush: gomock.Eq(true)}). + Do(addArgs(wg.Done)) + + require.NoError(t, sink.output(message, sinkOutputOptions{})) + require.NoError(t, sink.output(message, sinkOutputOptions{extraFlush: true})) +} + +func TestBufferMaxStaleness(t *testing.T) { + defer leaktest.AfterTest(t)() + sink, mock, cleanup := getMockBufferSync(t, time.Second /* maxStaleness*/, 0 /* sizeTrigger */, nil /* errCallback*/) + defer cleanup() + + var wg sync.WaitGroup + wg.Add(1) + defer wg.Wait() + + message := []byte("test") + mock.EXPECT(). + output(gomock.Eq(message), sinkOutputOptionsMatcher{extraFlush: gomock.Eq(true)}). + Do(addArgs(wg.Done)) + + require.NoError(t, sink.output(message, sinkOutputOptions{})) +} + +func TestBufferSizeTrigger(t *testing.T) { + defer leaktest.AfterTest(t)() + sink, mock, cleanup := getMockBufferSync(t, 0 /* maxStaleness*/, 2 /* sizeTrigger */, nil /* errCallback*/) + defer cleanup() + + var wg sync.WaitGroup + wg.Add(1) + defer wg.Wait() + + message := []byte("test") + mock.EXPECT(). + output(gomock.Eq(message), sinkOutputOptionsMatcher{extraFlush: gomock.Eq(true)}). + Do(addArgs(wg.Done)) + + require.NoError(t, sink.output(message, sinkOutputOptions{})) +} + +func TestBufferSizeTriggerMultipleFlush(t *testing.T) { + defer leaktest.AfterTest(t)() + sink, mock, cleanup := getMockBufferSync(t, 0 /* maxStaleness*/, 8 /* sizeTrigger */, nil /* errCallback*/) + defer cleanup() + + var wg sync.WaitGroup + wg.Add(2) + defer wg.Wait() + + gomock.InOrder( + mock.EXPECT(). + output(gomock.Eq([]byte("test1\ntest2")), sinkOutputOptionsMatcher{extraFlush: gomock.Eq(true)}). + Do(addArgs(wg.Done)), + mock.EXPECT(). + output(gomock.Eq([]byte("test3")), sinkOutputOptionsMatcher{extraFlush: gomock.Eq(true)}). + Do(addArgs(wg.Done)), + ) + + require.NoError(t, sink.output([]byte("test1"), sinkOutputOptions{})) + require.NoError(t, sink.output([]byte("test2"), sinkOutputOptions{})) + require.NoError(t, sink.output([]byte("test3"), sinkOutputOptions{extraFlush: true})) +} + +type testError struct{} + +func (testError) Error() string { + return "Test Error" +} + +func TestBufferErrCallback(t *testing.T) { + defer leaktest.AfterTest(t)() + + testCh := make(chan struct{}) + errCallback := func(error) { + close(testCh) + } + + sink, mock, cleanup := getMockBufferSync(t, 0 /* maxStaleness*/, 0 /* sizeTrigger */, errCallback) + defer cleanup() + + message := []byte("test") + err := testError{} + mock.EXPECT(). + output(gomock.Eq(message), gomock.Any()).Return(err) + require.NoError(t, sink.output(message, sinkOutputOptions{extraFlush: true})) + + <-testCh +} + +func TestBufferForceSync(t *testing.T) { + defer leaktest.AfterTest(t)() + sink, mock, cleanup := getMockBufferSync(t, 0 /* maxStaleness*/, 0 /* sizeTrigger */, nil /* errCallback*/) + defer cleanup() + + ch := make(chan struct{}) + message := []byte("test") + // Make the child sink block until ch is closed. + mock.EXPECT(). + output(gomock.Eq(message), sinkOutputOptionsMatcher{forceSync: gomock.Eq(true)}). + Do(addArgs(func() { <-ch })) + + var marker int32 + go func() { + // Wait a second, verify that the call to output() is still blocked, + // then close the channel to unblock it. + <-time.After(time.Second) + if atomic.LoadInt32(&marker) != 0 { + t.Error("sink.output returned while child sync should be blocking") + } + close(ch) + }() + require.NoError(t, sink.output(message, sinkOutputOptions{forceSync: true})) + // Set marker to be non-zero. + // This should happen quickly after the above call unblocks. + atomic.StoreInt32(&marker, 1) +} + +type sinkOutputOptionsMatcher struct { + extraFlush gomock.Matcher + ignoreErrors gomock.Matcher + forceSync gomock.Matcher +} + +func (m sinkOutputOptionsMatcher) Matches(x interface{}) bool { + opts, ok := x.(sinkOutputOptions) + if !ok { + return false + } + if m.extraFlush != nil && !m.extraFlush.Matches(opts.extraFlush) || + m.ignoreErrors != nil && !m.ignoreErrors.Matches(opts.ignoreErrors) || + m.forceSync != nil && !m.forceSync.Matches(opts.forceSync) { + return false + } + return true +} + +func (m sinkOutputOptionsMatcher) String() string { + var acc []string + if m.extraFlush != nil { + acc = append(acc, fmt.Sprintf("extraFlush %v", m.extraFlush.String())) + } + if m.ignoreErrors != nil { + acc = append(acc, fmt.Sprintf("ignoreErrors %v", m.ignoreErrors.String())) + } + if m.forceSync != nil { + acc = append(acc, fmt.Sprintf("forceSync %v", m.forceSync.String())) + } + if len(acc) == 0 { + return "is anything" + } + return strings.Join(acc, ", ") +} diff --git a/pkg/util/log/clog.go b/pkg/util/log/clog.go index 508a04749deb..909153e78ba4 100644 --- a/pkg/util/log/clog.go +++ b/pkg/util/log/clog.go @@ -295,8 +295,9 @@ func (l *loggerT) outputLogEntry(entry logEntry) { setActive() var fatalTrigger chan struct{} extraFlush := false + isFatal := entry.sev == severity.FATAL - if entry.sev == severity.FATAL { + if isFatal { extraFlush = true logging.signalFatalCh() @@ -401,7 +402,7 @@ func (l *loggerT) outputLogEntry(entry logEntry) { // The sink was not accepting entries at this level. Nothing to do. continue } - if err := s.sink.output(extraFlush, bufs.b[i].Bytes()); err != nil { + if err := s.sink.output(bufs.b[i].Bytes(), sinkOutputOptions{extraFlush: extraFlush, forceSync: isFatal}); err != nil { if !s.criticality { // An error on this sink is not critical. Just report // the error and move on. @@ -431,7 +432,7 @@ func (l *loggerT) outputLogEntry(entry logEntry) { } // Flush and exit on fatal logging. - if entry.sev == severity.FATAL { + if isFatal { close(fatalTrigger) // Note: although it seems like the function is allowed to return // below when s == severity.FATAL, this is not so, because the diff --git a/pkg/util/log/exit_override.go b/pkg/util/log/exit_override.go index 24ea594c8d4a..a1c969456f13 100644 --- a/pkg/util/log/exit_override.go +++ b/pkg/util/log/exit_override.go @@ -98,7 +98,7 @@ func (l *loggerT) reportErrorEverywhereLocked(ctx context.Context, err error) { sink := s.sink if logpb.Severity_ERROR >= s.threshold.get(entry.ch) && sink.active() { buf := s.formatter.formatEntry(entry) - sink.emergencyOutput(buf.Bytes()) + _ = sink.output(buf.Bytes(), sinkOutputOptions{ignoreErrors: true}) putBuffer(buf) } } diff --git a/pkg/util/log/file.go b/pkg/util/log/file.go index 687adfb410e7..7c7fc4036b52 100644 --- a/pkg/util/log/file.go +++ b/pkg/util/log/file.go @@ -172,7 +172,11 @@ func (l *fileSink) attachHints(stacks []byte) []byte { } // output implements the logSink interface. -func (l *fileSink) output(extraFlush bool, b []byte) error { +func (l *fileSink) output(b []byte, opts sinkOutputOptions) error { + if opts.ignoreErrors { + l.emergencyOutput(b) + return nil + } if !l.enabled.Get() { // NB: we need to check filesink.enabled a second time here in // case a test Scope() has disabled it asynchronously while @@ -191,7 +195,7 @@ func (l *fileSink) output(extraFlush bool, b []byte) error { return err } - if extraFlush || !l.bufferedWrites || logging.flushWrites.Get() { + if opts.extraFlush || !l.bufferedWrites || logging.flushWrites.Get() { l.flushAndMaybeSyncLocked(false /*doSync*/) } return nil diff --git a/pkg/util/log/flags.go b/pkg/util/log/flags.go index 0f983c3478ea..8e9083983867 100644 --- a/pkg/util/log/flags.go +++ b/pkg/util/log/flags.go @@ -16,6 +16,7 @@ import ( "io/fs" "math" + "github.com/cockroachdb/cockroach/pkg/cli/exit" "github.com/cockroachdb/cockroach/pkg/util/log/channel" "github.com/cockroachdb/cockroach/pkg/util/log/logconfig" "github.com/cockroachdb/cockroach/pkg/util/log/logflags" @@ -288,6 +289,7 @@ func ApplyConfig(config logconfig.Config) (cleanupFn func(), err error) { if err != nil { return nil, err } + attachBufferWrapper(secLoggersCtx, fileSinkInfo, fc.CommonSinkConfig) attachSinkInfo(fileSinkInfo, &fc.Channels) // Start the GC process. This ensures that old capture files get @@ -304,6 +306,7 @@ func ApplyConfig(config logconfig.Config) (cleanupFn func(), err error) { if err != nil { return nil, err } + attachBufferWrapper(secLoggersCtx, fluentSinkInfo, fc.CommonSinkConfig) attachSinkInfo(fluentSinkInfo, &fc.Channels) } @@ -316,6 +319,7 @@ func ApplyConfig(config logconfig.Config) (cleanupFn func(), err error) { if err != nil { return nil, err } + attachBufferWrapper(secLoggersCtx, httpSinkInfo, fc.CommonSinkConfig) attachSinkInfo(httpSinkInfo, &fc.Channels) } @@ -395,6 +399,34 @@ func (l *sinkInfo) applyFilters(chs logconfig.ChannelFilters) { } } +func attachBufferWrapper(ctx context.Context, s *sinkInfo, c logconfig.CommonSinkConfig) { + b := c.Buffering + if b.IsNone() { + return + } + + errCallback := func(err error) { + Ops.Errorf(context.Background(), "logging error: %v", err) + } + if s.criticality { + errCallback = func(err error) { + Ops.Errorf(context.Background(), "logging error: %v", err) + + logging.mu.Lock() + f := logging.mu.exitOverride.f + logging.mu.Unlock() + + code := s.sink.exitCode() + if f != nil { + f(code, err) + } else { + exit.WithCode(code) + } + } + } + s.sink = newBufferSink(ctx, s.sink, *b.MaxStaleness, int(*b.FlushTriggerSize), int32(*b.MaxInFlight), errCallback) +} + // applyConfig applies a common sink configuration to a sinkInfo. func (l *sinkInfo) applyConfig(c logconfig.CommonSinkConfig) error { l.threshold.setAll(severity.NONE) diff --git a/pkg/util/log/fluent_client.go b/pkg/util/log/fluent_client.go index 5a681dcaf6ce..b4927ff6e12a 100644 --- a/pkg/util/log/fluent_client.go +++ b/pkg/util/log/fluent_client.go @@ -60,7 +60,7 @@ func (l *fluentSink) exitCode() exit.Code { } // output implements the logSink interface. -func (l *fluentSink) output(extraSync bool, b []byte) error { +func (l *fluentSink) output(b []byte, opts sinkOutputOptions) (err error) { // Try to write and reconnect immediately if the first write fails. _ = l.tryWrite(b) if l.good { @@ -73,15 +73,6 @@ func (l *fluentSink) output(extraSync bool, b []byte) error { return l.tryWrite(b) } -// emergencyOutput implements the logSink interface. -func (l *fluentSink) emergencyOutput(b []byte) { - _ = l.tryWrite(b) - if !l.good { - _ = l.ensureConn(b) - _ = l.tryWrite(b) - } -} - func (l *fluentSink) close() { l.good = false if l.conn != nil { diff --git a/pkg/util/log/http_sink.go b/pkg/util/log/http_sink.go index 138cefac66fe..238f6360f938 100644 --- a/pkg/util/log/http_sink.go +++ b/pkg/util/log/http_sink.go @@ -73,7 +73,7 @@ type httpSink struct { // The parent logger's outputMu is held during this operation: log // sinks must not recursively call into logging when implementing // this method. -func (hs *httpSink) output(extraSync bool, b []byte) (err error) { +func (hs *httpSink) output(b []byte, _ sinkOutputOptions) (err error) { resp, err := hs.doRequest(hs, b) if err != nil { return err @@ -87,16 +87,6 @@ func (hs *httpSink) output(extraSync bool, b []byte) (err error) { return nil } -// emergencyOutput attempts to emit some formatted bytes, and -// ignores any errors. -// -// The parent logger's outputMu is held during this operation: log -// sinks must not recursively call into logging when implementing -// this method. -func (hs *httpSink) emergencyOutput(b []byte) { - _, _ = hs.doRequest(hs, b) -} - func doPost(hs *httpSink, b []byte) (*http.Response, error) { resp, err := hs.client.Post(hs.address, "text/plain", bytes.NewReader(b)) if err != nil { diff --git a/pkg/util/log/intercept.go b/pkg/util/log/intercept.go index 8c4846849b71..70af07eb2dce 100644 --- a/pkg/util/log/intercept.go +++ b/pkg/util/log/intercept.go @@ -119,7 +119,7 @@ func (i *interceptorSink) active() bool { return atomic.LoadUint32(&i.activeCount) > 0 } -func (i *interceptorSink) output(_ bool, b []byte) error { +func (i *interceptorSink) output(b []byte, _ sinkOutputOptions) error { i.mu.RLock() defer i.mu.RUnlock() for _, fn := range i.mu.fns { @@ -128,9 +128,5 @@ func (i *interceptorSink) output(_ bool, b []byte) error { return nil } -func (i *interceptorSink) emergencyOutput(b []byte) { - _ = i.output(false, b) -} - func (i *interceptorSink) attachHints(stacks []byte) []byte { return stacks } func (i *interceptorSink) exitCode() exit.Code { return exit.UnspecifiedError() } diff --git a/pkg/util/log/logconfig/config.go b/pkg/util/log/logconfig/config.go index a424286f7ed6..33a59b29f374 100644 --- a/pkg/util/log/logconfig/config.go +++ b/pkg/util/log/logconfig/config.go @@ -138,6 +138,50 @@ type CaptureFd2Config struct { MaxGroupSize *ByteSize `yaml:"max-group-size,omitempty"` } +// CommonBufferSinkConfig represents the common buffering configuration for sinks. +// +// User-facing documentation follows. +// TITLE: Common buffering configuration +// Buffering may be configured with the following fields. It may also be explicitly +// set to "NONE" to disable buffering. Example configuration: +// +// file-defaults: +// dir: logs +// buffering: +// max-staleness: 20s +// flush-trigger-size: 25KB +// sinks: +// file-groups: +// health: +// channels: HEALTH +// buffering: +// max-staleness: 5s # Override max-staleness for this sink. +// ops: +// channels: OPS +// buffering: NONE # Disable buffering for this sink. +type CommonBufferSinkConfig struct { + // MaxStaleness is the maximum time a log message will sit in the buffer + // before a flush is triggered. + MaxStaleness *time.Duration `yaml:"max-staleness,omitempty"` + + // FlushTriggerSize is the number of bytes that will trigger the buffer + // to flush. + FlushTriggerSize *ByteSize `yaml:"flush-trigger-size,omitempty"` + + // MaxInFlight is the maximum number of buffered flushes before messages + // start being dropped. + MaxInFlight *int `yaml:"max-in-flight,omitempty"` +} + +// CommonBufferSinkConfigWrapper is a BufferSinkConfig with a special value represented in YAML by +// the string "NONE", which actively disables buffering (in the sense that it overrides +// buffering that is enabled by default). +// This is a separate type so that marshaling and unmarshaling of the inner BufferSinkConfig +// can be handled by the library without causing infinite recursion. +type CommonBufferSinkConfigWrapper struct { + CommonBufferSinkConfig +} + // CommonSinkConfig represents the common configuration shared across all sinks. type CommonSinkConfig struct { // Filter specifies the default minimum severity for log events to @@ -167,6 +211,9 @@ type CommonSinkConfig struct { // it enables `exit-on-error` and changes the format of files // from `crdb-v1` to `crdb-v1-count`. Auditable *bool `yaml:",omitempty"` + + // Buffering configures buffering for this log sink, or NONE to explicitly disable. + Buffering CommonBufferSinkConfigWrapper `yaml:",omitempty"` } // SinkConfig represents the sink configurations. @@ -994,6 +1041,38 @@ func (h *Holder) Set(value string) error { return yaml.UnmarshalStrict([]byte(value), &h.Config) } +// MarshalYAML implements yaml.Marshaler interface. +func (w CommonBufferSinkConfigWrapper) MarshalYAML() (interface{}, error) { + if w.IsNone() { + return "NONE", nil + } + b, err := yaml.Marshal(w.CommonBufferSinkConfig) + return string(b), err +} + +// UnmarshalYAML implements the yaml.Unmarshaler interface. +func (w *CommonBufferSinkConfigWrapper) UnmarshalYAML(fn func(interface{}) error) error { + var v string + if err := fn(&v); err == nil { + if strings.ToUpper(v) == "NONE" { + w.CommonBufferSinkConfig = CommonBufferSinkConfig{ + MaxStaleness: func() *time.Duration { s := time.Duration(0); return &s }(), + FlushTriggerSize: func() *ByteSize { s := ByteSize(0); return &s }(), + } + return nil + } + } + return fn(&w.CommonBufferSinkConfig) +} + +// IsNone before default propagation indicates that the config explicitly disables +// buffering, such that no propagated defaults can actiate them. +// After default propagation, buffering is disabled iff IsNone(). +func (w CommonBufferSinkConfigWrapper) IsNone() bool { + return (w.MaxStaleness != nil && *w.MaxStaleness == 0) && + (w.FlushTriggerSize != nil && *w.FlushTriggerSize == 0) +} + // HTTPSinkMethod is a string restricted to "POST" and "GET" type HTTPSinkMethod string diff --git a/pkg/util/log/logconfig/gen.go b/pkg/util/log/logconfig/gen.go index 9f3a26af1417..b0b470069d23 100644 --- a/pkg/util/log/logconfig/gen.go +++ b/pkg/util/log/logconfig/gen.go @@ -88,7 +88,7 @@ func run() error { sort.Strings(keys) var sortedSinkInfos []*sinkInfo for _, k := range keys { - if k == "CommonSinkConfig" || strings.HasSuffix(k, "Defaults") { + if strings.HasPrefix(k, "Common") || strings.HasSuffix(k, "Defaults") { // We don't want the common configuration to appear as a sink in // the output doc. continue @@ -99,8 +99,9 @@ func run() error { // Render the template. var src bytes.Buffer if err := tmpl.Execute(&src, struct { - Sinks []*sinkInfo - }{sortedSinkInfos}); err != nil { + Sinks []*sinkInfo + Buffering *sinkInfo + }{sortedSinkInfos, info["CommonBufferSinkConfig"]}); err != nil { return err } @@ -316,7 +317,7 @@ Configuration options shared across all sink types: | Field | Description | |--|--| {{range .CommonFields -}} -| ` + "`" + `{{- .FieldName -}}` + "`" + ` | {{ .Comment | tableCell }} | +| ` + "`" + `{{- .FieldName -}}` + "`" + ` | {{ .Comment | tableCell }}{{- if eq .FieldName "buffering" }} See the [common buffering configuration](#buffering-config) section for details. {{end}} | {{end}} {{- end}} @@ -397,4 +398,21 @@ Likewise: channels: {INFO: all except ops,health} etc. + +{{if .Buffering}} + + + +## {{ .Buffering.Name }} + +{{ .Buffering.Comment }} + +{{if .Buffering.Fields -}} +| Field | Description | +|--|--| +{{range .Buffering.Fields -}} +| ` + "`" + `{{- .FieldName -}}` + "`" + ` | {{ .Comment | tableCell }} | +{{end}} +{{- end}} +{{end}} ` diff --git a/pkg/util/log/logconfig/testdata/validate b/pkg/util/log/logconfig/testdata/validate index d144a643ecef..5fe1478b5ced 100644 --- a/pkg/util/log/logconfig/testdata/validate +++ b/pkg/util/log/logconfig/testdata/validate @@ -99,6 +99,7 @@ sinks: redact: false redactable: true exit-on-error: false + buffering: NONE stderr: filter: NONE capture-stray-errors: @@ -170,6 +171,7 @@ sinks: redact: false redactable: true exit-on-error: true + buffering: NONE stderr: filter: NONE capture-stray-errors: @@ -436,3 +438,73 @@ capture-stray-errors: enable: true dir: /default-dir max-group-size: 100MiB + +# Check that each component of buffering struct propagates. +yaml +file-defaults: + buffering: + max-staleness: 15s + flush-trigger-size: 10KiB + max-in-flight: 4 +sinks: + file-groups: + a: + channels: STORAGE + buffering: + max-staleness: 10s + b: + channels: OPS + buffering: + flush-trigger-size: 5.0KiB + c: + channels: HEALTH + buffering: + max-in-flight: 16 + d: + channels: SESSIONS + buffering: NONE +---- +sinks: + file-groups: + a: + channels: {INFO: [STORAGE]} + filter: INFO + buffering: | + max-staleness: 10s + flush-trigger-size: 10KiB + max-in-flight: 4 + b: + channels: {INFO: [OPS]} + filter: INFO + buffering: | + max-staleness: 15s + flush-trigger-size: 5.0KiB + max-in-flight: 4 + c: + channels: {INFO: [HEALTH]} + filter: INFO + buffering: | + max-staleness: 15s + flush-trigger-size: 10KiB + max-in-flight: 16 + d: + channels: {INFO: [SESSIONS]} + filter: INFO + default: + channels: {INFO: [DEV, SQL_SCHEMA, USER_ADMIN, PRIVILEGES, SENSITIVE_ACCESS, + SQL_EXEC, SQL_PERF, SQL_INTERNAL_PERF, TELEMETRY]} + filter: INFO + buffering: | + max-staleness: 15s + flush-trigger-size: 10KiB + max-in-flight: 4 + stderr: + filter: NONE + buffering: | + max-staleness: 15s + flush-trigger-size: 10KiB + max-in-flight: 4 +capture-stray-errors: + enable: true + dir: /default-dir + max-group-size: 100MiB diff --git a/pkg/util/log/logconfig/validate.go b/pkg/util/log/logconfig/validate.go index 306c2b0f8348..a16b27c095ed 100644 --- a/pkg/util/log/logconfig/validate.go +++ b/pkg/util/log/logconfig/validate.go @@ -35,6 +35,9 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) { }() bt, bf := true, false + zeroDuration := time.Duration(0) + zeroByteSize := ByteSize(0) + zeroInt := int(0) baseCommonSinkConfig := CommonSinkConfig{ Filter: logpb.Severity_INFO, @@ -42,12 +45,19 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) { Redactable: &bt, Redact: &bf, Criticality: &bf, + Buffering: CommonBufferSinkConfigWrapper{ + CommonBufferSinkConfig: CommonBufferSinkConfig{ + MaxStaleness: &zeroDuration, + FlushTriggerSize: &zeroByteSize, + MaxInFlight: &zeroInt, + }, + }, } baseFileDefaults := FileDefaults{ Dir: defaultLogDir, BufferedWrites: &bt, - MaxFileSize: func() *ByteSize { s := ByteSize(0); return &s }(), - MaxGroupSize: func() *ByteSize { s := ByteSize(0); return &s }(), + MaxFileSize: &zeroByteSize, + MaxGroupSize: &zeroByteSize, FilePermissions: func() *FilePermissions { s := FilePermissions(0o644); return &s }(), CommonSinkConfig: CommonSinkConfig{ Format: func() *string { s := DefaultFileFormat; return &s }(), @@ -66,7 +76,7 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) { UnsafeTLS: &bf, DisableKeepAlives: &bf, Method: func() *HTTPSinkMethod { m := HTTPSinkMethod(http.MethodPost); return &m }(), - Timeout: func() *time.Duration { d := time.Duration(0); return &d }(), + Timeout: &zeroDuration, } propagateCommonDefaults(&baseFileDefaults.CommonSinkConfig, baseCommonSinkConfig) diff --git a/pkg/util/log/logconfig/validate_test.go b/pkg/util/log/logconfig/validate_test.go index d76e28fc892a..44fce1a188e9 100644 --- a/pkg/util/log/logconfig/validate_test.go +++ b/pkg/util/log/logconfig/validate_test.go @@ -92,6 +92,9 @@ func clearExpectedValues(c *Config) { if *f.Criticality == true { f.Criticality = nil } + if f.Buffering.IsNone() { + f.Buffering = CommonBufferSinkConfigWrapper{} + } } // Clear stderr sink defaults @@ -109,5 +112,8 @@ func clearExpectedValues(c *Config) { if *s.Criticality == true { s.Criticality = nil } + if s.Buffering.IsNone() { + s.Buffering = CommonBufferSinkConfigWrapper{} + } } } diff --git a/pkg/util/log/mock_generated.go b/pkg/util/log/mock_generated.go new file mode 100644 index 000000000000..8696d918b5c8 --- /dev/null +++ b/pkg/util/log/mock_generated.go @@ -0,0 +1,91 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: sinks.go + +// Package log is a generated GoMock package. +package log + +import ( + reflect "reflect" + + exit "github.com/cockroachdb/cockroach/pkg/cli/exit" + gomock "github.com/golang/mock/gomock" +) + +// MockLogSink is a mock of logSink interface. +type MockLogSink struct { + ctrl *gomock.Controller + recorder *MockLogSinkMockRecorder +} + +// MockLogSinkMockRecorder is the mock recorder for MockLogSink. +type MockLogSinkMockRecorder struct { + mock *MockLogSink +} + +// NewMockLogSink creates a new mock instance. +func NewMockLogSink(ctrl *gomock.Controller) *MockLogSink { + mock := &MockLogSink{ctrl: ctrl} + mock.recorder = &MockLogSinkMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockLogSink) EXPECT() *MockLogSinkMockRecorder { + return m.recorder +} + +// active mocks base method. +func (m *MockLogSink) active() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "active") + ret0, _ := ret[0].(bool) + return ret0 +} + +// active indicates an expected call of active. +func (mr *MockLogSinkMockRecorder) active() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "active", reflect.TypeOf((*MockLogSink)(nil).active)) +} + +// attachHints mocks base method. +func (m *MockLogSink) attachHints(arg0 []byte) []byte { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "attachHints", arg0) + ret0, _ := ret[0].([]byte) + return ret0 +} + +// attachHints indicates an expected call of attachHints. +func (mr *MockLogSinkMockRecorder) attachHints(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "attachHints", reflect.TypeOf((*MockLogSink)(nil).attachHints), arg0) +} + +// exitCode mocks base method. +func (m *MockLogSink) exitCode() exit.Code { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "exitCode") + ret0, _ := ret[0].(exit.Code) + return ret0 +} + +// exitCode indicates an expected call of exitCode. +func (mr *MockLogSinkMockRecorder) exitCode() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "exitCode", reflect.TypeOf((*MockLogSink)(nil).exitCode)) +} + +// output mocks base method. +func (m *MockLogSink) output(b []byte, opts sinkOutputOptions) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "output", b, opts) + ret0, _ := ret[0].(error) + return ret0 +} + +// output indicates an expected call of output. +func (mr *MockLogSinkMockRecorder) output(b, opts interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "output", reflect.TypeOf((*MockLogSink)(nil).output), b, opts) +} diff --git a/pkg/util/log/sinks.go b/pkg/util/log/sinks.go index 20f073530388..270896cce18d 100644 --- a/pkg/util/log/sinks.go +++ b/pkg/util/log/sinks.go @@ -12,6 +12,19 @@ package log import "github.com/cockroachdb/cockroach/pkg/cli/exit" +//go:generate mockgen -package=log -source=sinks.go -destination=mock_generated.go -mock_names=logSink=MockLogSink logSink + +// sinkOutputOptions provides various options for a logSink.output call. +type sinkOutputOptions struct { + // extraFlush invites an explicit flush of any buffering. + extraFlush bool + // ignoreErrors disables internal error handling (i.e. fail fast). + ignoreErrors bool + // forceSync forces synchronous operation of this output operation. + // That is, it will block until the output has been handled. + forceSync bool +} + // logSink abstracts the destination of logging events, after all // their details have been collected into a logpb.Entry. // @@ -32,7 +45,7 @@ type logSink interface { // The parent logger's outputMu is held during this operation: log // sinks must not recursively call into logging when implementing // this method. - output(extraSync bool, b []byte) error + output(b []byte, opts sinkOutputOptions) error // exitCode returns the exit code to use if the logger decides // to terminate because of an error in output(). @@ -44,10 +57,11 @@ type logSink interface { // The parent logger's outputMu is held during this operation: log // sinks must not recursively call into logging when implementing // this method. - emergencyOutput([]byte) + // emergencyOutput([]byte) } var _ logSink = (*stderrSink)(nil) var _ logSink = (*fileSink)(nil) var _ logSink = (*fluentSink)(nil) var _ logSink = (*httpSink)(nil) +var _ logSink = (*bufferSink)(nil) diff --git a/pkg/util/log/stderr_sink.go b/pkg/util/log/stderr_sink.go index 5d626a5ae5c6..21f56bd74970 100644 --- a/pkg/util/log/stderr_sink.go +++ b/pkg/util/log/stderr_sink.go @@ -31,7 +31,7 @@ func (l *stderrSink) attachHints(stacks []byte) []byte { } // output implements the logSink interface. -func (l *stderrSink) output(_ bool, b []byte) error { +func (l *stderrSink) output(b []byte, _ sinkOutputOptions) error { _, err := OrigStderr.Write(b) return err } @@ -40,8 +40,3 @@ func (l *stderrSink) output(_ bool, b []byte) error { func (l *stderrSink) exitCode() exit.Code { return exit.LoggingStderrUnavailable() } - -// emergencyOutput implements the logSink interface. -func (l *stderrSink) emergencyOutput(b []byte) { - _, _ = OrigStderr.Write(b) -} From 3da88c0f9900c9b5b3f187d52cd55eb9665a6e64 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Wed, 3 Nov 2021 15:55:05 +0100 Subject: [PATCH 02/19] util/log: rework some comments around the buffering wrapper Release note: None --- pkg/util/log/buffer_sink.go | 54 ++++++++++++++++++++++++++++--------- pkg/util/log/flags.go | 4 +++ 2 files changed, 46 insertions(+), 12 deletions(-) diff --git a/pkg/util/log/buffer_sink.go b/pkg/util/log/buffer_sink.go index 05d5b0527a78..3c36b8e0e4cd 100644 --- a/pkg/util/log/buffer_sink.go +++ b/pkg/util/log/buffer_sink.go @@ -27,7 +27,8 @@ import ( // or size threshold is met, at which point they are put in a queue to be flushed // by the child sink. If the queue is full, current bundle is compacted rather // sent, which currently drops the messages but retains their count for later -// reporting (TODO). +// reporting. +// TODO(knz): Actually report the count of dropped messages. // // Should an error occur in the child sink, it's forwarded to the provided // errCallback (unless forceSync is requested, in which case the error is returned @@ -36,9 +37,14 @@ type bufferSink struct { // child is the wrapped logSink. child logSink - // messageCh sends messages from output() to accumulate(). + // messageCh sends messages from output(), which is called when a + // log entry is initially created, to accumulator(), which is running + // asynchronously to populate the buffer. messageCh chan bufferSinkMessage - // flushCh sends bundles of messages from accumulate() to flusher(). + // flushCh sends bundles of messages from accumulator(), which is + // running asynchronously and collects incoming log entries, to + // flusher(), which is running asynchronously and pushes entries to + // the child sink. flushCh chan bufferSinkBundle // nInFlight internally tracks the population of flushCh to detect when it's full. nInFlight int32 @@ -86,7 +92,7 @@ func newBufferSink( return sink } -// accumulator accumulates messages and sends bundles of them to the accumulator. +// accumulator accumulates messages and sends bundles of them to the flusher. func (bs *bufferSink) accumulator(ctx context.Context) { var ( b bufferSinkBundle @@ -109,9 +115,13 @@ func (bs *bufferSink) accumulator(ctx context.Context) { appendMessage := func(m bufferSinkMessage) { b.messages = append(b.messages, m) - b.byteLen += len(m.b.Bytes()) + 1 + b.byteLen += len(m.b.Bytes()) + 1 // account for the final newline. if m.flush || m.errorCh != nil || (bs.triggerSize > 0 && b.byteLen > bs.triggerSize) { flush = true + // TODO(knz): This seems incorrect. If there is a non-empty + // bufferSinkBundle already; with errorCh already set + // (ie. synchronous previous log entry) and then an entry + // is emitted with *another* errorCh, the first one gets lost. b.errorCh = m.errorCh } else if timer == nil && bs.maxStaleness != 0 { timer = time.After(bs.maxStaleness) @@ -122,6 +132,10 @@ func (bs *bufferSink) accumulator(ctx context.Context) { flush = true case <-ctx.Done(): // Do one last non-blocking read on messageCh, so messages don't get dropped. + // + // TODO(knz): this seems incomplete: there may be multiple + // goroutines writing to messageCh concurrently, and so multiple + // messages might be queued when Done() signals termination. select { case m := <-bs.messageCh: appendMessage(m) @@ -135,6 +149,10 @@ func (bs *bufferSink) accumulator(ctx context.Context) { done := b.done if flush { + // TODO(knz): This logic seems to contain a race condition (with + // the flusher). Also it's not clear why this is using a custom + // atomic counter? Why not using a buffered channel and check + // via `select` that the write is possible? if atomic.LoadInt32(&bs.nInFlight) < bs.maxInFlight { bs.flushCh <- b atomic.AddInt32(&bs.nInFlight, 1) @@ -150,6 +168,12 @@ func (bs *bufferSink) accumulator(ctx context.Context) { } // flusher concatenates bundled messages and sends them to the child sink. +// +// TODO(knz): this code should be extended to detect server shutdowns: +// as currently implemented the flusher will only terminate after all +// the writes in the channel are completed. If the writes are slow, +// the goroutine may not terminate properly when server shutdown is +// requested. func (bs *bufferSink) flusher(ctx context.Context) { for b := range bs.flushCh { if len(b.messages) > 0 { @@ -169,7 +193,8 @@ func (bs *bufferSink) flusher(ctx context.Context) { } forceSync := b.errorCh != nil // Send the accumulated messages to the child sink. - err := bs.child.output(buf.Bytes(), sinkOutputOptions{extraFlush: true, forceSync: forceSync}) + err := bs.child.output(buf.Bytes(), + sinkOutputOptions{extraFlush: true, forceSync: forceSync}) if forceSync { b.errorCh <- err } else if err != nil && bs.errCallback != nil { @@ -182,7 +207,7 @@ func (bs *bufferSink) flusher(ctx context.Context) { } } // Decrease visible queue size at the end, - // so a long-running flush triggers a compact + // so a long-running flush triggers a compaction // instead of a blocked channel. atomic.AddInt32(&bs.nInFlight, -1) if b.done { @@ -194,9 +219,11 @@ func (bs *bufferSink) flusher(ctx context.Context) { // bufferSinkMessage holds an individual log message sent from output to accumulate. type bufferSinkMessage struct { b *buffer - // flush is set if the call explicitly requests to trigger an flush. + // flush is set if the call explicitly requests to trigger a flush. flush bool - // errorCh is set iff the message is forceSync. + // errorCh is set iff the message was emitted with the forceSync flag. + // This indicates that the caller is interested in knowing the error status + // of child sink writes. // The caller will block expecting a (possibly nil) error to return synchronously. errorCh chan<- error } @@ -204,13 +231,16 @@ type bufferSinkMessage struct { // bufferSinkBundle is the accumulated state; the unit sent from the accumulator to the flusher. type bufferSinkBundle struct { messages []bufferSinkMessage - // byteLen is the total length in bytes of the accumulated messages, plus enough for separators. + // byteLen is the total length in bytes of the accumulated messages, + // plus enough for separators. byteLen int // droppedCount is the number of dropped messages due to buffer fullness. droppedCount int - // errorCh, if non-nil, expects to receive the (possibly nil) error after the flush completes. + // errorCh, if non-nil, expects to receive the (possibly nil) error + // after the flush completes. errorCh chan<- error - // done indicates that this is the last bundle and the flusher should shutdown after sending. + // done indicates that this is the last bundle and the flusher + // should shutdown after sending. done bool } diff --git a/pkg/util/log/flags.go b/pkg/util/log/flags.go index 8e9083983867..e91b793a7753 100644 --- a/pkg/util/log/flags.go +++ b/pkg/util/log/flags.go @@ -406,9 +406,13 @@ func attachBufferWrapper(ctx context.Context, s *sinkInfo, c logconfig.CommonSin } errCallback := func(err error) { + // TODO(knz): explain which sink is encountering the error in the + // error message. Ops.Errorf(context.Background(), "logging error: %v", err) } if s.criticality { + // TODO(knz): explain which sink is encountering the error in the + // error message. errCallback = func(err error) { Ops.Errorf(context.Background(), "logging error: %v", err) From ccc7cc0fde14034c88bce483cd4cb9b089de4383 Mon Sep 17 00:00:00 2001 From: richardjcai Date: Wed, 3 Nov 2021 18:15:52 -0400 Subject: [PATCH 03/19] sql: add unit tests for creating default privileges Release note: None --- .../catprivilege/default_privilege_test.go | 197 ++++++++++++++++++ 1 file changed, 197 insertions(+) diff --git a/pkg/sql/catalog/catprivilege/default_privilege_test.go b/pkg/sql/catalog/catprivilege/default_privilege_test.go index 13030b353c8e..9a98ba7dcd25 100644 --- a/pkg/sql/catalog/catprivilege/default_privilege_test.go +++ b/pkg/sql/catalog/catprivilege/default_privilege_test.go @@ -359,6 +359,203 @@ func TestDefaultDefaultPrivileges(t *testing.T) { } } +func TestDefaultPrivileges(t *testing.T) { + defer leaktest.AfterTest(t)() + + // The ID chosen doesn't matter as long as it's not the system db ID. + defaultDatabaseID := descpb.ID(50) + + type userAndGrants struct { + user security.SQLUsername + grants privilege.List + } + testCases := []struct { + objectCreator security.SQLUsername + defaultPrivilegesRole security.SQLUsername + dbID descpb.ID + targetObject tree.AlterDefaultPrivilegesTargetObject + userAndGrants []userAndGrants + expectedGrantsOnObject []userAndGrants + }{ + { + // Altering default privileges on the system database normally wouldn't + // be possible but we do it here via directly altering the default + // privilege descriptor here. + // The purpose of this test however is to show that even after altering + // the default privileges, if we create an object in the system database, + // the only privileges on the object are ALL privileges for root and + // admin. + objectCreator: security.MakeSQLUsernameFromPreNormalizedString("creator"), + defaultPrivilegesRole: security.MakeSQLUsernameFromPreNormalizedString("creator"), + targetObject: tree.Tables, + dbID: keys.SystemDatabaseID, + userAndGrants: []userAndGrants{ + { + user: security.MakeSQLUsernameFromPreNormalizedString("foo"), + grants: privilege.List{privilege.SELECT}, + }, + }, + expectedGrantsOnObject: []userAndGrants{ + { + user: security.RootUserName(), + grants: privilege.List{privilege.ALL}, + }, + { + user: security.AdminRoleName(), + grants: privilege.List{privilege.ALL}, + }, + }, + }, + { + objectCreator: security.MakeSQLUsernameFromPreNormalizedString("creator"), + defaultPrivilegesRole: security.MakeSQLUsernameFromPreNormalizedString("creator"), + targetObject: tree.Tables, + dbID: defaultDatabaseID, + userAndGrants: []userAndGrants{ + { + user: security.MakeSQLUsernameFromPreNormalizedString("foo"), + grants: privilege.List{privilege.SELECT}, + }, + }, + expectedGrantsOnObject: []userAndGrants{ + { + user: security.MakeSQLUsernameFromPreNormalizedString("foo"), + grants: privilege.List{privilege.SELECT}, + }, + }, + }, + { + objectCreator: security.MakeSQLUsernameFromPreNormalizedString("creator"), + defaultPrivilegesRole: security.MakeSQLUsernameFromPreNormalizedString("creator"), + targetObject: tree.Tables, + dbID: defaultDatabaseID, + userAndGrants: []userAndGrants{ + { + user: security.MakeSQLUsernameFromPreNormalizedString("foo"), + grants: privilege.List{privilege.ALL}, + }, + }, + expectedGrantsOnObject: []userAndGrants{ + { + user: security.MakeSQLUsernameFromPreNormalizedString("foo"), + grants: privilege.List{privilege.ALL}, + }, + }, + }, + { + objectCreator: security.MakeSQLUsernameFromPreNormalizedString("creator"), + defaultPrivilegesRole: security.MakeSQLUsernameFromPreNormalizedString("creator"), + targetObject: tree.Tables, + dbID: defaultDatabaseID, + userAndGrants: []userAndGrants{ + { + user: security.MakeSQLUsernameFromPreNormalizedString("foo"), + grants: privilege.List{privilege.SELECT}, + }, + }, + expectedGrantsOnObject: []userAndGrants{ + { + user: security.MakeSQLUsernameFromPreNormalizedString("foo"), + grants: privilege.List{privilege.SELECT}, + }, + }, + }, + { + objectCreator: security.MakeSQLUsernameFromPreNormalizedString("creator"), + defaultPrivilegesRole: security.MakeSQLUsernameFromPreNormalizedString("creator"), + targetObject: tree.Tables, + dbID: defaultDatabaseID, + userAndGrants: []userAndGrants{ + { + user: security.MakeSQLUsernameFromPreNormalizedString("foo"), + grants: privilege.List{privilege.ALL}, + }, + }, + expectedGrantsOnObject: []userAndGrants{ + { + user: security.MakeSQLUsernameFromPreNormalizedString("foo"), + grants: privilege.List{privilege.ALL}, + }, + }, + }, + { + objectCreator: security.MakeSQLUsernameFromPreNormalizedString("creator"), + defaultPrivilegesRole: security.MakeSQLUsernameFromPreNormalizedString("creator"), + targetObject: tree.Tables, + dbID: defaultDatabaseID, + userAndGrants: []userAndGrants{ + { + user: security.MakeSQLUsernameFromPreNormalizedString("foo"), + grants: privilege.List{privilege.SELECT}, + }, + { + user: security.MakeSQLUsernameFromPreNormalizedString("bar"), + grants: privilege.List{privilege.ALL}, + }, + }, + expectedGrantsOnObject: []userAndGrants{ + { + user: security.MakeSQLUsernameFromPreNormalizedString("foo"), + grants: privilege.List{privilege.SELECT}, + }, + { + user: security.MakeSQLUsernameFromPreNormalizedString("bar"), + grants: privilege.List{privilege.ALL}, + }, + }, + }, + { + // In this case, we ALTER DEFAULT PRIVILEGES for the role foo. + // However the default privileges are retrieved for bar, thus + // we don't expect any privileges on the object. + objectCreator: security.MakeSQLUsernameFromPreNormalizedString("foo"), + defaultPrivilegesRole: security.MakeSQLUsernameFromPreNormalizedString("bar"), + targetObject: tree.Tables, + dbID: defaultDatabaseID, + userAndGrants: []userAndGrants{ + { + user: security.MakeSQLUsernameFromPreNormalizedString("foo"), + grants: privilege.List{privilege.SELECT}, + }, + { + user: security.MakeSQLUsernameFromPreNormalizedString("bar"), + grants: privilege.List{privilege.ALL}, + }, + }, + expectedGrantsOnObject: []userAndGrants{}, + }, + } + for _, tc := range testCases { + defaultPrivilegeDescriptor := MakeNewDefaultPrivilegeDescriptor() + defaultPrivileges := NewMutableDefaultPrivileges(defaultPrivilegeDescriptor) + + for _, userAndGrant := range tc.userAndGrants { + defaultPrivileges.GrantDefaultPrivileges( + descpb.DefaultPrivilegesRole{Role: tc.defaultPrivilegesRole}, + userAndGrant.grants, + []security.SQLUsername{userAndGrant.user}, + tc.targetObject, + ) + } + + createdPrivileges := defaultPrivileges.CreatePrivilegesFromDefaultPrivileges( + tc.dbID, + tc.objectCreator, + tc.targetObject, + &descpb.PrivilegeDescriptor{}, + ) + + for _, userAndGrant := range tc.expectedGrantsOnObject { + for _, grant := range userAndGrant.grants { + if !createdPrivileges.CheckPrivilege(userAndGrant.user, grant) { + t.Errorf("expected to find %s privilege for %s", grant.String(), userAndGrant.user) + } + } + } + + } +} + func TestModifyDefaultDefaultPrivileges(t *testing.T) { defer leaktest.AfterTest(t)() From b1eba61ab7eb3e1b9af07e0716cb6f459580be9c Mon Sep 17 00:00:00 2001 From: Rafi Shamim Date: Tue, 2 Nov 2021 16:54:38 -0400 Subject: [PATCH 04/19] *: fix improperly wrapped errors I'm working on a linter that detects errors that are not wrapped correctly, and it discovered these. Release note: None --- pkg/cmd/cmpconn/conn.go | 2 +- pkg/cmd/docgen/extract/extract.go | 2 +- pkg/cmd/roachprod-stress/main.go | 18 +++++++++--------- pkg/cmd/roachtest/tests/canary.go | 9 +++++---- pkg/release/release.go | 4 ++-- pkg/roachprod/install/cluster_synced.go | 3 +-- pkg/roachprod/vm/vm.go | 2 +- pkg/rpc/context_test.go | 4 ++-- pkg/sql/BUILD.bazel | 1 + pkg/sql/type_change_test.go | 3 ++- pkg/testutils/sqlutils/BUILD.bazel | 1 + pkg/testutils/sqlutils/sql_runner.go | 5 +++-- pkg/workload/tpccchecks/checks_generator.go | 2 +- 13 files changed, 30 insertions(+), 26 deletions(-) diff --git a/pkg/cmd/cmpconn/conn.go b/pkg/cmd/cmpconn/conn.go index ed9431ed472c..1d06c778d8a7 100644 --- a/pkg/cmd/cmpconn/conn.go +++ b/pkg/cmd/cmpconn/conn.go @@ -249,7 +249,7 @@ ReadRows: first = vals } else { if err := CompareVals(first, vals); err != nil { - return false, fmt.Errorf("compare %s to %s:\n%v", firstName, name, err) + return false, errors.Wrapf(err, "compare %s to %s", firstName, name) } } } diff --git a/pkg/cmd/docgen/extract/extract.go b/pkg/cmd/docgen/extract/extract.go index dc8783dd8750..5eb9b5c009bd 100644 --- a/pkg/cmd/docgen/extract/extract.go +++ b/pkg/cmd/docgen/extract/extract.go @@ -58,7 +58,7 @@ func GenerateRRJar(jar string, bnf []byte) ([]byte, error) { out, err := cmd.CombinedOutput() if err != nil { - return nil, fmt.Errorf("%s: %s", err, out) + return nil, fmt.Errorf("%w: %s", err, out) } return out, nil } diff --git a/pkg/cmd/roachprod-stress/main.go b/pkg/cmd/roachprod-stress/main.go index 2d0b6a8f2f38..ee4de775871c 100644 --- a/pkg/cmd/roachprod-stress/main.go +++ b/pkg/cmd/roachprod-stress/main.go @@ -79,7 +79,7 @@ func run() error { { fi, err := os.Stat(pkg) if err != nil { - return fmt.Errorf("the pkg flag %q is not a directory relative to the current working directory: %v", pkg, err) + return errors.Wrapf(err, "the pkg flag %q is not a directory relative to the current working directory", pkg) } if !fi.Mode().IsDir() { return fmt.Errorf("the pkg flag %q is not a directory relative to the current working directory", pkg) @@ -88,7 +88,7 @@ func run() error { // Verify that the test binary exists. fi, err = os.Stat(localTestBin) if err != nil { - return fmt.Errorf("test binary %q does not exist: %v", localTestBin, err) + return errors.Wrapf(err, "test binary %q does not exist", localTestBin) } if !fi.Mode().IsRegular() { return fmt.Errorf("test binary %q is not a file", localTestBin) @@ -113,19 +113,19 @@ func run() error { } if *flagFailure != "" { if _, err := regexp.Compile(*flagFailure); err != nil { - return fmt.Errorf("bad failure regexp: %s", err) + return errors.Wrap(err, "bad failure regexp") } } if *flagIgnore != "" { if _, err := regexp.Compile(*flagIgnore); err != nil { - return fmt.Errorf("bad ignore regexp: %s", err) + return errors.Wrap(err, "bad ignore regexp") } } cmd := exec.Command("roachprod", "status", cluster) out, err := cmd.CombinedOutput() if err != nil { - return fmt.Errorf("%v\n%s", err, out) + return errors.Wrapf(err, "%s", out) } nodes := strings.Count(string(out), "\n") - 1 @@ -160,15 +160,15 @@ func run() error { tmpPath := "testdata" + strconv.Itoa(rand.Int()) cmd = exec.Command("roachprod", "run", cluster, "--", "rm", "-rf", testdataPath) if output, err := cmd.CombinedOutput(); err != nil { - return fmt.Errorf("failed to remove old testdata: %v:\n%s", err, output) + return errors.Wrapf(err, "failed to remove old testdata:\n%s", output) } cmd = exec.Command("roachprod", "put", cluster, testdataPath, tmpPath) if err := cmd.Run(); err != nil { - return fmt.Errorf("failed to copy testdata: %v", err) + return errors.Wrap(err, "failed to copy testdata") } cmd = exec.Command("roachprod", "run", cluster, "mv", tmpPath, testdataPath) if err := cmd.Run(); err != nil { - return fmt.Errorf("failed to move testdata: %v", err) + return errors.Wrap(err, "failed to move testdata") } } testBin := filepath.Join(pkg, localTestBin) @@ -310,7 +310,7 @@ func run() error { } return err default: - return fmt.Errorf("unexpected context error: %v", err) + return errors.Wrap(err, "unexpected context error") } } } diff --git a/pkg/cmd/roachtest/tests/canary.go b/pkg/cmd/roachtest/tests/canary.go index 952c2df52094..a549f5298635 100644 --- a/pkg/cmd/roachtest/tests/canary.go +++ b/pkg/cmd/roachtest/tests/canary.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/errors" ) // This file contains common elements for all 3rd party test suite roachtests. @@ -131,7 +132,7 @@ func repeatRunE( } return nil } - return fmt.Errorf("all attempts failed for %s due to error: %s", operation, lastError) + return errors.Wrapf(lastError, "all attempts failed for %s", operation) } // repeatRunWithBuffer is the same function as c.RunWithBuffer but with an @@ -164,7 +165,7 @@ func repeatRunWithBuffer( } return lastResult, nil } - return nil, fmt.Errorf("all attempts failed for %s, with error: %s\n%s", operation, lastError, lastResult) + return nil, errors.Wrapf(lastError, "all attempts failed for %s\n%s", operation, lastResult) } // repeatGitCloneE is the same function as c.GitCloneE but with an automatic @@ -193,7 +194,7 @@ func repeatGitCloneE( } return nil } - return fmt.Errorf("could not clone %s due to error: %s", src, lastError) + return errors.Wrapf(lastError, "could not clone %s", src) } // repeatGetLatestTag fetches the latest (sorted) tag from a github repo. @@ -290,7 +291,7 @@ func repeatGetLatestTag( return releaseTags[len(releaseTags)-1].tag, nil } - return "", fmt.Errorf("could not get tags from %s, due to error: %s", url, lastError) + return "", errors.Wrapf(lastError, "could not get tags from %s", url) } // gitCloneWithRecurseSubmodules clones a git repo from src into dest and checks out origin's diff --git a/pkg/release/release.go b/pkg/release/release.go index 3d0177311f68..1611a45b8613 100644 --- a/pkg/release/release.go +++ b/pkg/release/release.go @@ -151,7 +151,7 @@ func MakeRelease(b SupportedTarget, pkgDir string, opts ...MakeReleaseOption) er } log.Printf("%s %s", cmd.Env, cmd.Args) if out, err := params.execFn(cmd); err != nil { - return errors.Newf("%s %s: %s\n\n%s", cmd.Env, cmd.Args, err, out) + return errors.Wrapf(err, "%s %s:\n\n%s", cmd.Env, cmd.Args, out) } } if strings.Contains(b.BuildType, "linux") { @@ -163,7 +163,7 @@ func MakeRelease(b SupportedTarget, pkgDir string, opts ...MakeReleaseOption) er cmd.Stderr = os.Stderr log.Printf("%s %s", cmd.Env, cmd.Args) if out, err := params.execFn(cmd); err != nil { - return errors.Newf("%s %s: %s\n\n%s", cmd.Env, cmd.Args, err, out) + return errors.Wrapf(err, "%s %s:\n\n%s", cmd.Env, cmd.Args, out) } cmd = exec.Command("ldd", binaryName) diff --git a/pkg/roachprod/install/cluster_synced.go b/pkg/roachprod/install/cluster_synced.go index fe9a77017183..d052b5b91d8e 100644 --- a/pkg/roachprod/install/cluster_synced.go +++ b/pkg/roachprod/install/cluster_synced.go @@ -1301,8 +1301,7 @@ func (c *SyncedCluster) Logs( var errBuf bytes.Buffer cmd.Stderr = &errBuf if err := cmd.Run(); err != nil && ctx.Err() == nil { - return fmt.Errorf("failed to run cockroach debug merge-logs:%v\n%v", - err, errBuf.String()) + return errors.Wrapf(err, "failed to run cockroach debug merge-logs:\n%v", errBuf.String()) } return nil } diff --git a/pkg/roachprod/vm/vm.go b/pkg/roachprod/vm/vm.go index 0536f182fcd5..81c75517cf81 100644 --- a/pkg/roachprod/vm/vm.go +++ b/pkg/roachprod/vm/vm.go @@ -375,7 +375,7 @@ func ExpandZonesFlag(zoneFlag []string) (zones []string, err error) { } n, err := strconv.Atoi(zone[colonIdx+1:]) if err != nil { - return zones, fmt.Errorf("failed to parse %q: %v", zone, err) + return zones, errors.Wrapf(err, "failed to parse %q", zone) } for i := 0; i < n; i++ { zones = append(zones, zone[:colonIdx]) diff --git a/pkg/rpc/context_test.go b/pkg/rpc/context_test.go index a2cc242a9654..0758df378b26 100644 --- a/pkg/rpc/context_test.go +++ b/pkg/rpc/context_test.go @@ -1307,13 +1307,13 @@ func grpcRunKeepaliveTestCase(testCtx context.Context, c grpcKeepaliveTestCase) } if c.expClose { if sendErr == nil || !grpcutil.IsClosedConnection(sendErr) { - newErr := fmt.Errorf("expected closed connection, found %v", sendErr) + newErr := errors.Newf("expected closed connection, found %v", sendErr) log.Infof(ctx, "%+v", newErr) return newErr } } else { if sendErr != nil { - newErr := fmt.Errorf("expected unclosed connection, found %v", sendErr) + newErr := errors.Newf("expected unclosed connection, found %v", sendErr) log.Infof(ctx, "%+v", newErr) return newErr } diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index baf7330b0662..b44fc803fe42 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -588,6 +588,7 @@ go_test( "//pkg/sql/opt/exec/explain", "//pkg/sql/parser", "//pkg/sql/pgwire/pgcode", + "//pkg/sql/pgwire/pgerror", "//pkg/sql/pgwire/pgwirebase", "//pkg/sql/physicalplan", "//pkg/sql/querycache", diff --git a/pkg/sql/type_change_test.go b/pkg/sql/type_change_test.go index 612f95f4c4f8..695b09bfc8b6 100644 --- a/pkg/sql/type_change_test.go +++ b/pkg/sql/type_change_test.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -582,7 +583,7 @@ WHERE return errors.New("expected error, found none") } if !testutils.IsError(err, "invalid input value for enum") { - return errors.NewAssertionErrorWithWrappedErrf(err, "expected invalid input for enum error") + return errors.Newf("expected invalid input for enum error, found %s", pgerror.FullError(err)) } return nil }) diff --git a/pkg/testutils/sqlutils/BUILD.bazel b/pkg/testutils/sqlutils/BUILD.bazel index 88b16a822aec..cc92d3d1988a 100644 --- a/pkg/testutils/sqlutils/BUILD.bazel +++ b/pkg/testutils/sqlutils/BUILD.bazel @@ -23,6 +23,7 @@ go_library( "//pkg/sql/catalog/descpb", "//pkg/sql/lexbase", "//pkg/sql/parser", + "//pkg/sql/pgwire/pgerror", "//pkg/sql/sem/tree", "//pkg/testutils", "//pkg/util/fileutil", diff --git a/pkg/testutils/sqlutils/sql_runner.go b/pkg/testutils/sqlutils/sql_runner.go index 4471b920953e..0b1ccc62ad57 100644 --- a/pkg/testutils/sqlutils/sql_runner.go +++ b/pkg/testutils/sqlutils/sql_runner.go @@ -18,6 +18,7 @@ import ( "strings" "time" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" @@ -105,7 +106,7 @@ func (sr *SQLRunner) ExpectErr(t testutils.TB, errRE string, query string, args t.Helper() _, err := sr.DB.ExecContext(context.Background(), query, args...) if !testutils.IsError(err, errRE) { - t.Fatalf("expected error '%s', got: %v", errRE, err) + t.Fatalf("expected error '%s', got: %s", errRE, pgerror.FullError(err)) } } @@ -117,7 +118,7 @@ func (sr *SQLRunner) ExpectErrSucceedsSoon( sr.succeedsWithin(t, func() error { _, err := sr.DB.ExecContext(context.Background(), query, args...) if !testutils.IsError(err, errRE) { - return errors.Newf("expected error '%s', got: %v", errRE, err) + return errors.Newf("expected error '%s', got: %s", errRE, pgerror.FullError(err)) } return nil }) diff --git a/pkg/workload/tpccchecks/checks_generator.go b/pkg/workload/tpccchecks/checks_generator.go index 68441e505978..8648683ae1c1 100644 --- a/pkg/workload/tpccchecks/checks_generator.go +++ b/pkg/workload/tpccchecks/checks_generator.go @@ -98,7 +98,7 @@ func (w *tpccChecks) Ops( ) (workload.QueryLoad, error) { sqlDatabase, err := workload.SanitizeUrls(w, w.flags.Lookup("db").Value.String(), urls) if err != nil { - return workload.QueryLoad{}, fmt.Errorf("%v", err) + return workload.QueryLoad{}, errors.Wrapf(err, "could not sanitize urls %v", urls) } dbs := make([]*gosql.DB, len(urls)) for i, url := range urls { From 87811bc473cd97507ecf08b26b69fbd0d3d06563 Mon Sep 17 00:00:00 2001 From: Ahmad Abedalqader Date: Thu, 4 Nov 2021 07:46:58 -0700 Subject: [PATCH 05/19] roachprod: fix `roachprod start` ignoring --binary flag Merging #71660 introduced a bug where roachprod ignores --binary flag when running `roachprod start`. This patch reverts to the old way of setting config.Binary. Release note: None Fixes #72425 #72420 #72373 #72372 --- pkg/cmd/roachprod/main.go | 35 +++++++++++++++++------------------ pkg/roachprod/roachprod.go | 3 +-- 2 files changed, 18 insertions(+), 20 deletions(-) diff --git a/pkg/cmd/roachprod/main.go b/pkg/cmd/roachprod/main.go index 0090a613cc67..5a558fbc317d 100644 --- a/pkg/cmd/roachprod/main.go +++ b/pkg/cmd/roachprod/main.go @@ -63,22 +63,21 @@ destroy the cluster. } var ( - numNodes int - numRacks int - username string - dryrun bool - destroyAllMine bool - destroyAllLocal bool - extendLifetime time.Duration - wipePreserveCerts bool - listDetails bool - listJSON bool - listMine bool - listPattern string - sqlCockroachBinary = "cockroach" - secure = false - extraSSHOptions = "" - nodeEnv = []string{ + numNodes int + numRacks int + username string + dryrun bool + destroyAllMine bool + destroyAllLocal bool + extendLifetime time.Duration + wipePreserveCerts bool + listDetails bool + listJSON bool + listMine bool + listPattern string + secure = false + extraSSHOptions = "" + nodeEnv = []string{ "COCKROACH_ENABLE_RPC_COMPRESSION=false", "COCKROACH_UI_RELEASE_NOTES_SIGNUP_DISMISSED=true", } @@ -795,7 +794,7 @@ var sqlCmd = &cobra.Command{ Long: "Run `cockroach sql` on a remote cluster.\n", Args: cobra.MinimumNArgs(1), Run: wrap(func(cmd *cobra.Command, args []string) error { - return roachprod.SQL(clusterOpts(args[0]), sqlCockroachBinary, args[1:]) + return roachprod.SQL(clusterOpts(args[0]), args[1:]) }), } @@ -1134,7 +1133,7 @@ func main() { fallthrough case sqlCmd: cmd.Flags().StringVarP( - &sqlCockroachBinary, "binary", "b", "cockroach", + &config.Binary, "binary", "b", config.Binary, "the remote cockroach binary to use") fallthrough case pgurlCmd, adminurlCmd: diff --git a/pkg/roachprod/roachprod.go b/pkg/roachprod/roachprod.go index 030f7256aad7..dda1775ffbd2 100644 --- a/pkg/roachprod/roachprod.go +++ b/pkg/roachprod/roachprod.go @@ -423,8 +423,7 @@ func Run(clusterOpts install.SyncedCluster, SSHOptions string, cmdArray []string } // SQL runs `cockroach sql` on a remote cluster. -func SQL(clusterOpts install.SyncedCluster, remoteCockroachBinary string, cmdArray []string) error { - config.Binary = remoteCockroachBinary +func SQL(clusterOpts install.SyncedCluster, cmdArray []string) error { c, err := newCluster(clusterOpts) if err != nil { return err From ea6f78c603d8cbe4c781a1f0e5cccf0acb3fe0a2 Mon Sep 17 00:00:00 2001 From: Jay Rauchenstein Date: Wed, 3 Nov 2021 13:43:53 -0700 Subject: [PATCH 06/19] util/log: forbid new-style buffering for file sinks There remain issues to be investigated surrounding the interaction of the new buffered logging decorator and the file sink concerning synchronization at shutdown. Forbidding it from being enabled in the logconfig for now. Release note: None --- pkg/util/log/logconfig/testdata/validate | 49 +++++++++++++++++------- pkg/util/log/logconfig/validate.go | 3 ++ 2 files changed, 38 insertions(+), 14 deletions(-) diff --git a/pkg/util/log/logconfig/testdata/validate b/pkg/util/log/logconfig/testdata/validate index 5fe1478b5ced..351b0ad52b56 100644 --- a/pkg/util/log/logconfig/testdata/validate +++ b/pkg/util/log/logconfig/testdata/validate @@ -441,69 +441,90 @@ capture-stray-errors: # Check that each component of buffering struct propagates. yaml -file-defaults: +fluent-defaults: buffering: max-staleness: 15s flush-trigger-size: 10KiB max-in-flight: 4 sinks: - file-groups: + fluent-servers: a: + address: a channels: STORAGE buffering: max-staleness: 10s b: + address: b channels: OPS buffering: flush-trigger-size: 5.0KiB c: + address: c channels: HEALTH buffering: max-in-flight: 16 d: + address: d channels: SESSIONS buffering: NONE ---- sinks: file-groups: + default: + channels: {INFO: all} + filter: INFO + fluent-servers: a: channels: {INFO: [STORAGE]} + net: tcp + address: a filter: INFO + format: json-fluent-compact + redact: false + redactable: true + exit-on-error: false buffering: | max-staleness: 10s flush-trigger-size: 10KiB max-in-flight: 4 b: channels: {INFO: [OPS]} + net: tcp + address: b filter: INFO + format: json-fluent-compact + redact: false + redactable: true + exit-on-error: false buffering: | max-staleness: 15s flush-trigger-size: 5.0KiB max-in-flight: 4 c: channels: {INFO: [HEALTH]} + net: tcp + address: c filter: INFO + format: json-fluent-compact + redact: false + redactable: true + exit-on-error: false buffering: | max-staleness: 15s flush-trigger-size: 10KiB max-in-flight: 16 d: channels: {INFO: [SESSIONS]} + net: tcp + address: d filter: INFO - default: - channels: {INFO: [DEV, SQL_SCHEMA, USER_ADMIN, PRIVILEGES, SENSITIVE_ACCESS, - SQL_EXEC, SQL_PERF, SQL_INTERNAL_PERF, TELEMETRY]} - filter: INFO - buffering: | - max-staleness: 15s - flush-trigger-size: 10KiB - max-in-flight: 4 + format: json-fluent-compact + redact: false + redactable: true + exit-on-error: false + buffering: NONE stderr: filter: NONE - buffering: | - max-staleness: 15s - flush-trigger-size: 10KiB - max-in-flight: 4 capture-stray-errors: enable: true dir: /default-dir diff --git a/pkg/util/log/logconfig/validate.go b/pkg/util/log/logconfig/validate.go index a16b27c095ed..5c22b0d93abf 100644 --- a/pkg/util/log/logconfig/validate.go +++ b/pkg/util/log/logconfig/validate.go @@ -323,6 +323,9 @@ func (c *Config) newFileSinkConfig(groupName string) *FileSinkConfig { func (c *Config) validateFileSinkConfig(fc *FileSinkConfig, defaultLogDir *string) error { propagateFileDefaults(&fc.FileDefaults, c.FileDefaults) + if !fc.Buffering.IsNone() { + return errors.New(`"buffering" not yet supported for file-groups. Use "buffered-writes".`) + } if fc.Dir != c.FileDefaults.Dir { // A directory was specified explicitly. Normalize it. if err := normalizeDir(&fc.Dir); err != nil { From a3e7b64484f22d2dde18f1d50668ad4619a4f426 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Thu, 4 Nov 2021 21:31:11 +0100 Subject: [PATCH 07/19] logconfig: remove a configuration pretty-print artifact Release note: None --- pkg/util/log/logconfig/config.go | 3 +-- pkg/util/log/logconfig/testdata/validate | 6 +++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/pkg/util/log/logconfig/config.go b/pkg/util/log/logconfig/config.go index 33a59b29f374..d4af810f3bb1 100644 --- a/pkg/util/log/logconfig/config.go +++ b/pkg/util/log/logconfig/config.go @@ -1046,8 +1046,7 @@ func (w CommonBufferSinkConfigWrapper) MarshalYAML() (interface{}, error) { if w.IsNone() { return "NONE", nil } - b, err := yaml.Marshal(w.CommonBufferSinkConfig) - return string(b), err + return w.CommonBufferSinkConfig, nil } // UnmarshalYAML implements the yaml.Unmarshaler interface. diff --git a/pkg/util/log/logconfig/testdata/validate b/pkg/util/log/logconfig/testdata/validate index 351b0ad52b56..1bf2573753ac 100644 --- a/pkg/util/log/logconfig/testdata/validate +++ b/pkg/util/log/logconfig/testdata/validate @@ -483,7 +483,7 @@ sinks: redact: false redactable: true exit-on-error: false - buffering: | + buffering: max-staleness: 10s flush-trigger-size: 10KiB max-in-flight: 4 @@ -496,7 +496,7 @@ sinks: redact: false redactable: true exit-on-error: false - buffering: | + buffering: max-staleness: 15s flush-trigger-size: 5.0KiB max-in-flight: 4 @@ -509,7 +509,7 @@ sinks: redact: false redactable: true exit-on-error: false - buffering: | + buffering: max-staleness: 15s flush-trigger-size: 10KiB max-in-flight: 16 From 6af3cee72f76abb912abbac1a3b43c65824203ec Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Thu, 4 Nov 2021 21:31:31 +0100 Subject: [PATCH 08/19] logconfig: simplify some code Release note: None --- pkg/util/log/logconfig/config.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/util/log/logconfig/config.go b/pkg/util/log/logconfig/config.go index d4af810f3bb1..eb9da6df568a 100644 --- a/pkg/util/log/logconfig/config.go +++ b/pkg/util/log/logconfig/config.go @@ -1054,9 +1054,11 @@ func (w *CommonBufferSinkConfigWrapper) UnmarshalYAML(fn func(interface{}) error var v string if err := fn(&v); err == nil { if strings.ToUpper(v) == "NONE" { + d := time.Duration(0) + s := ByteSize(0) w.CommonBufferSinkConfig = CommonBufferSinkConfig{ - MaxStaleness: func() *time.Duration { s := time.Duration(0); return &s }(), - FlushTriggerSize: func() *ByteSize { s := ByteSize(0); return &s }(), + MaxStaleness: &d, + FlushTriggerSize: &s, } return nil } From 27dd89a810476dc8ed9541a81ad9c1dc1a512b23 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Thu, 4 Nov 2021 21:37:56 +0100 Subject: [PATCH 09/19] logconfig: link issue #72452 in the error message Release note: None --- pkg/util/log/logconfig/BUILD.bazel | 1 + pkg/util/log/logconfig/validate.go | 10 +++++++++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/pkg/util/log/logconfig/BUILD.bazel b/pkg/util/log/logconfig/BUILD.bazel index cdb8f7ecbaf7..48374a7312a6 100644 --- a/pkg/util/log/logconfig/BUILD.bazel +++ b/pkg/util/log/logconfig/BUILD.bazel @@ -24,6 +24,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/util/log/logconfig", visibility = ["//visibility:public"], deps = [ + "//pkg/build", "//pkg/util/log/logpb", "@com_github_cockroachdb_errors//:errors", "@com_github_dustin_go_humanize//:go-humanize", diff --git a/pkg/util/log/logconfig/validate.go b/pkg/util/log/logconfig/validate.go index 5c22b0d93abf..bccc35b830b0 100644 --- a/pkg/util/log/logconfig/validate.go +++ b/pkg/util/log/logconfig/validate.go @@ -20,6 +20,7 @@ import ( "strings" "time" + "github.com/cockroachdb/cockroach/pkg/build" "github.com/cockroachdb/cockroach/pkg/util/log/logpb" "github.com/cockroachdb/errors" ) @@ -324,7 +325,14 @@ func (c *Config) newFileSinkConfig(groupName string) *FileSinkConfig { func (c *Config) validateFileSinkConfig(fc *FileSinkConfig, defaultLogDir *string) error { propagateFileDefaults(&fc.FileDefaults, c.FileDefaults) if !fc.Buffering.IsNone() { - return errors.New(`"buffering" not yet supported for file-groups. Use "buffered-writes".`) + // We cannot use unimplemented.WithIssue() here because of a + // circular dependency. + err := errors.UnimplementedError( + errors.IssueLink{IssueURL: build.MakeIssueURL(72452)}, + `unimplemented: "buffering" not yet supported for file-groups`) + err = errors.WithHint(err, `Use "buffered-writes".`) + err = errors.WithTelemetry(err, "#72452") + return err } if fc.Dir != c.FileDefaults.Dir { // A directory was specified explicitly. Normalize it. From 0461a6b020564f000e24d18a83387b90bffea4eb Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Thu, 4 Nov 2021 21:41:54 +0100 Subject: [PATCH 10/19] log: link issue #72453 where relevant Release note: None --- pkg/util/log/buffer_sink.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/util/log/buffer_sink.go b/pkg/util/log/buffer_sink.go index 3c36b8e0e4cd..1eff10411817 100644 --- a/pkg/util/log/buffer_sink.go +++ b/pkg/util/log/buffer_sink.go @@ -29,6 +29,7 @@ import ( // sent, which currently drops the messages but retains their count for later // reporting. // TODO(knz): Actually report the count of dropped messages. +// See: https://github.com/cockroachdb/cockroach/issues/72453 // // Should an error occur in the child sink, it's forwarded to the provided // errCallback (unless forceSync is requested, in which case the error is returned @@ -235,6 +236,8 @@ type bufferSinkBundle struct { // plus enough for separators. byteLen int // droppedCount is the number of dropped messages due to buffer fullness. + // TODO(knz): This needs to get reported somehow, see + // https://github.com/cockroachdb/cockroach/issues/72453 droppedCount int // errorCh, if non-nil, expects to receive the (possibly nil) error // after the flush completes. From cf83a9b209b70041b77e525344587fb459a93d96 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Thu, 4 Nov 2021 21:45:19 +0100 Subject: [PATCH 11/19] log: link issue #72454 where appropriate Release note: None --- pkg/util/log/buffer_sink.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/util/log/buffer_sink.go b/pkg/util/log/buffer_sink.go index 1eff10411817..6c2167ae6fc4 100644 --- a/pkg/util/log/buffer_sink.go +++ b/pkg/util/log/buffer_sink.go @@ -123,6 +123,7 @@ func (bs *bufferSink) accumulator(ctx context.Context) { // bufferSinkBundle already; with errorCh already set // (ie. synchronous previous log entry) and then an entry // is emitted with *another* errorCh, the first one gets lost. + // See: https://github.com/cockroachdb/cockroach/issues/72454 b.errorCh = m.errorCh } else if timer == nil && bs.maxStaleness != 0 { timer = time.After(bs.maxStaleness) From f19f54b1963e8870e496b440d3966d774dbd78ee Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Thu, 4 Nov 2021 21:48:53 +0100 Subject: [PATCH 12/19] log: link issue #72455 where appropriate Release note: None --- pkg/util/log/buffer_sink.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/util/log/buffer_sink.go b/pkg/util/log/buffer_sink.go index 6c2167ae6fc4..810191c90bb8 100644 --- a/pkg/util/log/buffer_sink.go +++ b/pkg/util/log/buffer_sink.go @@ -138,6 +138,7 @@ func (bs *bufferSink) accumulator(ctx context.Context) { // TODO(knz): this seems incomplete: there may be multiple // goroutines writing to messageCh concurrently, and so multiple // messages might be queued when Done() signals termination. + // See: https://github.com/cockroachdb/cockroach/issues/72455 select { case m := <-bs.messageCh: appendMessage(m) From 44eadb5e7602d53aa2d2fc8138d8f139c0f56858 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Thu, 4 Nov 2021 21:53:44 +0100 Subject: [PATCH 13/19] log: link issue #72458 where appropriate Release note: None --- pkg/util/log/buffer_sink.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/util/log/buffer_sink.go b/pkg/util/log/buffer_sink.go index 810191c90bb8..ed90877779dd 100644 --- a/pkg/util/log/buffer_sink.go +++ b/pkg/util/log/buffer_sink.go @@ -172,6 +172,9 @@ func (bs *bufferSink) accumulator(ctx context.Context) { // flusher concatenates bundled messages and sends them to the child sink. // +// TODO(knz): How does this interact with the flusher logic in log_flush.go? +// See: https://github.com/cockroachdb/cockroach/issues/72458 +// // TODO(knz): this code should be extended to detect server shutdowns: // as currently implemented the flusher will only terminate after all // the writes in the channel are completed. If the writes are slow, From 7a493ef1cf4249ef45d4509b4cc8446506693570 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Thu, 4 Nov 2021 21:56:30 +0100 Subject: [PATCH 14/19] log: link issue #72459 where appropriate Release note: None --- pkg/util/log/buffer_sink.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/util/log/buffer_sink.go b/pkg/util/log/buffer_sink.go index ed90877779dd..b0df0ad386d4 100644 --- a/pkg/util/log/buffer_sink.go +++ b/pkg/util/log/buffer_sink.go @@ -180,6 +180,7 @@ func (bs *bufferSink) accumulator(ctx context.Context) { // the writes in the channel are completed. If the writes are slow, // the goroutine may not terminate properly when server shutdown is // requested. +// See: https://github.com/cockroachdb/cockroach/issues/72459 func (bs *bufferSink) flusher(ctx context.Context) { for b := range bs.flushCh { if len(b.messages) > 0 { From 63bebf08613235f94f8ba83417e72463faa33ad8 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Thu, 4 Nov 2021 21:58:56 +0100 Subject: [PATCH 15/19] log: link issue #72460 where appropriate Release note: None --- pkg/util/log/buffer_sink.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/util/log/buffer_sink.go b/pkg/util/log/buffer_sink.go index b0df0ad386d4..610dd0b7bfd8 100644 --- a/pkg/util/log/buffer_sink.go +++ b/pkg/util/log/buffer_sink.go @@ -156,6 +156,7 @@ func (bs *bufferSink) accumulator(ctx context.Context) { // the flusher). Also it's not clear why this is using a custom // atomic counter? Why not using a buffered channel and check // via `select` that the write is possible? + // See: https://github.com/cockroachdb/cockroach/issues/72460 if atomic.LoadInt32(&bs.nInFlight) < bs.maxInFlight { bs.flushCh <- b atomic.AddInt32(&bs.nInFlight, 1) From 2752c924ac9c21185a55e4da3675cffbb006f189 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Thu, 4 Nov 2021 22:03:05 +0100 Subject: [PATCH 16/19] log: link issue #72461 where relevant Release note: None --- pkg/util/log/flags.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/util/log/flags.go b/pkg/util/log/flags.go index e91b793a7753..34b476bbc697 100644 --- a/pkg/util/log/flags.go +++ b/pkg/util/log/flags.go @@ -408,11 +408,13 @@ func attachBufferWrapper(ctx context.Context, s *sinkInfo, c logconfig.CommonSin errCallback := func(err error) { // TODO(knz): explain which sink is encountering the error in the // error message. + // See: https://github.com/cockroachdb/cockroach/issues/72461 Ops.Errorf(context.Background(), "logging error: %v", err) } if s.criticality { // TODO(knz): explain which sink is encountering the error in the // error message. + // See: https://github.com/cockroachdb/cockroach/issues/72461 errCallback = func(err error) { Ops.Errorf(context.Background(), "logging error: %v", err) From d9087aff9ba866616a26f506f07ad11bf32d27e0 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Thu, 4 Nov 2021 15:03:08 +0100 Subject: [PATCH 17/19] kvserver: use wrapper type for Store.mu.replicas This simplifies lots of callers and it will also make it easier to work on #72374, where this map will start containing more than one type as value. Release note: None --- pkg/kv/kvserver/BUILD.bazel | 1 + pkg/kv/kvserver/helpers_test.go | 9 ++- pkg/kv/kvserver/store.go | 20 +++---- pkg/kv/kvserver/store_create_replica.go | 8 +-- pkg/kv/kvserver/store_raft.go | 21 +++---- pkg/kv/kvserver/store_remove_replica.go | 5 +- pkg/kv/kvserver/store_replicas_by_rangeid.go | 63 ++++++++++++++++++++ pkg/kv/kvserver/store_snapshot.go | 5 +- 8 files changed, 93 insertions(+), 39 deletions(-) create mode 100644 pkg/kv/kvserver/store_replicas_by_rangeid.go diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 5a8171940b37..9e6053192dc8 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -83,6 +83,7 @@ go_library( "store_rebalancer.go", "store_remove_replica.go", "store_replica_btree.go", + "store_replicas_by_rangeid.go", "store_send.go", "store_snapshot.go", "store_split.go", diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index e0e34040518d..70fefa4ea431 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -21,7 +21,6 @@ import ( "math/rand" "testing" "time" - "unsafe" circuit "github.com/cockroachdb/circuitbreaker" "github.com/cockroachdb/cockroach/pkg/kv" @@ -515,7 +514,7 @@ func WriteRandomDataToRange( } func WatchForDisappearingReplicas(t testing.TB, store *Store) { - m := make(map[int64]struct{}) + m := make(map[roachpb.RangeID]struct{}) for { select { case <-store.Stopper().ShouldQuiesce(): @@ -523,9 +522,9 @@ func WatchForDisappearingReplicas(t testing.TB, store *Store) { default: } - store.mu.replicas.Range(func(k int64, v unsafe.Pointer) bool { - m[k] = struct{}{} - return true + _ = store.mu.replicas.Range(func(repl *Replica) error { + m[repl.RangeID] = struct{}{} + return nil }) for k := range m { diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 7c80312f6930..dbe78e872c58 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -364,9 +364,9 @@ func (rs *storeReplicaVisitor) Visit(visitor func(*Replica) bool) { // stale) view of all Replicas without holding the Store lock. In particular, // no locks are acquired during the copy process. rs.repls = nil - rs.store.mu.replicas.Range(func(k int64, v unsafe.Pointer) bool { - rs.repls = append(rs.repls, (*Replica)(v)) - return true + _ = rs.store.mu.replicas.Range(func(repl *Replica) error { + rs.repls = append(rs.repls, repl) + return nil }) if rs.ordered { @@ -586,7 +586,7 @@ type Store struct { syncutil.RWMutex // Map of replicas by Range ID (map[roachpb.RangeID]*Replica). This // includes `uninitReplicas`. May be read without holding Store.mu. - replicas syncutil.IntMap + replicas rangeIDReplicaMap // A btree key containing objects of type *Replica or *ReplicaPlaceholder. // Both types have an associated key range; the btree is keyed on their // start keys. @@ -2411,8 +2411,8 @@ func (s *Store) GetReplica(rangeID roachpb.RangeID) (*Replica, error) { // GetReplicaIfExists returns the replica with the given RangeID or nil. func (s *Store) GetReplicaIfExists(rangeID roachpb.RangeID) *Replica { - if value, ok := s.mu.replicas.Load(int64(rangeID)); ok { - return (*Replica)(value) + if repl, ok := s.mu.replicas.Load(rangeID); ok { + return repl } return nil } @@ -2459,8 +2459,8 @@ func (s *Store) getOverlappingKeyRangeLocked( // RaftStatus returns the current raft status of the local replica of // the given range. func (s *Store) RaftStatus(rangeID roachpb.RangeID) *raft.Status { - if value, ok := s.mu.replicas.Load(int64(rangeID)); ok { - return (*Replica)(value).RaftStatus() + if repl, ok := s.mu.replicas.Load(rangeID); ok { + return repl.RaftStatus() } return nil } @@ -2590,9 +2590,9 @@ func (s *Store) Capacity(ctx context.Context, useCached bool) (roachpb.StoreCapa // performance critical code. func (s *Store) ReplicaCount() int { var count int - s.mu.replicas.Range(func(_ int64, _ unsafe.Pointer) bool { + _ = s.mu.replicas.Range(func(*Replica) error { count++ - return true + return nil }) return count } diff --git a/pkg/kv/kvserver/store_create_replica.go b/pkg/kv/kvserver/store_create_replica.go index a3ed8b7f2a81..910453ba6c0a 100644 --- a/pkg/kv/kvserver/store_create_replica.go +++ b/pkg/kv/kvserver/store_create_replica.go @@ -13,7 +13,6 @@ package kvserver import ( "context" "time" - "unsafe" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -80,8 +79,7 @@ func (s *Store) tryGetOrCreateReplica( creatingReplica *roachpb.ReplicaDescriptor, ) (_ *Replica, created bool, _ error) { // The common case: look up an existing (initialized) replica. - if value, ok := s.mu.replicas.Load(int64(rangeID)); ok { - repl := (*Replica)(value) + if repl, ok := s.mu.replicas.Load(rangeID); ok { repl.raftMu.Lock() // not unlocked on success repl.mu.Lock() @@ -298,7 +296,7 @@ func (s *Store) addReplicaToRangeMapLocked(repl *Replica) error { // same replica object. This occurs during splits where the right-hand side // is added to the replicas map before it is initialized. if existing, loaded := s.mu.replicas.LoadOrStore( - int64(repl.RangeID), unsafe.Pointer(repl)); loaded && (*Replica)(existing) != repl { + repl.RangeID, repl); loaded && existing != repl { return errors.Errorf("%s: replica already exists", repl) } // Check whether the replica is unquiesced but not in the map. This @@ -314,7 +312,7 @@ func (s *Store) addReplicaToRangeMapLocked(repl *Replica) error { } // maybeMarkReplicaInitializedLocked should be called whenever a previously -// unintialized replica has become initialized so that the store can update its +// uninitialized replica has become initialized so that the store can update its // internal bookkeeping. It requires that Store.mu and Replica.raftMu // are locked. func (s *Store) maybeMarkReplicaInitializedLockedReplLocked( diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index c1c06cc8f491..cfe741f2df50 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -484,7 +484,7 @@ func (s *Store) processRequestQueue(ctx context.Context, rangeID roachpb.RangeID // forgiving. // // See https://github.com/cockroachdb/cockroach/issues/30951#issuecomment-428010411. - if _, exists := s.mu.replicas.Load(int64(rangeID)); !exists { + if _, exists := s.mu.replicas.Load(rangeID); !exists { q.Lock() if len(q.infos) == 0 { s.replicaQueues.Delete(int64(rangeID)) @@ -500,12 +500,11 @@ func (s *Store) processRequestQueue(ctx context.Context, rangeID roachpb.RangeID } func (s *Store) processReady(ctx context.Context, rangeID roachpb.RangeID) { - value, ok := s.mu.replicas.Load(int64(rangeID)) + r, ok := s.mu.replicas.Load(rangeID) if !ok { return } - r := (*Replica)(value) ctx = r.raftSchedulerCtx(ctx) start := timeutil.Now() stats, expl, err := r.handleRaftReady(ctx, noSnap) @@ -524,14 +523,13 @@ func (s *Store) processReady(ctx context.Context, rangeID roachpb.RangeID) { } func (s *Store) processTick(ctx context.Context, rangeID roachpb.RangeID) bool { - value, ok := s.mu.replicas.Load(int64(rangeID)) + r, ok := s.mu.replicas.Load(rangeID) if !ok { return false } livenessMap, _ := s.livenessMap.Load().(liveness.IsLiveMap) start := timeutil.Now() - r := (*Replica)(value) ctx = r.raftSchedulerCtx(ctx) exists, err := r.tick(ctx, livenessMap) if err != nil { @@ -560,8 +558,7 @@ func (s *Store) processTick(ctx context.Context, rangeID roachpb.RangeID) bool { func (s *Store) nodeIsLiveCallback(l livenesspb.Liveness) { s.updateLivenessMap() - s.mu.replicas.Range(func(k int64, v unsafe.Pointer) bool { - r := (*Replica)(v) + _ = s.mu.replicas.Range(func(r *Replica) error { r.mu.RLock() quiescent := r.mu.quiescent lagging := r.mu.laggingFollowersOnQuiesce @@ -570,7 +567,7 @@ func (s *Store) nodeIsLiveCallback(l livenesspb.Liveness) { if quiescent && (lagging.MemberStale(l) || !laggingAccurate) { r.unquiesce() } - return true + return nil }) } @@ -730,13 +727,13 @@ func (s *Store) sendQueuedHeartbeatsToNode( if !s.cfg.Transport.SendAsync(chReq, rpc.SystemClass) { for _, beat := range beats { - if value, ok := s.mu.replicas.Load(int64(beat.RangeID)); ok { - (*Replica)(value).addUnreachableRemoteReplica(beat.ToReplicaID) + if repl, ok := s.mu.replicas.Load(beat.RangeID); ok { + repl.addUnreachableRemoteReplica(beat.ToReplicaID) } } for _, resp := range resps { - if value, ok := s.mu.replicas.Load(int64(resp.RangeID)); ok { - (*Replica)(value).addUnreachableRemoteReplica(resp.ToReplicaID) + if repl, ok := s.mu.replicas.Load(resp.RangeID); ok { + repl.addUnreachableRemoteReplica(resp.ToReplicaID) } } return 0 diff --git a/pkg/kv/kvserver/store_remove_replica.go b/pkg/kv/kvserver/store_remove_replica.go index d919f29abf67..cb8cd493d2f5 100644 --- a/pkg/kv/kvserver/store_remove_replica.go +++ b/pkg/kv/kvserver/store_remove_replica.go @@ -238,11 +238,10 @@ func (s *Store) removeUninitializedReplicaRaftMuLocked( defer s.mu.Unlock() // Sanity check, could be removed. - value, stillExists := s.mu.replicas.Load(int64(rep.RangeID)) + existing, stillExists := s.mu.replicas.Load(rep.RangeID) if !stillExists { log.Fatalf(ctx, "uninitialized replica was removed in the meantime") } - existing := (*Replica)(value) if existing == rep { log.Infof(ctx, "removing uninitialized replica %v", rep) } else { @@ -264,7 +263,7 @@ func (s *Store) unlinkReplicaByRangeIDLocked(ctx context.Context, rangeID roachp s.unquiescedReplicas.Unlock() delete(s.mu.uninitReplicas, rangeID) s.replicaQueues.Delete(int64(rangeID)) - s.mu.replicas.Delete(int64(rangeID)) + s.mu.replicas.Delete(rangeID) s.unregisterLeaseholderByID(ctx, rangeID) } diff --git a/pkg/kv/kvserver/store_replicas_by_rangeid.go b/pkg/kv/kvserver/store_replicas_by_rangeid.go new file mode 100644 index 000000000000..c98bd471e5ac --- /dev/null +++ b/pkg/kv/kvserver/store_replicas_by_rangeid.go @@ -0,0 +1,63 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver + +import ( + "unsafe" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/iterutil" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" +) + +type rangeIDReplicaMap struct { + m syncutil.IntMap +} + +// Load loads the Replica for the RangeID. If not found, returns +// (nil, false), otherwise the Replica and true. +func (m *rangeIDReplicaMap) Load(rangeID roachpb.RangeID) (*Replica, bool) { + val, ok := m.m.Load(int64(rangeID)) + return (*Replica)(val), ok +} + +// LoadOrStore loads the replica and returns it (and `true`). If it does not +// exist, atomically inserts the provided Replica and returns it along with +// `false`. +func (m *rangeIDReplicaMap) LoadOrStore( + rangeID roachpb.RangeID, repl *Replica, +) (_ *Replica, loaded bool) { + val, loaded := m.m.LoadOrStore(int64(rangeID), unsafe.Pointer(repl)) + return (*Replica)(val), loaded +} + +// Delete drops the Replica if it existed in the map. +func (m *rangeIDReplicaMap) Delete(rangeID roachpb.RangeID) { + m.m.Delete(int64(rangeID)) +} + +// Range invokes the provided function with each Replica in the map. +// Iteration stops on any error. `iterutil.StopIteration()` can be +// returned from the closure to stop iteration without an error +// resulting from Range(). +func (m *rangeIDReplicaMap) Range(f func(*Replica) error) error { + var err error + v := func(k int64, v unsafe.Pointer) (wantMore bool) { + err = f((*Replica)(v)) + return err == nil + } + m.m.Range(v) + if errors.Is(err, iterutil.StopIteration()) { + return nil + } + return nil +} diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index d5bf8264c207..7ba412895112 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -450,13 +450,10 @@ func (s *Store) canAcceptSnapshotLocked( desc := *snapHeader.State.Desc // First, check for an existing Replica. - v, ok := s.mu.replicas.Load( - int64(desc.RangeID), - ) + existingRepl, ok := s.mu.replicas.Load(desc.RangeID) if !ok { return nil, errors.Errorf("canAcceptSnapshotLocked requires a replica present") } - existingRepl := (*Replica)(v) // The raftMu is held which allows us to use the existing replica as a // placeholder when we decide that the snapshot can be applied. As long // as the caller releases the raftMu only after feeding the snapshot From be9bf0037fe23cf7e81a2d2f13401bdaa770e343 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Thu, 4 Nov 2021 15:07:48 +0100 Subject: [PATCH 18/19] kvserver: rename replicas to replicasByRangeID Release note: None --- pkg/kv/kvserver/helpers_test.go | 4 ++-- pkg/kv/kvserver/store.go | 10 +++++----- pkg/kv/kvserver/store_create_replica.go | 4 ++-- pkg/kv/kvserver/store_raft.go | 12 ++++++------ pkg/kv/kvserver/store_remove_replica.go | 4 ++-- pkg/kv/kvserver/store_snapshot.go | 2 +- 6 files changed, 18 insertions(+), 18 deletions(-) diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index 70fefa4ea431..ef127376f7c0 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -522,13 +522,13 @@ func WatchForDisappearingReplicas(t testing.TB, store *Store) { default: } - _ = store.mu.replicas.Range(func(repl *Replica) error { + _ = store.mu.replicasByRangeID.Range(func(repl *Replica) error { m[repl.RangeID] = struct{}{} return nil }) for k := range m { - if _, ok := store.mu.replicas.Load(k); !ok { + if _, ok := store.mu.replicasByRangeID.Load(k); !ok { t.Fatalf("r%d disappeared from Store.mu.replicas map", k) } } diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index dbe78e872c58..ee4f817688b3 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -364,7 +364,7 @@ func (rs *storeReplicaVisitor) Visit(visitor func(*Replica) bool) { // stale) view of all Replicas without holding the Store lock. In particular, // no locks are acquired during the copy process. rs.repls = nil - _ = rs.store.mu.replicas.Range(func(repl *Replica) error { + _ = rs.store.mu.replicasByRangeID.Range(func(repl *Replica) error { rs.repls = append(rs.repls, repl) return nil }) @@ -586,7 +586,7 @@ type Store struct { syncutil.RWMutex // Map of replicas by Range ID (map[roachpb.RangeID]*Replica). This // includes `uninitReplicas`. May be read without holding Store.mu. - replicas rangeIDReplicaMap + replicasByRangeID rangeIDReplicaMap // A btree key containing objects of type *Replica or *ReplicaPlaceholder. // Both types have an associated key range; the btree is keyed on their // start keys. @@ -2411,7 +2411,7 @@ func (s *Store) GetReplica(rangeID roachpb.RangeID) (*Replica, error) { // GetReplicaIfExists returns the replica with the given RangeID or nil. func (s *Store) GetReplicaIfExists(rangeID roachpb.RangeID) *Replica { - if repl, ok := s.mu.replicas.Load(rangeID); ok { + if repl, ok := s.mu.replicasByRangeID.Load(rangeID); ok { return repl } return nil @@ -2459,7 +2459,7 @@ func (s *Store) getOverlappingKeyRangeLocked( // RaftStatus returns the current raft status of the local replica of // the given range. func (s *Store) RaftStatus(rangeID roachpb.RangeID) *raft.Status { - if repl, ok := s.mu.replicas.Load(rangeID); ok { + if repl, ok := s.mu.replicasByRangeID.Load(rangeID); ok { return repl.RaftStatus() } return nil @@ -2590,7 +2590,7 @@ func (s *Store) Capacity(ctx context.Context, useCached bool) (roachpb.StoreCapa // performance critical code. func (s *Store) ReplicaCount() int { var count int - _ = s.mu.replicas.Range(func(*Replica) error { + _ = s.mu.replicasByRangeID.Range(func(*Replica) error { count++ return nil }) diff --git a/pkg/kv/kvserver/store_create_replica.go b/pkg/kv/kvserver/store_create_replica.go index 910453ba6c0a..0e0e92a281b0 100644 --- a/pkg/kv/kvserver/store_create_replica.go +++ b/pkg/kv/kvserver/store_create_replica.go @@ -79,7 +79,7 @@ func (s *Store) tryGetOrCreateReplica( creatingReplica *roachpb.ReplicaDescriptor, ) (_ *Replica, created bool, _ error) { // The common case: look up an existing (initialized) replica. - if repl, ok := s.mu.replicas.Load(rangeID); ok { + if repl, ok := s.mu.replicasByRangeID.Load(rangeID); ok { repl.raftMu.Lock() // not unlocked on success repl.mu.Lock() @@ -295,7 +295,7 @@ func (s *Store) addReplicaToRangeMapLocked(repl *Replica) error { // It's ok for the replica to exist in the replicas map as long as it is the // same replica object. This occurs during splits where the right-hand side // is added to the replicas map before it is initialized. - if existing, loaded := s.mu.replicas.LoadOrStore( + if existing, loaded := s.mu.replicasByRangeID.LoadOrStore( repl.RangeID, repl); loaded && existing != repl { return errors.Errorf("%s: replica already exists", repl) } diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index cfe741f2df50..f6c91f6544fe 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -484,7 +484,7 @@ func (s *Store) processRequestQueue(ctx context.Context, rangeID roachpb.RangeID // forgiving. // // See https://github.com/cockroachdb/cockroach/issues/30951#issuecomment-428010411. - if _, exists := s.mu.replicas.Load(rangeID); !exists { + if _, exists := s.mu.replicasByRangeID.Load(rangeID); !exists { q.Lock() if len(q.infos) == 0 { s.replicaQueues.Delete(int64(rangeID)) @@ -500,7 +500,7 @@ func (s *Store) processRequestQueue(ctx context.Context, rangeID roachpb.RangeID } func (s *Store) processReady(ctx context.Context, rangeID roachpb.RangeID) { - r, ok := s.mu.replicas.Load(rangeID) + r, ok := s.mu.replicasByRangeID.Load(rangeID) if !ok { return } @@ -523,7 +523,7 @@ func (s *Store) processReady(ctx context.Context, rangeID roachpb.RangeID) { } func (s *Store) processTick(ctx context.Context, rangeID roachpb.RangeID) bool { - r, ok := s.mu.replicas.Load(rangeID) + r, ok := s.mu.replicasByRangeID.Load(rangeID) if !ok { return false } @@ -558,7 +558,7 @@ func (s *Store) processTick(ctx context.Context, rangeID roachpb.RangeID) bool { func (s *Store) nodeIsLiveCallback(l livenesspb.Liveness) { s.updateLivenessMap() - _ = s.mu.replicas.Range(func(r *Replica) error { + _ = s.mu.replicasByRangeID.Range(func(r *Replica) error { r.mu.RLock() quiescent := r.mu.quiescent lagging := r.mu.laggingFollowersOnQuiesce @@ -727,12 +727,12 @@ func (s *Store) sendQueuedHeartbeatsToNode( if !s.cfg.Transport.SendAsync(chReq, rpc.SystemClass) { for _, beat := range beats { - if repl, ok := s.mu.replicas.Load(beat.RangeID); ok { + if repl, ok := s.mu.replicasByRangeID.Load(beat.RangeID); ok { repl.addUnreachableRemoteReplica(beat.ToReplicaID) } } for _, resp := range resps { - if repl, ok := s.mu.replicas.Load(resp.RangeID); ok { + if repl, ok := s.mu.replicasByRangeID.Load(resp.RangeID); ok { repl.addUnreachableRemoteReplica(resp.ToReplicaID) } } diff --git a/pkg/kv/kvserver/store_remove_replica.go b/pkg/kv/kvserver/store_remove_replica.go index cb8cd493d2f5..8f47c805c840 100644 --- a/pkg/kv/kvserver/store_remove_replica.go +++ b/pkg/kv/kvserver/store_remove_replica.go @@ -238,7 +238,7 @@ func (s *Store) removeUninitializedReplicaRaftMuLocked( defer s.mu.Unlock() // Sanity check, could be removed. - existing, stillExists := s.mu.replicas.Load(rep.RangeID) + existing, stillExists := s.mu.replicasByRangeID.Load(rep.RangeID) if !stillExists { log.Fatalf(ctx, "uninitialized replica was removed in the meantime") } @@ -263,7 +263,7 @@ func (s *Store) unlinkReplicaByRangeIDLocked(ctx context.Context, rangeID roachp s.unquiescedReplicas.Unlock() delete(s.mu.uninitReplicas, rangeID) s.replicaQueues.Delete(int64(rangeID)) - s.mu.replicas.Delete(rangeID) + s.mu.replicasByRangeID.Delete(rangeID) s.unregisterLeaseholderByID(ctx, rangeID) } diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index 7ba412895112..e34d412e7dde 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -450,7 +450,7 @@ func (s *Store) canAcceptSnapshotLocked( desc := *snapHeader.State.Desc // First, check for an existing Replica. - existingRepl, ok := s.mu.replicas.Load(desc.RangeID) + existingRepl, ok := s.mu.replicasByRangeID.Load(desc.RangeID) if !ok { return nil, errors.Errorf("canAcceptSnapshotLocked requires a replica present") } From 2113763e676479963445fcf437c15fc0ba64dab8 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Thu, 4 Nov 2021 22:59:21 +0100 Subject: [PATCH 19/19] kvserver: touch up rangeIDReplicaMap - avoid the struct wrapper (and thus the need to name the inner map) - don't return an error in Range() Release note: None --- pkg/kv/kvserver/helpers_test.go | 3 +- pkg/kv/kvserver/store.go | 6 ++-- pkg/kv/kvserver/store_raft.go | 3 +- pkg/kv/kvserver/store_replicas_by_rangeid.go | 30 ++++++-------------- 4 files changed, 13 insertions(+), 29 deletions(-) diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index ef127376f7c0..e92a81bb820f 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -522,9 +522,8 @@ func WatchForDisappearingReplicas(t testing.TB, store *Store) { default: } - _ = store.mu.replicasByRangeID.Range(func(repl *Replica) error { + store.mu.replicasByRangeID.Range(func(repl *Replica) { m[repl.RangeID] = struct{}{} - return nil }) for k := range m { diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index ee4f817688b3..4ef95d32a143 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -364,9 +364,8 @@ func (rs *storeReplicaVisitor) Visit(visitor func(*Replica) bool) { // stale) view of all Replicas without holding the Store lock. In particular, // no locks are acquired during the copy process. rs.repls = nil - _ = rs.store.mu.replicasByRangeID.Range(func(repl *Replica) error { + rs.store.mu.replicasByRangeID.Range(func(repl *Replica) { rs.repls = append(rs.repls, repl) - return nil }) if rs.ordered { @@ -2590,9 +2589,8 @@ func (s *Store) Capacity(ctx context.Context, useCached bool) (roachpb.StoreCapa // performance critical code. func (s *Store) ReplicaCount() int { var count int - _ = s.mu.replicasByRangeID.Range(func(*Replica) error { + s.mu.replicasByRangeID.Range(func(*Replica) { count++ - return nil }) return count } diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index f6c91f6544fe..69ee2ff96c88 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -558,7 +558,7 @@ func (s *Store) processTick(ctx context.Context, rangeID roachpb.RangeID) bool { func (s *Store) nodeIsLiveCallback(l livenesspb.Liveness) { s.updateLivenessMap() - _ = s.mu.replicasByRangeID.Range(func(r *Replica) error { + s.mu.replicasByRangeID.Range(func(r *Replica) { r.mu.RLock() quiescent := r.mu.quiescent lagging := r.mu.laggingFollowersOnQuiesce @@ -567,7 +567,6 @@ func (s *Store) nodeIsLiveCallback(l livenesspb.Liveness) { if quiescent && (lagging.MemberStale(l) || !laggingAccurate) { r.unquiesce() } - return nil }) } diff --git a/pkg/kv/kvserver/store_replicas_by_rangeid.go b/pkg/kv/kvserver/store_replicas_by_rangeid.go index c98bd471e5ac..fe8c943181e3 100644 --- a/pkg/kv/kvserver/store_replicas_by_rangeid.go +++ b/pkg/kv/kvserver/store_replicas_by_rangeid.go @@ -14,19 +14,15 @@ import ( "unsafe" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/util/iterutil" "github.com/cockroachdb/cockroach/pkg/util/syncutil" - "github.com/cockroachdb/errors" ) -type rangeIDReplicaMap struct { - m syncutil.IntMap -} +type rangeIDReplicaMap syncutil.IntMap // Load loads the Replica for the RangeID. If not found, returns // (nil, false), otherwise the Replica and true. func (m *rangeIDReplicaMap) Load(rangeID roachpb.RangeID) (*Replica, bool) { - val, ok := m.m.Load(int64(rangeID)) + val, ok := (*syncutil.IntMap)(m).Load(int64(rangeID)) return (*Replica)(val), ok } @@ -36,28 +32,20 @@ func (m *rangeIDReplicaMap) Load(rangeID roachpb.RangeID) (*Replica, bool) { func (m *rangeIDReplicaMap) LoadOrStore( rangeID roachpb.RangeID, repl *Replica, ) (_ *Replica, loaded bool) { - val, loaded := m.m.LoadOrStore(int64(rangeID), unsafe.Pointer(repl)) + val, loaded := (*syncutil.IntMap)(m).LoadOrStore(int64(rangeID), unsafe.Pointer(repl)) return (*Replica)(val), loaded } // Delete drops the Replica if it existed in the map. func (m *rangeIDReplicaMap) Delete(rangeID roachpb.RangeID) { - m.m.Delete(int64(rangeID)) + (*syncutil.IntMap)(m).Delete(int64(rangeID)) } // Range invokes the provided function with each Replica in the map. -// Iteration stops on any error. `iterutil.StopIteration()` can be -// returned from the closure to stop iteration without an error -// resulting from Range(). -func (m *rangeIDReplicaMap) Range(f func(*Replica) error) error { - var err error - v := func(k int64, v unsafe.Pointer) (wantMore bool) { - err = f((*Replica)(v)) - return err == nil - } - m.m.Range(v) - if errors.Is(err, iterutil.StopIteration()) { - return nil +func (m *rangeIDReplicaMap) Range(f func(*Replica)) { + v := func(k int64, v unsafe.Pointer) bool { + f((*Replica)(v)) + return true // wantMore } - return nil + (*syncutil.IntMap)(m).Range(v) }