Skip to content

Commit

Permalink
Merge #82893
Browse files Browse the repository at this point in the history
82893: pkg/util/log: enable buffering by default for fluent/http sinks r=andreimatei a=abarganier

This patch updates the default `httpSink` and `fluentSink` configuration
to enable buffering with a `max-staleness` of 5 seconds, `trigger-size` of
1MiB, and `max-buffer-size` of 50MiB.

It's been identified that unavailable fluentd/http addresses, when
part of an un-buffered `fluentSink` or `httpSink`, can lead to
nodes becoming unavailable due to hanging synchronous requests to
write logs to the sink target. With a newly improved `bufferedSink`
wrapper, we should now enable buffering by default for these network
sinks to avoid this possibility.

Network sinks can still disable buffering via the `buffering: NONE`
option being set on the sink in the log config YAML.

Note that this will also affect the output of `debug check-log-config`.

Finally, the `DescribeAppliedConfig()` function previously did not
include any information about configured `httpSink`'s in its output.
This is incorrect behavior and a fix was required to verify the default
buffering config was being applied to http sinks successfully. Therefore,
this patch also updates `DescribeAppliedConfig()` to include descriptions
of http sinks.

Release note (ops change): `httpSink`'s and `fluentSinks`'s will now,
by default, have buffered writes enabled. This means that writes to these
sinks will be asynchronous. This is enabled via a new default `buffering`
configuration for both the `httpSink` and `fluentSink`, where the default
values are as follows:

```
buffering:
    // The maximum amount of time between flushes to the underlying
    // http or fluent sink.
    max-staleness: 5s
    // `flush-trigger-size` is the size in bytes of accumulated messages
    //  in the buffer which will trigger a flush. 0 disables this trigger.
    flush-trigger-size: 1MiB
    // `max-buffer-size` limits the size of the buffer. When a new message
    // is causing the buffer to overflow beyond this limit, old messages
    // are dropped
    max-buffer-size: 50MiB
```
This will show in `debug check-log-config` as well as impact the default
behavior of these two types of network sinks.


Co-authored-by: Alex Barganier <[email protected]>
  • Loading branch information
