Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-21.2: log: add support for decoding JSON logs #83147

Merged
merged 1 commit into from
Jul 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
{"header":1,"timestamp":"1631568932.530923000","goroutine":1,"file":"util/log/file_sync_buffer.go","line":238,"redactable":1,"tags":{"config":""},"message":"running on machine: "}
{"header":1,"timestamp":"1631568932.530929000","goroutine":1,"file":"util/log/file_sync_buffer.go","line":238,"redactable":1,"tags":{"config":""},"message":"binary: CockroachDB CCL v21.2.0-alpha.00000000-4019-g6d1becda18-dirty (x86_64-apple-darwin20.6.0, built , go1.16.6)"}
{"header":1,"timestamp":"1631568932.530934000","goroutine":1,"file":"util/log/file_sync_buffer.go","line":238,"redactable":1,"tags":{"config":""},"message":"arguments: []"}
{"header":1,"timestamp":"1631568932.530945000","goroutine":1,"file":"util/log/file_sync_buffer.go","line":238,"redactable":1,"tags":{"config":""},"message":"log format (utf8=✓): json"}
{"header":1,"timestamp":"1631568932.530945000","goroutine":1,"file":"util/log/file_sync_buffer.go","line":238,"redactable":1,"tags":{"config":""},"message":"log format (utf8=✓): json-unsupported"}
192 changes: 192 additions & 0 deletions pkg/util/log/format_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@ package log

import (
"bytes"
"encoding/json"
"fmt"
"sort"
"strconv"
"strings"

"github.com/cockroachdb/cockroach/pkg/util/jsonbytes"
"github.com/cockroachdb/cockroach/pkg/util/log/logpb"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/cockroachdb/redact"
)

Expand Down Expand Up @@ -413,3 +417,191 @@ func escapeString(buf *buffer, s string) {
b = jsonbytes.EncodeString(b, s)
buf.Buffer = *bytes.NewBuffer(b)
}

type entryDecoderJSON struct {
decoder *json.Decoder
sensitiveEditor redactEditor
compact bool
}

type jsonCommon struct {
Header int `json:"header,omitempty"`
Message string `json:"message"`
Stacks string `json:"stacks"`
Tags map[string]interface{} `json:"tags"`
Event map[string]interface{} `json:"event"`
}

// JSONEntry represents a JSON log entry.
type JSONEntry struct {
jsonCommon

//Channel Channel `json:"channel,omitempty"`
ChannelNumeric int64 `json:"channel_numeric,omitempty"`
Timestamp string `json:"timestamp,omitempty"`
//Severity Severity `json:"severity,omitempty"`
SeverityNumeric int64 `json:"severity_numeric,omitempty"`
Goroutine int64 `json:"goroutine,omitempty"`
File string `json:"file,omitempty"`
Line int64 `json:"line,omitempty"`
EntryCounter uint64 `json:"entry_counter,omitempty"`
Redactable int `json:"redactable,omitempty"`
NodeID int64 `json:"node_id,omitempty"`
ClusterID string `json:"cluster_id,omitempty"`
Version string `json:"version,omitempty"`
InstanceID int64 `json:"instance_id,omitempty"`
TenantID int64 `json:"tenant_id,omitempty"`
}

// JSONCompactEntry represents a JSON log entry in the compact format.
type JSONCompactEntry struct {
jsonCommon

//Channel Channel `json:"C,omitempty"`
ChannelNumeric int64 `json:"c,omitempty"`
Timestamp string `json:"t,omitempty"`
//Severity Severity `json:"sev,omitempty"`
SeverityNumeric int64 `json:"s,omitempty"`
Goroutine int64 `json:"g,omitempty"`
File string `json:"f,omitempty"`
Line int64 `json:"l,omitempty"`
EntryCounter uint64 `json:"n,omitempty"`
Redactable int `json:"r,omitempty"`
NodeID int64 `json:"N,omitempty"`
ClusterID string `json:"x,omitempty"`
Version string `json:"v,omitempty"`
InstanceID int64 `json:"q,omitempty"`
TenantID int64 `json:"T,omitempty"`
}

