diff --git a/pkg/cli/log_flags_test.go b/pkg/cli/log_flags_test.go index f7421debb7de..550d10fa4b85 100644 --- a/pkg/cli/log_flags_test.go +++ b/pkg/cli/log_flags_test.go @@ -42,7 +42,9 @@ func TestSetupLogging(t *testing.T) { `format: json-fluent-compact, ` + `redactable: true, ` + `exit-on-error: false, ` + - `buffering: NONE}` + `buffering: {max-staleness: 5s, ` + + `flush-trigger-size: 1.0MiB, ` + + `max-buffer-size: 50MiB}}` const defaultHTTPConfig = `http-defaults: {` + `method: POST, ` + `unsafe-tls: false, ` + @@ -52,7 +54,9 @@ func TestSetupLogging(t *testing.T) { `format: json-compact, ` + `redactable: true, ` + `exit-on-error: false, ` + - `buffering: NONE}` + `buffering: {max-staleness: 5s, ` + + `flush-trigger-size: 1.0MiB, ` + + `max-buffer-size: 50MiB}}` stdFileDefaultsRe := regexp.MustCompile( `file-defaults: \{` + `dir: (?P[^,]+), ` + diff --git a/pkg/util/log/flags.go b/pkg/util/log/flags.go index 53c233d16564..107d34c4119e 100644 --- a/pkg/util/log/flags.go +++ b/pkg/util/log/flags.go @@ -376,13 +376,8 @@ func newHTTPSinkInfo(c logconfig.HTTPSinkConfig) (*sinkInfo, error) { return nil, err } info.applyFilters(c.Channels) - httpSink, err := newHTTPSink(*c.Address, httpSinkOptions{ - method: string(*c.Method), - unsafeTLS: *c.UnsafeTLS, - timeout: *c.Timeout, - disableKeepAlives: *c.DisableKeepAlives, - contentType: info.formatter.contentType(), - }) + + httpSink, err := newHTTPSink(c) if err != nil { return nil, err } @@ -442,6 +437,16 @@ func (l *sinkInfo) describeAppliedConfig() (c logconfig.CommonSinkConfig) { c.Criticality = &l.criticality f := l.formatter.formatterName() c.Format = &f + bufferedSink, ok := l.sink.(*bufferedSink) + if ok { + c.Buffering.MaxStaleness = &bufferedSink.maxStaleness + triggerSize := logconfig.ByteSize(bufferedSink.triggerSize) + c.Buffering.FlushTriggerSize = &triggerSize + bufferedSink.mu.Lock() + maxBufferSize := logconfig.ByteSize(bufferedSink.mu.buf.maxSizeBytes) + c.Buffering.MaxBufferSize = &maxBufferSize + bufferedSink.mu.Unlock() + } return c } @@ -542,15 +547,23 @@ func DescribeAppliedConfig() string { config.Sinks.FluentServers = make(map[string]*logconfig.FluentSinkConfig) sIdx := 1 _ = logging.allSinkInfos.iter(func(l *sinkInfo) error { - fluentSink, ok := l.sink.(*fluentSink) + flSink, ok := l.sink.(*fluentSink) if !ok { - return nil + // Check to see if it's a fluentSink wrapped in a bufferedSink. + bufferedSink, ok := l.sink.(*bufferedSink) + if !ok { + return nil + } + flSink, ok = bufferedSink.child.(*fluentSink) + if !ok { + return nil + } } fc := &logconfig.FluentSinkConfig{} fc.CommonSinkConfig = l.describeAppliedConfig() - fc.Net = fluentSink.network - fc.Address = fluentSink.addr + fc.Net = flSink.network + fc.Address = flSink.addr // Describe the connections to this fluent sink. for ch, logger := range chans { @@ -562,6 +575,28 @@ func DescribeAppliedConfig() string { return nil }) + // Describe the http sinks. + config.Sinks.HTTPServers = make(map[string]*logconfig.HTTPSinkConfig) + sIdx = 1 + _ = logging.allSinkInfos.iter(func(l *sinkInfo) error { + netSink, ok := l.sink.(*httpSink) + if !ok { + // Check to see if it's a httpSink wrapped in a bufferedSink. + bufferedSink, ok := l.sink.(*bufferedSink) + if !ok { + return nil + } + netSink, ok = bufferedSink.child.(*httpSink) + if !ok { + return nil + } + } + skey := fmt.Sprintf("s%d", sIdx) + sIdx++ + config.Sinks.HTTPServers[skey] = netSink.config + return nil + }) + // Note: we cannot return 'config' directly, because this captures // certain variables from the loggers by reference and thus could be // invalidated by concurrent uses of ApplyConfig(). diff --git a/pkg/util/log/fluent_client_test.go b/pkg/util/log/fluent_client_test.go index 824cdcf7809d..306ce2db65c4 100644 --- a/pkg/util/log/fluent_client_test.go +++ b/pkg/util/log/fluent_client_test.go @@ -44,10 +44,24 @@ func TestFluentClient(t *testing.T) { // Set up a logging configuration with the server we've just set up // as target for the OPS channel. cfg := logconfig.DefaultConfig() + zeroBytes := logconfig.ByteSize(0) + zeroDuration := time.Duration(0) cfg.Sinks.FluentServers = map[string]*logconfig.FluentSinkConfig{ "ops": { Address: serverAddr, - Channels: logconfig.SelectChannels(channel.OPS)}, + Channels: logconfig.SelectChannels(channel.OPS), + FluentDefaults: logconfig.FluentDefaults{ + CommonSinkConfig: logconfig.CommonSinkConfig{ + Buffering: logconfig.CommonBufferSinkConfigWrapper{ + CommonBufferSinkConfig: logconfig.CommonBufferSinkConfig{ + MaxStaleness: &zeroDuration, + FlushTriggerSize: &zeroBytes, + MaxBufferSize: &zeroBytes, + }, + }, + }, + }, + }, } // Derive a full config using the same directory as the // TestLogScope. @@ -80,7 +94,7 @@ func TestFluentClient(t *testing.T) { msg, err := json.Marshal(info) require.NoError(t, err) - const expected = `{"c":1,"f":"util/log/fluent_client_test.go","g":222,"l":63,"message":"hello world","n":1,"r":1,"s":1,"sev":"I","t":"XXX","tag":"logtest.ops","v":"v999.0.0"}` + const expected = `{"c":1,"f":"util/log/fluent_client_test.go","g":222,"l":77,"message":"hello world","n":1,"r":1,"s":1,"sev":"I","t":"XXX","tag":"logtest.ops","v":"v999.0.0"}` require.Equal(t, expected, string(msg)) } diff --git a/pkg/util/log/http_sink.go b/pkg/util/log/http_sink.go index 8dad22bceaae..df906a0fb3fc 100644 --- a/pkg/util/log/http_sink.go +++ b/pkg/util/log/http_sink.go @@ -16,54 +16,49 @@ import ( "fmt" "net/http" "net/url" - "time" "github.com/cockroachdb/cockroach/pkg/cli/exit" + "github.com/cockroachdb/cockroach/pkg/util/log/logconfig" "github.com/cockroachdb/errors" ) // TODO: HTTP requests should be bound to context via http.NewRequestWithContext // Proper logging context to be decided/designed. - -// httpSinkOptions is safe to use concurrently due to the delegation of -// operations to `http.Client` which is safe to use concurrently. -type httpSinkOptions struct { - unsafeTLS bool - timeout time.Duration - method string - disableKeepAlives bool - contentType string -} - -func newHTTPSink(url string, opt httpSinkOptions) (*httpSink, error) { +func newHTTPSink(c logconfig.HTTPSinkConfig) (*httpSink, error) { transport, ok := http.DefaultTransport.(*http.Transport) if !ok { return nil, errors.AssertionFailedf("http.DefaultTransport is not a http.Transport: %T", http.DefaultTransport) } transport = transport.Clone() - transport.DisableKeepAlives = opt.disableKeepAlives + transport.DisableKeepAlives = *c.DisableKeepAlives hs := &httpSink{ client: http.Client{ Transport: transport, - Timeout: opt.timeout, + Timeout: *c.Timeout, }, - address: url, + address: *c.Address, doRequest: doPost, contentType: "application/octet-stream", } - if opt.unsafeTLS { + if *c.UnsafeTLS { transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} } - if opt.method == http.MethodGet { + if string(*c.Method) == http.MethodGet { hs.doRequest = doGet } - if opt.contentType != "" { - hs.contentType = opt.contentType + f, ok := formatters[*c.Format] + if !ok { + panic(errors.AssertionFailedf("unknown format: %q", *c.Format)) + } + if f.contentType() != "" { + hs.contentType = f.contentType() } + hs.config = &c + return hs, nil } @@ -72,6 +67,7 @@ type httpSink struct { address string contentType string doRequest func(sink *httpSink, logEntry []byte) (*http.Response, error) + config *logconfig.HTTPSinkConfig } // output emits some formatted bytes to this sink. diff --git a/pkg/util/log/http_sink_test.go b/pkg/util/log/http_sink_test.go index 2654eafc0fa8..f9e473824e96 100644 --- a/pkg/util/log/http_sink_test.go +++ b/pkg/util/log/http_sink_test.go @@ -29,6 +29,18 @@ import ( "github.com/stretchr/testify/require" ) +var ( + zeroBytes = logconfig.ByteSize(0) + zeroDuration = time.Duration(0) + disabledBufferingCfg = logconfig.CommonBufferSinkConfigWrapper{ + CommonBufferSinkConfig: logconfig.CommonBufferSinkConfig{ + MaxStaleness: &zeroDuration, + FlushTriggerSize: &zeroBytes, + MaxBufferSize: &zeroBytes, + }, + } +) + // testBase sets the provided HTTPDefaults, logs "hello World", captures the // resulting request to the server, and validates the body with the provided // requestTestFunc. @@ -173,6 +185,9 @@ func TestMessageReceived(t *testing.T) { // We need to disable keepalives otherwise the HTTP server in the // test will let an async goroutine run waiting for more requests. DisableKeepAlives: &tb, + CommonSinkConfig: logconfig.CommonSinkConfig{ + Buffering: disabledBufferingCfg, + }, } testFn := func(_ http.Header, body string) error { @@ -201,6 +216,9 @@ func TestHTTPSinkTimeout(t *testing.T) { // We need to disable keepalives otherwise the HTTP server in the // test will let an async goroutine run waiting for more requests. DisableKeepAlives: &tb, + CommonSinkConfig: logconfig.CommonSinkConfig{ + Buffering: disabledBufferingCfg, + }, } testBase(t, defaults, nil /* testFn */, true /* hangServer */, 500*time.Millisecond) @@ -224,7 +242,8 @@ func TestHTTPSinkContentTypeJSON(t *testing.T) { // test will let an async goroutine run waiting for more requests. DisableKeepAlives: &tb, CommonSinkConfig: logconfig.CommonSinkConfig{ - Format: &format, + Format: &format, + Buffering: disabledBufferingCfg, }, } @@ -258,7 +277,8 @@ func TestHTTPSinkContentTypePlainText(t *testing.T) { // test will let an async goroutine run waiting for more requests. DisableKeepAlives: &tb, CommonSinkConfig: logconfig.CommonSinkConfig{ - Format: &format, + Format: &format, + Buffering: disabledBufferingCfg, }, } diff --git a/pkg/util/log/logconfig/config.go b/pkg/util/log/logconfig/config.go index 0804fd793204..ca7f30f4b409 100644 --- a/pkg/util/log/logconfig/config.go +++ b/pkg/util/log/logconfig/config.go @@ -60,6 +60,19 @@ fluent-defaults: format: ` + DefaultFluentFormat + ` redactable: true exit-on-error: false + buffering: + max-staleness: 5s + flush-trigger-size: 1mib + max-buffer-size: 50mib +http-defaults: + filter: INFO + format: ` + DefaultHTTPFormat + ` + redactable: true + exit-on-error: false + buffering: + max-staleness: 5s + flush-trigger-size: 1mib + max-buffer-size: 50mib sinks: stderr: filter: NONE diff --git a/pkg/util/log/logconfig/testdata/validate b/pkg/util/log/logconfig/testdata/validate index b5a2b6f818ca..72393ec51c5c 100644 --- a/pkg/util/log/logconfig/testdata/validate +++ b/pkg/util/log/logconfig/testdata/validate @@ -99,7 +99,10 @@ sinks: redact: false redactable: true exit-on-error: false - buffering: NONE + buffering: + max-staleness: 5s + flush-trigger-size: 1.0MiB + max-buffer-size: 50MiB stderr: filter: NONE capture-stray-errors: @@ -171,7 +174,10 @@ sinks: redact: false redactable: true exit-on-error: true - buffering: NONE + buffering: + max-staleness: 5s + flush-trigger-size: 1.0MiB + max-buffer-size: 50MiB stderr: filter: NONE capture-stray-errors: diff --git a/pkg/util/log/logconfig/validate.go b/pkg/util/log/logconfig/validate.go index d00f8df8ee77..58050df6b779 100644 --- a/pkg/util/log/logconfig/validate.go +++ b/pkg/util/log/logconfig/validate.go @@ -39,6 +39,9 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) { bt, bf := true, false zeroDuration := time.Duration(0) zeroByteSize := ByteSize(0) + defaultBufferedStaleness := 5 * time.Second + defaultFlushTriggerSize := ByteSize(1024 * 1024) // 1mib + defaultMaxBufferSize := ByteSize(50 * 1024 * 1024) // 50mib baseCommonSinkConfig := CommonSinkConfig{ Filter: logpb.Severity_INFO, @@ -48,10 +51,6 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) { Criticality: &bf, // Buffering is configured to "NONE". This is different from a zero value // which buffers infinitely. - // - // TODO(andrei,alexb): Enable buffering by default for some sinks once the - // shutdown of the bufferedSink is improved. Note that baseFileDefaults - // below does not inherit the buffering settings from here. Buffering: CommonBufferSinkConfigWrapper{ CommonBufferSinkConfig: CommonBufferSinkConfig{ MaxStaleness: &zeroDuration, @@ -84,11 +83,25 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) { baseFluentDefaults := FluentDefaults{ CommonSinkConfig: CommonSinkConfig{ Format: func() *string { s := DefaultFluentFormat; return &s }(), + Buffering: CommonBufferSinkConfigWrapper{ + CommonBufferSinkConfig: CommonBufferSinkConfig{ + MaxStaleness: &defaultBufferedStaleness, + FlushTriggerSize: &defaultFlushTriggerSize, + MaxBufferSize: &defaultMaxBufferSize, + }, + }, }, } baseHTTPDefaults := HTTPDefaults{ CommonSinkConfig: CommonSinkConfig{ Format: func() *string { s := DefaultHTTPFormat; return &s }(), + Buffering: CommonBufferSinkConfigWrapper{ + CommonBufferSinkConfig: CommonBufferSinkConfig{ + MaxStaleness: &defaultBufferedStaleness, + FlushTriggerSize: &defaultFlushTriggerSize, + MaxBufferSize: &defaultMaxBufferSize, + }, + }, }, UnsafeTLS: &bf, DisableKeepAlives: &bf, diff --git a/pkg/util/log/testdata/config b/pkg/util/log/testdata/config index f29824ad41f0..55431dd88d71 100644 --- a/pkg/util/log/testdata/config +++ b/pkg/util/log/testdata/config @@ -79,6 +79,55 @@ sinks: redact: false redactable: true exit-on-error: false + buffering: + max-staleness: 5s + flush-trigger-size: 1.0MiB + max-buffer-size: 50MiB + stderr: + format: crdb-v2-tty + redact: false + redactable: true + exit-on-error: true +capture-stray-errors: + enable: true + dir: TMPDIR + max-group-size: 100MiB + +# Test the default config with an http server. +yaml +sinks: + http-servers: {local: {channels: SESSIONS, address: localhost:5170}} +---- +sinks: + file-groups: + default: + channels: {INFO: all} + dir: TMPDIR + max-file-size: 10MiB + max-group-size: 100MiB + buffered-writes: true + format: crdb-v2 + redact: false + redactable: true + exit-on-error: true + http-servers: + s1: + channels: {INFO: [SESSIONS]} + address: localhost:5170 + method: POST + unsafe-tls: false + timeout: 0s + disable-keep-alives: false + filter: INFO + format: json-compact + redact: false + redactable: true + exit-on-error: false + auditable: false + buffering: + max-staleness: 5s + flush-trigger-size: 1.0MiB + max-buffer-size: 50MiB stderr: format: crdb-v2-tty redact: false