Skip to content

Commit

Permalink
log: add support for decoding JSON logs
Browse files Browse the repository at this point in the history
Previously, our logging code did not have the ability to recognize and
decode JSON formatted log files. This led to problems when retrieving
logs via API endpoints and when running commands such as `merge-logs` to
process logs from debug.zip files.

This commit adds support for the json, json-compact, json-fluent, and
json-fluent-compact formats via one generalized JSON decoder.

Resolves #66684

Release note (ops change, cli change): debug zip and merge-logs commands
will now work with JSON formatted logs.
  • Loading branch information
dhartunian committed Jul 6, 2022
1 parent 8a5fa1e commit 29d22b7
Show file tree
Hide file tree
Showing 7 changed files with 477 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
{"header":1,"timestamp":"1631568932.530923000","goroutine":1,"file":"util/log/file_sync_buffer.go","line":238,"redactable":1,"tags":{"config":""},"message":"running on machine: "}
{"header":1,"timestamp":"1631568932.530929000","goroutine":1,"file":"util/log/file_sync_buffer.go","line":238,"redactable":1,"tags":{"config":""},"message":"binary: CockroachDB CCL v21.2.0-alpha.00000000-4019-g6d1becda18-dirty (x86_64-apple-darwin20.6.0, built , go1.16.6)"}
{"header":1,"timestamp":"1631568932.530934000","goroutine":1,"file":"util/log/file_sync_buffer.go","line":238,"redactable":1,"tags":{"config":""},"message":"arguments: []"}
{"header":1,"timestamp":"1631568932.530945000","goroutine":1,"file":"util/log/file_sync_buffer.go","line":238,"redactable":1,"tags":{"config":""},"message":"log format (utf8=✓): json"}
{"header":1,"timestamp":"1631568932.530945000","goroutine":1,"file":"util/log/file_sync_buffer.go","line":238,"redactable":1,"tags":{"config":""},"message":"log format (utf8=✓): json-unsupported"}
192 changes: 192 additions & 0 deletions pkg/util/log/format_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@ package log

import (
"bytes"
"encoding/json"
"fmt"
"sort"
"strconv"
"strings"

"github.com/cockroachdb/cockroach/pkg/util/jsonbytes"
"github.com/cockroachdb/cockroach/pkg/util/log/logpb"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/cockroachdb/redact"
)

Expand Down Expand Up @@ -413,3 +417,191 @@ func escapeString(buf *buffer, s string) {
b = jsonbytes.EncodeString(b, s)
buf.Buffer = *bytes.NewBuffer(b)
}

type entryDecoderJSON struct {
decoder *json.Decoder
sensitiveEditor redactEditor
compact bool
}

type jsonCommon struct {
Header int `json:"header,omitempty"`
Message string `json:"message"`
Stacks string `json:"stacks"`
Tags map[string]interface{} `json:"tags"`
Event map[string]interface{} `json:"event"`
}

// JSONEntry represents a JSON log entry.
type JSONEntry struct {
jsonCommon

//Channel Channel `json:"channel,omitempty"`
ChannelNumeric int64 `json:"channel_numeric,omitempty"`
Timestamp string `json:"timestamp,omitempty"`
//Severity Severity `json:"severity,omitempty"`
SeverityNumeric int64 `json:"severity_numeric,omitempty"`
Goroutine int64 `json:"goroutine,omitempty"`
File string `json:"file,omitempty"`
Line int64 `json:"line,omitempty"`
EntryCounter uint64 `json:"entry_counter,omitempty"`
Redactable int `json:"redactable,omitempty"`
NodeID int64 `json:"node_id,omitempty"`
ClusterID string `json:"cluster_id,omitempty"`
Version string `json:"version,omitempty"`
InstanceID int64 `json:"instance_id,omitempty"`
TenantID int64 `json:"tenant_id,omitempty"`
}

// JSONCompactEntry represents a JSON log entry in the compact format.
type JSONCompactEntry struct {
jsonCommon

//Channel Channel `json:"C,omitempty"`
ChannelNumeric int64 `json:"c,omitempty"`
Timestamp string `json:"t,omitempty"`
//Severity Severity `json:"sev,omitempty"`
SeverityNumeric int64 `json:"s,omitempty"`
Goroutine int64 `json:"g,omitempty"`
File string `json:"f,omitempty"`
Line int64 `json:"l,omitempty"`
EntryCounter uint64 `json:"n,omitempty"`
Redactable int `json:"r,omitempty"`
NodeID int64 `json:"N,omitempty"`
ClusterID string `json:"x,omitempty"`
Version string `json:"v,omitempty"`
InstanceID int64 `json:"q,omitempty"`
TenantID int64 `json:"T,omitempty"`
}

