Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

app/log: integrate optional loki logging spout #1425

Merged
merged 7 commits into from
Nov 11, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 60 additions & 8 deletions app/log/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package log

import (
"context"
"fmt"
"strings"
"sync"
Expand All @@ -29,6 +30,7 @@ import (
"go.uber.org/zap/zapcore"

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/log/loki"
"github.com/obolnetwork/charon/app/z"
)

Expand All @@ -38,18 +40,30 @@ const (
keyTopic = "topic"
)

// logger is the global logger.
// zapLogger abstracts a zap logger.
type zapLogger interface {
Debug(string, ...zap.Field)
Info(string, ...zap.Field)
Warn(string, ...zap.Field)
Error(string, ...zap.Field)
}

var (
logger = newDefaultLogger()
initMu sync.Mutex
// logger is the global logger.
logger zapLogger = newDefaultLogger()
// stopFuncs are the global logger stop functions.
stopFuncs []func(context.Context) = nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need to initialize this as nil?

xenowits marked this conversation as resolved.
Show resolved Hide resolved

padding = strings.Repeat(" ", padLength)
)

// Config defines the logging configuration.
type Config struct {
Level string // debug, info, warn or error
Format string // console or json
Level string // debug, info, warn or error
Format string // console or json
LokiAddresses []string // URLs for loki logging spout
LokiService string // Value of the service label pushed with loki logs.
}

// ZapLevel returns the zapcore level.
Expand All @@ -72,6 +86,8 @@ func DefaultConfig() Config {

// InitLogger initialises the global logger based on the provided config.
func InitLogger(config Config) error {
Stop(context.Background()) // Stop previously started loggers.

initMu.Lock()
defer initMu.Unlock()

Expand All @@ -87,12 +103,36 @@ func InitLogger(config Config) error {

if config.Format == "console" {
logger = newConsoleLogger(level, writer)
return nil
} else {
logger, err = newStructuredLogger(config.Format, level, writer)
if err != nil {
return err
}
}

logger, err = newStructuredLogger(config.Format, level, writer)
if err != nil {
return err
if len(config.LokiAddresses) > 0 {
// Wire loki clients internal logger
ctx := WithTopic(context.Background(), "loki")
filter := Filter()
logFunc := func(msg string, err error) {
Warn(ctx, msg, err, filter)
}

// Create a multi logger
loggers := multiLogger{logger}
for _, address := range config.LokiAddresses {
lokiCl := loki.New(address, config.LokiService, logFunc)
lokiLogger, err := newStructuredLogger("logfmt", zapcore.DebugLevel, lokiWriter{cl: lokiCl})
if err != nil {
return err
}

stopFuncs = append(stopFuncs, lokiCl.Stop)
loggers = append(loggers, lokiLogger)
go lokiCl.Run()
}

logger = loggers
}

return nil
Expand Down Expand Up @@ -127,6 +167,18 @@ func InitLogfmtForT(t *testing.T, ws zapcore.WriteSyncer, opts ...func(*zapcore.
require.NoError(t, err)
}

// Stop stops all log processors.
func Stop(ctx context.Context) {
initMu.Lock()
defer initMu.Unlock()

for _, stopFunc := range stopFuncs {
stopFunc(ctx)
}

stopFuncs = nil
}

// newStructuredLogger returns an opinionated logfmt or json logger.
func newStructuredLogger(format string, level zapcore.Level, ws zapcore.WriteSyncer, opts ...func(*zapcore.EncoderConfig)) (*zap.Logger, error) {
encConfig := zap.NewProductionEncoderConfig()
Expand Down
55 changes: 34 additions & 21 deletions app/log/loki/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/expbackoff"
"github.com/obolnetwork/charon/app/log"
pbv1 "github.com/obolnetwork/charon/app/log/loki/lokipb/v1"
"github.com/obolnetwork/charon/app/z"
)
Expand All @@ -46,28 +45,22 @@ const (
batchMax = 5 * 1 << 20 // 5MB
)

// Client for pushing logs in snappy-compressed protos over HTTP.
type Client struct {
input chan string
quit chan struct{}
done chan struct{}
service string
endpoint string
batchWait time.Duration
batchMax int
}
// logFunc abstract logging, since this is a logger itself.
corverroos marked this conversation as resolved.
Show resolved Hide resolved
type logFunc func(string, error)

// NewForT returns a new Client for testing.
func NewForT(endpoint string, service string, batchWait time.Duration, batchMax int) *Client {
return newInternal(endpoint, service, batchWait, batchMax)
return newInternal(endpoint, service, batchWait, batchMax, func(string, error) {})
}

// New returns a new Client.
func New(endpoint string, service string) *Client {
return newInternal(endpoint, service, batchWait, batchMax)
func New(endpoint string, service string, logFunc logFunc) *Client {
return newInternal(endpoint, service, batchWait, batchMax, logFunc)
}

func newInternal(endpoint string, service string, batchWait time.Duration, batchMax int) *Client {
func newInternal(endpoint string, service string, batchWait time.Duration, batchMax int,
logFunc logFunc,
) *Client {
return &Client{
endpoint: endpoint,
service: service,
Expand All @@ -76,9 +69,22 @@ func newInternal(endpoint string, service string, batchWait time.Duration, batch
input: make(chan string),
batchMax: batchMax,
batchWait: batchWait,
logFunc: logFunc,
}
}

// Client for pushing logs in snappy-compressed protos over HTTP.
type Client struct {
input chan string
quit chan struct{}
done chan struct{}
service string
endpoint string
batchWait time.Duration
batchMax int
logFunc logFunc
}

// Run blocks until Stop is called.
// - It batches and sends logs to loki.
// - It sends logs every batchWait time.
Expand All @@ -88,12 +94,12 @@ func newInternal(endpoint string, service string, batchWait time.Duration, batch
func (c *Client) Run() {
var (
client = new(http.Client)
ctx = log.WithTopic(context.Background(), "loki")
ctx = context.Background()
backoffConfig = expbackoff.DefaultConfig
retries int
triedAt time.Time
batch = newBatch(c.service) // New empty batch
ticker = time.NewTicker(c.batchWait / 10)
logFilter = log.Filter()
ticker = time.NewTicker(c.batchWait)
)
defer close(c.done)
defer ticker.Stop()
Expand All @@ -116,15 +122,22 @@ func (c *Client) Run() {
if batch.Age() < c.batchWait {
continue
}

// Do not send if we are backing off
if retries > 0 && expbackoff.Backoff(backoffConfig, retries) > 0 {
continue
if retries > 0 {
nextTry := triedAt.Add(expbackoff.Backoff(backoffConfig, retries))
if time.Until(nextTry) > 0 {
break
}
}

err := send(ctx, client, c.endpoint, batch)
if err != nil {
log.Warn(ctx, "Loki batch send failed", err, logFilter)
// Log async to avoid deadlock by recursive calls to Add.
go c.logFunc("Loki batch send failed", err)

retries++
triedAt = time.Now()

continue
}
Expand Down
63 changes: 63 additions & 0 deletions app/log/wrapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright © 2022 Obol Labs Inc.
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by the Free
// Software Foundation, either version 3 of the License, or (at your option)
// any later version.
//
// This program is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
// more details.
//
// You should have received a copy of the GNU General Public License along with
// this program. If not, see <http://www.gnu.org/licenses/>.

package log

import (
"go.uber.org/zap"

"github.com/obolnetwork/charon/app/log/loki"
)

// multiLogger wraps multiple zap loggers and implements zapLogger.
type multiLogger []zapLogger

func (m multiLogger) Debug(msg string, fields ...zap.Field) {
for _, l := range m {
l.Debug(msg, fields...)
}
}

func (m multiLogger) Info(msg string, fields ...zap.Field) {
for _, l := range m {
l.Info(msg, fields...)
}
}

func (m multiLogger) Warn(msg string, fields ...zap.Field) {
for _, l := range m {
l.Warn(msg, fields...)
}
}

func (m multiLogger) Error(msg string, fields ...zap.Field) {
for _, l := range m {
l.Error(msg, fields...)
}
}

// lokiWriter wraps a loki client and implements zap.SyncWriter.
type lokiWriter struct {
cl *loki.Client
}

func (l lokiWriter) Write(line []byte) (n int, err error) {
l.cl.Add(string(line))
return len(line), nil
}

func (lokiWriter) Sync() error {
return nil
}
3 changes: 2 additions & 1 deletion cmd/bootnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func newBootnodeCmd(runFunc func(context.Context, BootnodeConfig) error) *cobra.
Long: `Starts a discv5 bootnode that charon nodes can use to bootstrap their p2p cluster`,
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
if err := initLogger(cmd.Flags()); err != nil {
if err := log.InitLogger(config.LogConfig); err != nil {
return err
}

Expand All @@ -73,6 +73,7 @@ func newBootnodeCmd(runFunc func(context.Context, BootnodeConfig) error) *cobra.
bindBootnodeFlag(cmd.Flags(), &config)
bindP2PFlags(cmd, &config.P2PConfig)
bindLogFlags(cmd.Flags(), &config.LogConfig)
bindLokiFlags(cmd.Flags(), &config.LogConfig)

return cmd
}
Expand Down
18 changes: 0 additions & 18 deletions cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,21 +156,3 @@ func printFlags(ctx context.Context, flags *pflag.FlagSet) {

log.Info(ctx, "Parsed config", zStrs...)
}

// initLogger initialises logger based on provided log config flags.
func initLogger(flags *pflag.FlagSet) error {
logLevel := flags.Lookup("log-level")
logFmt := flags.Lookup("log-format")

if logLevel != nil && logFmt != nil {
err := log.InitLogger(log.Config{
Level: logLevel.Value.String(),
Format: logFmt.Value.String(),
})
if err != nil {
return err
}
}

return nil
}
5 changes: 3 additions & 2 deletions cmd/cmd_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ func TestCmdFlags(t *testing.T) {
},
AppConfig: &app.Config{
Log: log.Config{
Level: "info",
Format: "console",
Level: "info",
Format: "console",
LokiService: "charon",
},
P2P: p2p.Config{
UDPBootnodes: []string{"http://bootnode.lb.gcp.obol.tech:3640/enr"},
Expand Down
3 changes: 2 additions & 1 deletion cmd/dkg.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/spf13/cobra"
"github.com/spf13/pflag"

"github.com/obolnetwork/charon/app/log"
"github.com/obolnetwork/charon/dkg"
)

Expand All @@ -35,7 +36,7 @@ distributed validator key shares and a final cluster lock configuration. Note th
this command at the same time.`,
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
if err := initLogger(cmd.Flags()); err != nil {
if err := log.InitLogger(config.Log); err != nil {
return err
}

Expand Down
8 changes: 7 additions & 1 deletion cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func newRunCmd(runFunc func(context.Context, app.Config) error) *cobra.Command {
Short: "Run the charon middleware client",
Long: "Starts the long-running Charon middleware process to perform distributed validator duties.",
RunE: func(cmd *cobra.Command, args []string) error {
if err := initLogger(cmd.Flags()); err != nil {
if err := log.InitLogger(conf.Log); err != nil {
return err
}

Expand All @@ -52,11 +52,17 @@ func newRunCmd(runFunc func(context.Context, app.Config) error) *cobra.Command {
bindNoVerifyFlag(cmd.Flags(), &conf.NoVerify)
bindP2PFlags(cmd, &conf.P2P)
bindLogFlags(cmd.Flags(), &conf.Log)
bindLokiFlags(cmd.Flags(), &conf.Log)
bindFeatureFlags(cmd.Flags(), &conf.Feature)

return cmd
}

func bindLokiFlags(flags *pflag.FlagSet, config *log.Config) {
flags.StringSliceVar(&config.LokiAddresses, "loki-addresses", nil, "Enables sending of logfmt structured logs to these Loki log aggregation server addresses. This is in addition to normal stderr logs.")
flags.StringVar(&config.LokiService, "loki-service", "charon", "Service label sent with logs to Loki.")
}

func bindNoVerifyFlag(flags *pflag.FlagSet, config *bool) {
flags.BoolVar(config, "no-verify", false, "Disables cluster definition and lock file verification.")
}
Expand Down
2 changes: 2 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ Flags:
--lock-file string The path to the cluster lock file defining distributed validator cluster. (default ".charon/cluster-lock.json")
--log-format string Log format; console, logfmt or json (default "console")
--log-level string Log level; debug, info, warn or error (default "info")
--loki-addresses strings Enables sending of logfmt structured logs to these Loki log aggregation server addresses. This is in addition to normal stderr logs.
--loki-service string Service label sent with logs to Loki. (default "charon")
--monitoring-address string Listening address (ip and port) for the monitoring API (prometheus, pprof). (default "127.0.0.1:3620")
--no-verify Disables cluster definition and lock file verification.
--p2p-allowlist string Comma-separated list of CIDR subnets for allowing only certain peer connections. Example: 192.168.0.0/16 would permit connections to peers on your local network only. The default is to accept all connections.
Expand Down
Loading