Skip to content

Commit

Permalink
Merge pull request #21 from G-Research/SREWRK-1440-tracing-log-traceid
Browse files Browse the repository at this point in the history
Add structured logger (and traceID to logging)
  • Loading branch information
kradalby authored Feb 15, 2021
2 parents f1c0e1f + 74a4f70 commit fa3ec26
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 21 deletions.
95 changes: 75 additions & 20 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"sort"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -35,6 +37,8 @@ var (
flagListen = flag.String("listen", ":10080", "[ip]:port to serve HTTP on")
flagStore = flag.String("store", "localhost:10901", "Thanos Store API gRPC endpoint")
flagIgnoreWarnings = flag.Bool("ignore-warnings", false, "Ignore warnings from Thanos")
flagLogFormat = flag.String("log.format", "logfmt", "Log format. One of [logfmt, json]")
flagLogLevel = flag.String("log.level", "info", "Log filtering level. One of [debug, info, warn, error]")
)

var (
Expand All @@ -50,7 +54,7 @@ func init() {
prometheus.MustRegister(httpRequests)
}

func initTracer() func() {
func initTracer(logger log.Logger) func() {
flush, err := jaegerExporter.InstallNewPipeline(
jaegerExporter.WithCollectorEndpoint(""),
jaegerExporter.WithProcess(jaegerExporter.Process{
Expand All @@ -60,7 +64,8 @@ func initTracer() func() {
jaegerExporter.WithDisabledFromEnv(),
)
if err != nil {
log.Fatal(err)
level.Error(logger).Log("err", err)
os.Exit(1)
}

otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
Expand All @@ -72,11 +77,46 @@ func initTracer() func() {
return flush
}

func NewConfiguredLogger(format string, logLevel string) (log.Logger, error) {
var logger log.Logger
switch format {
case "logfmt":
logger = log.NewLogfmtLogger(os.Stdout)
case "json":
logger = log.NewJSONLogger(os.Stdout)
default:
return nil, fmt.Errorf("%s is not a valid log format", format)
}

var filterOption level.Option
switch logLevel {
case "debug":
filterOption = level.AllowDebug()
case "info":
filterOption = level.AllowInfo()
case "warn":
filterOption = level.AllowWarn()
case "error":
filterOption = level.AllowError()
default:
return nil, fmt.Errorf("%s is not a valid log level", logLevel)
}
logger = log.With(logger, "ts", log.DefaultTimestampUTC, "caller", log.Caller(5))
logger = level.NewFilter(logger, filterOption)
return logger, nil
}

func main() {
log.Printf("info: starting up thanos-remote-read...")
fmt.Println("info: starting up thanos-remote-read...")
flag.Parse()

flush := initTracer()
logger, err := NewConfiguredLogger(*flagLogFormat, *flagLogLevel)
if err != nil {
fmt.Fprintf(os.Stderr, "Could not initialize logger: %s", err)
os.Exit(1)
}

flush := initTracer(logger)
defer flush()

conn, err := grpc.Dial(*flagStore, grpc.WithInsecure(),
Expand All @@ -85,13 +125,18 @@ func main() {
grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor),
grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor()))
if err != nil {
log.Fatal(err)
level.Error(logger).Log("err", err)
os.Exit(1)
}
setup(conn, logger)
err = (http.ListenAndServe(*flagListen, nil))
if err != nil {
level.Error(logger).Log("err", err)
os.Exit(1)
}
setup(conn)
log.Fatal(http.ListenAndServe(*flagListen, nil))
}

func setup(conn *grpc.ClientConn) {
func setup(conn *grpc.ClientConn, logger log.Logger) {
api := &API{
client: storepb.NewStoreClient(conn),
}
Expand All @@ -104,7 +149,7 @@ func setup(conn *grpc.ClientConn) {
}
handler("/", "root", root)
handler("/-/healthy", "health", ok)
handler("/api/v1/read", "read", errorWrap(api.remoteRead))
handler("/api/v1/read", "read", errorWrap(loggerWrap(api.remoteRead, logger)))

http.Handle("/metrics", promhttp.Handler())
}
Expand All @@ -113,6 +158,12 @@ type API struct {
client storepb.StoreClient
}

func loggerWrap(f func(w http.ResponseWriter, r *http.Request, logger log.Logger) error, logger log.Logger) func(w http.ResponseWriter, r *http.Request) error {
return func(w http.ResponseWriter, r *http.Request) error {
return f(w, r, logger)
}
}

func errorWrap(f func(w http.ResponseWriter, r *http.Request) error) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
err := f(w, r)
Expand All @@ -132,7 +183,7 @@ type HTTPError struct {
Status int
}

func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) error {
func (api *API) remoteRead(w http.ResponseWriter, r *http.Request, logger log.Logger) error {
ctx := r.Context()
tracer := otel.Tracer("")
var span trace.Span
Expand Down Expand Up @@ -165,7 +216,7 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) error {
// This does not do streaming, at the time of writing Prometheus doesn't ask
// for it anyway: https://github.com/prometheus/prometheus/issues/5926

resp, err := api.doStoreRequest(r.Context(), &req, ignoredSelector)
resp, err := api.doStoreRequest(r.Context(), &req, ignoredSelector, logger)
if err != nil {
return err
}
Expand All @@ -180,7 +231,7 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) error {

compressed = snappy.Encode(nil, data)
if _, err := w.Write(compressed); err != nil {
log.Printf("Error writing response: %v", err)
level.Error(logger).Log("err", err, "traceID", span.SpanContext().TraceID)
}
return nil
}
Expand All @@ -198,7 +249,7 @@ func (c AggrChunkByTimestamp) Len() int { return len(c) }
func (c AggrChunkByTimestamp) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
func (c AggrChunkByTimestamp) Less(i, j int) bool { return c[i].MinTime < c[j].MinTime }

func (api *API) doStoreRequest(ctx context.Context, req *prompb.ReadRequest, ignoredSelector map[string]struct{}) (*prompb.ReadResponse, error) {
func (api *API) doStoreRequest(ctx context.Context, req *prompb.ReadRequest, ignoredSelector map[string]struct{}, logger log.Logger) (*prompb.ReadResponse, error) {
tracer := otel.Tracer("")
var span trace.Span
ctx, span = tracer.Start(ctx, "doStoreRequest")
Expand All @@ -224,7 +275,11 @@ func (api *API) doStoreRequest(ctx context.Context, req *prompb.ReadRequest, ign
Value: matcher.Value})
}

log.Printf("Thanos request: %v", storeReq)
level.Info(logger).Log(
"traceID", span.SpanContext().TraceID,
"msg", "thanos request",
"request", fmt.Sprintf("%v", storeReq),
)
storeRes, err := api.client.Series(ctx, storeReq)
if err != nil {
return nil, err
Expand All @@ -239,7 +294,7 @@ func (api *API) doStoreRequest(ctx context.Context, req *prompb.ReadRequest, ign
break
}
if err != nil {
log.Printf("Error in recv from thanos: %v", err)
level.Error(logger).Log("err", err, "traceID", span.SpanContext().TraceID, "msg", "Error in recv from thanos")
return nil, err
}

Expand All @@ -258,19 +313,19 @@ func (api *API) doStoreRequest(ctx context.Context, req *prompb.ReadRequest, ign
if chunk.Raw == nil {
// We only ask for and handle RAW
err := fmt.Errorf("unexpectedly missing raw chunk data")
log.Print(err)
level.Error(logger).Log("err", err, "traceID", span.SpanContext().TraceID)
return nil, err
}
if chunk.Raw.Type != storepb.Chunk_XOR {
err := fmt.Errorf("unexpected encoding type: %v", chunk.Raw.Type)
log.Print(err)
level.Error(logger).Log("err", err, "traceID", span.SpanContext().TraceID)
return nil, err
}

raw, err := chunkenc.FromData(chunkenc.EncXOR, chunk.Raw.Data)
if err != nil {
err := fmt.Errorf("reading chunk: %w", err)
log.Print("Error ", err)
level.Error(logger).Log("err", err, "traceID", span.SpanContext().TraceID)
return nil, err
}

Expand All @@ -288,7 +343,7 @@ func (api *API) doStoreRequest(ctx context.Context, req *prompb.ReadRequest, ign

case *storepb.SeriesResponse_Warning:
if *flagIgnoreWarnings {
log.Printf("Warning from thanos: %v", r)
level.Warn(logger).Log("result", fmt.Sprintf("%v", r), "traceID", span.SpanContext().TraceID)
} else {
return nil, HTTPError{fmt.Errorf("%v", r), http.StatusInternalServerError}
}
Expand Down
4 changes: 3 additions & 1 deletion main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,16 @@ func TestMain(m *testing.M) {
var logOutput bytes.Buffer
log.SetOutput(&logOutput)

logger, err := NewConfiguredLogger("logfmt", "error")

ctx := context.Background()
conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(bufDialer), grpc.WithInsecure())
if err != nil {
log.Fatalf("Failed to dial bufnet: %v", err)
}

defer conn.Close()
setup(conn)
setup(conn, logger)

status := m.Run()
if status != 0 {
Expand Down

0 comments on commit fa3ec26

Please sign in to comment.