// populate is a method that populates fields from the source JSONEntry
// into the `logpb.Entry`. Redactability is applied to the tags,
// message, stacks, and event fields if it's missing.
func (e *JSONEntry) populate(entry *logpb.Entry, d *entryDecoderJSON) (*redactablePackage, error) {
ts, err := fromFluent(e.Timestamp)
if err != nil {
return nil, err
}
entry.Time = ts

entry.Goroutine = e.Goroutine
entry.File = e.File
entry.Line = e.Line
entry.Redactable = e.Redactable == 1

if e.Header == 0 {
entry.Severity = Severity(e.SeverityNumeric)
entry.Channel = Channel(e.ChannelNumeric)
entry.Counter = e.EntryCounter
}

var entryMsg bytes.Buffer
if e.Event != nil {
by, err := json.Marshal(e.Event)
if err != nil {
return nil, err
}
entryMsg.Write(by)
entry.StructuredStart = 0
entry.StructuredEnd = uint32(entryMsg.Len())
} else {
entryMsg.Write([]byte(e.Message))
}

if e.Tags != nil {
var t *logtags.Buffer
for k, v := range e.Tags {
t = t.Add(k, v)
}
s := &strings.Builder{}
t.FormatToString(s)
tagStrings := strings.Split(s.String(), ",")
sort.Strings(tagStrings)
r := redactablePackage{
msg: []byte(strings.Join(tagStrings, ",")),
redactable: entry.Redactable,
}
r = d.sensitiveEditor(r)
entry.Tags = string(r.msg)
}

if e.Stacks != "" {
entry.StackTraceStart = uint32(entryMsg.Len()) + 1
entryMsg.Write([]byte("\nstack trace:\n"))
entryMsg.Write([]byte(e.Stacks))
}

return &redactablePackage{
msg: entryMsg.Bytes(),
redactable: entry.Redactable,
}, nil
}

func (e *JSONCompactEntry) toEntry(entry *JSONEntry) {
entry.jsonCommon = e.jsonCommon
entry.ChannelNumeric = e.ChannelNumeric
entry.Timestamp = e.Timestamp
entry.SeverityNumeric = e.SeverityNumeric
entry.Goroutine = e.Goroutine
entry.File = e.File
entry.Line = e.Line
entry.EntryCounter = e.EntryCounter
entry.Redactable = e.Redactable
entry.NodeID = e.NodeID
entry.ClusterID = e.ClusterID
entry.Version = e.Version
entry.InstanceID = e.InstanceID
entry.TenantID = e.TenantID
}

// Decode decodes the next log entry into the provided protobuf message.
func (d *entryDecoderJSON) Decode(entry *logpb.Entry) error {
var rp *redactablePackage
var e JSONEntry
if d.compact {
var compact JSONCompactEntry
err := d.decoder.Decode(&compact)
if err != nil {
return err
}
compact.toEntry(&e)
} else {
err := d.decoder.Decode(&e)
if err != nil {
return err
}
}
rp, err := e.populate(entry, d)
if err != nil {
return err
}

r := d.sensitiveEditor(*rp)
entry.Message = string(r.msg)
entry.Redactable = r.redactable

return nil
}

// fromFluent parses a fluentbit timestamp format into nanoseconds since
// the epoch. The fluentbit format is a string consisting of two
// concatenanted integers joined by a `.`. The left-hand side is the
// number of seconds since the epich, the right hand side is the
// additional number of nanoseconds of precision.
//
// For example: `"1136214245.654321000"` parses into `1136214245654321000`.
func fromFluent(timestamp string) (int64, error) {
parts := strings.Split(timestamp, ".")
if len(parts) != 2 {
return 0, errors.New("bad timestamp format")
}
left, err := strconv.ParseInt(parts[0], 10, 64)
if err != nil {
return 0, err
}
right, err := strconv.ParseInt(parts[1], 10, 64)
if err != nil {
return 0, err
}
return left*1000000000 + right, nil
}
35 changes: 35 additions & 0 deletions pkg/util/log/format_json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@ import (
"bytes"
"context"
"fmt"
"io"
"strings"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/build"
"github.com/cockroachdb/cockroach/pkg/util/log/channel"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/log/logpb"
"github.com/cockroachdb/cockroach/pkg/util/log/severity"
"github.com/cockroachdb/datadriven"
"github.com/cockroachdb/logtags"
"github.com/kr/pretty"
)

func TestJSONFormats(t *testing.T) {
Expand Down Expand Up @@ -91,3 +95,34 @@ func TestJSONFormats(t *testing.T) {
})

}

