diff --git a/docs/generated/logsinks.md b/docs/generated/logsinks.md index 5b69ded70cc9..171faa5cb8f3 100644 --- a/docs/generated/logsinks.md +++ b/docs/generated/logsinks.md @@ -6,6 +6,8 @@ The supported log output sink types are documented below. - [Output to Fluentd-compatible log collectors](#output-to-fluentd-compatible-log-collectors) +- [Output to HTTP servers.](#output-to-http-servers.) + - [Standard error stream](#standard-error-stream) @@ -91,7 +93,7 @@ Configuration options shared across all sink types: ## Sink type: Output to Fluentd-compatible log collectors -This sink type causes logging data to be sent over the network, to +This sink type causes logging data to be sent over the network to a log collector that can ingest log data in a [Fluentd](https://www.fluentd.org)-compatible protocol. @@ -168,6 +170,70 @@ Configuration options shared across all sink types: + + +## Sink type: Output to HTTP servers. + + +This sink type causes logging data to be sent over the network +as requests to an HTTP server. + +The configuration key under the `sinks` key in the YAML +configuration is `http-servers`. Example configuration: + + sinks: + http-servers: + health: + channels: HEALTH + address: http://127.0.0.1 + +Every new server sink configured automatically inherits the configuration set in the `http-defaults` section. + +For example: + + http-defaults: + redactable: false # default: disable redaction markers + sinks: + http-servers: + health: + channels: HEALTH + # This sink has redactable set to false, + # as the setting is inherited from fluent-defaults + # unless overridden here. + +The default output format for HTTP sinks is +`json-compact`. [Other supported formats.](log-formats.html) + +{{site.data.alerts.callout_info}} +Run `cockroach debug check-log-config` to verify the effect of defaults inheritance. +{{site.data.alerts.end}} + + + +Type-specific configuration options: + +| Field | Description | +|--|--| +| `channels` | the list of logging channels that use this sink. See the [channel selection configuration](#channel-format) section for details. | +| `address` | the network address of the http server. The host/address and port parts are separated with a colon. IPv6 numeric addresses should be included within square brackets, e.g.: [::1]:1234. Inherited from `http-defaults.address` if not specified. | +| `method` | the HTTP method to be used. POST and GET are supported; defaults to POST. Inherited from `http-defaults.method` if not specified. | +| `unsafe-tls` | enables certificate authentication to be bypassed. Defaults to false. Inherited from `http-defaults.unsafe-tls` if not specified. | +| `timeout` | the HTTP timeout. Defaults to 0 for no timeout. Inherited from `http-defaults.timeout` if not specified. | + + +Configuration options shared across all sink types: + +| Field | Description | +|--|--| +| `filter` | the minimum severity for log events to be emitted to this sink. This can be set to NONE to disable the sink. | +| `format` | the entry format to use. | +| `redact` | whether to strip sensitive information before log events are emitted to this sink. | +| `redactable` | whether to keep redaction markers in the sink's output. The presence of redaction markers makes it possible to strip sensitive data reliably. | +| `exit-on-error` | whether the logging system should terminate the process if an error is encountered while writing to this sink. | +| `auditable` | translated to tweaks to the other settings for this sink during validation. For example, it enables `exit-on-error` and changes the format of files from `crdb-v1` to `crdb-v1-count`. | + + + ## Sink type: Standard error stream diff --git a/pkg/util/log/BUILD.bazel b/pkg/util/log/BUILD.bazel index 35852787f123..aab687341431 100644 --- a/pkg/util/log/BUILD.bazel +++ b/pkg/util/log/BUILD.bazel @@ -22,6 +22,7 @@ go_library( "format_json.go", "formats.go", "get_stacks.go", + "http_sink.go", "intercept.go", "log.go", "log_bridge.go", diff --git a/pkg/util/log/flags.go b/pkg/util/log/flags.go index 6899840a9cfb..e5f53d791fd2 100644 --- a/pkg/util/log/flags.go +++ b/pkg/util/log/flags.go @@ -120,10 +120,16 @@ func ApplyConfig(config logconfig.Config) (cleanupFn func(), err error) { } } + // Call the final value of cleanupFn immediately if returning with error. + defer func() { + if err != nil { + cleanupFn() + } + }() + // If capture of internal fd2 writes is enabled, set it up here. if config.CaptureFd2.Enable { if logging.testingFd2CaptureLogger != nil { - cleanupFn() return nil, errors.New("fd2 capture already set up. Maybe use TestLogScope?") } // We use a secondary logger, even though no logging *event* will ever @@ -160,7 +166,6 @@ func ApplyConfig(config logconfig.Config) (cleanupFn func(), err error) { } fileSinkInfo, fileSink, err := newFileSinkInfo("stderr", fakeConfig) if err != nil { - cleanupFn() return nil, err } sinkInfos = append(sinkInfos, fileSinkInfo) @@ -190,7 +195,6 @@ func ApplyConfig(config logconfig.Config) (cleanupFn func(), err error) { if err := fileSink.takeOverInternalStderr(secLogger); err != nil { // Oof, it turns out we can't use this logger after all. Give up // on everything we did. - cleanupFn() return nil, err } @@ -218,7 +222,6 @@ func ApplyConfig(config logconfig.Config) (cleanupFn func(), err error) { // Apply the stderr sink configuration. logging.stderrSink.noColor.Set(config.Sinks.Stderr.NoColor) if err := logging.stderrSinkInfoTemplate.applyConfig(config.Sinks.Stderr.CommonSinkConfig); err != nil { - cleanupFn() return nil, err } @@ -244,6 +247,17 @@ func ApplyConfig(config logconfig.Config) (cleanupFn func(), err error) { l.sinkInfos = append(l.sinkInfos, &stderrSinkInfo) } + attachSinkInfo := func(si *sinkInfo, chs []logpb.Channel) { + sinkInfos = append(sinkInfos, si) + allSinkInfos.put(si) + + // Connect the channels for this sink. + for _, ch := range chs { + l := chans[ch] + l.sinkInfos = append(l.sinkInfos, si) + } + } + // Create the file sinks. for prefix, fc := range config.Sinks.FileGroups { if fc.Filter == severity.NONE || fc.Dir == nil { @@ -254,17 +268,9 @@ func ApplyConfig(config logconfig.Config) (cleanupFn func(), err error) { } fileSinkInfo, _, err := newFileSinkInfo(prefix, *fc) if err != nil { - cleanupFn() return nil, err } - sinkInfos = append(sinkInfos, fileSinkInfo) - allSinkInfos.put(fileSinkInfo) - - // Connect the channels for this sink. - for _, ch := range fc.Channels.Channels { - l := chans[ch] - l.sinkInfos = append(l.sinkInfos, fileSinkInfo) - } + attachSinkInfo(fileSinkInfo, fc.Channels.Channels) } // Create the fluent sinks. @@ -274,17 +280,21 @@ func ApplyConfig(config logconfig.Config) (cleanupFn func(), err error) { } fluentSinkInfo, err := newFluentSinkInfo(*fc) if err != nil { - cleanupFn() return nil, err } - sinkInfos = append(sinkInfos, fluentSinkInfo) - allSinkInfos.put(fluentSinkInfo) + attachSinkInfo(fluentSinkInfo, fc.Channels.Channels) + } - // Connect the channels for this sink. - for _, ch := range fc.Channels.Channels { - l := chans[ch] - l.sinkInfos = append(l.sinkInfos, fluentSinkInfo) + // Create the HTTP sinks. + for _, fc := range config.Sinks.HTTPServers { + if fc.Filter == severity.NONE { + continue + } + httpSinkInfo, err := newHTTPSinkInfo(*fc) + if err != nil { + return nil, err } + attachSinkInfo(httpSinkInfo, fc.Channels.Channels) } // Prepend the interceptor sink to all channels. @@ -333,6 +343,20 @@ func newFluentSinkInfo(c logconfig.FluentSinkConfig) (*sinkInfo, error) { return info, nil } +func newHTTPSinkInfo(c logconfig.HTTPSinkConfig) (*sinkInfo, error) { + info := &sinkInfo{} + if err := info.applyConfig(c.CommonSinkConfig); err != nil { + return nil, err + } + httpSink := newHTTPSink(*c.Address, httpSinkOptions{ + method: string(*c.Method), + unsafeTLS: *c.UnsafeTLS, + timeout: *c.Timeout, + }) + info.sink = httpSink + return info, nil +} + // applyConfig applies a common sink configuration to a sinkInfo. func (l *sinkInfo) applyConfig(c logconfig.CommonSinkConfig) error { l.threshold = c.Filter diff --git a/pkg/util/log/http_sink.go b/pkg/util/log/http_sink.go new file mode 100644 index 000000000000..70a1d8c2b007 --- /dev/null +++ b/pkg/util/log/http_sink.go @@ -0,0 +1,149 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package log + +import ( + "bytes" + "crypto/tls" + "fmt" + "net" + "net/http" + "net/url" + "time" + + "github.com/cockroachdb/cockroach/pkg/cli/exit" +) + +var insecureTransport http.RoundTripper = &http.Transport{ + // Same as DefaultTransport... + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + ForceAttemptHTTP2: true, + MaxIdleConns: 100, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + // ...except insecure TLS. + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, +} + +type httpSinkOptions struct { + unsafeTLS bool + timeout time.Duration + method string +} + +func newHTTPSink(url string, opt httpSinkOptions) *httpSink { + hs := &httpSink{ + client: http.Client{ + Transport: http.DefaultTransport, + Timeout: opt.timeout}, + address: url, + doRequest: doPost} + + if opt.unsafeTLS { + hs.client.Transport = insecureTransport + } + + if opt.method == http.MethodGet { + hs.doRequest = doGet + } + + return hs +} + +type httpSink struct { + client http.Client + address string + doRequest func(*httpSink, []byte) (*http.Response, error) +} + +// output emits some formatted bytes to this sink. +// the sink is invited to perform an extra flush if indicated +// by the argument. This is set to true for e.g. Fatal +// entries. +// +// The parent logger's outputMu is held during this operation: log +// sinks must not recursively call into logging when implementing +// this method. +func (hs *httpSink) output(extraSync bool, b []byte) (err error) { + resp, err := hs.doRequest(hs, b) + if err != nil { + return err + } + + if resp.StatusCode >= 400 { + return HTTPLogError{ + StatusCode: resp.StatusCode, + Address: hs.address} + } + return nil +} + +// emergencyOutput attempts to emit some formatted bytes, and +// ignores any errors. +// +// The parent logger's outputMu is held during this operation: log +// sinks must not recursively call into logging when implementing +// this method. +func (hs *httpSink) emergencyOutput(b []byte) { + hs.doRequest(hs, b) +} + +func doPost(hs *httpSink, b []byte) (*http.Response, error) { + resp, err := hs.client.Post(hs.address, "text/plain", bytes.NewReader(b)) + if err != nil { + return nil, err + } + resp.Body.Close() // don't care about content + return resp, nil +} + +func doGet(hs *httpSink, b []byte) (*http.Response, error) { + resp, err := hs.client.Get(hs.address + "?" + url.QueryEscape(string(b))) + if err != nil { + return nil, err + } + resp.Body.Close() // don't care about content + return resp, nil +} + +// active returns true if this sink is currently active. +func (*httpSink) active() bool { + return true +} + +// attachHints attaches some hints about the location of the message +// to the stack message. +func (*httpSink) attachHints(stacks []byte) []byte { + return stacks +} + +// exitCode returns the exit code to use if the logger decides +// to terminate because of an error in output(). +func (*httpSink) exitCode() exit.Code { + return exit.LoggingNetCollectorUnavailable() +} + +// HTTPLogError represents an HTTP error status code from a logging request. +type HTTPLogError struct { + StatusCode int + Address string +} + +func (e HTTPLogError) Error() string { + return fmt.Sprintf( + "received %v response attempting to log to [%v]", + e.StatusCode, e.Address) +} diff --git a/pkg/util/log/http_sink_test.go b/pkg/util/log/http_sink_test.go new file mode 100644 index 000000000000..4100552c0f73 --- /dev/null +++ b/pkg/util/log/http_sink_test.go @@ -0,0 +1,140 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package log + +import ( + "context" + "errors" + "net" + "net/http" + "strings" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log/channel" + "github.com/cockroachdb/cockroach/pkg/util/log/logconfig" + "github.com/cockroachdb/cockroach/pkg/util/netutil/addr" + "github.com/stretchr/testify/require" +) + +// TODO: HTTP server needs bound to context -- still need to figure out the right context. +// http.NewRequestWithContext + +type requestTestFunc func(body string) error + +func testBase(t *testing.T, defaults logconfig.HTTPDefaults, fn requestTestFunc) { + sc := ScopeWithoutShowLogs(t) + defer sc.Close(t) + + bodyCh := make(chan string, 1) + cancelCh := make(chan struct{}) + defer close(cancelCh) + + defer close(bodyCh) + handler := func(rw http.ResponseWriter, r *http.Request) { + buf := make([]byte, 5000) + r.Body.Read(buf) + select { + case bodyCh <- string(buf): + case <-cancelCh: + } + } + + serverErrCh := make(chan error) + { + l, err := net.Listen("tcp", "127.0.0.1:") + if err != nil { + t.Fatal(err) + } + _, port, err := addr.SplitHostPort(l.Addr().String(), "port") + if err != nil { + t.Fatal(err) + } + *defaults.Address += ":" + port + s := http.Server{Handler: http.HandlerFunc(handler)} + go func() { + err := s.Serve(l) + if err != http.ErrServerClosed { + select { + case serverErrCh <- err: + case <-cancelCh: + } + } + }() + defer s.Close() + } + + // Set up a logging configuration with the server we've just set up + // as target for the OPS channel. + cfg := logconfig.DefaultConfig() + cfg.Sinks.HTTPServers = map[string]*logconfig.HTTPSinkConfig{ + "ops": { + HTTPDefaults: defaults, + Channels: logconfig.ChannelList{Channels: []Channel{channel.OPS}}}, + } + // Derive a full config using the same directory as the + // TestLogScope. + require.NoError(t, cfg.Validate(&sc.logDir)) + + // Apply the configuration. + TestingResetActive() + cleanup, err := ApplyConfig(cfg) + require.NoError(t, err) + defer cleanup() + + // Send a log event on the OPS channel. + Ops.Infof(context.Background(), "hello world") + + // Check if any requests received within the timeout satisfy the given predicate. + timer := time.After(10 * time.Second) +outer: + for { + select { + case <-timer: + t.Fatal("timeout") + case body := <-bodyCh: + if err := fn(body); err != nil { + // non-failing, in case there are extra log messages generated + t.Log(err) + } else { + break outer + } + case err := <-serverErrCh: + t.Fatal(err) + } + } +} + +func TestSpecific(t *testing.T) { + defer leaktest.AfterTest(t)() + + address := "http://localhost" // testBase appends the port + timeout := 250 * time.Millisecond + defaults := logconfig.HTTPDefaults{ + Address: &address, + Timeout: &timeout, + } + + test_fn := func(body string) error { + t.Log(body) + if !strings.Contains(body, `"message":"hello world"`) { + return errors.New("Log message not found in request") + } + return nil + } + + testBase(t, defaults, test_fn) +} + +// Test cases: +// No timeout -- ensure correct behavior. +// Small timeout -- ensure timeout works. diff --git a/pkg/util/log/logconfig/config.go b/pkg/util/log/logconfig/config.go index 51682a01c5da..b7689fd55a15 100644 --- a/pkg/util/log/logconfig/config.go +++ b/pkg/util/log/logconfig/config.go @@ -12,9 +12,11 @@ package logconfig import ( "fmt" + "net/http" "reflect" "sort" "strings" + "time" "github.com/cockroachdb/cockroach/pkg/util/log/logpb" "github.com/cockroachdb/errors" @@ -34,6 +36,10 @@ const DefaultStderrFormat = `crdb-v2-tty` // when not specified in a configuration. const DefaultFluentFormat = `json-fluent-compact` +// DefaultHTTPFormat is the entry format for HTTP sinks +// when not specified in a configuration. +const DefaultHTTPFormat = `json-compact` + // DefaultConfig returns a suitable default configuration when logging // is meant to primarily go to files. func DefaultConfig() (c Config) { @@ -101,6 +107,11 @@ type Config struct { // configuration value. FluentDefaults FluentDefaults `yaml:"fluent-defaults,omitempty"` + // HTTPDefaults represents the default configuration for HTTP sinks, + // inherited when a specific HTTP sink config does not provide a + // configuration value. + HTTPDefaults HTTPDefaults `yaml:"http-defaults,omitempty"` + // Sinks represents the sink configurations. Sinks SinkConfig `yaml:",omitempty"` @@ -163,13 +174,10 @@ type SinkConfig struct { FileGroups map[string]*FileSinkConfig `yaml:"file-groups,omitempty"` // FluentServer represents the list of configured fluent sinks. FluentServers map[string]*FluentSinkConfig `yaml:"fluent-servers,omitempty"` + // HTTPServers represents the list of configured http sinks. + HTTPServers map[string]*HTTPSinkConfig `yaml:"http-servers,omitempty"` // Stderr represents the configuration for the stderr sink. Stderr StderrSinkConfig `yaml:",omitempty"` - - // sortedFileGroupNames and sortedServerNames are used internally to - // make the Export() function deterministic. - sortedFileGroupNames []string - sortedServerNames []string } // StderrSinkConfig represents the configuration for the stderr sink. @@ -233,7 +241,7 @@ type FluentDefaults struct { // User-facing documentation follows. // TITLE: Output to Fluentd-compatible log collectors // -// This sink type causes logging data to be sent over the network, to +// This sink type causes logging data to be sent over the network to // a log collector that can ingest log data in a // [Fluentd](https://www.fluentd.org)-compatible protocol. // @@ -395,6 +403,77 @@ type FileSinkConfig struct { prefix string } +// HTTPDefaults refresents the configuration defaults for HTTP sinks. +type HTTPDefaults struct { + // Address is the network address of the http server. The + // host/address and port parts are separated with a colon. IPv6 + // numeric addresses should be included within square brackets, + // e.g.: [::1]:1234. + Address *string `yaml:",omitempty"` + + // Method is the HTTP method to be used. POST and GET are + // supported; defaults to POST. + Method *HTTPSinkMethod `yaml:",omitempty"` + + // UnsafeTLS enables certificate authentication to be bypassed. + // Defaults to false. + UnsafeTLS *bool `yaml:"unsafe-tls,omitempty"` + + // Timeout is the HTTP timeout. + // Defaults to 0 for no timeout. + Timeout *time.Duration `yaml:",omitempty"` + + CommonSinkConfig `yaml:",inline"` +} + +// HTTPSinkConfig represents the configuration for one http sink. +// +// User-facing documentation follows. +// TITLE: Output to HTTP servers. +// +// This sink type causes logging data to be sent over the network +// as requests to an HTTP server. +// +// The configuration key under the `sinks` key in the YAML +// configuration is `http-servers`. Example configuration: +// +// sinks: +// http-servers: +// health: +// channels: HEALTH +// address: http://127.0.0.1 +// +// Every new server sink configured automatically inherits the configuration set in the `http-defaults` section. +// +// For example: +// +// http-defaults: +// redactable: false # default: disable redaction markers +// sinks: +// http-servers: +// health: +// channels: HEALTH +// # This sink has redactable set to false, +// # as the setting is inherited from fluent-defaults +// # unless overridden here. +// +// The default output format for HTTP sinks is +// `json-compact`. [Other supported formats.](log-formats.html) +// +// {{site.data.alerts.callout_info}} +// Run `cockroach debug check-log-config` to verify the effect of defaults inheritance. +// {{site.data.alerts.end}} +// +type HTTPSinkConfig struct { + // Channels is the list of logging channels that use this sink. + Channels ChannelList `yaml:",omitempty,flow"` + + HTTPDefaults `yaml:",inline"` + + // sinkName is populated during validation. + sinkName string +} + // IterateDirectories calls the provided fn on every directory linked to // by the configuration. func (c *Config) IterateDirectories(fn func(d string) error) error { @@ -684,3 +763,61 @@ func (*Holder) Type() string { return "yaml" } func (h *Holder) Set(value string) error { return yaml.UnmarshalStrict([]byte(value), &h.Config) } + +// HTTPSinkMethod is a string restricted to "POST" and "GET" +type HTTPSinkMethod string + +var _ constrainedString = (*HTTPSinkMethod)(nil) + +// Accept implements the constrainedString interface. +func (hsm *HTTPSinkMethod) Accept(s string) { + *hsm = HTTPSinkMethod(s) +} + +// Canonicalize implements the constrainedString interface. +func (HTTPSinkMethod) Canonicalize(s string) string { + return strings.ToUpper(strings.TrimSpace(s)) +} + +// AllowedSet implements the constrainedString interface. +func (HTTPSinkMethod) AllowedSet() []string { + return []string{ + http.MethodGet, + http.MethodPost, + } +} + +// MarshalYAML implements yaml.Marshaler interface. +func (hsm HTTPSinkMethod) MarshalYAML() (interface{}, error) { + return string(hsm), nil +} + +// UnmarshalYAML implements the yaml.Unmarshaler interface. +func (hsm *HTTPSinkMethod) UnmarshalYAML(fn func(interface{}) error) error { + return unmarshalYAMLConstrainedString(hsm, fn) +} + +// constrainedString is an interface to make it easy to unmarshal +// a string constrained to a small set of accepted values. +type constrainedString interface { + Accept(string) + Canonicalize(string) string + AllowedSet() []string +} + +// unmarshalYAMLConstrainedString is a utility function to unmarshal +// a type satisfying the constrainedString interface. +func unmarshalYAMLConstrainedString(cs constrainedString, fn func(interface{}) error) error { + var s string + if err := fn(&s); err != nil { + return err + } + s = cs.Canonicalize(s) + for _, candidate := range cs.AllowedSet() { + if s == candidate { + cs.Accept(s) + return nil + } + } + return errors.Newf("Unexpected value: %v", s) +} diff --git a/pkg/util/log/logconfig/export.go b/pkg/util/log/logconfig/export.go index 3ec48c720c1a..19b67369b007 100644 --- a/pkg/util/log/logconfig/export.go +++ b/pkg/util/log/logconfig/export.go @@ -29,19 +29,6 @@ func (c *Config) Export(onlyChans ChannelList) (string, string) { chanSel = onlyChans } - var buf bytes.Buffer - buf.WriteString("@startuml\nleft to right direction\n") - - // Export the channels. - buf.WriteString("component sources {\n") - for _, ch := range chanSel.Channels { - fmt.Fprintf(&buf, "() %s\n", ch) - } - buf.WriteString("cloud stray as \"stray\\nerrors\"\n}\n") - - // The process stderr stream. - buf.WriteString("queue stderr\n") - // links collects the relationships. We need to collect them and // print them at the end because plantUML does not support // interleaving box and arrow declarations. @@ -110,7 +97,16 @@ func (c *Config) Export(onlyChans ChannelList) (string, string) { // folders map each directory to a list of files within. folders := map[string][]string{} fileNum := 1 - for _, fn := range c.Sinks.sortedFileGroupNames { + + // Process the file sinks in sorted order, + // so the output order is deteministic. + var sortedNames []string + for prefix := range c.Sinks.FileGroups { + sortedNames = append(sortedNames, prefix) + } + sort.Strings(sortedNames) + + for _, fn := range sortedNames { fc := c.Sinks.FileGroups[fn] if fc.Filter == logpb.Severity_NONE { // This file is not collecting anything. Skip it. @@ -175,12 +171,19 @@ func (c *Config) Export(onlyChans ChannelList) (string, string) { // // servers maps each server to its box declaration. servers := map[string]string{} - for _, fn := range c.Sinks.sortedServerNames { + + sortedNames = nil + for serverName := range c.Sinks.FluentServers { + sortedNames = append(sortedNames, serverName) + } + sort.Strings(sortedNames) + + for _, fn := range sortedNames { fc := c.Sinks.FluentServers[fn] if fc.Filter == logpb.Severity_NONE { continue } - skey := fmt.Sprintf("s__%s", fc.serverName) + skey := fmt.Sprintf("s__%s", fn) target, thisprocs, thislinks := process(skey, fc.CommonSinkConfig) hasLink := false for _, ch := range fc.Channels.Channels { @@ -193,11 +196,43 @@ func (c *Config) Export(onlyChans ChannelList) (string, string) { if hasLink { processing = append(processing, thisprocs...) links = append(links, thislinks...) - servers[fc.serverName] = fmt.Sprintf("queue %s as \"fluent: %s:%s\"", + servers[fn] = fmt.Sprintf("queue %s as \"fluent: %s:%s\"", skey, fc.Net, fc.Address) } } + // Collect HTTP sinks + // Add the destinations into the same map, for display in the "network server" + // section of the diagram + sortedNames = nil + for sinkName := range c.Sinks.HTTPServers { + sortedNames = append(sortedNames, sinkName) + } + sort.Strings(sortedNames) + + for _, name := range sortedNames { + cfg := c.Sinks.HTTPServers[name] + if cfg.Filter == logpb.Severity_NONE { + continue + } + key := fmt.Sprintf("h__%s", name) + target, thisprocs, thislinks := process(key, cfg.CommonSinkConfig) + hasLink := false + for _, ch := range cfg.Channels.Channels { + if !chanSel.HasChannel(ch) { + continue + } + hasLink = true + links = append(links, fmt.Sprintf("%s --> %s", ch, target)) + } + if hasLink { + processing = append(processing, thisprocs...) + links = append(links, thislinks...) + servers[name] = fmt.Sprintf("queue %s as \"http: %s\"", + key, *cfg.Address) + } + } + // Export the stderr redirects. if c.Sinks.Stderr.Filter != logpb.Severity_NONE { target, thisprocs, thislinks := process("stderr", c.Sinks.Stderr.CommonSinkConfig) @@ -216,11 +251,23 @@ func (c *Config) Export(onlyChans ChannelList) (string, string) { } } + + var buf bytes.Buffer + buf.WriteString("@startuml\nleft to right direction\n") + + // Export the channels. + buf.WriteString("component sources {\n") + for _, ch := range chanSel.Channels { + fmt.Fprintf(&buf, "() %s\n", ch) + } + buf.WriteString("cloud stray as \"stray\\nerrors\"\n}\n") + + // The process stderr stream. + buf.WriteString("queue stderr\n") + // Represent the processing stages, if any. - if len(processing) > 0 { - for _, p := range processing { - fmt.Fprintf(&buf, "%s\n", p) - } + for _, p := range processing { + fmt.Fprintf(&buf, "%s\n", p) } // Represent the files, if any. @@ -238,10 +285,10 @@ func (c *Config) Export(onlyChans ChannelList) (string, string) { } // Represent the network servers, if any. - if len(c.Sinks.sortedServerNames) > 0 { + if len(servers) > 0 { buf.WriteString("cloud network {\n") - for _, s := range c.Sinks.sortedServerNames { - fmt.Fprintf(&buf, " %s\n", servers[s]) + for _, s := range servers { + fmt.Fprintf(&buf, " %s\n", s) } buf.WriteString("}\n") } diff --git a/pkg/util/log/logconfig/gen.go b/pkg/util/log/logconfig/gen.go index 10b8ca8d6e9f..40e958be77fa 100644 --- a/pkg/util/log/logconfig/gen.go +++ b/pkg/util/log/logconfig/gen.go @@ -245,11 +245,11 @@ func readInput(infos map[string]*sinkInfo) error { return nil } -var configStructRe = regexp.MustCompile(`^type (?P[A-Z][a-z0-9]*)(SinkConfig|Defaults) struct`) +var configStructRe = regexp.MustCompile(`^type (?P[A-Z]\w*)(SinkConfig|Defaults) struct`) var fieldDefRe = regexp.MustCompile(`^\s*` + // Field name in Go. - `(?P[A-Z][A-Za-z_0-9]*)` + + `(?P[A-Z]\w*)` + // Go type. Empty if embedded type. `(?P(?: [^ ]+)?)` + // Start of YAML annotation. @@ -260,16 +260,27 @@ var fieldDefRe = regexp.MustCompile(`^\s*` + `[^"]*"` + "`.*") func camelToSnake(typeName string) string { + isUpper := func(c byte) bool { + return 'A' <= c && c <= 'Z' + } + toLower := func(c byte) byte { + if !isUpper(c) { + return c + } + return c - 'A' + 'a' + } + var res strings.Builder - res.WriteByte(typeName[0] + 'a' - 'A') - for i := 1; i < len(typeName); i++ { - if typeName[i] >= 'A' && typeName[i] <= 'Z' { + res.WriteByte(toLower(typeName[0])) + for i := 1; i < len(typeName)-1; i++ { + // put a word break at transitions likeTHIS and LIKEThis + if isUpper(typeName[i]) && (!isUpper(typeName[i-1]) || !isUpper(typeName[i+1])) { res.WriteByte('-') - res.WriteByte(typeName[i] + 'a' - 'A') - } else { - res.WriteByte(typeName[i]) } + res.WriteByte(toLower(typeName[i])) } + // assume the last character isn't a one-letter word + res.WriteByte(toLower(typeName[len(typeName)-1])) return res.String() } diff --git a/pkg/util/log/logconfig/testdata/validate b/pkg/util/log/logconfig/testdata/validate index 5bba86a15907..53c8efc3de31 100644 --- a/pkg/util/log/logconfig/testdata/validate +++ b/pkg/util/log/logconfig/testdata/validate @@ -1,24 +1,6 @@ # Empty configuration: use and propagate defaults. yaml ---- -file-defaults: - dir: /default-dir - max-file-size: 10MiB - max-group-size: 100MiB - buffered-writes: true - filter: INFO - format: crdb-v2 - redact: false - redactable: true - exit-on-error: true - auditable: false -fluent-defaults: - filter: INFO - format: json-fluent-compact - redact: false - redactable: true - exit-on-error: false - auditable: false sinks: file-groups: default: @@ -51,24 +33,6 @@ sinks: custom: channels: DEV ---- -file-defaults: - dir: /default-dir - max-file-size: 10MiB - max-group-size: 100MiB - buffered-writes: true - filter: INFO - format: crdb-v2 - redact: false - redactable: true - exit-on-error: true - auditable: false -fluent-defaults: - filter: INFO - format: json-fluent-compact - redact: false - redactable: true - exit-on-error: false - auditable: false sinks: file-groups: custom: @@ -103,24 +67,6 @@ sinks: custom: channels: DEV ---- -file-defaults: - dir: /custom - max-file-size: 10MiB - max-group-size: 100MiB - buffered-writes: true - filter: INFO - format: crdb-v2 - redact: false - redactable: true - exit-on-error: true - auditable: false -fluent-defaults: - filter: INFO - format: json-fluent-compact - redact: false - redactable: true - exit-on-error: false - auditable: false sinks: file-groups: custom: @@ -150,30 +96,12 @@ capture-stray-errors: # Check that default severity propagates. yaml file-defaults: - filter: WARNING + filter: WARNING sinks: - file-groups: - custom: - channels: DEV + file-groups: + custom: + channels: DEV ---- -file-defaults: - dir: /default-dir - max-file-size: 10MiB - max-group-size: 100MiB - buffered-writes: true - filter: WARNING - format: crdb-v2 - redact: false - redactable: true - exit-on-error: true - auditable: false -fluent-defaults: - filter: INFO - format: json-fluent-compact - redact: false - redactable: true - exit-on-error: false - auditable: false sinks: file-groups: custom: @@ -207,24 +135,6 @@ sinks: address: "127.0.0.1:5170" channels: DEV ---- -file-defaults: - dir: /default-dir - max-file-size: 10MiB - max-group-size: 100MiB - buffered-writes: true - filter: INFO - format: crdb-v2 - redact: false - redactable: true - exit-on-error: true - auditable: false -fluent-defaults: - filter: INFO - format: json-fluent-compact - redact: false - redactable: true - exit-on-error: false - auditable: false sinks: file-groups: default: @@ -267,24 +177,6 @@ sinks: custom: channels: all ---- -file-defaults: - dir: /default-dir - max-file-size: 10MiB - max-group-size: 100MiB - buffered-writes: true - filter: INFO - format: crdb-v2 - redact: false - redactable: true - exit-on-error: true - auditable: false -fluent-defaults: - filter: INFO - format: json-fluent-compact - redact: false - redactable: true - exit-on-error: false - auditable: false sinks: file-groups: custom: @@ -318,24 +210,6 @@ sinks: channels: DEV auditable: true ---- -file-defaults: - dir: /default-dir - max-file-size: 10MiB - max-group-size: 100MiB - buffered-writes: true - filter: INFO - format: crdb-v2 - redact: false - redactable: true - exit-on-error: true - auditable: false -fluent-defaults: - filter: INFO - format: json-fluent-compact - redact: false - redactable: true - exit-on-error: false - auditable: false sinks: file-groups: custom: @@ -370,24 +244,6 @@ sinks: address: localhost:5170 auditable: true ---- -file-defaults: - dir: /default-dir - max-file-size: 10MiB - max-group-size: 100MiB - buffered-writes: true - filter: INFO - format: crdb-v2 - redact: false - redactable: true - exit-on-error: true - auditable: false -fluent-defaults: - filter: INFO - format: json-fluent-compact - redact: false - redactable: true - exit-on-error: false - auditable: false sinks: file-groups: default: @@ -431,24 +287,6 @@ sinks: exit-on-error: false auditable: true ---- -file-defaults: - dir: /default-dir - max-file-size: 10MiB - max-group-size: 100MiB - buffered-writes: true - filter: INFO - format: crdb-v2 - redact: false - redactable: true - exit-on-error: true - auditable: false -fluent-defaults: - filter: INFO - format: json-fluent-compact - redact: false - redactable: true - exit-on-error: false - auditable: false sinks: file-groups: default: @@ -478,24 +316,6 @@ capture-stray-errors: yaml file-defaults: {filter: NONE} ---- -file-defaults: - dir: /default-dir - max-file-size: 10MiB - max-group-size: 100MiB - buffered-writes: true - filter: NONE - format: crdb-v2 - redact: false - redactable: true - exit-on-error: true - auditable: false -fluent-defaults: - filter: INFO - format: json-fluent-compact - redact: false - redactable: true - exit-on-error: false - auditable: false sinks: stderr: channels: all diff --git a/pkg/util/log/logconfig/validate.go b/pkg/util/log/logconfig/validate.go index 1cbc94643859..c5318a58ac3d 100644 --- a/pkg/util/log/logconfig/validate.go +++ b/pkg/util/log/logconfig/validate.go @@ -13,10 +13,11 @@ package logconfig import ( "bytes" "fmt" + "net/http" "path/filepath" "reflect" - "sort" "strings" + "time" "github.com/cockroachdb/cockroach/pkg/util/log/logpb" "github.com/cockroachdb/errors" @@ -56,12 +57,22 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) { Format: func() *string { s := DefaultFluentFormat; return &s }(), }, } + baseHTTPDefaults := HTTPDefaults{ + CommonSinkConfig: CommonSinkConfig{ + Format: func() *string { s := DefaultHTTPFormat; return &s }(), + }, + UnsafeTLS: &bf, + Method: func() *HTTPSinkMethod { m := HTTPSinkMethod(http.MethodPost); return &m }(), + Timeout: func() *time.Duration { d := time.Duration(0); return &d }(), + } - progagateCommonDefaults(&baseFileDefaults.CommonSinkConfig, baseCommonSinkConfig) - progagateCommonDefaults(&baseFluentDefaults.CommonSinkConfig, baseCommonSinkConfig) + propagateCommonDefaults(&baseFileDefaults.CommonSinkConfig, baseCommonSinkConfig) + propagateCommonDefaults(&baseFluentDefaults.CommonSinkConfig, baseCommonSinkConfig) + propagateCommonDefaults(&baseHTTPDefaults.CommonSinkConfig, baseCommonSinkConfig) - progagateFileDefaults(&c.FileDefaults, baseFileDefaults) - progagateFluentDefaults(&c.FluentDefaults, baseFluentDefaults) + propagateFileDefaults(&c.FileDefaults, baseFileDefaults) + propagateFluentDefaults(&c.FluentDefaults, baseFluentDefaults) + propagateHTTPDefaults(&c.HTTPDefaults, baseHTTPDefaults) // Normalize the directory. if err := normalizeDir(&c.FileDefaults.Dir); err != nil { @@ -92,11 +103,22 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) { } } + for sinkName, fc := range c.Sinks.HTTPServers { + if fc == nil { + fc = &HTTPSinkConfig{} + c.Sinks.HTTPServers[sinkName] = fc + } + fc.sinkName = sinkName + if err := c.validateHTTPSinkConfig(fc); err != nil { + fmt.Fprintf(&errBuf, "http server %q: %v\n", sinkName, err) + } + } + // Defaults for stderr. if c.Sinks.Stderr.Filter == logpb.Severity_UNKNOWN { c.Sinks.Stderr.Filter = logpb.Severity_NONE } - progagateCommonDefaults(&c.Sinks.Stderr.CommonSinkConfig, c.FileDefaults.CommonSinkConfig) + propagateCommonDefaults(&c.Sinks.Stderr.CommonSinkConfig, c.FileDefaults.CommonSinkConfig) if c.Sinks.Stderr.Auditable != nil && *c.Sinks.Stderr.Auditable { if *c.Sinks.Stderr.Format == "crdb-v1-tty" { f := "crdb-v1-tty-count" @@ -108,10 +130,9 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) { c.Sinks.Stderr.Channels.Sort() - // fileSinks maps channels to files. fileSinks := make(map[logpb.Channel]*FileSinkConfig) - // fluentSinks maps channels to fluent servers. fluentSinks := make(map[logpb.Channel]*FluentSinkConfig) + httpSinks := make(map[logpb.Channel]*HTTPSinkConfig) // Check that no channel is listed by more than one file sink, // and every file has at least one channel. @@ -136,21 +157,36 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) { // Check that no channel is listed by more than one fluent sink, and // every sink has at least one channel. - for _, fc := range c.Sinks.FluentServers { + for serverName, fc := range c.Sinks.FluentServers { if len(fc.Channels.Channels) == 0 { - fmt.Fprintf(&errBuf, "fluent server %q: no channel selected\n", fc.serverName) + fmt.Fprintf(&errBuf, "fluent server %q: no channel selected\n", serverName) } fc.Channels.Sort() for _, ch := range fc.Channels.Channels { if prev := fluentSinks[ch]; prev != nil { fmt.Fprintf(&errBuf, "fluent server %q: channel %s already captured by server %q\n", - fc.serverName, ch, prev.serverName) + serverName, ch, prev.serverName) } else { fluentSinks[ch] = fc } } } + for sinkName, fc := range c.Sinks.HTTPServers { + if len(fc.Channels.Channels) == 0 { + fmt.Fprintf(&errBuf, "http server %q: no channel selected\n", sinkName) + } + fc.Channels.Sort() + for _, ch := range fc.Channels.Channels { + if prev := httpSinks[ch]; prev != nil { + fmt.Fprintf(&errBuf, "http server %q: channel %s already captured by server %q\n", + sinkName, ch, prev.sinkName) + } else { + httpSinks[ch] = fc + } + } + } + // If capture-stray-errors was enabled, then perform some additional // validation on it. if c.CaptureFd2.Enable { @@ -188,7 +224,7 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) { Channels: ChannelList{Channels: []logpb.Channel{devch}}, } fc.prefix = "default" - progagateFileDefaults(&fc.FileDefaults, c.FileDefaults) + propagateFileDefaults(&fc.FileDefaults, c.FileDefaults) if err := c.validateFileSinkConfig(fc, defaultLogDir); err != nil { fmt.Fprintln(&errBuf, err) } @@ -208,44 +244,27 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) { } devFile.Channels.Sort() - // fileGroupNames collects the names of file groups. We need this to - // store this sorted in c.Sinks.sortedFileGroupNames later. - fileGroupNames := make([]string, 0, len(c.Sinks.FileGroups)) // Elide all the file sinks without a directory or with severity set - // to NONE. Also collect the remaining names for sorting below. + // to NONE. for prefix, fc := range c.Sinks.FileGroups { if fc.Dir == nil || fc.Filter == logpb.Severity_NONE { delete(c.Sinks.FileGroups, prefix) - } else { - fileGroupNames = append(fileGroupNames, prefix) } } - // serverNames collects the names of the servers. We need this to - // store this sorted in c.Sinks.sortedServerNames later. - serverNames := make([]string, 0, len(c.Sinks.FluentServers)) // Elide all the file sinks without a directory or with severity set - // to NONE. Also collect the remaining names for sorting below. + // to NONE. for serverName, fc := range c.Sinks.FluentServers { if fc.Filter == logpb.Severity_NONE { delete(c.Sinks.FluentServers, serverName) - } else { - serverNames = append(serverNames, serverName) } } - // Remember the sorted names, so we get deterministic output in - // export. - sort.Strings(fileGroupNames) - c.Sinks.sortedFileGroupNames = fileGroupNames - sort.Strings(serverNames) - c.Sinks.sortedServerNames = serverNames - return nil } func (c *Config) validateFileSinkConfig(fc *FileSinkConfig, defaultLogDir *string) error { - progagateFileDefaults(&fc.FileDefaults, c.FileDefaults) + propagateFileDefaults(&fc.FileDefaults, c.FileDefaults) if fc.Dir != c.FileDefaults.Dir { // A directory was specified explicitly. Normalize it. if err := normalizeDir(&fc.Dir); err != nil { @@ -275,7 +294,7 @@ func (c *Config) validateFileSinkConfig(fc *FileSinkConfig, defaultLogDir *strin } func (c *Config) validateFluentSinkConfig(fc *FluentSinkConfig) error { - progagateFluentDefaults(&fc.FluentDefaults, c.FluentDefaults) + propagateFluentDefaults(&fc.FluentDefaults, c.FluentDefaults) fc.Net = strings.ToLower(strings.TrimSpace(fc.Net)) switch fc.Net { case "tcp", "tcp4", "tcp6": @@ -301,6 +320,14 @@ func (c *Config) validateFluentSinkConfig(fc *FluentSinkConfig) error { return nil } +func (c *Config) validateHTTPSinkConfig(hsc *HTTPSinkConfig) error { + propagateHTTPDefaults(&hsc.HTTPDefaults, c.HTTPDefaults) + if hsc.Address == nil || len(*hsc.Address) == 0 { + return errors.New("address cannot be empty") + } + return nil +} + func normalizeDir(dir **string) error { if *dir == nil { return nil @@ -319,25 +346,29 @@ func normalizeDir(dir **string) error { return nil } -func progagateCommonDefaults(target *CommonSinkConfig, source CommonSinkConfig) { - progagateDefaults(target, source) +func propagateCommonDefaults(target *CommonSinkConfig, source CommonSinkConfig) { + propagateDefaults(target, source) +} + +func propagateFileDefaults(target *FileDefaults, source FileDefaults) { + propagateDefaults(target, source) } -func progagateFileDefaults(target *FileDefaults, source FileDefaults) { - progagateDefaults(target, source) +func propagateFluentDefaults(target *FluentDefaults, source FluentDefaults) { + propagateDefaults(target, source) } -func progagateFluentDefaults(target *FluentDefaults, source FluentDefaults) { - progagateDefaults(target, source) +func propagateHTTPDefaults(target *HTTPDefaults, source HTTPDefaults) { + propagateDefaults(target, source) } -// progagateDefaults takes (target *T, source T) where T is a struct +// propagateDefaults takes (target *T, source T) where T is a struct // and sets zero-valued exported fields in target to the values // from source (recursively for struct-valued fields). // Wrap for static type-checking, as unexpected types will panic. // // (Consider making this a common utility if it gets some maturity here.) -func progagateDefaults(target, source interface{}) { +func propagateDefaults(target, source interface{}) { s := reflect.ValueOf(source) t := reflect.Indirect(reflect.ValueOf(target)) // *target @@ -345,7 +376,7 @@ func progagateDefaults(target, source interface{}) { tf := t.Field(i) sf := s.Field(i) if tf.Kind() == reflect.Struct { - progagateDefaults(tf.Addr().Interface(), sf.Interface()) + propagateDefaults(tf.Addr().Interface(), sf.Interface()) } else if tf.CanSet() && tf.IsZero() { tf.Set(s.Field(i)) } diff --git a/pkg/util/log/logconfig/validate_test.go b/pkg/util/log/logconfig/validate_test.go index 0ec5f17cad89..5d7c3c518ae2 100644 --- a/pkg/util/log/logconfig/validate_test.go +++ b/pkg/util/log/logconfig/validate_test.go @@ -31,7 +31,7 @@ func TestValidate(t *testing.T) { if err != nil { t.Fatal(err) } - fmt.Fprintf(&buf, "%s", string(b)) + buf.Write(b) t.Logf("%s", buf.String()) buf.Reset() @@ -39,11 +39,16 @@ func TestValidate(t *testing.T) { if err := c.Validate(&defaultDir); err != nil { fmt.Fprintf(&buf, "ERROR: %v\n", err) } else { + // clear the default fields to reduce test over-specification + c.FileDefaults = FileDefaults{} + c.FluentDefaults = FluentDefaults{} + c.HTTPDefaults = HTTPDefaults{} + b, err := yaml.Marshal(&c) if err != nil { t.Fatal(err) } - fmt.Fprintf(&buf, "%s", string(b)) + buf.Write(b) } return buf.String() }) diff --git a/pkg/util/log/sinks.go b/pkg/util/log/sinks.go index 7a2e5bdd486c..20f073530388 100644 --- a/pkg/util/log/sinks.go +++ b/pkg/util/log/sinks.go @@ -50,3 +50,4 @@ type logSink interface { var _ logSink = (*stderrSink)(nil) var _ logSink = (*fileSink)(nil) var _ logSink = (*fluentSink)(nil) +var _ logSink = (*httpSink)(nil)