craig[bot] and abarganier committed Jun 29, 2022
2 parents da24124 + 967f874 commit ac11a55
Show file tree
Hide file tree
Showing 9 changed files with 193 additions and 43 deletions.
8 changes: 6 additions & 2 deletions pkg/cli/log_flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ func TestSetupLogging(t *testing.T) {
`format: json-fluent-compact, ` +
`redactable: true, ` +
`exit-on-error: false, ` +
`buffering: NONE}`
`buffering: {max-staleness: 5s, ` +
`flush-trigger-size: 1.0MiB, ` +
`max-buffer-size: 50MiB}}`
const defaultHTTPConfig = `http-defaults: {` +
`method: POST, ` +
`unsafe-tls: false, ` +
Expand All @@ -52,7 +54,9 @@ func TestSetupLogging(t *testing.T) {
`format: json-compact, ` +
`redactable: true, ` +
`exit-on-error: false, ` +
`buffering: NONE}`
`buffering: {max-staleness: 5s, ` +
`flush-trigger-size: 1.0MiB, ` +
`max-buffer-size: 50MiB}}`
stdFileDefaultsRe := regexp.MustCompile(
`file-defaults: \{` +
`dir: (?P<path>[^,]+), ` +
Expand Down
57 changes: 46 additions & 11 deletions pkg/util/log/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,13 +376,8 @@ func newHTTPSinkInfo(c logconfig.HTTPSinkConfig) (*sinkInfo, error) {
return nil, err
}
info.applyFilters(c.Channels)
httpSink, err := newHTTPSink(*c.Address, httpSinkOptions{
method: string(*c.Method),
unsafeTLS: *c.UnsafeTLS,
timeout: *c.Timeout,
disableKeepAlives: *c.DisableKeepAlives,
contentType: info.formatter.contentType(),
})

httpSink, err := newHTTPSink(c)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -442,6 +437,16 @@ func (l *sinkInfo) describeAppliedConfig() (c logconfig.CommonSinkConfig) {
c.Criticality = &l.criticality
f := l.formatter.formatterName()
c.Format = &f
bufferedSink, ok := l.sink.(*bufferedSink)
if ok {
c.Buffering.MaxStaleness = &bufferedSink.maxStaleness
triggerSize := logconfig.ByteSize(bufferedSink.triggerSize)
c.Buffering.FlushTriggerSize = &triggerSize
bufferedSink.mu.Lock()
maxBufferSize := logconfig.ByteSize(bufferedSink.mu.buf.maxSizeBytes)
c.Buffering.MaxBufferSize = &maxBufferSize
bufferedSink.mu.Unlock()
}
return c
}

Expand Down Expand Up @@ -542,15 +547,23 @@ func DescribeAppliedConfig() string {
config.Sinks.FluentServers = make(map[string]*logconfig.FluentSinkConfig)
sIdx := 1
_ = logging.allSinkInfos.iter(func(l *sinkInfo) error {
fluentSink, ok := l.sink.(*fluentSink)
flSink, ok := l.sink.(*fluentSink)
if !ok {
return nil
// Check to see if it's a fluentSink wrapped in a bufferedSink.
bufferedSink, ok := l.sink.(*bufferedSink)
if !ok {
return nil
}
flSink, ok = bufferedSink.child.(*fluentSink)
if !ok {
return nil
}
}

fc := &logconfig.FluentSinkConfig{}
fc.CommonSinkConfig = l.describeAppliedConfig()
fc.Net = fluentSink.network
fc.Address = fluentSink.addr
fc.Net = flSink.network
fc.Address = flSink.addr

// Describe the connections to this fluent sink.
for ch, logger := range chans {
Expand All @@ -562,6 +575,28 @@ func DescribeAppliedConfig() string {
return nil
})

// Describe the http sinks.
config.Sinks.HTTPServers = make(map[string]*logconfig.HTTPSinkConfig)
sIdx = 1
_ = logging.allSinkInfos.iter(func(l *sinkInfo) error {
netSink, ok := l.sink.(*httpSink)
if !ok {
// Check to see if it's a httpSink wrapped in a bufferedSink.
bufferedSink, ok := l.sink.(*bufferedSink)
if !ok {
return nil
}
netSink, ok = bufferedSink.child.(*httpSink)
if !ok {
return nil
}
}
skey := fmt.Sprintf("s%d", sIdx)
sIdx++
config.Sinks.HTTPServers[skey] = netSink.config
return nil
})

// Note: we cannot return 'config' directly, because this captures
// certain variables from the loggers by reference and thus could be
// invalidated by concurrent uses of ApplyConfig().
Expand Down
18 changes: 16 additions & 2 deletions pkg/util/log/fluent_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,24 @@ func TestFluentClient(t *testing.T) {
// Set up a logging configuration with the server we've just set up
// as target for the OPS channel.
cfg := logconfig.DefaultConfig()
zeroBytes := logconfig.ByteSize(0)
zeroDuration := time.Duration(0)
cfg.Sinks.FluentServers = map[string]*logconfig.FluentSinkConfig{
"ops": {
Address: serverAddr,
Channels: logconfig.SelectChannels(channel.OPS)},
Channels: logconfig.SelectChannels(channel.OPS),
FluentDefaults: logconfig.FluentDefaults{
CommonSinkConfig: logconfig.CommonSinkConfig{
Buffering: logconfig.CommonBufferSinkConfigWrapper{
CommonBufferSinkConfig: logconfig.CommonBufferSinkConfig{
MaxStaleness: &zeroDuration,
FlushTriggerSize: &zeroBytes,
MaxBufferSize: &zeroBytes,
},
},
},
},
},
}
// Derive a full config using the same directory as the
// TestLogScope.
Expand Down Expand Up @@ -80,7 +94,7 @@ func TestFluentClient(t *testing.T) {
msg, err := json.Marshal(info)
require.NoError(t, err)

const expected = `{"c":1,"f":"util/log/fluent_client_test.go","g":222,"l":63,"message":"hello world","n":1,"r":1,"s":1,"sev":"I","t":"XXX","tag":"logtest.ops","v":"v999.0.0"}`
const expected = `{"c":1,"f":"util/log/fluent_client_test.go","g":222,"l":77,"message":"hello world","n":1,"r":1,"s":1,"sev":"I","t":"XXX","tag":"logtest.ops","v":"v999.0.0"}`
require.Equal(t, expected, string(msg))
}

Expand Down
36 changes: 16 additions & 20 deletions pkg/util/log/http_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,54 +16,49 @@ import (
"fmt"
"net/http"
"net/url"
"time"

"github.com/cockroachdb/cockroach/pkg/cli/exit"
"github.com/cockroachdb/cockroach/pkg/util/log/logconfig"
"github.com/cockroachdb/errors"
)

// TODO: HTTP requests should be bound to context via http.NewRequestWithContext
// Proper logging context to be decided/designed.

// httpSinkOptions is safe to use concurrently due to the delegation of
// operations to `http.Client` which is safe to use concurrently.
type httpSinkOptions struct {
unsafeTLS bool
timeout time.Duration
method string
disableKeepAlives bool
contentType string
}

func newHTTPSink(url string, opt httpSinkOptions) (*httpSink, error) {
func newHTTPSink(c logconfig.HTTPSinkConfig) (*httpSink, error) {
transport, ok := http.DefaultTransport.(*http.Transport)
if !ok {
return nil, errors.AssertionFailedf("http.DefaultTransport is not a http.Transport: %T", http.DefaultTransport)
}
transport = transport.Clone()
transport.DisableKeepAlives = opt.disableKeepAlives
transport.DisableKeepAlives = *c.DisableKeepAlives
hs := &httpSink{
client: http.Client{
Transport: transport,
Timeout: opt.timeout,
Timeout: *c.Timeout,
},
address: url,
address: *c.Address,
doRequest: doPost,
contentType: "application/octet-stream",
}

if opt.unsafeTLS {
if *c.UnsafeTLS {
transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
}

if opt.method == http.MethodGet {
if string(*c.Method) == http.MethodGet {
hs.doRequest = doGet
}

if opt.contentType != "" {
hs.contentType = opt.contentType
f, ok := formatters[*c.Format]
if !ok {
panic(errors.AssertionFailedf("unknown format: %q", *c.Format))
}
if f.contentType() != "" {
hs.contentType = f.contentType()
}

hs.config = &c

return hs, nil
}

Expand All @@ -72,6 +67,7 @@ type httpSink struct {
address string
contentType string
doRequest func(sink *httpSink, logEntry []byte) (*http.Response, error)
config *logconfig.HTTPSinkConfig
}

// output emits some formatted bytes to this sink.
Expand Down
24 changes: 22 additions & 2 deletions pkg/util/log/http_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,18 @@ import (
"github.com/stretchr/testify/require"
)

var (
zeroBytes = logconfig.ByteSize(0)
zeroDuration = time.Duration(0)
disabledBufferingCfg = logconfig.CommonBufferSinkConfigWrapper{
CommonBufferSinkConfig: logconfig.CommonBufferSinkConfig{
MaxStaleness: &zeroDuration,
FlushTriggerSize: &zeroBytes,
MaxBufferSize: &zeroBytes,
},
}
)

// testBase sets the provided HTTPDefaults, logs "hello World", captures the
// resulting request to the server, and validates the body with the provided
// requestTestFunc.
Expand Down Expand Up @@ -173,6 +185,9 @@ func TestMessageReceived(t *testing.T) {
// We need to disable keepalives otherwise the HTTP server in the
// test will let an async goroutine run waiting for more requests.
DisableKeepAlives: &tb,
CommonSinkConfig: logconfig.CommonSinkConfig{
Buffering: disabledBufferingCfg,
},
}

testFn := func(_ http.Header, body string) error {
Expand Down Expand Up @@ -201,6 +216,9 @@ func TestHTTPSinkTimeout(t *testing.T) {
// We need to disable keepalives otherwise the HTTP server in the
// test will let an async goroutine run waiting for more requests.
DisableKeepAlives: &tb,
CommonSinkConfig: logconfig.CommonSinkConfig{
Buffering: disabledBufferingCfg,
},
}

testBase(t, defaults, nil /* testFn */, true /* hangServer */, 500*time.Millisecond)
Expand All @@ -224,7 +242,8 @@ func TestHTTPSinkContentTypeJSON(t *testing.T) {
// test will let an async goroutine run waiting for more requests.
DisableKeepAlives: &tb,
CommonSinkConfig: logconfig.CommonSinkConfig{
Format: &format,
Format: &format,
Buffering: disabledBufferingCfg,
},
}

Expand Down Expand Up @@ -258,7 +277,8 @@ func TestHTTPSinkContentTypePlainText(t *testing.T) {
// test will let an async goroutine run waiting for more requests.
DisableKeepAlives: &tb,
CommonSinkConfig: logconfig.CommonSinkConfig{
Format: &format,
Format: &format,
Buffering: disabledBufferingCfg,
},
}

Expand Down
13 changes: 13 additions & 0 deletions pkg/util/log/logconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,19 @@ fluent-defaults:
format: ` + DefaultFluentFormat + `
redactable: true
exit-on-error: false
buffering:
max-staleness: 5s
flush-trigger-size: 1mib
max-buffer-size: 50mib
http-defaults:
filter: INFO
format: ` + DefaultHTTPFormat + `
redactable: true
exit-on-error: false
buffering:
max-staleness: 5s
flush-trigger-size: 1mib
max-buffer-size: 50mib
sinks:
stderr:
filter: NONE
Expand Down
10 changes: 8 additions & 2 deletions pkg/util/log/logconfig/testdata/validate
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,10 @@ sinks:
redact: false
redactable: true
exit-on-error: false
buffering: NONE
buffering:
max-staleness: 5s
flush-trigger-size: 1.0MiB
max-buffer-size: 50MiB
stderr:
filter: NONE
capture-stray-errors:
Expand Down Expand Up @@ -171,7 +174,10 @@ sinks:
redact: false
redactable: true
exit-on-error: true
buffering: NONE
buffering:
max-staleness: 5s
flush-trigger-size: 1.0MiB
max-buffer-size: 50MiB
stderr:
filter: NONE
capture-stray-errors:
Expand Down
21 changes: 17 additions & 4 deletions pkg/util/log/logconfig/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) {
bt, bf := true, false
zeroDuration := time.Duration(0)
zeroByteSize := ByteSize(0)
defaultBufferedStaleness := 5 * time.Second
defaultFlushTriggerSize := ByteSize(1024 * 1024) // 1mib
defaultMaxBufferSize := ByteSize(50 * 1024 * 1024) // 50mib

baseCommonSinkConfig := CommonSinkConfig{
Filter: logpb.Severity_INFO,
Expand All @@ -48,10 +51,6 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) {
Criticality: &bf,
// Buffering is configured to "NONE". This is different from a zero value
// which buffers infinitely.
//
// TODO(andrei,alexb): Enable buffering by default for some sinks once the
// shutdown of the bufferedSink is improved. Note that baseFileDefaults
// below does not inherit the buffering settings from here.
Buffering: CommonBufferSinkConfigWrapper{
CommonBufferSinkConfig: CommonBufferSinkConfig{
MaxStaleness: &zeroDuration,
Expand Down Expand Up @@ -84,11 +83,25 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) {
baseFluentDefaults := FluentDefaults{
CommonSinkConfig: CommonSinkConfig{
Format: func() *string { s := DefaultFluentFormat; return &s }(),
Buffering: CommonBufferSinkConfigWrapper{
CommonBufferSinkConfig: CommonBufferSinkConfig{
MaxStaleness: &defaultBufferedStaleness,
FlushTriggerSize: &defaultFlushTriggerSize,
MaxBufferSize: &defaultMaxBufferSize,
},
},
},
}
baseHTTPDefaults := HTTPDefaults{
CommonSinkConfig: CommonSinkConfig{
Format: func() *string { s := DefaultHTTPFormat; return &s }(),
Buffering: CommonBufferSinkConfigWrapper{
CommonBufferSinkConfig: CommonBufferSinkConfig{
MaxStaleness: &defaultBufferedStaleness,
FlushTriggerSize: &defaultFlushTriggerSize,
MaxBufferSize: &defaultMaxBufferSize,
},
},
},
UnsafeTLS: &bf,
DisableKeepAlives: &bf,
Expand Down
Loading

0 comments on commit ac11a55

Please sign in to comment.