Skip to content

Commit

Permalink
log: Set content type header for http sink
Browse files Browse the repository at this point in the history
The content type header for the output of HTTP log sink
is always set to text/plain irrespective of the log format.
If the log format is JSON, we should set the content
type to be application/json.

Release note (bug fix): The content type header for the
HTTP log sink is set to application/json if the format of
the log output is JSON.
  • Loading branch information
rimadeodhar committed Feb 24, 2022
1 parent 920464f commit 65c59a6
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 11 deletions.
3 changes: 2 additions & 1 deletion pkg/util/log/clog.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,8 @@ func (l *loggerT) outputLogEntry(entry logEntry) {
// The sink was not accepting entries at this level. Nothing to do.
continue
}
if err := s.sink.output(bufs.b[i].Bytes(), sinkOutputOptions{extraFlush: extraFlush, forceSync: isFatal}); err != nil {
format := formatParsers[s.formatter.formatterName()]
if err := s.sink.output(bufs.b[i].Bytes(), sinkOutputOptions{extraFlush: extraFlush, forceSync: isFatal, format: format}); err != nil {
if !s.criticality {
// An error on this sink is not critical. Just report
// the error and move on.
Expand Down
1 change: 1 addition & 0 deletions pkg/util/log/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ func newFluentSinkInfo(c logconfig.FluentSinkConfig) (*sinkInfo, error) {

func newHTTPSinkInfo(c logconfig.HTTPSinkConfig) (*sinkInfo, error) {
info := &sinkInfo{}

if err := info.applyConfig(c.CommonSinkConfig); err != nil {
return nil, err
}
Expand Down
25 changes: 19 additions & 6 deletions pkg/util/log/http_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ type httpSinkOptions struct {
disableKeepAlives bool
}

// formatToContentType map contains a mapping from the log format
// to the content type header that should be set for that format
// for the HTTP POST method. The content type header defaults
// to `text/plain` when the http sink is configured with a format
// not included in this map.
var formatToContentType = map[string]string{
"json": "application/json",
}

func newHTTPSink(url string, opt httpSinkOptions) (*httpSink, error) {
transport, ok := http.DefaultTransport.(*http.Transport)
if !ok {
Expand Down Expand Up @@ -62,7 +71,7 @@ func newHTTPSink(url string, opt httpSinkOptions) (*httpSink, error) {
type httpSink struct {
client http.Client
address string
doRequest func(*httpSink, []byte) (*http.Response, error)
doRequest func(sink *httpSink, logEntry []byte, format string) (*http.Response, error)
}

// output emits some formatted bytes to this sink.
Expand All @@ -73,8 +82,8 @@ type httpSink struct {
// The parent logger's outputMu is held during this operation: log
// sinks must not recursively call into logging when implementing
// this method.
func (hs *httpSink) output(b []byte, _ sinkOutputOptions) (err error) {
resp, err := hs.doRequest(hs, b)
func (hs *httpSink) output(b []byte, opt sinkOutputOptions) (err error) {
resp, err := hs.doRequest(hs, b, opt.format)
if err != nil {
return err
}
Expand All @@ -87,16 +96,20 @@ func (hs *httpSink) output(b []byte, _ sinkOutputOptions) (err error) {
return nil
}

func doPost(hs *httpSink, b []byte) (*http.Response, error) {
resp, err := hs.client.Post(hs.address, "text/plain", bytes.NewReader(b))
func doPost(hs *httpSink, b []byte, format string) (*http.Response, error) {
contentType, ok := formatToContentType[format]
if !ok {
contentType = "text/plain"
}
resp, err := hs.client.Post(hs.address, contentType, bytes.NewReader(b))
if err != nil {
return nil, err
}
resp.Body.Close() // don't care about content
return resp, nil
}

func doGet(hs *httpSink, b []byte) (*http.Response, error) {
func doGet(hs *httpSink, b []byte, _ string) (*http.Response, error) {
resp, err := hs.client.Get(hs.address + "?" + url.QueryEscape(string(b)))
if err != nil {
return nil, err
Expand Down
76 changes: 72 additions & 4 deletions pkg/util/log/http_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package log

import (
"context"
"errors"
"io"
"net"
"net/http"
Expand All @@ -26,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/netutil/addr"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

Expand All @@ -37,7 +37,7 @@ import (
func testBase(
t *testing.T,
defaults logconfig.HTTPDefaults,
fn func(body string) error,
fn func(header http.Header, body string) error,
hangServer bool,
deadline time.Duration,
) {
Expand Down Expand Up @@ -68,7 +68,7 @@ func testBase(
<-cancelCh
} else {
// The test is expecting some message via a predicate.
if err := fn(string(buf)); err != nil {
if err := fn(r.Header, string(buf)); err != nil {
// non-failing, in case there are extra log messages generated
t.Log(err)
} else {
Expand Down Expand Up @@ -175,7 +175,7 @@ func TestMessageReceived(t *testing.T) {
DisableKeepAlives: &tb,
}

testFn := func(body string) error {
testFn := func(_ http.Header, body string) error {
t.Log(body)
if !strings.Contains(body, `"message":"hello world"`) {
return errors.New("Log message not found in request")
Expand Down Expand Up @@ -205,3 +205,71 @@ func TestHTTPSinkTimeout(t *testing.T) {

testBase(t, defaults, nil /* testFn */, true /* hangServer */, 500*time.Millisecond)
}

// TestHTTPSinkContentTypeJSON verifies that the HTTP sink content type
// header is set to `application/json` when the format is json.
func TestHTTPSinkContentTypeJSON(t *testing.T) {
defer leaktest.AfterTest(t)()

address := "http://localhost" // testBase appends the port
timeout := 5 * time.Second
tb := true
format := "json"
expectedContentType := "application/json"
defaults := logconfig.HTTPDefaults{
Address: &address,
Timeout: &timeout,

// We need to disable keepalives otherwise the HTTP server in the
// test will let an async goroutine run waiting for more requests.
DisableKeepAlives: &tb,
CommonSinkConfig: logconfig.CommonSinkConfig{
Format: &format,
},
}

testFn := func(header http.Header, body string) error {
t.Log(body)
contentType := header.Get("Content-Type")
if contentType != expectedContentType {
return errors.Newf("mismatched content type: expected %s, got %s")
}
return nil
}

testBase(t, defaults, testFn, false /* hangServer */, time.Duration(0))
}

// TestHTTPSinkContentTypePlainText verifies that the HTTP sink content type
// header is set to `text/plain` when the format is json.
func TestHTTPSinkContentTypePlainText(t *testing.T) {
defer leaktest.AfterTest(t)()

address := "http://localhost" // testBase appends the port
timeout := 5 * time.Second
tb := true
format := "crdb-v1"
expectedContentType := "text/plain"
defaults := logconfig.HTTPDefaults{
Address: &address,
Timeout: &timeout,

// We need to disable keepalives otherwise the HTTP server in the
// test will let an async goroutine run waiting for more requests.
DisableKeepAlives: &tb,
CommonSinkConfig: logconfig.CommonSinkConfig{
Format: &format,
},
}

testFn := func(header http.Header, body string) error {
t.Log(body)
contentType := header.Get("Content-Type")
if contentType != expectedContentType {
return errors.Newf("mismatched content type: expected %s, got %s")
}
return nil
}

testBase(t, defaults, testFn, false /* hangServer */, time.Duration(0))
}
2 changes: 2 additions & 0 deletions pkg/util/log/sinks.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type sinkOutputOptions struct {
// forceSync forces synchronous operation of this output operation.
// That is, it will block until the output has been handled.
forceSync bool
// format specifies the log entry format (e.g. json etc.).
format string
}

// logSink abstracts the destination of logging events, after all
Expand Down

0 comments on commit 65c59a6

Please sign in to comment.