From b593bc9188384d32b1c687f8e55ce578be02478f Mon Sep 17 00:00:00 2001 From: Kristoffer Dalby Date: Mon, 15 Feb 2021 15:32:07 +0000 Subject: [PATCH 1/6] Add structured logger (and traceID to logging) This commit swaps stdlib log for go-kit structured logger and defaults to logfmt format for logging. In addition, it adds traceID to logs to make it play nicely and integrate better between log and tracing systems. --- main.go | 75 ++++++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 63 insertions(+), 12 deletions(-) diff --git a/main.go b/main.go index 7291559..4516ae4 100644 --- a/main.go +++ b/main.go @@ -8,12 +8,15 @@ import ( "fmt" "io" "io/ioutil" - "log" "net/http" + "os" "sort" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/prometheus/prompb" @@ -35,7 +38,10 @@ var ( flagListen = flag.String("listen", ":10080", "[ip]:port to serve HTTP on") flagStore = flag.String("store", "localhost:10901", "Thanos Store API gRPC endpoint") flagIgnoreWarnings = flag.Bool("ignore-warnings", false, "Ignore warnings from Thanos") + flagLogFormat = flag.String("log.format", "logfmt", "Log format. One of [logfmt, json]") + flagLogLevel = flag.String("log.level", "info", "Log filtering level. One of [debug, info, warn, error]") ) +var logger log.Logger var ( httpRequests = prometheus.NewCounterVec( @@ -60,7 +66,8 @@ func initTracer() func() { jaegerExporter.WithDisabledFromEnv(), ) if err != nil { - log.Fatal(err) + level.Error(logger).Log("err", err) + os.Exit(1) } otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator( @@ -72,10 +79,44 @@ func initTracer() func() { return flush } +func InitConfiguredLogger(format string, logLevel string) error { + switch format { + case "logfmt": + logger = log.NewLogfmtLogger(os.Stdout) + case "json": + logger = log.NewJSONLogger(os.Stdout) + default: + return errors.Errorf("%s is not a valid log format", format) + } + + var filterOption level.Option + switch logLevel { + case "debug": + filterOption = level.AllowDebug() + case "info": + filterOption = level.AllowInfo() + case "warn": + filterOption = level.AllowWarn() + case "error": + filterOption = level.AllowError() + default: + return errors.Errorf("%s is not a valid log level", logLevel) + } + logger = log.With(logger, "ts", log.DefaultTimestampUTC, "caller", log.Caller(5)) + logger = level.NewFilter(logger, filterOption) + return nil +} + func main() { - log.Printf("info: starting up thanos-remote-read...") + fmt.Println("info: starting up thanos-remote-read...") flag.Parse() + err := InitConfiguredLogger(*flagLogFormat, *flagLogLevel) + if err != nil { + fmt.Fprintf(os.Stderr, "Could not initialize logger: %s", err) + os.Exit(1) + } + flush := initTracer() defer flush() @@ -85,10 +126,15 @@ func main() { grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor), grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor())) if err != nil { - log.Fatal(err) + level.Error(logger).Log("err", err) + os.Exit(1) } setup(conn) - log.Fatal(http.ListenAndServe(*flagListen, nil)) + err = (http.ListenAndServe(*flagListen, nil)) + if err != nil { + level.Error(logger).Log("err", err) + os.Exit(1) + } } func setup(conn *grpc.ClientConn) { @@ -180,7 +226,8 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) error { compressed = snappy.Encode(nil, data) if _, err := w.Write(compressed); err != nil { - log.Printf("Error writing response: %v", err) + // log.Printf("Error writing response: %v", err) + level.Error(logger).Log("err", err, "traceID", span.SpanContext().TraceID) } return nil } @@ -224,7 +271,11 @@ func (api *API) doStoreRequest(ctx context.Context, req *prompb.ReadRequest, ign Value: matcher.Value}) } - log.Printf("Thanos request: %v", storeReq) + level.Info(logger).Log( + "traceID", span.SpanContext().TraceID, + "msg", "thanos request", + "request", fmt.Sprintf("%v", storeReq), + ) storeRes, err := api.client.Series(ctx, storeReq) if err != nil { return nil, err @@ -239,7 +290,7 @@ func (api *API) doStoreRequest(ctx context.Context, req *prompb.ReadRequest, ign break } if err != nil { - log.Printf("Error in recv from thanos: %v", err) + level.Error(logger).Log("err", err, "traceID", span.SpanContext().TraceID, "msg", "Error in recv from thanos") return nil, err } @@ -258,19 +309,19 @@ func (api *API) doStoreRequest(ctx context.Context, req *prompb.ReadRequest, ign if chunk.Raw == nil { // We only ask for and handle RAW err := fmt.Errorf("unexpectedly missing raw chunk data") - log.Print(err) + level.Error(logger).Log("err", err, "traceID", span.SpanContext().TraceID) return nil, err } if chunk.Raw.Type != storepb.Chunk_XOR { err := fmt.Errorf("unexpected encoding type: %v", chunk.Raw.Type) - log.Print(err) + level.Error(logger).Log("err", err, "traceID", span.SpanContext().TraceID) return nil, err } raw, err := chunkenc.FromData(chunkenc.EncXOR, chunk.Raw.Data) if err != nil { err := fmt.Errorf("reading chunk: %w", err) - log.Print("Error ", err) + level.Error(logger).Log("err", err, "traceID", span.SpanContext().TraceID) return nil, err } @@ -288,7 +339,7 @@ func (api *API) doStoreRequest(ctx context.Context, req *prompb.ReadRequest, ign case *storepb.SeriesResponse_Warning: if *flagIgnoreWarnings { - log.Printf("Warning from thanos: %v", r) + level.Warn(logger).Log("result", fmt.Sprintf("%v", r), "traceID", span.SpanContext().TraceID) } else { return nil, HTTPError{fmt.Errorf("%v", r), http.StatusInternalServerError} } From 2e22f79ed39a34eaaa061120baac5b409fca83e3 Mon Sep 17 00:00:00 2001 From: Kristoffer Dalby Date: Mon, 15 Feb 2021 16:15:15 +0000 Subject: [PATCH 2/6] Pass logger through Context --- main.go | 33 +++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/main.go b/main.go index 4516ae4..292318d 100644 --- a/main.go +++ b/main.go @@ -41,7 +41,6 @@ var ( flagLogFormat = flag.String("log.format", "logfmt", "Log format. One of [logfmt, json]") flagLogLevel = flag.String("log.level", "info", "Log filtering level. One of [debug, info, warn, error]") ) -var logger log.Logger var ( httpRequests = prometheus.NewCounterVec( @@ -56,7 +55,7 @@ func init() { prometheus.MustRegister(httpRequests) } -func initTracer() func() { +func initTracer(logger log.Logger) func() { flush, err := jaegerExporter.InstallNewPipeline( jaegerExporter.WithCollectorEndpoint(""), jaegerExporter.WithProcess(jaegerExporter.Process{ @@ -79,14 +78,15 @@ func initTracer() func() { return flush } -func InitConfiguredLogger(format string, logLevel string) error { +func InitConfiguredLogger(format string, logLevel string) (log.Logger, error) { + var logger log.Logger switch format { case "logfmt": logger = log.NewLogfmtLogger(os.Stdout) case "json": logger = log.NewJSONLogger(os.Stdout) default: - return errors.Errorf("%s is not a valid log format", format) + return nil, errors.Errorf("%s is not a valid log format", format) } var filterOption level.Option @@ -100,24 +100,24 @@ func InitConfiguredLogger(format string, logLevel string) error { case "error": filterOption = level.AllowError() default: - return errors.Errorf("%s is not a valid log level", logLevel) + return nil, errors.Errorf("%s is not a valid log level", logLevel) } logger = log.With(logger, "ts", log.DefaultTimestampUTC, "caller", log.Caller(5)) logger = level.NewFilter(logger, filterOption) - return nil + return logger, nil } func main() { fmt.Println("info: starting up thanos-remote-read...") flag.Parse() - err := InitConfiguredLogger(*flagLogFormat, *flagLogLevel) + logger, err := InitConfiguredLogger(*flagLogFormat, *flagLogLevel) if err != nil { fmt.Fprintf(os.Stderr, "Could not initialize logger: %s", err) os.Exit(1) } - flush := initTracer() + flush := initTracer(logger) defer flush() conn, err := grpc.Dial(*flagStore, grpc.WithInsecure(), @@ -129,7 +129,7 @@ func main() { level.Error(logger).Log("err", err) os.Exit(1) } - setup(conn) + setup(conn, logger) err = (http.ListenAndServe(*flagListen, nil)) if err != nil { level.Error(logger).Log("err", err) @@ -137,7 +137,7 @@ func main() { } } -func setup(conn *grpc.ClientConn) { +func setup(conn *grpc.ClientConn, logger log.Logger) { api := &API{ client: storepb.NewStoreClient(conn), } @@ -150,7 +150,7 @@ func setup(conn *grpc.ClientConn) { } handler("/", "root", root) handler("/-/healthy", "health", ok) - handler("/api/v1/read", "read", errorWrap(api.remoteRead)) + handler("/api/v1/read", "read", errorWrap(loggerWrap(api.remoteRead, logger))) http.Handle("/metrics", promhttp.Handler()) } @@ -159,6 +159,14 @@ type API struct { client storepb.StoreClient } +func loggerWrap(f func(w http.ResponseWriter, r *http.Request) error, logger log.Logger) func(w http.ResponseWriter, r *http.Request) error { + return func(w http.ResponseWriter, r *http.Request) error { + ctx := context.WithValue(r.Context(), "logger", logger) + newR := r.WithContext(ctx) + return f(w, newR) + } +} + func errorWrap(f func(w http.ResponseWriter, r *http.Request) error) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { err := f(w, r) @@ -180,6 +188,7 @@ type HTTPError struct { func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) error { ctx := r.Context() + logger := ctx.Value("logger").(log.Logger) tracer := otel.Tracer("") var span trace.Span ctx, span = tracer.Start(ctx, "remoteRead") @@ -226,7 +235,6 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) error { compressed = snappy.Encode(nil, data) if _, err := w.Write(compressed); err != nil { - // log.Printf("Error writing response: %v", err) level.Error(logger).Log("err", err, "traceID", span.SpanContext().TraceID) } return nil @@ -250,6 +258,7 @@ func (api *API) doStoreRequest(ctx context.Context, req *prompb.ReadRequest, ign var span trace.Span ctx, span = tracer.Start(ctx, "doStoreRequest") defer span.End() + logger := ctx.Value("logger").(log.Logger) response := &prompb.ReadResponse{} From d7e5b678dbee4007121fd8c61fd7ba5eccf2e1e6 Mon Sep 17 00:00:00 2001 From: Kristoffer Dalby Date: Mon, 15 Feb 2021 16:24:21 +0000 Subject: [PATCH 3/6] Rename logger function to be equal to geras --- main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/main.go b/main.go index 292318d..e291240 100644 --- a/main.go +++ b/main.go @@ -78,7 +78,7 @@ func initTracer(logger log.Logger) func() { return flush } -func InitConfiguredLogger(format string, logLevel string) (log.Logger, error) { +func NewConfiguredLogger(format string, logLevel string) (log.Logger, error) { var logger log.Logger switch format { case "logfmt": @@ -111,7 +111,7 @@ func main() { fmt.Println("info: starting up thanos-remote-read...") flag.Parse() - logger, err := InitConfiguredLogger(*flagLogFormat, *flagLogLevel) + logger, err := NewConfiguredLogger(*flagLogFormat, *flagLogLevel) if err != nil { fmt.Fprintf(os.Stderr, "Could not initialize logger: %s", err) os.Exit(1) From 2881a91b680084acdbccf7cfca46612f4b900258 Mon Sep 17 00:00:00 2001 From: Kristoffer Dalby Date: Mon, 15 Feb 2021 16:24:31 +0000 Subject: [PATCH 4/6] Add logger to tests --- main_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/main_test.go b/main_test.go index 90864e5..6729c7c 100644 --- a/main_test.go +++ b/main_test.go @@ -58,6 +58,8 @@ func TestMain(m *testing.M) { var logOutput bytes.Buffer log.SetOutput(&logOutput) + logger, err := NewConfiguredLogger("logfmt", "error") + ctx := context.Background() conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(bufDialer), grpc.WithInsecure()) if err != nil { @@ -65,7 +67,7 @@ func TestMain(m *testing.M) { } defer conn.Close() - setup(conn) + setup(conn, logger) status := m.Run() if status != 0 { From 01e667b24d3f39c2ed0c3b1bca5d4ffbe0118c3f Mon Sep 17 00:00:00 2001 From: Kristoffer Dalby Date: Mon, 15 Feb 2021 16:48:22 +0000 Subject: [PATCH 5/6] Simplify log wrapper as per comment --- main.go | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/main.go b/main.go index e291240..106b3aa 100644 --- a/main.go +++ b/main.go @@ -159,11 +159,9 @@ type API struct { client storepb.StoreClient } -func loggerWrap(f func(w http.ResponseWriter, r *http.Request) error, logger log.Logger) func(w http.ResponseWriter, r *http.Request) error { +func loggerWrap(f func(w http.ResponseWriter, r *http.Request, logger log.Logger) error, logger log.Logger) func(w http.ResponseWriter, r *http.Request) error { return func(w http.ResponseWriter, r *http.Request) error { - ctx := context.WithValue(r.Context(), "logger", logger) - newR := r.WithContext(ctx) - return f(w, newR) + return f(w, r, logger) } } @@ -186,9 +184,8 @@ type HTTPError struct { Status int } -func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) error { +func (api *API) remoteRead(w http.ResponseWriter, r *http.Request, logger log.Logger) error { ctx := r.Context() - logger := ctx.Value("logger").(log.Logger) tracer := otel.Tracer("") var span trace.Span ctx, span = tracer.Start(ctx, "remoteRead") @@ -220,7 +217,7 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) error { // This does not do streaming, at the time of writing Prometheus doesn't ask // for it anyway: https://github.com/prometheus/prometheus/issues/5926 - resp, err := api.doStoreRequest(r.Context(), &req, ignoredSelector) + resp, err := api.doStoreRequest(r.Context(), &req, ignoredSelector, logger) if err != nil { return err } @@ -253,12 +250,11 @@ func (c AggrChunkByTimestamp) Len() int { return len(c) } func (c AggrChunkByTimestamp) Swap(i, j int) { c[i], c[j] = c[j], c[i] } func (c AggrChunkByTimestamp) Less(i, j int) bool { return c[i].MinTime < c[j].MinTime } -func (api *API) doStoreRequest(ctx context.Context, req *prompb.ReadRequest, ignoredSelector map[string]struct{}) (*prompb.ReadResponse, error) { +func (api *API) doStoreRequest(ctx context.Context, req *prompb.ReadRequest, ignoredSelector map[string]struct{}, logger log.Logger) (*prompb.ReadResponse, error) { tracer := otel.Tracer("") var span trace.Span ctx, span = tracer.Start(ctx, "doStoreRequest") defer span.End() - logger := ctx.Value("logger").(log.Logger) response := &prompb.ReadResponse{} From 74a4f70ea7f71604b4aa7d36bfc864ce6fdd167a Mon Sep 17 00:00:00 2001 From: Kristoffer Dalby Date: Mon, 15 Feb 2021 17:32:36 +0000 Subject: [PATCH 6/6] Remove unnecessary errors package --- main.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/main.go b/main.go index 106b3aa..171cca6 100644 --- a/main.go +++ b/main.go @@ -16,7 +16,6 @@ import ( "github.com/go-kit/kit/log/level" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" - "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/prometheus/prompb" @@ -86,7 +85,7 @@ func NewConfiguredLogger(format string, logLevel string) (log.Logger, error) { case "json": logger = log.NewJSONLogger(os.Stdout) default: - return nil, errors.Errorf("%s is not a valid log format", format) + return nil, fmt.Errorf("%s is not a valid log format", format) } var filterOption level.Option @@ -100,7 +99,7 @@ func NewConfiguredLogger(format string, logLevel string) (log.Logger, error) { case "error": filterOption = level.AllowError() default: - return nil, errors.Errorf("%s is not a valid log level", logLevel) + return nil, fmt.Errorf("%s is not a valid log level", logLevel) } logger = log.With(logger, "ts", log.DefaultTimestampUTC, "caller", log.Caller(5)) logger = level.NewFilter(logger, filterOption)