// populate is a method that populates fields from the source JSONEntry
// into the `logpb.Entry`. Redactability is applied to the tags,
// message, stacks, and event fields if it's missing.
func (e *JSONEntry) populate(entry *logpb.Entry, d *entryDecoderJSON) (*redactablePackage, error) {
ts, err := fromFluent(e.Timestamp)
if err != nil {
return nil, err
}
entry.Time = ts

entry.Goroutine = e.Goroutine
entry.File = e.File
entry.Line = e.Line
entry.Redactable = e.Redactable == 1

if e.Header == 0 {
entry.Severity = Severity(e.SeverityNumeric)
entry.Channel = Channel(e.ChannelNumeric)
entry.Counter = e.EntryCounter
}

var entryMsg bytes.Buffer
if e.Event != nil {
by, err := json.Marshal(e.Event)
if err != nil {
return nil, err
}
entryMsg.Write(by)
entry.StructuredStart = 0
entry.StructuredEnd = uint32(entryMsg.Len())
} else {
entryMsg.Write([]byte(e.Message))
}

if e.Tags != nil {
var t *logtags.Buffer
for k, v := range e.Tags {
t = t.Add(k, v)
}
s := &strings.Builder{}
t.FormatToString(s)
tagStrings := strings.Split(s.String(), ",")
sort.Strings(tagStrings)
r := redactablePackage{
msg: []byte(strings.Join(tagStrings, ",")),
redactable: entry.Redactable,
}
r = d.sensitiveEditor(r)
entry.Tags = string(r.msg)
}

if e.Stacks != "" {
entry.StackTraceStart = uint32(entryMsg.Len()) + 1
entryMsg.Write([]byte("\nstack trace:\n"))
entryMsg.Write([]byte(e.Stacks))
}

return &redactablePackage{
msg: entryMsg.Bytes(),
redactable: entry.Redactable,
}, nil
}

func (e *JSONCompactEntry) toEntry(entry *JSONEntry) {
entry.jsonCommon = e.jsonCommon
entry.ChannelNumeric = e.ChannelNumeric
entry.Timestamp = e.Timestamp
entry.SeverityNumeric = e.SeverityNumeric
entry.Goroutine = e.Goroutine
entry.File = e.File
entry.Line = e.Line
entry.EntryCounter = e.EntryCounter
entry.Redactable = e.Redactable
entry.NodeID = e.NodeID
entry.ClusterID = e.ClusterID
entry.Version = e.Version
entry.InstanceID = e.InstanceID
entry.TenantID = e.TenantID
}

// Decode decodes the next log entry into the provided protobuf message.
func (d *entryDecoderJSON) Decode(entry *logpb.Entry) error {
var rp *redactablePackage
var e JSONEntry
if d.compact {
var compact JSONCompactEntry
err := d.decoder.Decode(&compact)
if err != nil {
return err
}
compact.toEntry(&e)
} else {
err := d.decoder.Decode(&e)
if err != nil {
return err
}
}
rp, err := e.populate(entry, d)
if err != nil {
return err
}

r := d.sensitiveEditor(*rp)
entry.Message = string(r.msg)
entry.Redactable = r.redactable

return nil
}

// fromFluent parses a fluentbit timestamp format into nanoseconds since
// the epoch. The fluentbit format is a string consisting of two
// concatenanted integers joined by a `.`. The left-hand side is the
// number of seconds since the epich, the right hand side is the
// additional number of nanoseconds of precision.
//
// For example: `"1136214245.654321000"` parses into `1136214245654321000`.
func fromFluent(timestamp string) (int64, error) {
parts := strings.Split(timestamp, ".")
if len(parts) != 2 {
return 0, errors.New("bad timestamp format")
}
left, err := strconv.ParseInt(parts[0], 10, 64)
if err != nil {
return 0, err
}
right, err := strconv.ParseInt(parts[1], 10, 64)
if err != nil {
return 0, err
}
return left*1000000000 + right, nil
}
35 changes: 35 additions & 0 deletions pkg/util/log/format_json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@ import (
"bytes"
"context"
"fmt"
"io"
"strings"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/build"
"github.com/cockroachdb/cockroach/pkg/util/log/channel"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/log/logpb"
"github.com/cockroachdb/cockroach/pkg/util/log/severity"
"github.com/cockroachdb/datadriven"
"github.com/cockroachdb/logtags"
"github.com/kr/pretty"
)

func TestJSONFormats(t *testing.T) {
Expand Down Expand Up @@ -91,3 +95,34 @@ func TestJSONFormats(t *testing.T) {
})

}