func TestJsonDecode(t *testing.T) {
datadriven.RunTest(t, "testdata/parse_json",
func(t *testing.T, td *datadriven.TestData) string {
switch td.Cmd {
case "log":
var out strings.Builder

d, err := NewEntryDecoderWithFormat(strings.NewReader(td.Input), WithMarkedSensitiveData, td.CmdArgs[0].Vals[0])
if err != nil {
td.Fatalf(t, "error while constructing decoder: %v", err)
}

for {
var e logpb.Entry
if err := d.Decode(&e); err != nil {
if err == io.EOF {
break
}
td.Fatalf(t, "error while decoding: %v", err)
}
fmt.Fprintf(&out, "%# v\n", pretty.Formatter(e))
}
return out.String()
default:
t.Fatalf("unknown directive: %q", td.Cmd)
}
// unreachable
return ""
})
}
4 changes: 2 additions & 2 deletions pkg/util/log/formats.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ var formatParsers = map[string]string{
"crdb-v2": "v2",
"crdb-v2-tty": "v2",
"json": "json",
"json-compact": "json",
"json-compact": "json-compact",
"json-fluent": "json",
"json-fluent-compact": "json",
"json-fluent-compact": "json-compact",
}

var formatters = func() map[string]logFormatter {
Expand Down
41 changes: 37 additions & 4 deletions pkg/util/log/log_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package log
import (
"bufio"
"bytes"
"encoding/json"
"io"
"regexp"

Expand All @@ -27,13 +28,22 @@ var (
/* Prefix */ `(?:.*\[config\][ ]+log format \(utf8=.+\): )` +
/* Format */ `(.*)$`,
)
v1IndicatorRE = regexp.MustCompile(
v2IndicatorRE = regexp.MustCompile(
`(?m)^` +
/* crdb-v1 Indicator */ `(?:.*line format: \[IWEF\]yymmdd hh:mm:ss.uuuuuu goid file:line.*)$`,
/* crdb-v2 indicator */ `(?:.*line format: \[IWEF\]yymmdd hh:mm:ss.uuuuuu goid \[chan@\]file:line.*)$`,
)
v2IndicatorRE = regexp.MustCompile(
v1IndicatorRE = regexp.MustCompile(
`(?m)^` +
/* crdb-v2 Indicator */ `(?:.*line format: \[IWEF\]yymmdd hh:mm:ss.uuuuuu goid \[chan@\]file:line.*)$`)
/* crdb-v1 indicator */ `(?:.*line format: \[IWEF\]yymmdd hh:mm:ss.uuuuuu goid file:line.*)$`,
)
jsonIndicatorRE = regexp.MustCompile(
`(?m)^` + `(?:.*\"config\".+log format \(utf8=.+\): )json\".+$`)
jsonCompactIndicatorRE = regexp.MustCompile(
`(?m)^` + `(?:.*\"config\".+log format \(utf8=.+\): )json-compact\".+$`)
jsonFluentIndicatorRE = regexp.MustCompile(
`(?m)^` + `(?:.*\"config\".+log format \(utf8=.+\): )json-fluent\".+$`)
jsonFluentCompactIndicatorRE = regexp.MustCompile(
`(?m)^` + `(?:.*\"config\".+log format \(utf8=.+\): )json-fluent-compact\".+$`)
)

// EntryDecoder is used to decode log entries.
Expand Down Expand Up @@ -83,6 +93,17 @@ func NewEntryDecoderWithFormat(
}
decoder.scanner.Split(decoder.split)
d = decoder
case "json":
d = &entryDecoderJSON{
decoder: json.NewDecoder(in),
sensitiveEditor: getEditor(editMode),
}
case "json-compact":
d = &entryDecoderJSON{
decoder: json.NewDecoder(in),
sensitiveEditor: getEditor(editMode),
compact: true,
}
default:
// The unimplemented.WithIssue function is not used here because it results in circular dependency issues.
return nil, errors.WithTelemetry(
Expand Down Expand Up @@ -131,5 +152,17 @@ func getLogFormat(data []byte) (string, error) {
return "crdb-v2", nil
}

if jsonIndicatorRE.Match(data) {
return "json", nil
}
if jsonCompactIndicatorRE.Match(data) {
return "json-compact", nil
}
if jsonFluentIndicatorRE.Match(data) {
return "json-fluent", nil
}
if jsonFluentCompactIndicatorRE.Match(data) {
return "json-fluent-compact", nil
}
return "", errors.New("failed to extract log file format from the log")
}
Loading

0 comments on commit 29d22b7

Please sign in to comment.