Skip to content

Commit

Permalink
util/log: new experimental integration with Fluentd
Browse files Browse the repository at this point in the history
Release note (cli change): It is now possible to redirect logging to
[Fluentd](https://www.fluentd.org)-compatible network collectors. See
the documentation for details. This is an alpha-quality feature.
  • Loading branch information
knz committed Dec 22, 2020
1 parent b837842 commit eb01d89
Show file tree
Hide file tree
Showing 12 changed files with 623 additions and 7 deletions.
4 changes: 4 additions & 0 deletions pkg/cli/exit/codes.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ func FatalError() Code { return Code{7} }
// in the logging system.
func TimeoutAfterFatalError() Code { return Code{8} }

// LoggingNetCollectorUnavailable (9) indicates that an error occurred
// during a logging operation to a network collector.
func LoggingNetCollectorUnavailable() Code { return Code{9} }

// Codes that are specific to client commands follow. It's possible
// for codes to be reused across separate client or server commands.
// Command-specific exit codes should be allocated down from 125.
Expand Down
1 change: 1 addition & 0 deletions pkg/util/log/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"file_log_gc.go",
"file_sync_buffer.go",
"flags.go",
"fluent_client.go",
"format_crdb_v1.go",
"format_json.go",
"formats.go",
Expand Down
57 changes: 57 additions & 0 deletions pkg/util/log/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func init() {
// we cannot keep redaction markers there.
*defaultConfig.Sinks.Stderr.Redactable = false
// Remove all sinks other than stderr.
defaultConfig.Sinks.FluentServers = nil
defaultConfig.Sinks.FileGroups = nil

if _, err := ApplyConfig(defaultConfig); err != nil {
Expand Down Expand Up @@ -279,6 +280,26 @@ func ApplyConfig(config logconfig.Config) (cleanupFn func(), err error) {
}
}

// Create the fluent sinks.
for _, fc := range config.Sinks.FluentServers {
if fc.Filter == severity.NONE {
continue
}
fluentSinkInfo, err := newFluentSinkInfo(*fc)
if err != nil {
cleanupFn()
return nil, err
}
sinkInfos = append(sinkInfos, fluentSinkInfo)
allSinkInfos.put(fluentSinkInfo)

// Connect the channels for this sink.
for _, ch := range fc.Channels.Channels {
l := chans[ch]
l.sinkInfos = append(l.sinkInfos, fluentSinkInfo)
}
}

logging.setChannelLoggers(chans, &stderrSinkInfo)
setActive()

Expand All @@ -303,6 +324,18 @@ func newFileSinkInfo(fileNamePrefix string, c logconfig.FileConfig) (*sinkInfo,
return info, fileSink, nil
}

// newFluentSinkInfo creates a new fluentSink and its accompanying sinkInfo
// from the provided configuration.
func newFluentSinkInfo(c logconfig.FluentConfig) (*sinkInfo, error) {
info := &sinkInfo{}
if err := info.applyConfig(c.CommonSinkConfig); err != nil {
return nil, err
}
fluentSink := newFluentSink(c.Net, c.Address)
info.sink = fluentSink
return info, nil
}

// applyConfig applies a common sink configuration to a sinkInfo.
func (l *sinkInfo) applyConfig(c logconfig.CommonSinkConfig) error {
l.threshold = c.Filter
Expand Down Expand Up @@ -426,6 +459,30 @@ func DescribeAppliedConfig() string {
return nil
})

// Describe the fluent sinks.
config.Sinks.FluentServers = make(map[string]*logconfig.FluentConfig)
sIdx := 1
_ = allSinkInfos.iter(func(l *sinkInfo) error {
fluentSink, ok := l.sink.(*fluentSink)
if !ok {
return nil
}

fc := &logconfig.FluentConfig{}
fc.CommonSinkConfig = l.describeAppliedConfig()
fc.Net = fluentSink.network
fc.Address = fluentSink.addr

// Describe the connections to this fluent sink.
for ch, logger := range chans {
describeConnections(logger, ch, l, &fc.Channels)
}
skey := fmt.Sprintf("s%d", sIdx)
sIdx++
config.Sinks.FluentServers[skey] = fc
return nil
})

// Note: we cannot return 'config' directly, because this captures
// certain variables from the loggers by reference and thus could be
// invalidated by concurrent uses of ApplyConfig().
Expand Down
122 changes: 122 additions & 0 deletions pkg/util/log/fluent_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package log

import (
"fmt"
"net"

"github.com/cockroachdb/cockroach/pkg/cli/exit"
"github.com/cockroachdb/errors"
)

// fluentSink represents a Fluentd-compatible network collector.
type fluentSink struct {
// The network address of the fluentd collector.
network string
addr string

// good indicates that the connection can be used.
good bool
conn net.Conn
}

func newFluentSink(network, addr string) *fluentSink {
f := &fluentSink{
addr: addr,
network: network,
}
return f
}

func (l *fluentSink) String() string {
return fmt.Sprintf("fluent:%s://%s", l.network, l.addr)
}

// activeAtSeverity implements the logSink interface.
func (l *fluentSink) active() bool { return true }

// attachHints implements the logSink interface.
func (l *fluentSink) attachHints(stacks []byte) []byte {
return stacks
}

// exitCode implements the logSink interface.
func (l *fluentSink) exitCode() exit.Code {
return exit.LoggingNetCollectorUnavailable()
}

// output implements the logSink interface.
func (l *fluentSink) output(extraSync bool, b []byte) error {
// Try to write and reconnect immediately if the first write fails.
//
// TODO(knz): Add some net socket write deadlines here.
_ = l.tryWrite(b)
if l.good {
return nil
}

if err := l.ensureConn(b); err != nil {
return err
}
return l.tryWrite(b)
}

// emergencyOutput implements the logSink interface.
func (l *fluentSink) emergencyOutput(b []byte) {
// TODO(knz): Add some net socket write deadlines here.
_ = l.tryWrite(b)
if !l.good {
_ = l.ensureConn(b)
_ = l.tryWrite(b)
}
}

func (l *fluentSink) close() {
l.good = false
if l.conn != nil {
if err := l.conn.Close(); err != nil {
fmt.Fprintf(OrigStderr, "error closing network logger: %v\n", err)
}
l.conn = nil
}
}

func (l *fluentSink) ensureConn(b []byte) error {
if l.good {
return nil
}
l.close()
var err error
l.conn, err = net.Dial(l.network, l.addr)
if err != nil {
fmt.Fprintf(OrigStderr, "%s: error dialing network logger: %v\n%s", l, err, b)
return err
}
fmt.Fprintf(OrigStderr, "%s: connection to network logger resumed\n", l)
l.good = true
return nil
}

var errNoConn = errors.New("no connection opened")

func (l *fluentSink) tryWrite(b []byte) error {
if !l.good {
return errNoConn
}
n, err := l.conn.Write(b)
if err != nil || n < len(b) {
fmt.Fprintf(OrigStderr, "%s: logging error: %v or short write (%d/%d)\n%s",
l, err, n, len(b), b)
l.good = false
}
return err
}
44 changes: 43 additions & 1 deletion pkg/util/log/logconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ const DefaultFileFormat = `crdb-v1`
// when not specified in a configuration.
const DefaultStderrFormat = `crdb-v1-tty`

// DefaultFluentFormat is the entry format for fluent sinks
// when not specified in a configuration.
const DefaultFluentFormat = `json-fluent-compact`

// DefaultConfig returns a suitable default configuration when logging
// is meant to primarily go to files.
func DefaultConfig() (c Config) {
Expand All @@ -43,6 +47,11 @@ file-defaults:
max-file-size: 10mib
max-group-size: 100mib
exit-on-error: true
fluent-defaults:
filter: INFO
format: ` + DefaultFluentFormat + `
redactable: true
exit-on-error: false
sinks:
stderr:
filter: NONE
Expand Down Expand Up @@ -86,6 +95,11 @@ type Config struct {
// configuration value.
FileDefaults FileDefaults `yaml:"file-defaults,omitempty"`

// FluentDefaults represents the default configuration for fluent sinks,
// inherited when a specific fluent sink config does not provide a
// configuration value.
FluentDefaults FluentDefaults `yaml:"fluent-defaults,omitempty"`

// Sinks represents the sink configurations.
Sinks SinkConfig `yaml:",omitempty"`

Expand Down Expand Up @@ -115,12 +129,15 @@ type CaptureFd2Config struct {
type SinkConfig struct {
// FileGroups represents the list of configured file sinks.
FileGroups map[string]*FileConfig `yaml:"file-groups,omitempty"`
// FluentServer represents the list of configured fluent sinks.
FluentServers map[string]*FluentConfig `yaml:"fluent-servers,omitempty"`
// Stderr represents the configuration for the stderr sink.
Stderr StderrConfig `yaml:",omitempty"`

// sortedFileGroupNames is used internally to
// sortedFileGroupNames and sortedServerNames are used internally to
// make the Export() function deterministic.
sortedFileGroupNames []string
sortedServerNames []string
}

// StderrConfig represents the configuration for the stderr sink.
Expand Down Expand Up @@ -171,6 +188,26 @@ type CommonSinkConfig struct {
Auditable *bool `yaml:",omitempty"`
}

// FluentConfig represents the configuration for one fluentd sink.
type FluentConfig struct {
// Channels is the list of logging channels that use this sink.
Channels ChannelList `yaml:",omitempty"`

// Net is the protocol for the fluent server. Can be "tcp", "udp",
// "tcp4", etc.
Net string `yaml:",omitempty"`
// Address is the network address of the fluent server. The
// host/address and port parts are separated with a colon. IPv6
// numeric addresses should be included within square brackets,
// e.g.: [::1]:1234.
Address string

CommonSinkConfig `yaml:",inline"`

// used during validation.
serverName string
}

// FileDefaults represent configuration defaults for file sinks.
type FileDefaults struct {
// Dir stores the default output directory for file sinks.
Expand Down Expand Up @@ -200,6 +237,11 @@ type FileDefaults struct {
CommonSinkConfig `yaml:",inline"`
}

// FluentDefaults represent configuration defaults for fluent sinks.
type FluentDefaults struct {
CommonSinkConfig `yaml:",inline"`
}

// FileConfig represents the configuration for one file sink.
type FileConfig struct {
// Channels is the list of logging channels that use this sink.
Expand Down
36 changes: 36 additions & 0 deletions pkg/util/log/logconfig/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,33 @@ func (c *Config) Export(onlyChans ChannelList) (string, string) {
links = append(links, "stray --> stderr")
}

// Collect the network servers.
//
// servers maps each server to its box declaration.
servers := map[string]string{}
for _, fn := range c.Sinks.sortedServerNames {
fc := c.Sinks.FluentServers[fn]
if fc.Filter == logpb.Severity_NONE {
continue
}
skey := fmt.Sprintf("s__%s", fc.serverName)
target, thisprocs, thislinks := process(skey, fc.CommonSinkConfig)
hasLink := false
for _, ch := range fc.Channels.Channels {
if !chanSel.HasChannel(ch) {
continue
}
hasLink = true
links = append(links, fmt.Sprintf("%s --> %s", ch, target))
}
if hasLink {
processing = append(processing, thisprocs...)
links = append(links, thislinks...)
servers[fc.serverName] = fmt.Sprintf("queue %s as \"fluent: %s:%s\"",
skey, fc.Net, fc.Address)
}
}

// Export the stderr redirects.
if c.Sinks.Stderr.Filter != logpb.Severity_NONE {
target, thisprocs, thislinks := process("stderr", c.Sinks.Stderr.CommonSinkConfig)
Expand Down Expand Up @@ -210,6 +237,15 @@ func (c *Config) Export(onlyChans ChannelList) (string, string) {
buf.WriteString("}\n")
}

// Represent the network servers, if any.
if len(c.Sinks.sortedServerNames) > 0 {
buf.WriteString("cloud network {\n")
for _, s := range c.Sinks.sortedServerNames {
fmt.Fprintf(&buf, " %s\n", servers[s])
}
buf.WriteString("}\n")
}

// Export the relationships.
for _, l := range links {
fmt.Fprintf(&buf, "%s\n", l)
Expand Down
Loading

0 comments on commit eb01d89

Please sign in to comment.