func TestJsonDecode(t *testing.T) {
datadriven.RunTest(t, "testdata/parse_json",
func(t *testing.T, td *datadriven.TestData) string {
switch td.Cmd {
case "log":
var out strings.Builder

d, err := NewEntryDecoderWithFormat(strings.NewReader(td.Input), WithMarkedSensitiveData, td.CmdArgs[0].Vals[0])
if err != nil {
td.Fatalf(t, "error while constructing decoder: %v", err)
}

for {
var e logpb.Entry
if err := d.Decode(&e); err != nil {
if err == io.EOF {
break
}
td.Fatalf(t, "error while decoding: %v", err)
}
fmt.Fprintf(&out, "%# v\n", pretty.Formatter(e))
}
return out.String()
default:
t.Fatalf("unknown directive: %q", td.Cmd)
}
// unreachable
return ""
})
}
4 changes: 2 additions & 2 deletions pkg/util/log/formats.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ var formatParsers = map[string]string{
"crdb-v2": "v2",
"crdb-v2-tty": "v2",
"json": "json",
"json-compact": "json",
"json-compact": "json-compact",
"json-fluent": "json",
"json-fluent-compact": "json",
"json-fluent-compact": "json-compact",
}

var formatters = func() map[string]logFormatter {
Expand Down
41 changes: 37 additions & 4 deletions pkg/util/log/log_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package log
import (
"bufio"
"bytes"
"encoding/json"
"io"
"regexp"

Expand All @@ -27,13 +28,22 @@ var (
/* Prefix */ `(?:.*\[config\][ ]+log format \(utf8=.+\): )` +
/* Format */ `(.*)$`,
)
v1IndicatorRE = regexp.MustCompile(
v2IndicatorRE = regexp.MustCompile(
`(?m)^` +
/* crdb-v1 Indicator */ `(?:.*line format: \[IWEF\]yymmdd hh:mm:ss.uuuuuu goid file:line.*)$`,
/* crdb-v2 indicator */ `(?:.*line format: \[IWEF\]yymmdd hh:mm:ss.uuuuuu goid \[chan@\]file:line.*)$`,
)
v2IndicatorRE = regexp.MustCompile(
v1IndicatorRE = regexp.MustCompile(
`(?m)^` +
/* crdb-v2 Indicator */ `(?:.*line format: \[IWEF\]yymmdd hh:mm:ss.uuuuuu goid \[chan@\]file:line.*)$`)
/* crdb-v1 indicator */ `(?:.*line format: \[IWEF\]yymmdd hh:mm:ss.uuuuuu goid file:line.*)$`,
)
jsonIndicatorRE = regexp.MustCompile(
`(?m)^` + `(?:.*\"config\".+log format \(utf8=.+\): )json\".+$`)
jsonCompactIndicatorRE = regexp.MustCompile(
`(?m)^` + `(?:.*\"config\".+log format \(utf8=.+\): )json-compact\".+$`)
jsonFluentIndicatorRE = regexp.MustCompile(
`(?m)^` + `(?:.*\"config\".+log format \(utf8=.+\): )json-fluent\".+$`)
jsonFluentCompactIndicatorRE = regexp.MustCompile(
`(?m)^` + `(?:.*\"config\".+log format \(utf8=.+\): )json-fluent-compact\".+$`)
)

// EntryDecoder is used to decode log entries.
Expand Down Expand Up @@ -83,6 +93,17 @@ func NewEntryDecoderWithFormat(
}
decoder.scanner.Split(decoder.split)
d = decoder
case "json":
d = &entryDecoderJSON{
decoder: json.NewDecoder(in),
sensitiveEditor: getEditor(editMode),
}
case "json-compact":
d = &entryDecoderJSON{
decoder: json.NewDecoder(in),
sensitiveEditor: getEditor(editMode),
compact: true,
}
default:
// The unimplemented.WithIssue function is not used here because it results in circular dependency issues.
return nil, errors.WithTelemetry(
Expand Down Expand Up @@ -131,5 +152,17 @@ func getLogFormat(data []byte) (string, error) {
return "crdb-v2", nil
}

if jsonIndicatorRE.Match(data) {
return "json", nil
}
if jsonCompactIndicatorRE.Match(data) {
return "json-compact", nil
}
if jsonFluentIndicatorRE.Match(data) {
return "json-fluent", nil
}
if jsonFluentCompactIndicatorRE.Match(data) {
return "json-fluent-compact", nil
}
return "", errors.New("failed to extract log file format from the log")
}
Loading