diff --git a/pkg/util/log/clog.go b/pkg/util/log/clog.go index 42f2f381b461..83640a9b73bc 100644 --- a/pkg/util/log/clog.go +++ b/pkg/util/log/clog.go @@ -333,7 +333,8 @@ func (l *loggerT) outputLogEntry(entry logEntry) { // The sink was not accepting entries at this level. Nothing to do. continue } - if err := s.sink.output(bufs.b[i].Bytes(), sinkOutputOptions{extraFlush: extraFlush, forceSync: isFatal}); err != nil { + format := formatParsers[s.formatter.formatterName()] + if err := s.sink.output(bufs.b[i].Bytes(), sinkOutputOptions{extraFlush: extraFlush, forceSync: isFatal, format: format}); err != nil { if !s.criticality { // An error on this sink is not critical. Just report // the error and move on. diff --git a/pkg/util/log/flags.go b/pkg/util/log/flags.go index 51fdc77fb4a5..24b4a728bae6 100644 --- a/pkg/util/log/flags.go +++ b/pkg/util/log/flags.go @@ -375,6 +375,7 @@ func newFluentSinkInfo(c logconfig.FluentSinkConfig) (*sinkInfo, error) { func newHTTPSinkInfo(c logconfig.HTTPSinkConfig) (*sinkInfo, error) { info := &sinkInfo{} + if err := info.applyConfig(c.CommonSinkConfig); err != nil { return nil, err } diff --git a/pkg/util/log/http_sink.go b/pkg/util/log/http_sink.go index 238f6360f938..e3352431dcf8 100644 --- a/pkg/util/log/http_sink.go +++ b/pkg/util/log/http_sink.go @@ -32,6 +32,15 @@ type httpSinkOptions struct { disableKeepAlives bool } +// formatToContentType map contains a mapping from the log format +// to the content type header that should be set for that format +// for the HTTP POST method. The content type header defaults +// to `text/plain` when the http sink is configured with a format +// not included in this map. +var formatToContentType = map[string]string{ + "json": "application/json", +} + func newHTTPSink(url string, opt httpSinkOptions) (*httpSink, error) { transport, ok := http.DefaultTransport.(*http.Transport) if !ok { @@ -62,7 +71,7 @@ func newHTTPSink(url string, opt httpSinkOptions) (*httpSink, error) { type httpSink struct { client http.Client address string - doRequest func(*httpSink, []byte) (*http.Response, error) + doRequest func(sink *httpSink, logEntry []byte, format string) (*http.Response, error) } // output emits some formatted bytes to this sink. @@ -73,8 +82,8 @@ type httpSink struct { // The parent logger's outputMu is held during this operation: log // sinks must not recursively call into logging when implementing // this method. -func (hs *httpSink) output(b []byte, _ sinkOutputOptions) (err error) { - resp, err := hs.doRequest(hs, b) +func (hs *httpSink) output(b []byte, opt sinkOutputOptions) (err error) { + resp, err := hs.doRequest(hs, b, opt.format) if err != nil { return err } @@ -87,8 +96,12 @@ func (hs *httpSink) output(b []byte, _ sinkOutputOptions) (err error) { return nil } -func doPost(hs *httpSink, b []byte) (*http.Response, error) { - resp, err := hs.client.Post(hs.address, "text/plain", bytes.NewReader(b)) +func doPost(hs *httpSink, b []byte, format string) (*http.Response, error) { + contentType, ok := formatToContentType[format] + if !ok { + contentType = "text/plain" + } + resp, err := hs.client.Post(hs.address, contentType, bytes.NewReader(b)) if err != nil { return nil, err } @@ -96,7 +109,7 @@ func doPost(hs *httpSink, b []byte) (*http.Response, error) { return resp, nil } -func doGet(hs *httpSink, b []byte) (*http.Response, error) { +func doGet(hs *httpSink, b []byte, _ string) (*http.Response, error) { resp, err := hs.client.Get(hs.address + "?" + url.QueryEscape(string(b))) if err != nil { return nil, err diff --git a/pkg/util/log/http_sink_test.go b/pkg/util/log/http_sink_test.go index 8eef4bfb7593..b5f5f5f8d71a 100644 --- a/pkg/util/log/http_sink_test.go +++ b/pkg/util/log/http_sink_test.go @@ -12,7 +12,6 @@ package log import ( "context" - "errors" "io" "net" "net/http" @@ -26,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/netutil/addr" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -37,7 +37,7 @@ import ( func testBase( t *testing.T, defaults logconfig.HTTPDefaults, - fn func(body string) error, + fn func(header http.Header, body string) error, hangServer bool, deadline time.Duration, ) { @@ -68,7 +68,7 @@ func testBase( <-cancelCh } else { // The test is expecting some message via a predicate. - if err := fn(string(buf)); err != nil { + if err := fn(r.Header, string(buf)); err != nil { // non-failing, in case there are extra log messages generated t.Log(err) } else { @@ -175,7 +175,7 @@ func TestMessageReceived(t *testing.T) { DisableKeepAlives: &tb, } - testFn := func(body string) error { + testFn := func(_ http.Header, body string) error { t.Log(body) if !strings.Contains(body, `"message":"hello world"`) { return errors.New("Log message not found in request") @@ -205,3 +205,71 @@ func TestHTTPSinkTimeout(t *testing.T) { testBase(t, defaults, nil /* testFn */, true /* hangServer */, 500*time.Millisecond) } + +// TestHTTPSinkContentTypeJSON verifies that the HTTP sink content type +// header is set to `application/json` when the format is json. +func TestHTTPSinkContentTypeJSON(t *testing.T) { + defer leaktest.AfterTest(t)() + + address := "http://localhost" // testBase appends the port + timeout := 5 * time.Second + tb := true + format := "json" + expectedContentType := "application/json" + defaults := logconfig.HTTPDefaults{ + Address: &address, + Timeout: &timeout, + + // We need to disable keepalives otherwise the HTTP server in the + // test will let an async goroutine run waiting for more requests. + DisableKeepAlives: &tb, + CommonSinkConfig: logconfig.CommonSinkConfig{ + Format: &format, + }, + } + + testFn := func(header http.Header, body string) error { + t.Log(body) + contentType := header.Get("Content-Type") + if contentType != expectedContentType { + return errors.Newf("mismatched content type: expected %s, got %s") + } + return nil + } + + testBase(t, defaults, testFn, false /* hangServer */, time.Duration(0)) +} + +// TestHTTPSinkContentTypePlainText verifies that the HTTP sink content type +// header is set to `text/plain` when the format is json. +func TestHTTPSinkContentTypePlainText(t *testing.T) { + defer leaktest.AfterTest(t)() + + address := "http://localhost" // testBase appends the port + timeout := 5 * time.Second + tb := true + format := "crdb-v1" + expectedContentType := "text/plain" + defaults := logconfig.HTTPDefaults{ + Address: &address, + Timeout: &timeout, + + // We need to disable keepalives otherwise the HTTP server in the + // test will let an async goroutine run waiting for more requests. + DisableKeepAlives: &tb, + CommonSinkConfig: logconfig.CommonSinkConfig{ + Format: &format, + }, + } + + testFn := func(header http.Header, body string) error { + t.Log(body) + contentType := header.Get("Content-Type") + if contentType != expectedContentType { + return errors.Newf("mismatched content type: expected %s, got %s") + } + return nil + } + + testBase(t, defaults, testFn, false /* hangServer */, time.Duration(0)) +} diff --git a/pkg/util/log/sinks.go b/pkg/util/log/sinks.go index f265e8ee3aaa..3e248aa617bb 100644 --- a/pkg/util/log/sinks.go +++ b/pkg/util/log/sinks.go @@ -31,6 +31,8 @@ type sinkOutputOptions struct { // forceSync forces synchronous operation of this output operation. // That is, it will block until the output has been handled. forceSync bool + // format specifies the log entry format (e.g. json etc.). + format string } // logSink abstracts the destination of logging events, after all