From e72da90409b0a915d51d1099255bf3f1338c1130 Mon Sep 17 00:00:00 2001 From: Jay Rauchenstein Date: Wed, 23 Jun 2021 15:37:33 -0700 Subject: [PATCH] util/log: add http log sink Release note (cli change): Added a new HTTP sink to the logging system. This can be configured similarly to other log sinks with the new 'http-servers' and 'http-defaults' sections of the logging config passed via the "--log" or "--log-config-file" command line flags. --- docs/generated/logsinks.md | 69 +++++++- pkg/cli/log_flags_test.go | 14 +- pkg/cli/testdata/logflags | 29 +++- pkg/util/log/BUILD.bazel | 3 + pkg/util/log/flags.go | 68 +++++--- pkg/util/log/http_sink.go | 145 ++++++++++++++++ pkg/util/log/http_sink_test.go | 207 +++++++++++++++++++++++ pkg/util/log/logconfig/config.go | 159 ++++++++++++++++- pkg/util/log/logconfig/export.go | 94 +++++++--- pkg/util/log/logconfig/gen.go | 27 ++- pkg/util/log/logconfig/testdata/validate | 188 +------------------- pkg/util/log/logconfig/validate.go | 116 ++++++++----- pkg/util/log/logconfig/validate_test.go | 9 +- pkg/util/log/sinks.go | 1 + 14 files changed, 834 insertions(+), 295 deletions(-) create mode 100644 pkg/util/log/http_sink.go create mode 100644 pkg/util/log/http_sink_test.go diff --git a/docs/generated/logsinks.md b/docs/generated/logsinks.md index 5b69ded70cc9..5ff83bb851bc 100644 --- a/docs/generated/logsinks.md +++ b/docs/generated/logsinks.md @@ -6,6 +6,8 @@ The supported log output sink types are documented below. - [Output to Fluentd-compatible log collectors](#output-to-fluentd-compatible-log-collectors) +- [Output to HTTP servers.](#output-to-http-servers.) + - [Standard error stream](#standard-error-stream) @@ -91,7 +93,7 @@ Configuration options shared across all sink types: ## Sink type: Output to Fluentd-compatible log collectors -This sink type causes logging data to be sent over the network, to +This sink type causes logging data to be sent over the network to a log collector that can ingest log data in a [Fluentd](https://www.fluentd.org)-compatible protocol. @@ -168,6 +170,71 @@ Configuration options shared across all sink types: + + +## Sink type: Output to HTTP servers. + + +This sink type causes logging data to be sent over the network +as requests to an HTTP server. + +The configuration key under the `sinks` key in the YAML +configuration is `http-servers`. Example configuration: + + sinks: + http-servers: + health: + channels: HEALTH + address: http://127.0.0.1 + +Every new server sink configured automatically inherits the configuration set in the `http-defaults` section. + +For example: + + http-defaults: + redactable: false # default: disable redaction markers + sinks: + http-servers: + health: + channels: HEALTH + # This sink has redactable set to false, + # as the setting is inherited from fluent-defaults + # unless overridden here. + +The default output format for HTTP sinks is +`json-compact`. [Other supported formats.](log-formats.html) + +{{site.data.alerts.callout_info}} +Run `cockroach debug check-log-config` to verify the effect of defaults inheritance. +{{site.data.alerts.end}} + + + +Type-specific configuration options: + +| Field | Description | +|--|--| +| `channels` | the list of logging channels that use this sink. See the [channel selection configuration](#channel-format) section for details. | +| `address` | the network address of the http server. The host/address and port parts are separated with a colon. IPv6 numeric addresses should be included within square brackets, e.g.: [::1]:1234. Inherited from `http-defaults.address` if not specified. | +| `method` | the HTTP method to be used. POST and GET are supported; defaults to POST. Inherited from `http-defaults.method` if not specified. | +| `unsafe-tls` | enables certificate authentication to be bypassed. Defaults to false. Inherited from `http-defaults.unsafe-tls` if not specified. | +| `timeout` | the HTTP timeout. Defaults to 0 for no timeout. Inherited from `http-defaults.timeout` if not specified. | +| `disable-keep-alives` | causes the logging sink to re-establish a new connection for every outgoing log message. This option is intended for testing only and can cause excessive network overhead in production systems. Inherited from `http-defaults.disable-keep-alives` if not specified. | + + +Configuration options shared across all sink types: + +| Field | Description | +|--|--| +| `filter` | the minimum severity for log events to be emitted to this sink. This can be set to NONE to disable the sink. | +| `format` | the entry format to use. | +| `redact` | whether to strip sensitive information before log events are emitted to this sink. | +| `redactable` | whether to keep redaction markers in the sink's output. The presence of redaction markers makes it possible to strip sensitive data reliably. | +| `exit-on-error` | whether the logging system should terminate the process if an error is encountered while writing to this sink. | +| `auditable` | translated to tweaks to the other settings for this sink during validation. For example, it enables `exit-on-error` and changes the format of files from `crdb-v1` to `crdb-v1-count`. | + + + ## Sink type: Standard error stream diff --git a/pkg/cli/log_flags_test.go b/pkg/cli/log_flags_test.go index 6174993d8cc2..d4a084adf619 100644 --- a/pkg/cli/log_flags_test.go +++ b/pkg/cli/log_flags_test.go @@ -31,7 +31,7 @@ func TestSetupLogging(t *testing.T) { defer leaktest.AfterTest(t)() reWhitespace := regexp.MustCompile(`(?ms:((\s|\n)+))`) - reWhitespace2 := regexp.MustCompile(`{\s+`) + reBracketWhitespace := regexp.MustCompile(`(?P[{[])\s+`) reSimplify := regexp.MustCompile(`(?ms:^\s*(auditable: false|redact: false|exit-on-error: true|max-group-size: 100MiB)\n)`) @@ -41,6 +41,15 @@ func TestSetupLogging(t *testing.T) { `redactable: true, ` + `exit-on-error: false` + `}` + const defaultHTTPConfig = `http-defaults: {` + + `method: POST, ` + + `unsafe-tls: false, ` + + `timeout: 0s, ` + + `disable-keep-alives: false, ` + + `filter: INFO, ` + + `format: json-compact, ` + + `redactable: true, ` + + `exit-on-error: false}` stdFileDefaultsRe := regexp.MustCompile( `file-defaults: \{dir: (?P[^,]+), max-file-size: 10MiB, buffered-writes: true, filter: INFO, format: crdb-v2, redactable: true\}`) fileDefaultsNoMaxSizeRe := regexp.MustCompile( @@ -103,10 +112,11 @@ func TestSetupLogging(t *testing.T) { t.Fatal(err) } actual = reWhitespace.ReplaceAllString(h.String(), " ") - actual = reWhitespace2.ReplaceAllString(actual, "{") + actual = reBracketWhitespace.ReplaceAllString(actual, "$bracket") // Shorten the configuration for legibility during reviews of test changes. actual = strings.ReplaceAll(actual, defaultFluentConfig, "") + actual = strings.ReplaceAll(actual, defaultHTTPConfig, "") actual = stdFileDefaultsRe.ReplaceAllString(actual, "") actual = fileDefaultsNoMaxSizeRe.ReplaceAllString(actual, "") actual = strings.ReplaceAll(actual, fileDefaultsNoDir, "") diff --git a/pkg/cli/testdata/logflags b/pkg/cli/testdata/logflags index 0072d721bce3..25f8b285e0b4 100644 --- a/pkg/cli/testdata/logflags +++ b/pkg/cli/testdata/logflags @@ -14,6 +14,7 @@ start ---- config: {)>, , +, sinks: {file-groups: {default: )>, , +, sinks: {file-groups: {default: , , +, sinks: {}} run @@ -83,6 +86,7 @@ init ---- config: {, , +, sinks: {}} @@ -95,6 +99,7 @@ bank ---- config: {, , +, sinks: {}} @@ -105,6 +110,7 @@ demo ---- config: {, , +, sinks: {}} @@ -120,6 +126,7 @@ start ---- config: {, , +, sinks: {}} @@ -132,6 +139,7 @@ start ---- config: {, , +, sinks: {file-groups: {default: , , +, sinks: {file-groups: {default: , sql-exec: , sql-slow: , sql-slow-internal-only: , -telemetry: {channels: [ TELEMETRY], +telemetry: {channels: [TELEMETRY], dir: /mypath, max-file-size: 100KiB, max-group-size: 1.0MiB, @@ -195,6 +204,7 @@ start ---- config: {, , +, sinks: {file-groups: {default: , , +, sinks: {file-groups: {default: , sql-exec: , sql-slow: , sql-slow-internal-only: , -telemetry: {channels: [ TELEMETRY], +telemetry: {channels: [TELEMETRY], dir: /mypath, max-file-size: 100KiB, max-group-size: 1.0MiB, @@ -262,6 +273,7 @@ start ---- config: {)>, , +, sinks: {file-groups: {default: )>, , +, sinks: {file-groups: {default: , , +, sinks: {}} @@ -355,6 +369,7 @@ start ---- config: {, , +, sinks: {file-groups: {default: , sql-exec: , sql-slow: , sql-slow-internal-only: , -telemetry: {channels: [ TELEMETRY], +telemetry: {channels: [TELEMETRY], dir: /mypath, max-file-size: 100KiB, max-group-size: 1.0MiB, @@ -387,6 +402,7 @@ start ---- config: {, , +, sinks: {file-groups: {default: , sql-exec: , sql-slow: , sql-slow-internal-only: , -telemetry: {channels: [ TELEMETRY], +telemetry: {channels: [TELEMETRY], dir: /pathA, max-file-size: 100KiB, max-group-size: 1.0MiB, @@ -419,6 +435,7 @@ init ---- config: {, , +, sinks: {file-groups: {default: {channels: all, dir: /mypath, buffered-writes: true, @@ -435,6 +452,7 @@ start ---- config: {)>, , +, sinks: {file-groups: {default: )>, , +, sinks: {file-groups: {default: , , +, sinks: {}} # Default when no severity is specified is WARNING. @@ -504,6 +524,7 @@ init ---- config: {, , +, sinks: {}} diff --git a/pkg/util/log/BUILD.bazel b/pkg/util/log/BUILD.bazel index 35852787f123..35c3e7377666 100644 --- a/pkg/util/log/BUILD.bazel +++ b/pkg/util/log/BUILD.bazel @@ -22,6 +22,7 @@ go_library( "format_json.go", "formats.go", "get_stacks.go", + "http_sink.go", "intercept.go", "log.go", "log_bridge.go", @@ -133,6 +134,7 @@ go_test( "format_crdb_v2_test.go", "format_json_test.go", "helpers_test.go", + "http_sink_test.go", "intercept_test.go", "main_test.go", "redact_test.go", @@ -153,6 +155,7 @@ go_test( "//pkg/util/log/logconfig", "//pkg/util/log/logpb", "//pkg/util/log/severity", + "//pkg/util/netutil/addr", "//pkg/util/randutil", "//pkg/util/syncutil", "//pkg/util/timeutil", diff --git a/pkg/util/log/flags.go b/pkg/util/log/flags.go index 6899840a9cfb..7bba41bc5599 100644 --- a/pkg/util/log/flags.go +++ b/pkg/util/log/flags.go @@ -120,10 +120,16 @@ func ApplyConfig(config logconfig.Config) (cleanupFn func(), err error) { } } + // Call the final value of cleanupFn immediately if returning with error. + defer func() { + if err != nil { + cleanupFn() + } + }() + // If capture of internal fd2 writes is enabled, set it up here. if config.CaptureFd2.Enable { if logging.testingFd2CaptureLogger != nil { - cleanupFn() return nil, errors.New("fd2 capture already set up. Maybe use TestLogScope?") } // We use a secondary logger, even though no logging *event* will ever @@ -160,7 +166,6 @@ func ApplyConfig(config logconfig.Config) (cleanupFn func(), err error) { } fileSinkInfo, fileSink, err := newFileSinkInfo("stderr", fakeConfig) if err != nil { - cleanupFn() return nil, err } sinkInfos = append(sinkInfos, fileSinkInfo) @@ -190,7 +195,6 @@ func ApplyConfig(config logconfig.Config) (cleanupFn func(), err error) { if err := fileSink.takeOverInternalStderr(secLogger); err != nil { // Oof, it turns out we can't use this logger after all. Give up // on everything we did. - cleanupFn() return nil, err } @@ -218,7 +222,6 @@ func ApplyConfig(config logconfig.Config) (cleanupFn func(), err error) { // Apply the stderr sink configuration. logging.stderrSink.noColor.Set(config.Sinks.Stderr.NoColor) if err := logging.stderrSinkInfoTemplate.applyConfig(config.Sinks.Stderr.CommonSinkConfig); err != nil { - cleanupFn() return nil, err } @@ -244,6 +247,17 @@ func ApplyConfig(config logconfig.Config) (cleanupFn func(), err error) { l.sinkInfos = append(l.sinkInfos, &stderrSinkInfo) } + attachSinkInfo := func(si *sinkInfo, chs []logpb.Channel) { + sinkInfos = append(sinkInfos, si) + allSinkInfos.put(si) + + // Connect the channels for this sink. + for _, ch := range chs { + l := chans[ch] + l.sinkInfos = append(l.sinkInfos, si) + } + } + // Create the file sinks. for prefix, fc := range config.Sinks.FileGroups { if fc.Filter == severity.NONE || fc.Dir == nil { @@ -254,17 +268,9 @@ func ApplyConfig(config logconfig.Config) (cleanupFn func(), err error) { } fileSinkInfo, _, err := newFileSinkInfo(prefix, *fc) if err != nil { - cleanupFn() return nil, err } - sinkInfos = append(sinkInfos, fileSinkInfo) - allSinkInfos.put(fileSinkInfo) - - // Connect the channels for this sink. - for _, ch := range fc.Channels.Channels { - l := chans[ch] - l.sinkInfos = append(l.sinkInfos, fileSinkInfo) - } + attachSinkInfo(fileSinkInfo, fc.Channels.Channels) } // Create the fluent sinks. @@ -274,17 +280,21 @@ func ApplyConfig(config logconfig.Config) (cleanupFn func(), err error) { } fluentSinkInfo, err := newFluentSinkInfo(*fc) if err != nil { - cleanupFn() return nil, err } - sinkInfos = append(sinkInfos, fluentSinkInfo) - allSinkInfos.put(fluentSinkInfo) + attachSinkInfo(fluentSinkInfo, fc.Channels.Channels) + } - // Connect the channels for this sink. - for _, ch := range fc.Channels.Channels { - l := chans[ch] - l.sinkInfos = append(l.sinkInfos, fluentSinkInfo) + // Create the HTTP sinks. + for _, fc := range config.Sinks.HTTPServers { + if fc.Filter == severity.NONE { + continue } + httpSinkInfo, err := newHTTPSinkInfo(*fc) + if err != nil { + return nil, err + } + attachSinkInfo(httpSinkInfo, fc.Channels.Channels) } // Prepend the interceptor sink to all channels. @@ -333,6 +343,24 @@ func newFluentSinkInfo(c logconfig.FluentSinkConfig) (*sinkInfo, error) { return info, nil } +func newHTTPSinkInfo(c logconfig.HTTPSinkConfig) (*sinkInfo, error) { + info := &sinkInfo{} + if err := info.applyConfig(c.CommonSinkConfig); err != nil { + return nil, err + } + httpSink, err := newHTTPSink(*c.Address, httpSinkOptions{ + method: string(*c.Method), + unsafeTLS: *c.UnsafeTLS, + timeout: *c.Timeout, + disableKeepAlives: *c.DisableKeepAlives, + }) + if err != nil { + return nil, err + } + info.sink = httpSink + return info, nil +} + // applyConfig applies a common sink configuration to a sinkInfo. func (l *sinkInfo) applyConfig(c logconfig.CommonSinkConfig) error { l.threshold = c.Filter diff --git a/pkg/util/log/http_sink.go b/pkg/util/log/http_sink.go new file mode 100644 index 000000000000..138cefac66fe --- /dev/null +++ b/pkg/util/log/http_sink.go @@ -0,0 +1,145 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package log + +import ( + "bytes" + "crypto/tls" + "fmt" + "net/http" + "net/url" + "time" + + "github.com/cockroachdb/cockroach/pkg/cli/exit" + "github.com/cockroachdb/errors" +) + +// TODO: HTTP requests should be bound to context via http.NewRequestWithContext +// Proper logging context to be decided/designed. + +type httpSinkOptions struct { + unsafeTLS bool + timeout time.Duration + method string + disableKeepAlives bool +} + +func newHTTPSink(url string, opt httpSinkOptions) (*httpSink, error) { + transport, ok := http.DefaultTransport.(*http.Transport) + if !ok { + return nil, errors.AssertionFailedf("http.DefaultTransport is not a http.Transport: %T", http.DefaultTransport) + } + transport = transport.Clone() + transport.DisableKeepAlives = opt.disableKeepAlives + hs := &httpSink{ + client: http.Client{ + Transport: transport, + Timeout: opt.timeout, + }, + address: url, + doRequest: doPost, + } + + if opt.unsafeTLS { + transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} + } + + if opt.method == http.MethodGet { + hs.doRequest = doGet + } + + return hs, nil +} + +type httpSink struct { + client http.Client + address string + doRequest func(*httpSink, []byte) (*http.Response, error) +} + +// output emits some formatted bytes to this sink. +// the sink is invited to perform an extra flush if indicated +// by the argument. This is set to true for e.g. Fatal +// entries. +// +// The parent logger's outputMu is held during this operation: log +// sinks must not recursively call into logging when implementing +// this method. +func (hs *httpSink) output(extraSync bool, b []byte) (err error) { + resp, err := hs.doRequest(hs, b) + if err != nil { + return err + } + + if resp.StatusCode >= 400 { + return HTTPLogError{ + StatusCode: resp.StatusCode, + Address: hs.address} + } + return nil +} + +// emergencyOutput attempts to emit some formatted bytes, and +// ignores any errors. +// +// The parent logger's outputMu is held during this operation: log +// sinks must not recursively call into logging when implementing +// this method. +func (hs *httpSink) emergencyOutput(b []byte) { + _, _ = hs.doRequest(hs, b) +} + +func doPost(hs *httpSink, b []byte) (*http.Response, error) { + resp, err := hs.client.Post(hs.address, "text/plain", bytes.NewReader(b)) + if err != nil { + return nil, err + } + resp.Body.Close() // don't care about content + return resp, nil +} + +func doGet(hs *httpSink, b []byte) (*http.Response, error) { + resp, err := hs.client.Get(hs.address + "?" + url.QueryEscape(string(b))) + if err != nil { + return nil, err + } + resp.Body.Close() // don't care about content + return resp, nil +} + +// active returns true if this sink is currently active. +func (*httpSink) active() bool { + return true +} + +// attachHints attaches some hints about the location of the message +// to the stack message. +func (*httpSink) attachHints(stacks []byte) []byte { + return stacks +} + +// exitCode returns the exit code to use if the logger decides +// to terminate because of an error in output(). +func (*httpSink) exitCode() exit.Code { + return exit.LoggingNetCollectorUnavailable() +} + +// HTTPLogError represents an HTTP error status code from a logging request. +type HTTPLogError struct { + StatusCode int + Address string +} + +func (e HTTPLogError) Error() string { + return fmt.Sprintf( + "received %v response attempting to log to [%v]", + e.StatusCode, e.Address) +} diff --git a/pkg/util/log/http_sink_test.go b/pkg/util/log/http_sink_test.go new file mode 100644 index 000000000000..59e26876e083 --- /dev/null +++ b/pkg/util/log/http_sink_test.go @@ -0,0 +1,207 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package log + +import ( + "context" + "errors" + "io" + "net" + "net/http" + "strings" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log/channel" + "github.com/cockroachdb/cockroach/pkg/util/log/logconfig" + "github.com/cockroachdb/cockroach/pkg/util/netutil/addr" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/stretchr/testify/require" +) + +// testBase sets the provided HTTPDefaults, logs "hello World", captures the +// resulting request to the server, and validates the body with the provided +// requestTestFunc. +// Options also given to cause the server to hang (which naturally skips the body valiation) +// and to set a maximum duration for the log call. +func testBase( + t *testing.T, + defaults logconfig.HTTPDefaults, + fn func(body string) error, + hangServer bool, + deadline time.Duration, +) { + sc := ScopeWithoutShowLogs(t) + defer sc.Close(t) + + // cancelCh ensures that async goroutines terminate if the test + // goroutine terminates due to a Fatal call or a panic. + cancelCh := make(chan struct{}) + defer func() { close(cancelCh) }() + + // seenMessage is true after the request predicate + // has seen the expected message from the client. + var seenMessage syncutil.AtomicBool + + handler := func(rw http.ResponseWriter, r *http.Request) { + buf := make([]byte, 5000) + nbytes, err := r.Body.Read(buf) + if err != nil && err != io.EOF { + t.Error(err) + return + } + buf = buf[:nbytes] + + if hangServer { + // The test is requesting the server to simulate a timeout. Just + // do nothing until the test terminates. + <-cancelCh + } else { + // The test is expecting some message via a predicate. + if err := fn(string(buf)); err != nil { + // non-failing, in case there are extra log messages generated + t.Log(err) + } else { + seenMessage.Set(true) + } + } + } + + { + // Start the HTTP server that receives the logging events from the + // test. + + l, err := net.Listen("tcp", "127.0.0.1:") + if err != nil { + t.Fatal(err) + } + _, port, err := addr.SplitHostPort(l.Addr().String(), "port") + if err != nil { + t.Fatal(err) + } + *defaults.Address += ":" + port + s := http.Server{Handler: http.HandlerFunc(handler)} + + // serverErrCh collects errors and signals the termination of the + // server async goroutine. + serverErrCh := make(chan error, 1) + go func() { + defer func() { close(serverErrCh) }() + err := s.Serve(l) + if err != http.ErrServerClosed { + select { + case serverErrCh <- err: + case <-cancelCh: + } + } + }() + + // At the end of this function, close the server + // allowing the above goroutine to finish and close serverClosedCh + // allowing the deferred read to proceed and this function to return. + // (Basically, it's a WaitGroup of one.) + defer func() { + require.NoError(t, s.Close()) + serverErr := <-serverErrCh + require.NoError(t, serverErr) + }() + } + + // Set up a logging configuration with the server we've just set up + // as target for the OPS channel. + cfg := logconfig.DefaultConfig() + cfg.Sinks.HTTPServers = map[string]*logconfig.HTTPSinkConfig{ + "ops": { + HTTPDefaults: defaults, + Channels: logconfig.ChannelList{Channels: []Channel{channel.OPS}}}, + } + // Derive a full config using the same directory as the + // TestLogScope. + require.NoError(t, cfg.Validate(&sc.logDir)) + + // Apply the configuration. + TestingResetActive() + cleanup, err := ApplyConfig(cfg) + require.NoError(t, err) + defer cleanup() + + // Send a log event on the OPS channel. + logStart := timeutil.Now() + Ops.Infof(context.Background(), "hello world") + logDuration := timeutil.Since(logStart) + + // Note: deadline is passed by the caller and already contains slack + // to accommodate for the overhead of the logging call compared to + // the timeout in the HTTP request. + if deadline > 0 && logDuration > deadline { + t.Error("Log call exceeded timeout") + } + + if hangServer { + return + } + + // If the test was not requiring a timeout, it was requiring some + // logging message to match the predicate. If we don't see the + // predicate match, it is a test failure. + if !seenMessage.Get() { + t.Error("expected message matching predicate, found none") + } +} + +// TestMessageReceived verifies that the server receives the logged message. +func TestMessageReceived(t *testing.T) { + defer leaktest.AfterTest(t)() + + address := "http://localhost" // testBase appends the port + timeout := 5 * time.Second + tb := true + defaults := logconfig.HTTPDefaults{ + Address: &address, + Timeout: &timeout, + + // We need to disable keepalives otherwise the HTTP server in the + // test will let an async goroutine run waiting for more requests. + DisableKeepAlives: &tb, + } + + testFn := func(body string) error { + t.Log(body) + if !strings.Contains(body, `"message":"hello world"`) { + return errors.New("Log message not found in request") + } + return nil + } + + testBase(t, defaults, testFn, false /* hangServer */, time.Duration(0)) +} + +// TestHTTPSinkTimeout verifies that a log call to a hanging server doesn't last +// to much longer than the configured timeout. +func TestHTTPSinkTimeout(t *testing.T) { + defer leaktest.AfterTest(t)() + + address := "http://localhost" // testBase appends the port + timeout := time.Millisecond + tb := true + defaults := logconfig.HTTPDefaults{ + Address: &address, + Timeout: &timeout, + + // We need to disable keepalives otherwise the HTTP server in the + // test will let an async goroutine run waiting for more requests. + DisableKeepAlives: &tb, + } + + testBase(t, defaults, nil /* testFn */, true /* hangServer */, 500*time.Millisecond) +} diff --git a/pkg/util/log/logconfig/config.go b/pkg/util/log/logconfig/config.go index 51682a01c5da..8c9a99717fa2 100644 --- a/pkg/util/log/logconfig/config.go +++ b/pkg/util/log/logconfig/config.go @@ -12,14 +12,16 @@ package logconfig import ( "fmt" + "net/http" "reflect" "sort" "strings" + "time" "github.com/cockroachdb/cockroach/pkg/util/log/logpb" "github.com/cockroachdb/errors" - "github.com/dustin/go-humanize" - "gopkg.in/yaml.v2" + humanize "github.com/dustin/go-humanize" + yaml "gopkg.in/yaml.v2" ) // DefaultFileFormat is the entry format for file sinks when not @@ -34,6 +36,10 @@ const DefaultStderrFormat = `crdb-v2-tty` // when not specified in a configuration. const DefaultFluentFormat = `json-fluent-compact` +// DefaultHTTPFormat is the entry format for HTTP sinks +// when not specified in a configuration. +const DefaultHTTPFormat = `json-compact` + // DefaultConfig returns a suitable default configuration when logging // is meant to primarily go to files. func DefaultConfig() (c Config) { @@ -101,6 +107,11 @@ type Config struct { // configuration value. FluentDefaults FluentDefaults `yaml:"fluent-defaults,omitempty"` + // HTTPDefaults represents the default configuration for HTTP sinks, + // inherited when a specific HTTP sink config does not provide a + // configuration value. + HTTPDefaults HTTPDefaults `yaml:"http-defaults,omitempty"` + // Sinks represents the sink configurations. Sinks SinkConfig `yaml:",omitempty"` @@ -163,13 +174,10 @@ type SinkConfig struct { FileGroups map[string]*FileSinkConfig `yaml:"file-groups,omitempty"` // FluentServer represents the list of configured fluent sinks. FluentServers map[string]*FluentSinkConfig `yaml:"fluent-servers,omitempty"` + // HTTPServers represents the list of configured http sinks. + HTTPServers map[string]*HTTPSinkConfig `yaml:"http-servers,omitempty"` // Stderr represents the configuration for the stderr sink. Stderr StderrSinkConfig `yaml:",omitempty"` - - // sortedFileGroupNames and sortedServerNames are used internally to - // make the Export() function deterministic. - sortedFileGroupNames []string - sortedServerNames []string } // StderrSinkConfig represents the configuration for the stderr sink. @@ -233,7 +241,7 @@ type FluentDefaults struct { // User-facing documentation follows. // TITLE: Output to Fluentd-compatible log collectors // -// This sink type causes logging data to be sent over the network, to +// This sink type causes logging data to be sent over the network to // a log collector that can ingest log data in a // [Fluentd](https://www.fluentd.org)-compatible protocol. // @@ -395,6 +403,83 @@ type FileSinkConfig struct { prefix string } +// HTTPDefaults refresents the configuration defaults for HTTP sinks. +type HTTPDefaults struct { + // Address is the network address of the http server. The + // host/address and port parts are separated with a colon. IPv6 + // numeric addresses should be included within square brackets, + // e.g.: [::1]:1234. + Address *string `yaml:",omitempty"` + + // Method is the HTTP method to be used. POST and GET are + // supported; defaults to POST. + Method *HTTPSinkMethod `yaml:",omitempty"` + + // UnsafeTLS enables certificate authentication to be bypassed. + // Defaults to false. + UnsafeTLS *bool `yaml:"unsafe-tls,omitempty"` + + // Timeout is the HTTP timeout. + // Defaults to 0 for no timeout. + Timeout *time.Duration `yaml:",omitempty"` + + // DisableKeepAlives causes the logging sink to re-establish a new + // connection for every outgoing log message. This option is + // intended for testing only and can cause excessive network + // overhead in production systems. + DisableKeepAlives *bool `yaml:"disable-keep-alives,omitempty"` + + CommonSinkConfig `yaml:",inline"` +} + +// HTTPSinkConfig represents the configuration for one http sink. +// +// User-facing documentation follows. +// TITLE: Output to HTTP servers. +// +// This sink type causes logging data to be sent over the network +// as requests to an HTTP server. +// +// The configuration key under the `sinks` key in the YAML +// configuration is `http-servers`. Example configuration: +// +// sinks: +// http-servers: +// health: +// channels: HEALTH +// address: http://127.0.0.1 +// +// Every new server sink configured automatically inherits the configuration set in the `http-defaults` section. +// +// For example: +// +// http-defaults: +// redactable: false # default: disable redaction markers +// sinks: +// http-servers: +// health: +// channels: HEALTH +// # This sink has redactable set to false, +// # as the setting is inherited from fluent-defaults +// # unless overridden here. +// +// The default output format for HTTP sinks is +// `json-compact`. [Other supported formats.](log-formats.html) +// +// {{site.data.alerts.callout_info}} +// Run `cockroach debug check-log-config` to verify the effect of defaults inheritance. +// {{site.data.alerts.end}} +// +type HTTPSinkConfig struct { + // Channels is the list of logging channels that use this sink. + Channels ChannelList `yaml:",omitempty,flow"` + + HTTPDefaults `yaml:",inline"` + + // sinkName is populated during validation. + sinkName string +} + // IterateDirectories calls the provided fn on every directory linked to // by the configuration. func (c *Config) IterateDirectories(fn func(d string) error) error { @@ -684,3 +769,61 @@ func (*Holder) Type() string { return "yaml" } func (h *Holder) Set(value string) error { return yaml.UnmarshalStrict([]byte(value), &h.Config) } + +// HTTPSinkMethod is a string restricted to "POST" and "GET" +type HTTPSinkMethod string + +var _ constrainedString = (*HTTPSinkMethod)(nil) + +// Accept implements the constrainedString interface. +func (hsm *HTTPSinkMethod) Accept(s string) { + *hsm = HTTPSinkMethod(s) +} + +// Canonicalize implements the constrainedString interface. +func (HTTPSinkMethod) Canonicalize(s string) string { + return strings.ToUpper(strings.TrimSpace(s)) +} + +// AllowedSet implements the constrainedString interface. +func (HTTPSinkMethod) AllowedSet() []string { + return []string{ + http.MethodGet, + http.MethodPost, + } +} + +// MarshalYAML implements yaml.Marshaler interface. +func (hsm HTTPSinkMethod) MarshalYAML() (interface{}, error) { + return string(hsm), nil +} + +// UnmarshalYAML implements the yaml.Unmarshaler interface. +func (hsm *HTTPSinkMethod) UnmarshalYAML(fn func(interface{}) error) error { + return unmarshalYAMLConstrainedString(hsm, fn) +} + +// constrainedString is an interface to make it easy to unmarshal +// a string constrained to a small set of accepted values. +type constrainedString interface { + Accept(string) + Canonicalize(string) string + AllowedSet() []string +} + +// unmarshalYAMLConstrainedString is a utility function to unmarshal +// a type satisfying the constrainedString interface. +func unmarshalYAMLConstrainedString(cs constrainedString, fn func(interface{}) error) error { + var s string + if err := fn(&s); err != nil { + return err + } + s = cs.Canonicalize(s) + for _, candidate := range cs.AllowedSet() { + if s == candidate { + cs.Accept(s) + return nil + } + } + return errors.Newf("Unexpected value: %v", s) +} diff --git a/pkg/util/log/logconfig/export.go b/pkg/util/log/logconfig/export.go index 3ec48c720c1a..1fbb6f48ceca 100644 --- a/pkg/util/log/logconfig/export.go +++ b/pkg/util/log/logconfig/export.go @@ -29,19 +29,6 @@ func (c *Config) Export(onlyChans ChannelList) (string, string) { chanSel = onlyChans } - var buf bytes.Buffer - buf.WriteString("@startuml\nleft to right direction\n") - - // Export the channels. - buf.WriteString("component sources {\n") - for _, ch := range chanSel.Channels { - fmt.Fprintf(&buf, "() %s\n", ch) - } - buf.WriteString("cloud stray as \"stray\\nerrors\"\n}\n") - - // The process stderr stream. - buf.WriteString("queue stderr\n") - // links collects the relationships. We need to collect them and // print them at the end because plantUML does not support // interleaving box and arrow declarations. @@ -110,7 +97,16 @@ func (c *Config) Export(onlyChans ChannelList) (string, string) { // folders map each directory to a list of files within. folders := map[string][]string{} fileNum := 1 - for _, fn := range c.Sinks.sortedFileGroupNames { + + // Process the file sinks in sorted order, + // so the output order is deteministic. + var sortedNames []string + for prefix := range c.Sinks.FileGroups { + sortedNames = append(sortedNames, prefix) + } + sort.Strings(sortedNames) + + for _, fn := range sortedNames { fc := c.Sinks.FileGroups[fn] if fc.Filter == logpb.Severity_NONE { // This file is not collecting anything. Skip it. @@ -175,12 +171,19 @@ func (c *Config) Export(onlyChans ChannelList) (string, string) { // // servers maps each server to its box declaration. servers := map[string]string{} - for _, fn := range c.Sinks.sortedServerNames { + + sortedNames = nil + for serverName := range c.Sinks.FluentServers { + sortedNames = append(sortedNames, serverName) + } + sort.Strings(sortedNames) + + for _, fn := range sortedNames { fc := c.Sinks.FluentServers[fn] if fc.Filter == logpb.Severity_NONE { continue } - skey := fmt.Sprintf("s__%s", fc.serverName) + skey := fmt.Sprintf("s__%s", fn) target, thisprocs, thislinks := process(skey, fc.CommonSinkConfig) hasLink := false for _, ch := range fc.Channels.Channels { @@ -193,11 +196,43 @@ func (c *Config) Export(onlyChans ChannelList) (string, string) { if hasLink { processing = append(processing, thisprocs...) links = append(links, thislinks...) - servers[fc.serverName] = fmt.Sprintf("queue %s as \"fluent: %s:%s\"", + servers[fn] = fmt.Sprintf("queue %s as \"fluent: %s:%s\"", skey, fc.Net, fc.Address) } } + // Collect HTTP sinks + // Add the destinations into the same map, for display in the "network server" + // section of the diagram + sortedNames = nil + for sinkName := range c.Sinks.HTTPServers { + sortedNames = append(sortedNames, sinkName) + } + sort.Strings(sortedNames) + + for _, name := range sortedNames { + cfg := c.Sinks.HTTPServers[name] + if cfg.Filter == logpb.Severity_NONE { + continue + } + key := fmt.Sprintf("h__%s", name) + target, thisprocs, thislinks := process(key, cfg.CommonSinkConfig) + hasLink := false + for _, ch := range cfg.Channels.Channels { + if !chanSel.HasChannel(ch) { + continue + } + hasLink = true + links = append(links, fmt.Sprintf("%s --> %s", ch, target)) + } + if hasLink { + processing = append(processing, thisprocs...) + links = append(links, thislinks...) + servers[name] = fmt.Sprintf("queue %s as \"http: %s\"", + key, *cfg.Address) + } + } + // Export the stderr redirects. if c.Sinks.Stderr.Filter != logpb.Severity_NONE { target, thisprocs, thislinks := process("stderr", c.Sinks.Stderr.CommonSinkConfig) @@ -216,11 +251,22 @@ func (c *Config) Export(onlyChans ChannelList) (string, string) { } } + var buf bytes.Buffer + buf.WriteString("@startuml\nleft to right direction\n") + + // Export the channels. + buf.WriteString("component sources {\n") + for _, ch := range chanSel.Channels { + fmt.Fprintf(&buf, "() %s\n", ch) + } + buf.WriteString("cloud stray as \"stray\\nerrors\"\n}\n") + + // The process stderr stream. + buf.WriteString("queue stderr\n") + // Represent the processing stages, if any. - if len(processing) > 0 { - for _, p := range processing { - fmt.Fprintf(&buf, "%s\n", p) - } + for _, p := range processing { + fmt.Fprintf(&buf, "%s\n", p) } // Represent the files, if any. @@ -238,10 +284,10 @@ func (c *Config) Export(onlyChans ChannelList) (string, string) { } // Represent the network servers, if any. - if len(c.Sinks.sortedServerNames) > 0 { + if len(servers) > 0 { buf.WriteString("cloud network {\n") - for _, s := range c.Sinks.sortedServerNames { - fmt.Fprintf(&buf, " %s\n", servers[s]) + for _, s := range servers { + fmt.Fprintf(&buf, " %s\n", s) } buf.WriteString("}\n") } diff --git a/pkg/util/log/logconfig/gen.go b/pkg/util/log/logconfig/gen.go index 484ae3934d5e..71bdea968462 100644 --- a/pkg/util/log/logconfig/gen.go +++ b/pkg/util/log/logconfig/gen.go @@ -245,11 +245,11 @@ func readInput(infos map[string]*sinkInfo) error { return nil } -var configStructRe = regexp.MustCompile(`^type (?P[A-Z][a-z0-9]*)(SinkConfig|Defaults) struct`) +var configStructRe = regexp.MustCompile(`^type (?P[A-Z]\w*)(SinkConfig|Defaults) struct`) var fieldDefRe = regexp.MustCompile(`^\s*` + // Field name in Go. - `(?P[A-Z][A-Za-z_0-9]*)` + + `(?P[A-Z]\w*)` + // Go type. Empty if embedded type. `(?P(?: [^ ]+)?)` + // Start of YAML annotation. @@ -260,16 +260,27 @@ var fieldDefRe = regexp.MustCompile(`^\s*` + `[^"]*"` + "`.*") func camelToSnake(typeName string) string { + isUpper := func(c byte) bool { + return 'A' <= c && c <= 'Z' + } + toLower := func(c byte) byte { + if !isUpper(c) { + return c + } + return c - 'A' + 'a' + } + var res strings.Builder - res.WriteByte(typeName[0] + 'a' - 'A') - for i := 1; i < len(typeName); i++ { - if typeName[i] >= 'A' && typeName[i] <= 'Z' { + res.WriteByte(toLower(typeName[0])) + for i := 1; i < len(typeName)-1; i++ { + // put a word break at transitions likeTHIS and LIKEThis + if isUpper(typeName[i]) && (!isUpper(typeName[i-1]) || !isUpper(typeName[i+1])) { res.WriteByte('-') - res.WriteByte(typeName[i] + 'a' - 'A') - } else { - res.WriteByte(typeName[i]) } + res.WriteByte(toLower(typeName[i])) } + // assume the last character isn't a one-letter word + res.WriteByte(toLower(typeName[len(typeName)-1])) return res.String() } diff --git a/pkg/util/log/logconfig/testdata/validate b/pkg/util/log/logconfig/testdata/validate index 5bba86a15907..53c8efc3de31 100644 --- a/pkg/util/log/logconfig/testdata/validate +++ b/pkg/util/log/logconfig/testdata/validate @@ -1,24 +1,6 @@ # Empty configuration: use and propagate defaults. yaml ---- -file-defaults: - dir: /default-dir - max-file-size: 10MiB - max-group-size: 100MiB - buffered-writes: true - filter: INFO - format: crdb-v2 - redact: false - redactable: true - exit-on-error: true - auditable: false -fluent-defaults: - filter: INFO - format: json-fluent-compact - redact: false - redactable: true - exit-on-error: false - auditable: false sinks: file-groups: default: @@ -51,24 +33,6 @@ sinks: custom: channels: DEV ---- -file-defaults: - dir: /default-dir - max-file-size: 10MiB - max-group-size: 100MiB - buffered-writes: true - filter: INFO - format: crdb-v2 - redact: false - redactable: true - exit-on-error: true - auditable: false -fluent-defaults: - filter: INFO - format: json-fluent-compact - redact: false - redactable: true - exit-on-error: false - auditable: false sinks: file-groups: custom: @@ -103,24 +67,6 @@ sinks: custom: channels: DEV ---- -file-defaults: - dir: /custom - max-file-size: 10MiB - max-group-size: 100MiB - buffered-writes: true - filter: INFO - format: crdb-v2 - redact: false - redactable: true - exit-on-error: true - auditable: false -fluent-defaults: - filter: INFO - format: json-fluent-compact - redact: false - redactable: true - exit-on-error: false - auditable: false sinks: file-groups: custom: @@ -150,30 +96,12 @@ capture-stray-errors: # Check that default severity propagates. yaml file-defaults: - filter: WARNING + filter: WARNING sinks: - file-groups: - custom: - channels: DEV + file-groups: + custom: + channels: DEV ---- -file-defaults: - dir: /default-dir - max-file-size: 10MiB - max-group-size: 100MiB - buffered-writes: true - filter: WARNING - format: crdb-v2 - redact: false - redactable: true - exit-on-error: true - auditable: false -fluent-defaults: - filter: INFO - format: json-fluent-compact - redact: false - redactable: true - exit-on-error: false - auditable: false sinks: file-groups: custom: @@ -207,24 +135,6 @@ sinks: address: "127.0.0.1:5170" channels: DEV ---- -file-defaults: - dir: /default-dir - max-file-size: 10MiB - max-group-size: 100MiB - buffered-writes: true - filter: INFO - format: crdb-v2 - redact: false - redactable: true - exit-on-error: true - auditable: false -fluent-defaults: - filter: INFO - format: json-fluent-compact - redact: false - redactable: true - exit-on-error: false - auditable: false sinks: file-groups: default: @@ -267,24 +177,6 @@ sinks: custom: channels: all ---- -file-defaults: - dir: /default-dir - max-file-size: 10MiB - max-group-size: 100MiB - buffered-writes: true - filter: INFO - format: crdb-v2 - redact: false - redactable: true - exit-on-error: true - auditable: false -fluent-defaults: - filter: INFO - format: json-fluent-compact - redact: false - redactable: true - exit-on-error: false - auditable: false sinks: file-groups: custom: @@ -318,24 +210,6 @@ sinks: channels: DEV auditable: true ---- -file-defaults: - dir: /default-dir - max-file-size: 10MiB - max-group-size: 100MiB - buffered-writes: true - filter: INFO - format: crdb-v2 - redact: false - redactable: true - exit-on-error: true - auditable: false -fluent-defaults: - filter: INFO - format: json-fluent-compact - redact: false - redactable: true - exit-on-error: false - auditable: false sinks: file-groups: custom: @@ -370,24 +244,6 @@ sinks: address: localhost:5170 auditable: true ---- -file-defaults: - dir: /default-dir - max-file-size: 10MiB - max-group-size: 100MiB - buffered-writes: true - filter: INFO - format: crdb-v2 - redact: false - redactable: true - exit-on-error: true - auditable: false -fluent-defaults: - filter: INFO - format: json-fluent-compact - redact: false - redactable: true - exit-on-error: false - auditable: false sinks: file-groups: default: @@ -431,24 +287,6 @@ sinks: exit-on-error: false auditable: true ---- -file-defaults: - dir: /default-dir - max-file-size: 10MiB - max-group-size: 100MiB - buffered-writes: true - filter: INFO - format: crdb-v2 - redact: false - redactable: true - exit-on-error: true - auditable: false -fluent-defaults: - filter: INFO - format: json-fluent-compact - redact: false - redactable: true - exit-on-error: false - auditable: false sinks: file-groups: default: @@ -478,24 +316,6 @@ capture-stray-errors: yaml file-defaults: {filter: NONE} ---- -file-defaults: - dir: /default-dir - max-file-size: 10MiB - max-group-size: 100MiB - buffered-writes: true - filter: NONE - format: crdb-v2 - redact: false - redactable: true - exit-on-error: true - auditable: false -fluent-defaults: - filter: INFO - format: json-fluent-compact - redact: false - redactable: true - exit-on-error: false - auditable: false sinks: stderr: channels: all diff --git a/pkg/util/log/logconfig/validate.go b/pkg/util/log/logconfig/validate.go index 1cbc94643859..9721ddd5ebce 100644 --- a/pkg/util/log/logconfig/validate.go +++ b/pkg/util/log/logconfig/validate.go @@ -13,10 +13,11 @@ package logconfig import ( "bytes" "fmt" + "net/http" "path/filepath" "reflect" - "sort" "strings" + "time" "github.com/cockroachdb/cockroach/pkg/util/log/logpb" "github.com/cockroachdb/errors" @@ -56,12 +57,23 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) { Format: func() *string { s := DefaultFluentFormat; return &s }(), }, } + baseHTTPDefaults := HTTPDefaults{ + CommonSinkConfig: CommonSinkConfig{ + Format: func() *string { s := DefaultHTTPFormat; return &s }(), + }, + UnsafeTLS: &bf, + DisableKeepAlives: &bf, + Method: func() *HTTPSinkMethod { m := HTTPSinkMethod(http.MethodPost); return &m }(), + Timeout: func() *time.Duration { d := time.Duration(0); return &d }(), + } - progagateCommonDefaults(&baseFileDefaults.CommonSinkConfig, baseCommonSinkConfig) - progagateCommonDefaults(&baseFluentDefaults.CommonSinkConfig, baseCommonSinkConfig) + propagateCommonDefaults(&baseFileDefaults.CommonSinkConfig, baseCommonSinkConfig) + propagateCommonDefaults(&baseFluentDefaults.CommonSinkConfig, baseCommonSinkConfig) + propagateCommonDefaults(&baseHTTPDefaults.CommonSinkConfig, baseCommonSinkConfig) - progagateFileDefaults(&c.FileDefaults, baseFileDefaults) - progagateFluentDefaults(&c.FluentDefaults, baseFluentDefaults) + propagateFileDefaults(&c.FileDefaults, baseFileDefaults) + propagateFluentDefaults(&c.FluentDefaults, baseFluentDefaults) + propagateHTTPDefaults(&c.HTTPDefaults, baseHTTPDefaults) // Normalize the directory. if err := normalizeDir(&c.FileDefaults.Dir); err != nil { @@ -92,11 +104,22 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) { } } + for sinkName, fc := range c.Sinks.HTTPServers { + if fc == nil { + fc = &HTTPSinkConfig{} + c.Sinks.HTTPServers[sinkName] = fc + } + fc.sinkName = sinkName + if err := c.validateHTTPSinkConfig(fc); err != nil { + fmt.Fprintf(&errBuf, "http server %q: %v\n", sinkName, err) + } + } + // Defaults for stderr. if c.Sinks.Stderr.Filter == logpb.Severity_UNKNOWN { c.Sinks.Stderr.Filter = logpb.Severity_NONE } - progagateCommonDefaults(&c.Sinks.Stderr.CommonSinkConfig, c.FileDefaults.CommonSinkConfig) + propagateCommonDefaults(&c.Sinks.Stderr.CommonSinkConfig, c.FileDefaults.CommonSinkConfig) if c.Sinks.Stderr.Auditable != nil && *c.Sinks.Stderr.Auditable { if *c.Sinks.Stderr.Format == "crdb-v1-tty" { f := "crdb-v1-tty-count" @@ -108,10 +131,9 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) { c.Sinks.Stderr.Channels.Sort() - // fileSinks maps channels to files. fileSinks := make(map[logpb.Channel]*FileSinkConfig) - // fluentSinks maps channels to fluent servers. fluentSinks := make(map[logpb.Channel]*FluentSinkConfig) + httpSinks := make(map[logpb.Channel]*HTTPSinkConfig) // Check that no channel is listed by more than one file sink, // and every file has at least one channel. @@ -136,21 +158,36 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) { // Check that no channel is listed by more than one fluent sink, and // every sink has at least one channel. - for _, fc := range c.Sinks.FluentServers { + for serverName, fc := range c.Sinks.FluentServers { if len(fc.Channels.Channels) == 0 { - fmt.Fprintf(&errBuf, "fluent server %q: no channel selected\n", fc.serverName) + fmt.Fprintf(&errBuf, "fluent server %q: no channel selected\n", serverName) } fc.Channels.Sort() for _, ch := range fc.Channels.Channels { if prev := fluentSinks[ch]; prev != nil { fmt.Fprintf(&errBuf, "fluent server %q: channel %s already captured by server %q\n", - fc.serverName, ch, prev.serverName) + serverName, ch, prev.serverName) } else { fluentSinks[ch] = fc } } } + for sinkName, fc := range c.Sinks.HTTPServers { + if len(fc.Channels.Channels) == 0 { + fmt.Fprintf(&errBuf, "http server %q: no channel selected\n", sinkName) + } + fc.Channels.Sort() + for _, ch := range fc.Channels.Channels { + if prev := httpSinks[ch]; prev != nil { + fmt.Fprintf(&errBuf, "http server %q: channel %s already captured by server %q\n", + sinkName, ch, prev.sinkName) + } else { + httpSinks[ch] = fc + } + } + } + // If capture-stray-errors was enabled, then perform some additional // validation on it. if c.CaptureFd2.Enable { @@ -188,7 +225,7 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) { Channels: ChannelList{Channels: []logpb.Channel{devch}}, } fc.prefix = "default" - progagateFileDefaults(&fc.FileDefaults, c.FileDefaults) + propagateFileDefaults(&fc.FileDefaults, c.FileDefaults) if err := c.validateFileSinkConfig(fc, defaultLogDir); err != nil { fmt.Fprintln(&errBuf, err) } @@ -208,44 +245,27 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) { } devFile.Channels.Sort() - // fileGroupNames collects the names of file groups. We need this to - // store this sorted in c.Sinks.sortedFileGroupNames later. - fileGroupNames := make([]string, 0, len(c.Sinks.FileGroups)) // Elide all the file sinks without a directory or with severity set - // to NONE. Also collect the remaining names for sorting below. + // to NONE. for prefix, fc := range c.Sinks.FileGroups { if fc.Dir == nil || fc.Filter == logpb.Severity_NONE { delete(c.Sinks.FileGroups, prefix) - } else { - fileGroupNames = append(fileGroupNames, prefix) } } - // serverNames collects the names of the servers. We need this to - // store this sorted in c.Sinks.sortedServerNames later. - serverNames := make([]string, 0, len(c.Sinks.FluentServers)) // Elide all the file sinks without a directory or with severity set - // to NONE. Also collect the remaining names for sorting below. + // to NONE. for serverName, fc := range c.Sinks.FluentServers { if fc.Filter == logpb.Severity_NONE { delete(c.Sinks.FluentServers, serverName) - } else { - serverNames = append(serverNames, serverName) } } - // Remember the sorted names, so we get deterministic output in - // export. - sort.Strings(fileGroupNames) - c.Sinks.sortedFileGroupNames = fileGroupNames - sort.Strings(serverNames) - c.Sinks.sortedServerNames = serverNames - return nil } func (c *Config) validateFileSinkConfig(fc *FileSinkConfig, defaultLogDir *string) error { - progagateFileDefaults(&fc.FileDefaults, c.FileDefaults) + propagateFileDefaults(&fc.FileDefaults, c.FileDefaults) if fc.Dir != c.FileDefaults.Dir { // A directory was specified explicitly. Normalize it. if err := normalizeDir(&fc.Dir); err != nil { @@ -275,7 +295,7 @@ func (c *Config) validateFileSinkConfig(fc *FileSinkConfig, defaultLogDir *strin } func (c *Config) validateFluentSinkConfig(fc *FluentSinkConfig) error { - progagateFluentDefaults(&fc.FluentDefaults, c.FluentDefaults) + propagateFluentDefaults(&fc.FluentDefaults, c.FluentDefaults) fc.Net = strings.ToLower(strings.TrimSpace(fc.Net)) switch fc.Net { case "tcp", "tcp4", "tcp6": @@ -301,6 +321,14 @@ func (c *Config) validateFluentSinkConfig(fc *FluentSinkConfig) error { return nil } +func (c *Config) validateHTTPSinkConfig(hsc *HTTPSinkConfig) error { + propagateHTTPDefaults(&hsc.HTTPDefaults, c.HTTPDefaults) + if hsc.Address == nil || len(*hsc.Address) == 0 { + return errors.New("address cannot be empty") + } + return nil +} + func normalizeDir(dir **string) error { if *dir == nil { return nil @@ -319,25 +347,29 @@ func normalizeDir(dir **string) error { return nil } -func progagateCommonDefaults(target *CommonSinkConfig, source CommonSinkConfig) { - progagateDefaults(target, source) +func propagateCommonDefaults(target *CommonSinkConfig, source CommonSinkConfig) { + propagateDefaults(target, source) +} + +func propagateFileDefaults(target *FileDefaults, source FileDefaults) { + propagateDefaults(target, source) } -func progagateFileDefaults(target *FileDefaults, source FileDefaults) { - progagateDefaults(target, source) +func propagateFluentDefaults(target *FluentDefaults, source FluentDefaults) { + propagateDefaults(target, source) } -func progagateFluentDefaults(target *FluentDefaults, source FluentDefaults) { - progagateDefaults(target, source) +func propagateHTTPDefaults(target *HTTPDefaults, source HTTPDefaults) { + propagateDefaults(target, source) } -// progagateDefaults takes (target *T, source T) where T is a struct +// propagateDefaults takes (target *T, source T) where T is a struct // and sets zero-valued exported fields in target to the values // from source (recursively for struct-valued fields). // Wrap for static type-checking, as unexpected types will panic. // // (Consider making this a common utility if it gets some maturity here.) -func progagateDefaults(target, source interface{}) { +func propagateDefaults(target, source interface{}) { s := reflect.ValueOf(source) t := reflect.Indirect(reflect.ValueOf(target)) // *target @@ -345,7 +377,7 @@ func progagateDefaults(target, source interface{}) { tf := t.Field(i) sf := s.Field(i) if tf.Kind() == reflect.Struct { - progagateDefaults(tf.Addr().Interface(), sf.Interface()) + propagateDefaults(tf.Addr().Interface(), sf.Interface()) } else if tf.CanSet() && tf.IsZero() { tf.Set(s.Field(i)) } diff --git a/pkg/util/log/logconfig/validate_test.go b/pkg/util/log/logconfig/validate_test.go index 0ec5f17cad89..5d7c3c518ae2 100644 --- a/pkg/util/log/logconfig/validate_test.go +++ b/pkg/util/log/logconfig/validate_test.go @@ -31,7 +31,7 @@ func TestValidate(t *testing.T) { if err != nil { t.Fatal(err) } - fmt.Fprintf(&buf, "%s", string(b)) + buf.Write(b) t.Logf("%s", buf.String()) buf.Reset() @@ -39,11 +39,16 @@ func TestValidate(t *testing.T) { if err := c.Validate(&defaultDir); err != nil { fmt.Fprintf(&buf, "ERROR: %v\n", err) } else { + // clear the default fields to reduce test over-specification + c.FileDefaults = FileDefaults{} + c.FluentDefaults = FluentDefaults{} + c.HTTPDefaults = HTTPDefaults{} + b, err := yaml.Marshal(&c) if err != nil { t.Fatal(err) } - fmt.Fprintf(&buf, "%s", string(b)) + buf.Write(b) } return buf.String() }) diff --git a/pkg/util/log/sinks.go b/pkg/util/log/sinks.go index 7a2e5bdd486c..20f073530388 100644 --- a/pkg/util/log/sinks.go +++ b/pkg/util/log/sinks.go @@ -50,3 +50,4 @@ type logSink interface { var _ logSink = (*stderrSink)(nil) var _ logSink = (*fileSink)(nil) var _ logSink = (*fluentSink)(nil) +var _ logSink = (*httpSink)(nil)