diff --git a/pkg/util/log/clog.go b/pkg/util/log/clog.go index 508a04749deb..34a426c68c82 100644 --- a/pkg/util/log/clog.go +++ b/pkg/util/log/clog.go @@ -401,6 +401,7 @@ func (l *loggerT) outputLogEntry(entry logEntry) { // The sink was not accepting entries at this level. Nothing to do. continue } + if err := s.sink.output(extraFlush, bufs.b[i].Bytes()); err != nil { if !s.criticality { // An error on this sink is not critical. Just report diff --git a/pkg/util/log/flags.go b/pkg/util/log/flags.go index 86c55ae4265e..720e6efbdbbf 100644 --- a/pkg/util/log/flags.go +++ b/pkg/util/log/flags.go @@ -371,6 +371,7 @@ func newFluentSinkInfo(c logconfig.FluentSinkConfig) (*sinkInfo, error) { func newHTTPSinkInfo(c logconfig.HTTPSinkConfig) (*sinkInfo, error) { info := &sinkInfo{} + if err := info.applyConfig(c.CommonSinkConfig); err != nil { return nil, err } diff --git a/pkg/util/log/http_sink.go b/pkg/util/log/http_sink.go index 138cefac66fe..57141e26c7ad 100644 --- a/pkg/util/log/http_sink.go +++ b/pkg/util/log/http_sink.go @@ -32,6 +32,15 @@ type httpSinkOptions struct { disableKeepAlives bool } +// formatToContentType map contains a mapping from the log format +// to the content type header that should be set for that format +// for the HTTP POST method. The content type header defaults +// to `text/plain` when the http sink is configured with a format +// not included in this map. +var formatToContentType = map[string]string{ + "json": "application/json", +} + func newHTTPSink(url string, opt httpSinkOptions) (*httpSink, error) { transport, ok := http.DefaultTransport.(*http.Transport) if !ok { @@ -62,7 +71,7 @@ func newHTTPSink(url string, opt httpSinkOptions) (*httpSink, error) { type httpSink struct { client http.Client address string - doRequest func(*httpSink, []byte) (*http.Response, error) + doRequest func(sink *httpSink, logEntry []byte) (*http.Response, error) } // output emits some formatted bytes to this sink. @@ -73,8 +82,10 @@ type httpSink struct { // The parent logger's outputMu is held during this operation: log // sinks must not recursively call into logging when implementing // this method. + func (hs *httpSink) output(extraSync bool, b []byte) (err error) { resp, err := hs.doRequest(hs, b) + if err != nil { return err } diff --git a/pkg/util/log/http_sink_test.go b/pkg/util/log/http_sink_test.go index 8eef4bfb7593..b5f5f5f8d71a 100644 --- a/pkg/util/log/http_sink_test.go +++ b/pkg/util/log/http_sink_test.go @@ -12,7 +12,6 @@ package log import ( "context" - "errors" "io" "net" "net/http" @@ -26,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/netutil/addr" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -37,7 +37,7 @@ import ( func testBase( t *testing.T, defaults logconfig.HTTPDefaults, - fn func(body string) error, + fn func(header http.Header, body string) error, hangServer bool, deadline time.Duration, ) { @@ -68,7 +68,7 @@ func testBase( <-cancelCh } else { // The test is expecting some message via a predicate. - if err := fn(string(buf)); err != nil { + if err := fn(r.Header, string(buf)); err != nil { // non-failing, in case there are extra log messages generated t.Log(err) } else { @@ -175,7 +175,7 @@ func TestMessageReceived(t *testing.T) { DisableKeepAlives: &tb, } - testFn := func(body string) error { + testFn := func(_ http.Header, body string) error { t.Log(body) if !strings.Contains(body, `"message":"hello world"`) { return errors.New("Log message not found in request") @@ -205,3 +205,71 @@ func TestHTTPSinkTimeout(t *testing.T) { testBase(t, defaults, nil /* testFn */, true /* hangServer */, 500*time.Millisecond) } + +// TestHTTPSinkContentTypeJSON verifies that the HTTP sink content type +// header is set to `application/json` when the format is json. +func TestHTTPSinkContentTypeJSON(t *testing.T) { + defer leaktest.AfterTest(t)() + + address := "http://localhost" // testBase appends the port + timeout := 5 * time.Second + tb := true + format := "json" + expectedContentType := "application/json" + defaults := logconfig.HTTPDefaults{ + Address: &address, + Timeout: &timeout, + + // We need to disable keepalives otherwise the HTTP server in the + // test will let an async goroutine run waiting for more requests. + DisableKeepAlives: &tb, + CommonSinkConfig: logconfig.CommonSinkConfig{ + Format: &format, + }, + } + + testFn := func(header http.Header, body string) error { + t.Log(body) + contentType := header.Get("Content-Type") + if contentType != expectedContentType { + return errors.Newf("mismatched content type: expected %s, got %s") + } + return nil + } + + testBase(t, defaults, testFn, false /* hangServer */, time.Duration(0)) +} + +// TestHTTPSinkContentTypePlainText verifies that the HTTP sink content type +// header is set to `text/plain` when the format is json. +func TestHTTPSinkContentTypePlainText(t *testing.T) { + defer leaktest.AfterTest(t)() + + address := "http://localhost" // testBase appends the port + timeout := 5 * time.Second + tb := true + format := "crdb-v1" + expectedContentType := "text/plain" + defaults := logconfig.HTTPDefaults{ + Address: &address, + Timeout: &timeout, + + // We need to disable keepalives otherwise the HTTP server in the + // test will let an async goroutine run waiting for more requests. + DisableKeepAlives: &tb, + CommonSinkConfig: logconfig.CommonSinkConfig{ + Format: &format, + }, + } + + testFn := func(header http.Header, body string) error { + t.Log(body) + contentType := header.Get("Content-Type") + if contentType != expectedContentType { + return errors.Newf("mismatched content type: expected %s, got %s") + } + return nil + } + + testBase(t, defaults, testFn, false /* hangServer */, time.Duration(0)) +}