Skip to content

Commit

Permalink
util/log: add http log sink
Browse files Browse the repository at this point in the history
Release note (cli change): Added a new HTTP sink to the logging system.
This can be configured similarly to other log sinks with the new 'http-servers'
and 'http-defaults' sections of the logging config passed via the "--log" or
"--log-config-file" command line flags.
  • Loading branch information
rauchenstein committed Jul 19, 2021
1 parent 2ed3c02 commit a80f0a3
Show file tree
Hide file tree
Showing 9 changed files with 624 additions and 47 deletions.
68 changes: 67 additions & 1 deletion docs/generated/logsinks.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ The supported log output sink types are documented below.

- [Output to Fluentd-compatible log collectors](#output-to-fluentd-compatible-log-collectors)

- [Output to HTTP servers.](#output-to-http-servers.)

- [Standard error stream](#standard-error-stream)


Expand Down Expand Up @@ -91,7 +93,7 @@ Configuration options shared across all sink types:
## Sink type: Output to Fluentd-compatible log collectors


This sink type causes logging data to be sent over the network, to
This sink type causes logging data to be sent over the network to
a log collector that can ingest log data in a
[Fluentd](https://www.fluentd.org)-compatible protocol.

Expand Down Expand Up @@ -168,6 +170,70 @@ Configuration options shared across all sink types:



<a name="output-to-http-servers.">

## Sink type: Output to HTTP servers.


This sink type causes logging data to be sent over the network
as requests to an HTTP server.

The configuration key under the `sinks` key in the YAML
configuration is `http-servers`. Example configuration:

sinks:
http-servers:
health:
channels: HEALTH
address: http://127.0.0.1

Every new server sink configured automatically inherits the configuration set in the `http-defaults` section.

For example:

http-defaults:
redactable: false # default: disable redaction markers
sinks:
http-servers:
health:
channels: HEALTH
# This sink has redactable set to false,
# as the setting is inherited from fluent-defaults
# unless overridden here.

The default output format for HTTP sinks is
`json-compact`. [Other supported formats.](log-formats.html)

{{site.data.alerts.callout_info}}
Run `cockroach debug check-log-config` to verify the effect of defaults inheritance.
{{site.data.alerts.end}}



Type-specific configuration options:

| Field | Description |
|--|--|
| `channels` | the list of logging channels that use this sink. See the [channel selection configuration](#channel-format) section for details. |
| `address` | the network address of the http server. The host/address and port parts are separated with a colon. IPv6 numeric addresses should be included within square brackets, e.g.: [::1]:1234. Inherited from `http-defaults.address` if not specified. |
| `method` | the HTTP method to be used. POST and GET are supported; defaults to POST. Inherited from `http-defaults.method` if not specified. |
| `unsafe-tls` | enables certificate authentication to be bypassed. Defaults to false. Inherited from `http-defaults.unsafe-tls` if not specified. |
| `timeout` | the HTTP timeout. Defaults to 0 for no timeout. Inherited from `http-defaults.timeout` if not specified. |


Configuration options shared across all sink types:

| Field | Description |
|--|--|
| `filter` | the minimum severity for log events to be emitted to this sink. This can be set to NONE to disable the sink. |
| `format` | the entry format to use. |
| `redact` | whether to strip sensitive information before log events are emitted to this sink. |
| `redactable` | whether to keep redaction markers in the sink's output. The presence of redaction markers makes it possible to strip sensitive data reliably. |
| `exit-on-error` | whether the logging system should terminate the process if an error is encountered while writing to this sink. |
| `auditable` | translated to tweaks to the other settings for this sink during validation. For example, it enables `exit-on-error` and changes the format of files from `crdb-v1` to `crdb-v1-count`. |



<a name="standard-error-stream">

## Sink type: Standard error stream
Expand Down
1 change: 1 addition & 0 deletions pkg/util/log/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
"format_json.go",
"formats.go",
"get_stacks.go",
"http_sink.go",
"intercept.go",
"log.go",
"log_bridge.go",
Expand Down
64 changes: 44 additions & 20 deletions pkg/util/log/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,16 @@ func ApplyConfig(config logconfig.Config) (cleanupFn func(), err error) {
}
}

// Call the final value of cleanupFn immediately if returning with error.
defer func() {
if err != nil {
cleanupFn()
}
}()

// If capture of internal fd2 writes is enabled, set it up here.
if config.CaptureFd2.Enable {
if logging.testingFd2CaptureLogger != nil {
cleanupFn()
return nil, errors.New("fd2 capture already set up. Maybe use TestLogScope?")
}
// We use a secondary logger, even though no logging *event* will ever
Expand Down Expand Up @@ -160,7 +166,6 @@ func ApplyConfig(config logconfig.Config) (cleanupFn func(), err error) {
}
fileSinkInfo, fileSink, err := newFileSinkInfo("stderr", fakeConfig)
if err != nil {
cleanupFn()
return nil, err
}
sinkInfos = append(sinkInfos, fileSinkInfo)
Expand Down Expand Up @@ -190,7 +195,6 @@ func ApplyConfig(config logconfig.Config) (cleanupFn func(), err error) {
if err := fileSink.takeOverInternalStderr(secLogger); err != nil {
// Oof, it turns out we can't use this logger after all. Give up
// on everything we did.
cleanupFn()
return nil, err
}

Expand Down Expand Up @@ -218,7 +222,6 @@ func ApplyConfig(config logconfig.Config) (cleanupFn func(), err error) {
// Apply the stderr sink configuration.
logging.stderrSink.noColor.Set(config.Sinks.Stderr.NoColor)
if err := logging.stderrSinkInfoTemplate.applyConfig(config.Sinks.Stderr.CommonSinkConfig); err != nil {
cleanupFn()
return nil, err
}

Expand All @@ -244,6 +247,17 @@ func ApplyConfig(config logconfig.Config) (cleanupFn func(), err error) {
l.sinkInfos = append(l.sinkInfos, &stderrSinkInfo)
}

attachSinkInfo := func(si *sinkInfo, chs []logpb.Channel) {
sinkInfos = append(sinkInfos, si)
allSinkInfos.put(si)

// Connect the channels for this sink.
for _, ch := range chs {
l := chans[ch]
l.sinkInfos = append(l.sinkInfos, si)
}
}

// Create the file sinks.
for prefix, fc := range config.Sinks.FileGroups {
if fc.Filter == severity.NONE || fc.Dir == nil {
Expand All @@ -254,17 +268,9 @@ func ApplyConfig(config logconfig.Config) (cleanupFn func(), err error) {
}
fileSinkInfo, _, err := newFileSinkInfo(prefix, *fc)
if err != nil {
cleanupFn()
return nil, err
}
sinkInfos = append(sinkInfos, fileSinkInfo)
allSinkInfos.put(fileSinkInfo)

// Connect the channels for this sink.
for _, ch := range fc.Channels.Channels {
l := chans[ch]
l.sinkInfos = append(l.sinkInfos, fileSinkInfo)
}
attachSinkInfo(fileSinkInfo, fc.Channels.Channels)
}

// Create the fluent sinks.
Expand All @@ -274,17 +280,21 @@ func ApplyConfig(config logconfig.Config) (cleanupFn func(), err error) {
}
fluentSinkInfo, err := newFluentSinkInfo(*fc)
if err != nil {
cleanupFn()
return nil, err
}
sinkInfos = append(sinkInfos, fluentSinkInfo)
allSinkInfos.put(fluentSinkInfo)
attachSinkInfo(fluentSinkInfo, fc.Channels.Channels)
}

// Connect the channels for this sink.
for _, ch := range fc.Channels.Channels {
l := chans[ch]
l.sinkInfos = append(l.sinkInfos, fluentSinkInfo)
// Create the HTTP sinks.
for _, fc := range config.Sinks.HTTPServers {
if fc.Filter == severity.NONE {
continue
}
httpSinkInfo, err := newHTTPSinkInfo(*fc)
if err != nil {
return nil, err
}
attachSinkInfo(httpSinkInfo, fc.Channels.Channels)
}

// Prepend the interceptor sink to all channels.
Expand Down Expand Up @@ -333,6 +343,20 @@ func newFluentSinkInfo(c logconfig.FluentSinkConfig) (*sinkInfo, error) {
return info, nil
}

func newHTTPSinkInfo(c logconfig.HTTPSinkConfig) (*sinkInfo, error) {
info := &sinkInfo{}
if err := info.applyConfig(c.CommonSinkConfig); err != nil {
return nil, err
}
httpSink := newHTTPSink(*c.Address, httpSinkOptions{
method: string(*c.Method),
unsafeTLS: *c.UnsafeTLS,
timeout: *c.Timeout,
})
info.sink = httpSink
return info, nil
}

// applyConfig applies a common sink configuration to a sinkInfo.
func (l *sinkInfo) applyConfig(c logconfig.CommonSinkConfig) error {
l.threshold = c.Filter
Expand Down
149 changes: 149 additions & 0 deletions pkg/util/log/http_sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package log

import (
"bytes"
"crypto/tls"
"fmt"
"net"
"net/http"
"net/url"
"time"

"github.com/cockroachdb/cockroach/pkg/cli/exit"
)

var insecureTransport http.RoundTripper = &http.Transport{
// Same as DefaultTransport...
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
// ...except insecure TLS.
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}

type httpSinkOptions struct {
unsafeTLS bool
timeout time.Duration
method string
}

func newHTTPSink(url string, opt httpSinkOptions) *httpSink {
hs := &httpSink{
client: http.Client{
Transport: http.DefaultTransport,
Timeout: opt.timeout},
address: url,
doRequest: doPost}

if opt.unsafeTLS {
hs.client.Transport = insecureTransport
}

if opt.method == http.MethodGet {
hs.doRequest = doGet
}

return hs
}

type httpSink struct {
client http.Client
address string
doRequest func(*httpSink, []byte) (*http.Response, error)
}

// output emits some formatted bytes to this sink.
// the sink is invited to perform an extra flush if indicated
// by the argument. This is set to true for e.g. Fatal
// entries.
//
// The parent logger's outputMu is held during this operation: log
// sinks must not recursively call into logging when implementing
// this method.
func (hs *httpSink) output(extraSync bool, b []byte) (err error) {
resp, err := hs.doRequest(hs, b)
if err != nil {
return err
}

if resp.StatusCode >= 400 {
return HTTPLogError{
StatusCode: resp.StatusCode,
Address: hs.address}
}
return nil
}

// emergencyOutput attempts to emit some formatted bytes, and
// ignores any errors.
//
// The parent logger's outputMu is held during this operation: log
// sinks must not recursively call into logging when implementing
// this method.
func (hs *httpSink) emergencyOutput(b []byte) {
hs.doRequest(hs, b)
}

func doPost(hs *httpSink, b []byte) (*http.Response, error) {
resp, err := hs.client.Post(hs.address, "text/plain", bytes.NewReader(b))
if err != nil {
return nil, err
}
resp.Body.Close() // don't care about content
return resp, nil
}

func doGet(hs *httpSink, b []byte) (*http.Response, error) {
resp, err := hs.client.Get(hs.address + "?" + url.QueryEscape(string(b)))
if err != nil {
return nil, err
}
resp.Body.Close() // don't care about content
return resp, nil
}

// active returns true if this sink is currently active.
func (*httpSink) active() bool {
return true
}

// attachHints attaches some hints about the location of the message
// to the stack message.
func (*httpSink) attachHints(stacks []byte) []byte {
return stacks
}

// exitCode returns the exit code to use if the logger decides
// to terminate because of an error in output().
func (*httpSink) exitCode() exit.Code {
return exit.LoggingNetCollectorUnavailable()
}

// HTTPLogError represents an HTTP error status code from a logging request.
type HTTPLogError struct {
StatusCode int
Address string
}

func (e HTTPLogError) Error() string {
return fmt.Sprintf(
"received %v response attempting to log to [%v]",
e.StatusCode, e.Address)
}
Loading

0 comments on commit a80f0a3

Please sign in to comment.