diff --git a/docs/generated/logsinks.md b/docs/generated/logsinks.md
index 5b69ded70cc9..5ff83bb851bc 100644
--- a/docs/generated/logsinks.md
+++ b/docs/generated/logsinks.md
@@ -6,6 +6,8 @@ The supported log output sink types are documented below.
- [Output to Fluentd-compatible log collectors](#output-to-fluentd-compatible-log-collectors)
+- [Output to HTTP servers.](#output-to-http-servers.)
+
- [Standard error stream](#standard-error-stream)
@@ -91,7 +93,7 @@ Configuration options shared across all sink types:
## Sink type: Output to Fluentd-compatible log collectors
-This sink type causes logging data to be sent over the network, to
+This sink type causes logging data to be sent over the network to
a log collector that can ingest log data in a
[Fluentd](https://www.fluentd.org)-compatible protocol.
@@ -168,6 +170,71 @@ Configuration options shared across all sink types:
+
+
+## Sink type: Output to HTTP servers.
+
+
+This sink type causes logging data to be sent over the network
+as requests to an HTTP server.
+
+The configuration key under the `sinks` key in the YAML
+configuration is `http-servers`. Example configuration:
+
+ sinks:
+ http-servers:
+ health:
+ channels: HEALTH
+ address: http://127.0.0.1
+
+Every new server sink configured automatically inherits the configuration set in the `http-defaults` section.
+
+For example:
+
+ http-defaults:
+ redactable: false # default: disable redaction markers
+ sinks:
+ http-servers:
+ health:
+ channels: HEALTH
+ # This sink has redactable set to false,
+ # as the setting is inherited from fluent-defaults
+ # unless overridden here.
+
+The default output format for HTTP sinks is
+`json-compact`. [Other supported formats.](log-formats.html)
+
+{{site.data.alerts.callout_info}}
+Run `cockroach debug check-log-config` to verify the effect of defaults inheritance.
+{{site.data.alerts.end}}
+
+
+
+Type-specific configuration options:
+
+| Field | Description |
+|--|--|
+| `channels` | the list of logging channels that use this sink. See the [channel selection configuration](#channel-format) section for details. |
+| `address` | the network address of the http server. The host/address and port parts are separated with a colon. IPv6 numeric addresses should be included within square brackets, e.g.: [::1]:1234. Inherited from `http-defaults.address` if not specified. |
+| `method` | the HTTP method to be used. POST and GET are supported; defaults to POST. Inherited from `http-defaults.method` if not specified. |
+| `unsafe-tls` | enables certificate authentication to be bypassed. Defaults to false. Inherited from `http-defaults.unsafe-tls` if not specified. |
+| `timeout` | the HTTP timeout. Defaults to 0 for no timeout. Inherited from `http-defaults.timeout` if not specified. |
+| `disable-keep-alives` | causes the logging sink to re-establish a new connection for every outgoing log message. This option is intended for testing only and can cause excessive network overhead in production systems. Inherited from `http-defaults.disable-keep-alives` if not specified. |
+
+
+Configuration options shared across all sink types:
+
+| Field | Description |
+|--|--|
+| `filter` | the minimum severity for log events to be emitted to this sink. This can be set to NONE to disable the sink. |
+| `format` | the entry format to use. |
+| `redact` | whether to strip sensitive information before log events are emitted to this sink. |
+| `redactable` | whether to keep redaction markers in the sink's output. The presence of redaction markers makes it possible to strip sensitive data reliably. |
+| `exit-on-error` | whether the logging system should terminate the process if an error is encountered while writing to this sink. |
+| `auditable` | translated to tweaks to the other settings for this sink during validation. For example, it enables `exit-on-error` and changes the format of files from `crdb-v1` to `crdb-v1-count`. |
+
+
+
## Sink type: Standard error stream
diff --git a/pkg/cli/log_flags_test.go b/pkg/cli/log_flags_test.go
index 6174993d8cc2..18c1582b9751 100644
--- a/pkg/cli/log_flags_test.go
+++ b/pkg/cli/log_flags_test.go
@@ -31,7 +31,7 @@ func TestSetupLogging(t *testing.T) {
defer leaktest.AfterTest(t)()
reWhitespace := regexp.MustCompile(`(?ms:((\s|\n)+))`)
- reWhitespace2 := regexp.MustCompile(`{\s+`)
+ reBracketWhitespace := regexp.MustCompile(`(?P[{[])\s+`)
reSimplify := regexp.MustCompile(`(?ms:^\s*(auditable: false|redact: false|exit-on-error: true|max-group-size: 100MiB)\n)`)
@@ -41,6 +41,14 @@ func TestSetupLogging(t *testing.T) {
`redactable: true, ` +
`exit-on-error: false` +
`}`
+ const defaultHTTPConfig = `http-defaults: {` +
+ `method: POST, ` +
+ `unsafe-tls: false, ` +
+ `timeout: 0s, ` +
+ `filter: INFO, ` +
+ `format: json-compact, ` +
+ `redactable: true, ` +
+ `exit-on-error: false}`
stdFileDefaultsRe := regexp.MustCompile(
`file-defaults: \{dir: (?P[^,]+), max-file-size: 10MiB, buffered-writes: true, filter: INFO, format: crdb-v2, redactable: true\}`)
fileDefaultsNoMaxSizeRe := regexp.MustCompile(
@@ -103,10 +111,11 @@ func TestSetupLogging(t *testing.T) {
t.Fatal(err)
}
actual = reWhitespace.ReplaceAllString(h.String(), " ")
- actual = reWhitespace2.ReplaceAllString(actual, "{")
+ actual = reBracketWhitespace.ReplaceAllString(actual, "$bracket")
// Shorten the configuration for legibility during reviews of test changes.
actual = strings.ReplaceAll(actual, defaultFluentConfig, "")
+ actual = strings.ReplaceAll(actual, defaultHTTPConfig, "")
actual = stdFileDefaultsRe.ReplaceAllString(actual, "")
actual = fileDefaultsNoMaxSizeRe.ReplaceAllString(actual, "")
actual = strings.ReplaceAll(actual, fileDefaultsNoDir, "")
diff --git a/pkg/cli/testdata/logflags b/pkg/cli/testdata/logflags
index 0072d721bce3..25f8b285e0b4 100644
--- a/pkg/cli/testdata/logflags
+++ b/pkg/cli/testdata/logflags
@@ -14,6 +14,7 @@ start
----
config: {)>,
,
+,
sinks: {file-groups: {default: )>,
,
+,
sinks: {file-groups: {default: ,
,
+,
sinks: {}}
run
@@ -83,6 +86,7 @@ init
----
config: {,
,
+,
sinks: {}}
@@ -95,6 +99,7 @@ bank
----
config: {,
,
+,
sinks: {}}
@@ -105,6 +110,7 @@ demo
----
config: {,
,
+,
sinks: {}}
@@ -120,6 +126,7 @@ start
----
config: {,
,
+,
sinks: {}}
@@ -132,6 +139,7 @@ start
----
config: {,
,
+,
sinks: {file-groups: {default: ,
,
+,
sinks: {file-groups: {default: ,
sql-exec: ,
sql-slow: ,
sql-slow-internal-only: ,
-telemetry: {channels: [ TELEMETRY],
+telemetry: {channels: [TELEMETRY],
dir: /mypath,
max-file-size: 100KiB,
max-group-size: 1.0MiB,
@@ -195,6 +204,7 @@ start
----
config: {,
,
+,
sinks: {file-groups: {default: ,
,
+,
sinks: {file-groups: {default: ,
sql-exec: ,
sql-slow: ,
sql-slow-internal-only: ,
-telemetry: {channels: [ TELEMETRY],
+telemetry: {channels: [TELEMETRY],
dir: /mypath,
max-file-size: 100KiB,
max-group-size: 1.0MiB,
@@ -262,6 +273,7 @@ start
----
config: {)>,
,
+,
sinks: {file-groups: {default: )>,
,
+,
sinks: {file-groups: {default: ,
,
+,
sinks: {}}
@@ -355,6 +369,7 @@ start
----
config: {,
,
+,
sinks: {file-groups: {default: ,
sql-exec: ,
sql-slow: ,
sql-slow-internal-only: ,
-telemetry: {channels: [ TELEMETRY],
+telemetry: {channels: [TELEMETRY],
dir: /mypath,
max-file-size: 100KiB,
max-group-size: 1.0MiB,
@@ -387,6 +402,7 @@ start
----
config: {,
,
+,
sinks: {file-groups: {default: ,
sql-exec: ,
sql-slow: ,
sql-slow-internal-only: ,
-telemetry: {channels: [ TELEMETRY],
+telemetry: {channels: [TELEMETRY],
dir: /pathA,
max-file-size: 100KiB,
max-group-size: 1.0MiB,
@@ -419,6 +435,7 @@ init
----
config: {,
,
+,
sinks: {file-groups: {default: {channels: all,
dir: /mypath,
buffered-writes: true,
@@ -435,6 +452,7 @@ start
----
config: {)>,
,
+,
sinks: {file-groups: {default: )>,
,
+,
sinks: {file-groups: {default: ,
,
+,
sinks: {}}
# Default when no severity is specified is WARNING.
@@ -504,6 +524,7 @@ init
----
config: {,
,
+,
sinks: {}}
diff --git a/pkg/util/log/BUILD.bazel b/pkg/util/log/BUILD.bazel
index 35852787f123..35c3e7377666 100644
--- a/pkg/util/log/BUILD.bazel
+++ b/pkg/util/log/BUILD.bazel
@@ -22,6 +22,7 @@ go_library(
"format_json.go",
"formats.go",
"get_stacks.go",
+ "http_sink.go",
"intercept.go",
"log.go",
"log_bridge.go",
@@ -133,6 +134,7 @@ go_test(
"format_crdb_v2_test.go",
"format_json_test.go",
"helpers_test.go",
+ "http_sink_test.go",
"intercept_test.go",
"main_test.go",
"redact_test.go",
@@ -153,6 +155,7 @@ go_test(
"//pkg/util/log/logconfig",
"//pkg/util/log/logpb",
"//pkg/util/log/severity",
+ "//pkg/util/netutil/addr",
"//pkg/util/randutil",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
diff --git a/pkg/util/log/flags.go b/pkg/util/log/flags.go
index 6899840a9cfb..7bba41bc5599 100644
--- a/pkg/util/log/flags.go
+++ b/pkg/util/log/flags.go
@@ -120,10 +120,16 @@ func ApplyConfig(config logconfig.Config) (cleanupFn func(), err error) {
}
}
+ // Call the final value of cleanupFn immediately if returning with error.
+ defer func() {
+ if err != nil {
+ cleanupFn()
+ }
+ }()
+
// If capture of internal fd2 writes is enabled, set it up here.
if config.CaptureFd2.Enable {
if logging.testingFd2CaptureLogger != nil {
- cleanupFn()
return nil, errors.New("fd2 capture already set up. Maybe use TestLogScope?")
}
// We use a secondary logger, even though no logging *event* will ever
@@ -160,7 +166,6 @@ func ApplyConfig(config logconfig.Config) (cleanupFn func(), err error) {
}
fileSinkInfo, fileSink, err := newFileSinkInfo("stderr", fakeConfig)
if err != nil {
- cleanupFn()
return nil, err
}
sinkInfos = append(sinkInfos, fileSinkInfo)
@@ -190,7 +195,6 @@ func ApplyConfig(config logconfig.Config) (cleanupFn func(), err error) {
if err := fileSink.takeOverInternalStderr(secLogger); err != nil {
// Oof, it turns out we can't use this logger after all. Give up
// on everything we did.
- cleanupFn()
return nil, err
}
@@ -218,7 +222,6 @@ func ApplyConfig(config logconfig.Config) (cleanupFn func(), err error) {
// Apply the stderr sink configuration.
logging.stderrSink.noColor.Set(config.Sinks.Stderr.NoColor)
if err := logging.stderrSinkInfoTemplate.applyConfig(config.Sinks.Stderr.CommonSinkConfig); err != nil {
- cleanupFn()
return nil, err
}
@@ -244,6 +247,17 @@ func ApplyConfig(config logconfig.Config) (cleanupFn func(), err error) {
l.sinkInfos = append(l.sinkInfos, &stderrSinkInfo)
}
+ attachSinkInfo := func(si *sinkInfo, chs []logpb.Channel) {
+ sinkInfos = append(sinkInfos, si)
+ allSinkInfos.put(si)
+
+ // Connect the channels for this sink.
+ for _, ch := range chs {
+ l := chans[ch]
+ l.sinkInfos = append(l.sinkInfos, si)
+ }
+ }
+
// Create the file sinks.
for prefix, fc := range config.Sinks.FileGroups {
if fc.Filter == severity.NONE || fc.Dir == nil {
@@ -254,17 +268,9 @@ func ApplyConfig(config logconfig.Config) (cleanupFn func(), err error) {
}
fileSinkInfo, _, err := newFileSinkInfo(prefix, *fc)
if err != nil {
- cleanupFn()
return nil, err
}
- sinkInfos = append(sinkInfos, fileSinkInfo)
- allSinkInfos.put(fileSinkInfo)
-
- // Connect the channels for this sink.
- for _, ch := range fc.Channels.Channels {
- l := chans[ch]
- l.sinkInfos = append(l.sinkInfos, fileSinkInfo)
- }
+ attachSinkInfo(fileSinkInfo, fc.Channels.Channels)
}
// Create the fluent sinks.
@@ -274,17 +280,21 @@ func ApplyConfig(config logconfig.Config) (cleanupFn func(), err error) {
}
fluentSinkInfo, err := newFluentSinkInfo(*fc)
if err != nil {
- cleanupFn()
return nil, err
}
- sinkInfos = append(sinkInfos, fluentSinkInfo)
- allSinkInfos.put(fluentSinkInfo)
+ attachSinkInfo(fluentSinkInfo, fc.Channels.Channels)
+ }
- // Connect the channels for this sink.
- for _, ch := range fc.Channels.Channels {
- l := chans[ch]
- l.sinkInfos = append(l.sinkInfos, fluentSinkInfo)
+ // Create the HTTP sinks.
+ for _, fc := range config.Sinks.HTTPServers {
+ if fc.Filter == severity.NONE {
+ continue
}
+ httpSinkInfo, err := newHTTPSinkInfo(*fc)
+ if err != nil {
+ return nil, err
+ }
+ attachSinkInfo(httpSinkInfo, fc.Channels.Channels)
}
// Prepend the interceptor sink to all channels.
@@ -333,6 +343,24 @@ func newFluentSinkInfo(c logconfig.FluentSinkConfig) (*sinkInfo, error) {
return info, nil
}
+func newHTTPSinkInfo(c logconfig.HTTPSinkConfig) (*sinkInfo, error) {
+ info := &sinkInfo{}
+ if err := info.applyConfig(c.CommonSinkConfig); err != nil {
+ return nil, err
+ }
+ httpSink, err := newHTTPSink(*c.Address, httpSinkOptions{
+ method: string(*c.Method),
+ unsafeTLS: *c.UnsafeTLS,
+ timeout: *c.Timeout,
+ disableKeepAlives: *c.DisableKeepAlives,
+ })
+ if err != nil {
+ return nil, err
+ }
+ info.sink = httpSink
+ return info, nil
+}
+
// applyConfig applies a common sink configuration to a sinkInfo.
func (l *sinkInfo) applyConfig(c logconfig.CommonSinkConfig) error {
l.threshold = c.Filter
diff --git a/pkg/util/log/http_sink.go b/pkg/util/log/http_sink.go
new file mode 100644
index 000000000000..138cefac66fe
--- /dev/null
+++ b/pkg/util/log/http_sink.go
@@ -0,0 +1,145 @@
+// Copyright 2021 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package log
+
+import (
+ "bytes"
+ "crypto/tls"
+ "fmt"
+ "net/http"
+ "net/url"
+ "time"
+
+ "github.com/cockroachdb/cockroach/pkg/cli/exit"
+ "github.com/cockroachdb/errors"
+)
+
+// TODO: HTTP requests should be bound to context via http.NewRequestWithContext
+// Proper logging context to be decided/designed.
+
+type httpSinkOptions struct {
+ unsafeTLS bool
+ timeout time.Duration
+ method string
+ disableKeepAlives bool
+}
+
+func newHTTPSink(url string, opt httpSinkOptions) (*httpSink, error) {
+ transport, ok := http.DefaultTransport.(*http.Transport)
+ if !ok {
+ return nil, errors.AssertionFailedf("http.DefaultTransport is not a http.Transport: %T", http.DefaultTransport)
+ }
+ transport = transport.Clone()
+ transport.DisableKeepAlives = opt.disableKeepAlives
+ hs := &httpSink{
+ client: http.Client{
+ Transport: transport,
+ Timeout: opt.timeout,
+ },
+ address: url,
+ doRequest: doPost,
+ }
+
+ if opt.unsafeTLS {
+ transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
+ }
+
+ if opt.method == http.MethodGet {
+ hs.doRequest = doGet
+ }
+
+ return hs, nil
+}
+
+type httpSink struct {
+ client http.Client
+ address string
+ doRequest func(*httpSink, []byte) (*http.Response, error)
+}
+
+// output emits some formatted bytes to this sink.
+// the sink is invited to perform an extra flush if indicated
+// by the argument. This is set to true for e.g. Fatal
+// entries.
+//
+// The parent logger's outputMu is held during this operation: log
+// sinks must not recursively call into logging when implementing
+// this method.
+func (hs *httpSink) output(extraSync bool, b []byte) (err error) {
+ resp, err := hs.doRequest(hs, b)
+ if err != nil {
+ return err
+ }
+
+ if resp.StatusCode >= 400 {
+ return HTTPLogError{
+ StatusCode: resp.StatusCode,
+ Address: hs.address}
+ }
+ return nil
+}
+
+// emergencyOutput attempts to emit some formatted bytes, and
+// ignores any errors.
+//
+// The parent logger's outputMu is held during this operation: log
+// sinks must not recursively call into logging when implementing
+// this method.
+func (hs *httpSink) emergencyOutput(b []byte) {
+ _, _ = hs.doRequest(hs, b)
+}
+
+func doPost(hs *httpSink, b []byte) (*http.Response, error) {
+ resp, err := hs.client.Post(hs.address, "text/plain", bytes.NewReader(b))
+ if err != nil {
+ return nil, err
+ }
+ resp.Body.Close() // don't care about content
+ return resp, nil
+}
+
+func doGet(hs *httpSink, b []byte) (*http.Response, error) {
+ resp, err := hs.client.Get(hs.address + "?" + url.QueryEscape(string(b)))
+ if err != nil {
+ return nil, err
+ }
+ resp.Body.Close() // don't care about content
+ return resp, nil
+}
+
+// active returns true if this sink is currently active.
+func (*httpSink) active() bool {
+ return true
+}
+
+// attachHints attaches some hints about the location of the message
+// to the stack message.
+func (*httpSink) attachHints(stacks []byte) []byte {
+ return stacks
+}
+
+// exitCode returns the exit code to use if the logger decides
+// to terminate because of an error in output().
+func (*httpSink) exitCode() exit.Code {
+ return exit.LoggingNetCollectorUnavailable()
+}
+
+// HTTPLogError represents an HTTP error status code from a logging request.
+type HTTPLogError struct {
+ StatusCode int
+ Address string
+}
+
+func (e HTTPLogError) Error() string {
+ return fmt.Sprintf(
+ "received %v response attempting to log to [%v]",
+ e.StatusCode, e.Address)
+}
diff --git a/pkg/util/log/http_sink_test.go b/pkg/util/log/http_sink_test.go
new file mode 100644
index 000000000000..59e26876e083
--- /dev/null
+++ b/pkg/util/log/http_sink_test.go
@@ -0,0 +1,207 @@
+// Copyright 2021 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package log
+
+import (
+ "context"
+ "errors"
+ "io"
+ "net"
+ "net/http"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/cockroachdb/cockroach/pkg/util/leaktest"
+ "github.com/cockroachdb/cockroach/pkg/util/log/channel"
+ "github.com/cockroachdb/cockroach/pkg/util/log/logconfig"
+ "github.com/cockroachdb/cockroach/pkg/util/netutil/addr"
+ "github.com/cockroachdb/cockroach/pkg/util/syncutil"
+ "github.com/cockroachdb/cockroach/pkg/util/timeutil"
+ "github.com/stretchr/testify/require"
+)
+
+// testBase sets the provided HTTPDefaults, logs "hello World", captures the
+// resulting request to the server, and validates the body with the provided
+// requestTestFunc.
+// Options also given to cause the server to hang (which naturally skips the body valiation)
+// and to set a maximum duration for the log call.
+func testBase(
+ t *testing.T,
+ defaults logconfig.HTTPDefaults,
+ fn func(body string) error,
+ hangServer bool,
+ deadline time.Duration,
+) {
+ sc := ScopeWithoutShowLogs(t)
+ defer sc.Close(t)
+
+ // cancelCh ensures that async goroutines terminate if the test
+ // goroutine terminates due to a Fatal call or a panic.
+ cancelCh := make(chan struct{})
+ defer func() { close(cancelCh) }()
+
+ // seenMessage is true after the request predicate
+ // has seen the expected message from the client.
+ var seenMessage syncutil.AtomicBool
+
+ handler := func(rw http.ResponseWriter, r *http.Request) {
+ buf := make([]byte, 5000)
+ nbytes, err := r.Body.Read(buf)
+ if err != nil && err != io.EOF {
+ t.Error(err)
+ return
+ }
+ buf = buf[:nbytes]
+
+ if hangServer {
+ // The test is requesting the server to simulate a timeout. Just
+ // do nothing until the test terminates.
+ <-cancelCh
+ } else {
+ // The test is expecting some message via a predicate.
+ if err := fn(string(buf)); err != nil {
+ // non-failing, in case there are extra log messages generated
+ t.Log(err)
+ } else {
+ seenMessage.Set(true)
+ }
+ }
+ }
+
+ {
+ // Start the HTTP server that receives the logging events from the
+ // test.
+
+ l, err := net.Listen("tcp", "127.0.0.1:")
+ if err != nil {
+ t.Fatal(err)
+ }
+ _, port, err := addr.SplitHostPort(l.Addr().String(), "port")
+ if err != nil {
+ t.Fatal(err)
+ }
+ *defaults.Address += ":" + port
+ s := http.Server{Handler: http.HandlerFunc(handler)}
+
+ // serverErrCh collects errors and signals the termination of the
+ // server async goroutine.
+ serverErrCh := make(chan error, 1)
+ go func() {
+ defer func() { close(serverErrCh) }()
+ err := s.Serve(l)
+ if err != http.ErrServerClosed {
+ select {
+ case serverErrCh <- err:
+ case <-cancelCh:
+ }
+ }
+ }()
+
+ // At the end of this function, close the server
+ // allowing the above goroutine to finish and close serverClosedCh
+ // allowing the deferred read to proceed and this function to return.
+ // (Basically, it's a WaitGroup of one.)
+ defer func() {
+ require.NoError(t, s.Close())
+ serverErr := <-serverErrCh
+ require.NoError(t, serverErr)
+ }()
+ }
+
+ // Set up a logging configuration with the server we've just set up
+ // as target for the OPS channel.
+ cfg := logconfig.DefaultConfig()
+ cfg.Sinks.HTTPServers = map[string]*logconfig.HTTPSinkConfig{
+ "ops": {
+ HTTPDefaults: defaults,
+ Channels: logconfig.ChannelList{Channels: []Channel{channel.OPS}}},
+ }
+ // Derive a full config using the same directory as the
+ // TestLogScope.
+ require.NoError(t, cfg.Validate(&sc.logDir))
+
+ // Apply the configuration.
+ TestingResetActive()
+ cleanup, err := ApplyConfig(cfg)
+ require.NoError(t, err)
+ defer cleanup()
+
+ // Send a log event on the OPS channel.
+ logStart := timeutil.Now()
+ Ops.Infof(context.Background(), "hello world")
+ logDuration := timeutil.Since(logStart)
+
+ // Note: deadline is passed by the caller and already contains slack
+ // to accommodate for the overhead of the logging call compared to
+ // the timeout in the HTTP request.
+ if deadline > 0 && logDuration > deadline {
+ t.Error("Log call exceeded timeout")
+ }
+
+ if hangServer {
+ return
+ }
+
+ // If the test was not requiring a timeout, it was requiring some
+ // logging message to match the predicate. If we don't see the
+ // predicate match, it is a test failure.
+ if !seenMessage.Get() {
+ t.Error("expected message matching predicate, found none")
+ }
+}
+
+// TestMessageReceived verifies that the server receives the logged message.
+func TestMessageReceived(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+
+ address := "http://localhost" // testBase appends the port
+ timeout := 5 * time.Second
+ tb := true
+ defaults := logconfig.HTTPDefaults{
+ Address: &address,
+ Timeout: &timeout,
+
+ // We need to disable keepalives otherwise the HTTP server in the
+ // test will let an async goroutine run waiting for more requests.
+ DisableKeepAlives: &tb,
+ }
+
+ testFn := func(body string) error {
+ t.Log(body)
+ if !strings.Contains(body, `"message":"hello world"`) {
+ return errors.New("Log message not found in request")
+ }
+ return nil
+ }
+
+ testBase(t, defaults, testFn, false /* hangServer */, time.Duration(0))
+}
+
+// TestHTTPSinkTimeout verifies that a log call to a hanging server doesn't last
+// to much longer than the configured timeout.
+func TestHTTPSinkTimeout(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+
+ address := "http://localhost" // testBase appends the port
+ timeout := time.Millisecond
+ tb := true
+ defaults := logconfig.HTTPDefaults{
+ Address: &address,
+ Timeout: &timeout,
+
+ // We need to disable keepalives otherwise the HTTP server in the
+ // test will let an async goroutine run waiting for more requests.
+ DisableKeepAlives: &tb,
+ }
+
+ testBase(t, defaults, nil /* testFn */, true /* hangServer */, 500*time.Millisecond)
+}
diff --git a/pkg/util/log/logconfig/config.go b/pkg/util/log/logconfig/config.go
index 51682a01c5da..2d0c8d4c593e 100644
--- a/pkg/util/log/logconfig/config.go
+++ b/pkg/util/log/logconfig/config.go
@@ -12,14 +12,16 @@ package logconfig
import (
"fmt"
+ "net/http"
"reflect"
"sort"
"strings"
+ "time"
"github.com/cockroachdb/cockroach/pkg/util/log/logpb"
"github.com/cockroachdb/errors"
- "github.com/dustin/go-humanize"
- "gopkg.in/yaml.v2"
+ humanize "github.com/dustin/go-humanize"
+ yaml "gopkg.in/yaml.v2"
)
// DefaultFileFormat is the entry format for file sinks when not
@@ -34,6 +36,10 @@ const DefaultStderrFormat = `crdb-v2-tty`
// when not specified in a configuration.
const DefaultFluentFormat = `json-fluent-compact`
+// DefaultHTTPFormat is the entry format for HTTP sinks
+// when not specified in a configuration.
+const DefaultHTTPFormat = `json-compact`
+
// DefaultConfig returns a suitable default configuration when logging
// is meant to primarily go to files.
func DefaultConfig() (c Config) {
@@ -101,6 +107,11 @@ type Config struct {
// configuration value.
FluentDefaults FluentDefaults `yaml:"fluent-defaults,omitempty"`
+ // HTTPDefaults represents the default configuration for HTTP sinks,
+ // inherited when a specific HTTP sink config does not provide a
+ // configuration value.
+ HTTPDefaults HTTPDefaults `yaml:"http-defaults,omitempty"`
+
// Sinks represents the sink configurations.
Sinks SinkConfig `yaml:",omitempty"`
@@ -163,13 +174,10 @@ type SinkConfig struct {
FileGroups map[string]*FileSinkConfig `yaml:"file-groups,omitempty"`
// FluentServer represents the list of configured fluent sinks.
FluentServers map[string]*FluentSinkConfig `yaml:"fluent-servers,omitempty"`
+ // HTTPServers represents the list of configured http sinks.
+ HTTPServers map[string]*HTTPSinkConfig `yaml:"http-servers,omitempty"`
// Stderr represents the configuration for the stderr sink.
Stderr StderrSinkConfig `yaml:",omitempty"`
-
- // sortedFileGroupNames and sortedServerNames are used internally to
- // make the Export() function deterministic.
- sortedFileGroupNames []string
- sortedServerNames []string
}
// StderrSinkConfig represents the configuration for the stderr sink.
@@ -233,7 +241,7 @@ type FluentDefaults struct {
// User-facing documentation follows.
// TITLE: Output to Fluentd-compatible log collectors
//
-// This sink type causes logging data to be sent over the network, to
+// This sink type causes logging data to be sent over the network to
// a log collector that can ingest log data in a
// [Fluentd](https://www.fluentd.org)-compatible protocol.
//
@@ -395,6 +403,83 @@ type FileSinkConfig struct {
prefix string
}
+// HTTPDefaults refresents the configuration defaults for HTTP sinks.
+type HTTPDefaults struct {
+ // Address is the network address of the http server. The
+ // host/address and port parts are separated with a colon. IPv6
+ // numeric addresses should be included within square brackets,
+ // e.g.: [::1]:1234.
+ Address *string `yaml:",omitempty"`
+
+ // Method is the HTTP method to be used. POST and GET are
+ // supported; defaults to POST.
+ Method *HTTPSinkMethod `yaml:",omitempty"`
+
+ // UnsafeTLS enables certificate authentication to be bypassed.
+ // Defaults to false.
+ UnsafeTLS *bool `yaml:"unsafe-tls,omitempty"`
+
+ // Timeout is the HTTP timeout.
+ // Defaults to 0 for no timeout.
+ Timeout *time.Duration `yaml:",omitempty"`
+
+ // DisableKeepAlives causes the logging sink to re-establish a new
+ // connection for every outgoing log message. This option is
+ // intended for testing only and can cause excessive network
+ // overhead in production systems.
+ DisableKeepAlives *bool `yaml:",omitempty"`
+
+ CommonSinkConfig `yaml:",inline"`
+}
+
+// HTTPSinkConfig represents the configuration for one http sink.
+//
+// User-facing documentation follows.
+// TITLE: Output to HTTP servers.
+//
+// This sink type causes logging data to be sent over the network
+// as requests to an HTTP server.
+//
+// The configuration key under the `sinks` key in the YAML
+// configuration is `http-servers`. Example configuration:
+//
+// sinks:
+// http-servers:
+// health:
+// channels: HEALTH
+// address: http://127.0.0.1
+//
+// Every new server sink configured automatically inherits the configuration set in the `http-defaults` section.
+//
+// For example:
+//
+// http-defaults:
+// redactable: false # default: disable redaction markers
+// sinks:
+// http-servers:
+// health:
+// channels: HEALTH
+// # This sink has redactable set to false,
+// # as the setting is inherited from fluent-defaults
+// # unless overridden here.
+//
+// The default output format for HTTP sinks is
+// `json-compact`. [Other supported formats.](log-formats.html)
+//
+// {{site.data.alerts.callout_info}}
+// Run `cockroach debug check-log-config` to verify the effect of defaults inheritance.
+// {{site.data.alerts.end}}
+//
+type HTTPSinkConfig struct {
+ // Channels is the list of logging channels that use this sink.
+ Channels ChannelList `yaml:",omitempty,flow"`
+
+ HTTPDefaults `yaml:",inline"`
+
+ // sinkName is populated during validation.
+ sinkName string
+}
+
// IterateDirectories calls the provided fn on every directory linked to
// by the configuration.
func (c *Config) IterateDirectories(fn func(d string) error) error {
@@ -684,3 +769,61 @@ func (*Holder) Type() string { return "yaml" }
func (h *Holder) Set(value string) error {
return yaml.UnmarshalStrict([]byte(value), &h.Config)
}
+
+// HTTPSinkMethod is a string restricted to "POST" and "GET"
+type HTTPSinkMethod string
+
+var _ constrainedString = (*HTTPSinkMethod)(nil)
+
+// Accept implements the constrainedString interface.
+func (hsm *HTTPSinkMethod) Accept(s string) {
+ *hsm = HTTPSinkMethod(s)
+}
+
+// Canonicalize implements the constrainedString interface.
+func (HTTPSinkMethod) Canonicalize(s string) string {
+ return strings.ToUpper(strings.TrimSpace(s))
+}
+
+// AllowedSet implements the constrainedString interface.
+func (HTTPSinkMethod) AllowedSet() []string {
+ return []string{
+ http.MethodGet,
+ http.MethodPost,
+ }
+}
+
+// MarshalYAML implements yaml.Marshaler interface.
+func (hsm HTTPSinkMethod) MarshalYAML() (interface{}, error) {
+ return string(hsm), nil
+}
+
+// UnmarshalYAML implements the yaml.Unmarshaler interface.
+func (hsm *HTTPSinkMethod) UnmarshalYAML(fn func(interface{}) error) error {
+ return unmarshalYAMLConstrainedString(hsm, fn)
+}
+
+// constrainedString is an interface to make it easy to unmarshal
+// a string constrained to a small set of accepted values.
+type constrainedString interface {
+ Accept(string)
+ Canonicalize(string) string
+ AllowedSet() []string
+}
+
+// unmarshalYAMLConstrainedString is a utility function to unmarshal
+// a type satisfying the constrainedString interface.
+func unmarshalYAMLConstrainedString(cs constrainedString, fn func(interface{}) error) error {
+ var s string
+ if err := fn(&s); err != nil {
+ return err
+ }
+ s = cs.Canonicalize(s)
+ for _, candidate := range cs.AllowedSet() {
+ if s == candidate {
+ cs.Accept(s)
+ return nil
+ }
+ }
+ return errors.Newf("Unexpected value: %v", s)
+}
diff --git a/pkg/util/log/logconfig/export.go b/pkg/util/log/logconfig/export.go
index 3ec48c720c1a..1fbb6f48ceca 100644
--- a/pkg/util/log/logconfig/export.go
+++ b/pkg/util/log/logconfig/export.go
@@ -29,19 +29,6 @@ func (c *Config) Export(onlyChans ChannelList) (string, string) {
chanSel = onlyChans
}
- var buf bytes.Buffer
- buf.WriteString("@startuml\nleft to right direction\n")
-
- // Export the channels.
- buf.WriteString("component sources {\n")
- for _, ch := range chanSel.Channels {
- fmt.Fprintf(&buf, "() %s\n", ch)
- }
- buf.WriteString("cloud stray as \"stray\\nerrors\"\n}\n")
-
- // The process stderr stream.
- buf.WriteString("queue stderr\n")
-
// links collects the relationships. We need to collect them and
// print them at the end because plantUML does not support
// interleaving box and arrow declarations.
@@ -110,7 +97,16 @@ func (c *Config) Export(onlyChans ChannelList) (string, string) {
// folders map each directory to a list of files within.
folders := map[string][]string{}
fileNum := 1
- for _, fn := range c.Sinks.sortedFileGroupNames {
+
+ // Process the file sinks in sorted order,
+ // so the output order is deteministic.
+ var sortedNames []string
+ for prefix := range c.Sinks.FileGroups {
+ sortedNames = append(sortedNames, prefix)
+ }
+ sort.Strings(sortedNames)
+
+ for _, fn := range sortedNames {
fc := c.Sinks.FileGroups[fn]
if fc.Filter == logpb.Severity_NONE {
// This file is not collecting anything. Skip it.
@@ -175,12 +171,19 @@ func (c *Config) Export(onlyChans ChannelList) (string, string) {
//
// servers maps each server to its box declaration.
servers := map[string]string{}
- for _, fn := range c.Sinks.sortedServerNames {
+
+ sortedNames = nil
+ for serverName := range c.Sinks.FluentServers {
+ sortedNames = append(sortedNames, serverName)
+ }
+ sort.Strings(sortedNames)
+
+ for _, fn := range sortedNames {
fc := c.Sinks.FluentServers[fn]
if fc.Filter == logpb.Severity_NONE {
continue
}
- skey := fmt.Sprintf("s__%s", fc.serverName)
+ skey := fmt.Sprintf("s__%s", fn)
target, thisprocs, thislinks := process(skey, fc.CommonSinkConfig)
hasLink := false
for _, ch := range fc.Channels.Channels {
@@ -193,11 +196,43 @@ func (c *Config) Export(onlyChans ChannelList) (string, string) {
if hasLink {
processing = append(processing, thisprocs...)
links = append(links, thislinks...)
- servers[fc.serverName] = fmt.Sprintf("queue %s as \"fluent: %s:%s\"",
+ servers[fn] = fmt.Sprintf("queue %s as \"fluent: %s:%s\"",
skey, fc.Net, fc.Address)
}
}
+ // Collect HTTP sinks
+ // Add the destinations into the same map, for display in the "network server"
+ // section of the diagram
+ sortedNames = nil
+ for sinkName := range c.Sinks.HTTPServers {
+ sortedNames = append(sortedNames, sinkName)
+ }
+ sort.Strings(sortedNames)
+
+ for _, name := range sortedNames {
+ cfg := c.Sinks.HTTPServers[name]
+ if cfg.Filter == logpb.Severity_NONE {
+ continue
+ }
+ key := fmt.Sprintf("h__%s", name)
+ target, thisprocs, thislinks := process(key, cfg.CommonSinkConfig)
+ hasLink := false
+ for _, ch := range cfg.Channels.Channels {
+ if !chanSel.HasChannel(ch) {
+ continue
+ }
+ hasLink = true
+ links = append(links, fmt.Sprintf("%s --> %s", ch, target))
+ }
+ if hasLink {
+ processing = append(processing, thisprocs...)
+ links = append(links, thislinks...)
+ servers[name] = fmt.Sprintf("queue %s as \"http: %s\"",
+ key, *cfg.Address)
+ }
+ }
+
// Export the stderr redirects.
if c.Sinks.Stderr.Filter != logpb.Severity_NONE {
target, thisprocs, thislinks := process("stderr", c.Sinks.Stderr.CommonSinkConfig)
@@ -216,11 +251,22 @@ func (c *Config) Export(onlyChans ChannelList) (string, string) {
}
}
+ var buf bytes.Buffer
+ buf.WriteString("@startuml\nleft to right direction\n")
+
+ // Export the channels.
+ buf.WriteString("component sources {\n")
+ for _, ch := range chanSel.Channels {
+ fmt.Fprintf(&buf, "() %s\n", ch)
+ }
+ buf.WriteString("cloud stray as \"stray\\nerrors\"\n}\n")
+
+ // The process stderr stream.
+ buf.WriteString("queue stderr\n")
+
// Represent the processing stages, if any.
- if len(processing) > 0 {
- for _, p := range processing {
- fmt.Fprintf(&buf, "%s\n", p)
- }
+ for _, p := range processing {
+ fmt.Fprintf(&buf, "%s\n", p)
}
// Represent the files, if any.
@@ -238,10 +284,10 @@ func (c *Config) Export(onlyChans ChannelList) (string, string) {
}
// Represent the network servers, if any.
- if len(c.Sinks.sortedServerNames) > 0 {
+ if len(servers) > 0 {
buf.WriteString("cloud network {\n")
- for _, s := range c.Sinks.sortedServerNames {
- fmt.Fprintf(&buf, " %s\n", servers[s])
+ for _, s := range servers {
+ fmt.Fprintf(&buf, " %s\n", s)
}
buf.WriteString("}\n")
}
diff --git a/pkg/util/log/logconfig/gen.go b/pkg/util/log/logconfig/gen.go
index 10b8ca8d6e9f..40e958be77fa 100644
--- a/pkg/util/log/logconfig/gen.go
+++ b/pkg/util/log/logconfig/gen.go
@@ -245,11 +245,11 @@ func readInput(infos map[string]*sinkInfo) error {
return nil
}
-var configStructRe = regexp.MustCompile(`^type (?P[A-Z][a-z0-9]*)(SinkConfig|Defaults) struct`)
+var configStructRe = regexp.MustCompile(`^type (?P[A-Z]\w*)(SinkConfig|Defaults) struct`)
var fieldDefRe = regexp.MustCompile(`^\s*` +
// Field name in Go.
- `(?P[A-Z][A-Za-z_0-9]*)` +
+ `(?P[A-Z]\w*)` +
// Go type. Empty if embedded type.
`(?P(?: [^ ]+)?)` +
// Start of YAML annotation.
@@ -260,16 +260,27 @@ var fieldDefRe = regexp.MustCompile(`^\s*` +
`[^"]*"` + "`.*")
func camelToSnake(typeName string) string {
+ isUpper := func(c byte) bool {
+ return 'A' <= c && c <= 'Z'
+ }
+ toLower := func(c byte) byte {
+ if !isUpper(c) {
+ return c
+ }
+ return c - 'A' + 'a'
+ }
+
var res strings.Builder
- res.WriteByte(typeName[0] + 'a' - 'A')
- for i := 1; i < len(typeName); i++ {
- if typeName[i] >= 'A' && typeName[i] <= 'Z' {
+ res.WriteByte(toLower(typeName[0]))
+ for i := 1; i < len(typeName)-1; i++ {
+ // put a word break at transitions likeTHIS and LIKEThis
+ if isUpper(typeName[i]) && (!isUpper(typeName[i-1]) || !isUpper(typeName[i+1])) {
res.WriteByte('-')
- res.WriteByte(typeName[i] + 'a' - 'A')
- } else {
- res.WriteByte(typeName[i])
}
+ res.WriteByte(toLower(typeName[i]))
}
+ // assume the last character isn't a one-letter word
+ res.WriteByte(toLower(typeName[len(typeName)-1]))
return res.String()
}
diff --git a/pkg/util/log/logconfig/testdata/validate b/pkg/util/log/logconfig/testdata/validate
index 5bba86a15907..53c8efc3de31 100644
--- a/pkg/util/log/logconfig/testdata/validate
+++ b/pkg/util/log/logconfig/testdata/validate
@@ -1,24 +1,6 @@
# Empty configuration: use and propagate defaults.
yaml
----
-file-defaults:
- dir: /default-dir
- max-file-size: 10MiB
- max-group-size: 100MiB
- buffered-writes: true
- filter: INFO
- format: crdb-v2
- redact: false
- redactable: true
- exit-on-error: true
- auditable: false
-fluent-defaults:
- filter: INFO
- format: json-fluent-compact
- redact: false
- redactable: true
- exit-on-error: false
- auditable: false
sinks:
file-groups:
default:
@@ -51,24 +33,6 @@ sinks:
custom:
channels: DEV
----
-file-defaults:
- dir: /default-dir
- max-file-size: 10MiB
- max-group-size: 100MiB
- buffered-writes: true
- filter: INFO
- format: crdb-v2
- redact: false
- redactable: true
- exit-on-error: true
- auditable: false
-fluent-defaults:
- filter: INFO
- format: json-fluent-compact
- redact: false
- redactable: true
- exit-on-error: false
- auditable: false
sinks:
file-groups:
custom:
@@ -103,24 +67,6 @@ sinks:
custom:
channels: DEV
----
-file-defaults:
- dir: /custom
- max-file-size: 10MiB
- max-group-size: 100MiB
- buffered-writes: true
- filter: INFO
- format: crdb-v2
- redact: false
- redactable: true
- exit-on-error: true
- auditable: false
-fluent-defaults:
- filter: INFO
- format: json-fluent-compact
- redact: false
- redactable: true
- exit-on-error: false
- auditable: false
sinks:
file-groups:
custom:
@@ -150,30 +96,12 @@ capture-stray-errors:
# Check that default severity propagates.
yaml
file-defaults:
- filter: WARNING
+ filter: WARNING
sinks:
- file-groups:
- custom:
- channels: DEV
+ file-groups:
+ custom:
+ channels: DEV
----
-file-defaults:
- dir: /default-dir
- max-file-size: 10MiB
- max-group-size: 100MiB
- buffered-writes: true
- filter: WARNING
- format: crdb-v2
- redact: false
- redactable: true
- exit-on-error: true
- auditable: false
-fluent-defaults:
- filter: INFO
- format: json-fluent-compact
- redact: false
- redactable: true
- exit-on-error: false
- auditable: false
sinks:
file-groups:
custom:
@@ -207,24 +135,6 @@ sinks:
address: "127.0.0.1:5170"
channels: DEV
----
-file-defaults:
- dir: /default-dir
- max-file-size: 10MiB
- max-group-size: 100MiB
- buffered-writes: true
- filter: INFO
- format: crdb-v2
- redact: false
- redactable: true
- exit-on-error: true
- auditable: false
-fluent-defaults:
- filter: INFO
- format: json-fluent-compact
- redact: false
- redactable: true
- exit-on-error: false
- auditable: false
sinks:
file-groups:
default:
@@ -267,24 +177,6 @@ sinks:
custom:
channels: all
----
-file-defaults:
- dir: /default-dir
- max-file-size: 10MiB
- max-group-size: 100MiB
- buffered-writes: true
- filter: INFO
- format: crdb-v2
- redact: false
- redactable: true
- exit-on-error: true
- auditable: false
-fluent-defaults:
- filter: INFO
- format: json-fluent-compact
- redact: false
- redactable: true
- exit-on-error: false
- auditable: false
sinks:
file-groups:
custom:
@@ -318,24 +210,6 @@ sinks:
channels: DEV
auditable: true
----
-file-defaults:
- dir: /default-dir
- max-file-size: 10MiB
- max-group-size: 100MiB
- buffered-writes: true
- filter: INFO
- format: crdb-v2
- redact: false
- redactable: true
- exit-on-error: true
- auditable: false
-fluent-defaults:
- filter: INFO
- format: json-fluent-compact
- redact: false
- redactable: true
- exit-on-error: false
- auditable: false
sinks:
file-groups:
custom:
@@ -370,24 +244,6 @@ sinks:
address: localhost:5170
auditable: true
----
-file-defaults:
- dir: /default-dir
- max-file-size: 10MiB
- max-group-size: 100MiB
- buffered-writes: true
- filter: INFO
- format: crdb-v2
- redact: false
- redactable: true
- exit-on-error: true
- auditable: false
-fluent-defaults:
- filter: INFO
- format: json-fluent-compact
- redact: false
- redactable: true
- exit-on-error: false
- auditable: false
sinks:
file-groups:
default:
@@ -431,24 +287,6 @@ sinks:
exit-on-error: false
auditable: true
----
-file-defaults:
- dir: /default-dir
- max-file-size: 10MiB
- max-group-size: 100MiB
- buffered-writes: true
- filter: INFO
- format: crdb-v2
- redact: false
- redactable: true
- exit-on-error: true
- auditable: false
-fluent-defaults:
- filter: INFO
- format: json-fluent-compact
- redact: false
- redactable: true
- exit-on-error: false
- auditable: false
sinks:
file-groups:
default:
@@ -478,24 +316,6 @@ capture-stray-errors:
yaml
file-defaults: {filter: NONE}
----
-file-defaults:
- dir: /default-dir
- max-file-size: 10MiB
- max-group-size: 100MiB
- buffered-writes: true
- filter: NONE
- format: crdb-v2
- redact: false
- redactable: true
- exit-on-error: true
- auditable: false
-fluent-defaults:
- filter: INFO
- format: json-fluent-compact
- redact: false
- redactable: true
- exit-on-error: false
- auditable: false
sinks:
stderr:
channels: all
diff --git a/pkg/util/log/logconfig/validate.go b/pkg/util/log/logconfig/validate.go
index 1cbc94643859..9721ddd5ebce 100644
--- a/pkg/util/log/logconfig/validate.go
+++ b/pkg/util/log/logconfig/validate.go
@@ -13,10 +13,11 @@ package logconfig
import (
"bytes"
"fmt"
+ "net/http"
"path/filepath"
"reflect"
- "sort"
"strings"
+ "time"
"github.com/cockroachdb/cockroach/pkg/util/log/logpb"
"github.com/cockroachdb/errors"
@@ -56,12 +57,23 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) {
Format: func() *string { s := DefaultFluentFormat; return &s }(),
},
}
+ baseHTTPDefaults := HTTPDefaults{
+ CommonSinkConfig: CommonSinkConfig{
+ Format: func() *string { s := DefaultHTTPFormat; return &s }(),
+ },
+ UnsafeTLS: &bf,
+ DisableKeepAlives: &bf,
+ Method: func() *HTTPSinkMethod { m := HTTPSinkMethod(http.MethodPost); return &m }(),
+ Timeout: func() *time.Duration { d := time.Duration(0); return &d }(),
+ }
- progagateCommonDefaults(&baseFileDefaults.CommonSinkConfig, baseCommonSinkConfig)
- progagateCommonDefaults(&baseFluentDefaults.CommonSinkConfig, baseCommonSinkConfig)
+ propagateCommonDefaults(&baseFileDefaults.CommonSinkConfig, baseCommonSinkConfig)
+ propagateCommonDefaults(&baseFluentDefaults.CommonSinkConfig, baseCommonSinkConfig)
+ propagateCommonDefaults(&baseHTTPDefaults.CommonSinkConfig, baseCommonSinkConfig)
- progagateFileDefaults(&c.FileDefaults, baseFileDefaults)
- progagateFluentDefaults(&c.FluentDefaults, baseFluentDefaults)
+ propagateFileDefaults(&c.FileDefaults, baseFileDefaults)
+ propagateFluentDefaults(&c.FluentDefaults, baseFluentDefaults)
+ propagateHTTPDefaults(&c.HTTPDefaults, baseHTTPDefaults)
// Normalize the directory.
if err := normalizeDir(&c.FileDefaults.Dir); err != nil {
@@ -92,11 +104,22 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) {
}
}
+ for sinkName, fc := range c.Sinks.HTTPServers {
+ if fc == nil {
+ fc = &HTTPSinkConfig{}
+ c.Sinks.HTTPServers[sinkName] = fc
+ }
+ fc.sinkName = sinkName
+ if err := c.validateHTTPSinkConfig(fc); err != nil {
+ fmt.Fprintf(&errBuf, "http server %q: %v\n", sinkName, err)
+ }
+ }
+
// Defaults for stderr.
if c.Sinks.Stderr.Filter == logpb.Severity_UNKNOWN {
c.Sinks.Stderr.Filter = logpb.Severity_NONE
}
- progagateCommonDefaults(&c.Sinks.Stderr.CommonSinkConfig, c.FileDefaults.CommonSinkConfig)
+ propagateCommonDefaults(&c.Sinks.Stderr.CommonSinkConfig, c.FileDefaults.CommonSinkConfig)
if c.Sinks.Stderr.Auditable != nil && *c.Sinks.Stderr.Auditable {
if *c.Sinks.Stderr.Format == "crdb-v1-tty" {
f := "crdb-v1-tty-count"
@@ -108,10 +131,9 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) {
c.Sinks.Stderr.Channels.Sort()
- // fileSinks maps channels to files.
fileSinks := make(map[logpb.Channel]*FileSinkConfig)
- // fluentSinks maps channels to fluent servers.
fluentSinks := make(map[logpb.Channel]*FluentSinkConfig)
+ httpSinks := make(map[logpb.Channel]*HTTPSinkConfig)
// Check that no channel is listed by more than one file sink,
// and every file has at least one channel.
@@ -136,21 +158,36 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) {
// Check that no channel is listed by more than one fluent sink, and
// every sink has at least one channel.
- for _, fc := range c.Sinks.FluentServers {
+ for serverName, fc := range c.Sinks.FluentServers {
if len(fc.Channels.Channels) == 0 {
- fmt.Fprintf(&errBuf, "fluent server %q: no channel selected\n", fc.serverName)
+ fmt.Fprintf(&errBuf, "fluent server %q: no channel selected\n", serverName)
}
fc.Channels.Sort()
for _, ch := range fc.Channels.Channels {
if prev := fluentSinks[ch]; prev != nil {
fmt.Fprintf(&errBuf, "fluent server %q: channel %s already captured by server %q\n",
- fc.serverName, ch, prev.serverName)
+ serverName, ch, prev.serverName)
} else {
fluentSinks[ch] = fc
}
}
}
+ for sinkName, fc := range c.Sinks.HTTPServers {
+ if len(fc.Channels.Channels) == 0 {
+ fmt.Fprintf(&errBuf, "http server %q: no channel selected\n", sinkName)
+ }
+ fc.Channels.Sort()
+ for _, ch := range fc.Channels.Channels {
+ if prev := httpSinks[ch]; prev != nil {
+ fmt.Fprintf(&errBuf, "http server %q: channel %s already captured by server %q\n",
+ sinkName, ch, prev.sinkName)
+ } else {
+ httpSinks[ch] = fc
+ }
+ }
+ }
+
// If capture-stray-errors was enabled, then perform some additional
// validation on it.
if c.CaptureFd2.Enable {
@@ -188,7 +225,7 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) {
Channels: ChannelList{Channels: []logpb.Channel{devch}},
}
fc.prefix = "default"
- progagateFileDefaults(&fc.FileDefaults, c.FileDefaults)
+ propagateFileDefaults(&fc.FileDefaults, c.FileDefaults)
if err := c.validateFileSinkConfig(fc, defaultLogDir); err != nil {
fmt.Fprintln(&errBuf, err)
}
@@ -208,44 +245,27 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) {
}
devFile.Channels.Sort()
- // fileGroupNames collects the names of file groups. We need this to
- // store this sorted in c.Sinks.sortedFileGroupNames later.
- fileGroupNames := make([]string, 0, len(c.Sinks.FileGroups))
// Elide all the file sinks without a directory or with severity set
- // to NONE. Also collect the remaining names for sorting below.
+ // to NONE.
for prefix, fc := range c.Sinks.FileGroups {
if fc.Dir == nil || fc.Filter == logpb.Severity_NONE {
delete(c.Sinks.FileGroups, prefix)
- } else {
- fileGroupNames = append(fileGroupNames, prefix)
}
}
- // serverNames collects the names of the servers. We need this to
- // store this sorted in c.Sinks.sortedServerNames later.
- serverNames := make([]string, 0, len(c.Sinks.FluentServers))
// Elide all the file sinks without a directory or with severity set
- // to NONE. Also collect the remaining names for sorting below.
+ // to NONE.
for serverName, fc := range c.Sinks.FluentServers {
if fc.Filter == logpb.Severity_NONE {
delete(c.Sinks.FluentServers, serverName)
- } else {
- serverNames = append(serverNames, serverName)
}
}
- // Remember the sorted names, so we get deterministic output in
- // export.
- sort.Strings(fileGroupNames)
- c.Sinks.sortedFileGroupNames = fileGroupNames
- sort.Strings(serverNames)
- c.Sinks.sortedServerNames = serverNames
-
return nil
}
func (c *Config) validateFileSinkConfig(fc *FileSinkConfig, defaultLogDir *string) error {
- progagateFileDefaults(&fc.FileDefaults, c.FileDefaults)
+ propagateFileDefaults(&fc.FileDefaults, c.FileDefaults)
if fc.Dir != c.FileDefaults.Dir {
// A directory was specified explicitly. Normalize it.
if err := normalizeDir(&fc.Dir); err != nil {
@@ -275,7 +295,7 @@ func (c *Config) validateFileSinkConfig(fc *FileSinkConfig, defaultLogDir *strin
}
func (c *Config) validateFluentSinkConfig(fc *FluentSinkConfig) error {
- progagateFluentDefaults(&fc.FluentDefaults, c.FluentDefaults)
+ propagateFluentDefaults(&fc.FluentDefaults, c.FluentDefaults)
fc.Net = strings.ToLower(strings.TrimSpace(fc.Net))
switch fc.Net {
case "tcp", "tcp4", "tcp6":
@@ -301,6 +321,14 @@ func (c *Config) validateFluentSinkConfig(fc *FluentSinkConfig) error {
return nil
}
+func (c *Config) validateHTTPSinkConfig(hsc *HTTPSinkConfig) error {
+ propagateHTTPDefaults(&hsc.HTTPDefaults, c.HTTPDefaults)
+ if hsc.Address == nil || len(*hsc.Address) == 0 {
+ return errors.New("address cannot be empty")
+ }
+ return nil
+}
+
func normalizeDir(dir **string) error {
if *dir == nil {
return nil
@@ -319,25 +347,29 @@ func normalizeDir(dir **string) error {
return nil
}
-func progagateCommonDefaults(target *CommonSinkConfig, source CommonSinkConfig) {
- progagateDefaults(target, source)
+func propagateCommonDefaults(target *CommonSinkConfig, source CommonSinkConfig) {
+ propagateDefaults(target, source)
+}
+
+func propagateFileDefaults(target *FileDefaults, source FileDefaults) {
+ propagateDefaults(target, source)
}
-func progagateFileDefaults(target *FileDefaults, source FileDefaults) {
- progagateDefaults(target, source)
+func propagateFluentDefaults(target *FluentDefaults, source FluentDefaults) {
+ propagateDefaults(target, source)
}
-func progagateFluentDefaults(target *FluentDefaults, source FluentDefaults) {
- progagateDefaults(target, source)
+func propagateHTTPDefaults(target *HTTPDefaults, source HTTPDefaults) {
+ propagateDefaults(target, source)
}
-// progagateDefaults takes (target *T, source T) where T is a struct
+// propagateDefaults takes (target *T, source T) where T is a struct
// and sets zero-valued exported fields in target to the values
// from source (recursively for struct-valued fields).
// Wrap for static type-checking, as unexpected types will panic.
//
// (Consider making this a common utility if it gets some maturity here.)
-func progagateDefaults(target, source interface{}) {
+func propagateDefaults(target, source interface{}) {
s := reflect.ValueOf(source)
t := reflect.Indirect(reflect.ValueOf(target)) // *target
@@ -345,7 +377,7 @@ func progagateDefaults(target, source interface{}) {
tf := t.Field(i)
sf := s.Field(i)
if tf.Kind() == reflect.Struct {
- progagateDefaults(tf.Addr().Interface(), sf.Interface())
+ propagateDefaults(tf.Addr().Interface(), sf.Interface())
} else if tf.CanSet() && tf.IsZero() {
tf.Set(s.Field(i))
}
diff --git a/pkg/util/log/logconfig/validate_test.go b/pkg/util/log/logconfig/validate_test.go
index 0ec5f17cad89..5d7c3c518ae2 100644
--- a/pkg/util/log/logconfig/validate_test.go
+++ b/pkg/util/log/logconfig/validate_test.go
@@ -31,7 +31,7 @@ func TestValidate(t *testing.T) {
if err != nil {
t.Fatal(err)
}
- fmt.Fprintf(&buf, "%s", string(b))
+ buf.Write(b)
t.Logf("%s", buf.String())
buf.Reset()
@@ -39,11 +39,16 @@ func TestValidate(t *testing.T) {
if err := c.Validate(&defaultDir); err != nil {
fmt.Fprintf(&buf, "ERROR: %v\n", err)
} else {
+ // clear the default fields to reduce test over-specification
+ c.FileDefaults = FileDefaults{}
+ c.FluentDefaults = FluentDefaults{}
+ c.HTTPDefaults = HTTPDefaults{}
+
b, err := yaml.Marshal(&c)
if err != nil {
t.Fatal(err)
}
- fmt.Fprintf(&buf, "%s", string(b))
+ buf.Write(b)
}
return buf.String()
})
diff --git a/pkg/util/log/sinks.go b/pkg/util/log/sinks.go
index 7a2e5bdd486c..20f073530388 100644
--- a/pkg/util/log/sinks.go
+++ b/pkg/util/log/sinks.go
@@ -50,3 +50,4 @@ type logSink interface {
var _ logSink = (*stderrSink)(nil)
var _ logSink = (*fileSink)(nil)
var _ logSink = (*fluentSink)(nil)
+var _ logSink = (*httpSink)(nil)