Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

log: Set content type header for http sink #77014

Merged
merged 1 commit into from
Feb 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/util/log/clog.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,8 @@ func (l *loggerT) outputLogEntry(entry logEntry) {
// The sink was not accepting entries at this level. Nothing to do.
continue
}
if err := s.sink.output(bufs.b[i].Bytes(), sinkOutputOptions{extraFlush: extraFlush, forceSync: isFatal}); err != nil {
format := formatParsers[s.formatter.formatterName()]
if err := s.sink.output(bufs.b[i].Bytes(), sinkOutputOptions{extraFlush: extraFlush, forceSync: isFatal, format: format}); err != nil {
if !s.criticality {
// An error on this sink is not critical. Just report
// the error and move on.
Expand Down
1 change: 1 addition & 0 deletions pkg/util/log/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ func newFluentSinkInfo(c logconfig.FluentSinkConfig) (*sinkInfo, error) {

func newHTTPSinkInfo(c logconfig.HTTPSinkConfig) (*sinkInfo, error) {
info := &sinkInfo{}

if err := info.applyConfig(c.CommonSinkConfig); err != nil {
return nil, err
}
Expand Down
25 changes: 19 additions & 6 deletions pkg/util/log/http_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ type httpSinkOptions struct {
disableKeepAlives bool
}

// formatToContentType map contains a mapping from the log format
// to the content type header that should be set for that format
// for the HTTP POST method. The content type header defaults
// to `text/plain` when the http sink is configured with a format
// not included in this map.
var formatToContentType = map[string]string{
"json": "application/json",
}

func newHTTPSink(url string, opt httpSinkOptions) (*httpSink, error) {
transport, ok := http.DefaultTransport.(*http.Transport)
if !ok {
Expand Down Expand Up @@ -62,7 +71,7 @@ func newHTTPSink(url string, opt httpSinkOptions) (*httpSink, error) {
type httpSink struct {
client http.Client
address string
doRequest func(*httpSink, []byte) (*http.Response, error)
doRequest func(sink *httpSink, logEntry []byte, format string) (*http.Response, error)
}

// output emits some formatted bytes to this sink.
Expand All @@ -73,8 +82,8 @@ type httpSink struct {
// The parent logger's outputMu is held during this operation: log
// sinks must not recursively call into logging when implementing
// this method.
func (hs *httpSink) output(b []byte, _ sinkOutputOptions) (err error) {
resp, err := hs.doRequest(hs, b)
func (hs *httpSink) output(b []byte, opt sinkOutputOptions) (err error) {
resp, err := hs.doRequest(hs, b, opt.format)
if err != nil {
return err
}
Expand All @@ -87,16 +96,20 @@ func (hs *httpSink) output(b []byte, _ sinkOutputOptions) (err error) {
return nil
}

func doPost(hs *httpSink, b []byte) (*http.Response, error) {
resp, err := hs.client.Post(hs.address, "text/plain", bytes.NewReader(b))
func doPost(hs *httpSink, b []byte, format string) (*http.Response, error) {
contentType, ok := formatToContentType[format]
if !ok {
contentType = "text/plain"
}
resp, err := hs.client.Post(hs.address, contentType, bytes.NewReader(b))
if err != nil {
return nil, err
}
resp.Body.Close() // don't care about content
return resp, nil
}

func doGet(hs *httpSink, b []byte) (*http.Response, error) {
func doGet(hs *httpSink, b []byte, _ string) (*http.Response, error) {
resp, err := hs.client.Get(hs.address + "?" + url.QueryEscape(string(b)))
if err != nil {
return nil, err
Expand Down
76 changes: 72 additions & 4 deletions pkg/util/log/http_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package log

import (
"context"
"errors"
"io"
"net"
"net/http"
Expand All @@ -26,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/netutil/addr"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

Expand All @@ -37,7 +37,7 @@ import (
func testBase(
t *testing.T,
defaults logconfig.HTTPDefaults,
fn func(body string) error,
fn func(header http.Header, body string) error,
hangServer bool,
deadline time.Duration,
) {
Expand Down Expand Up @@ -68,7 +68,7 @@ func testBase(
<-cancelCh
} else {
// The test is expecting some message via a predicate.
if err := fn(string(buf)); err != nil {
if err := fn(r.Header, string(buf)); err != nil {
// non-failing, in case there are extra log messages generated
t.Log(err)
} else {
Expand Down Expand Up @@ -175,7 +175,7 @@ func TestMessageReceived(t *testing.T) {
DisableKeepAlives: &tb,
}

testFn := func(body string) error {
testFn := func(_ http.Header, body string) error {
t.Log(body)
if !strings.Contains(body, `"message":"hello world"`) {
return errors.New("Log message not found in request")
Expand Down Expand Up @@ -205,3 +205,71 @@ func TestHTTPSinkTimeout(t *testing.T) {

testBase(t, defaults, nil /* testFn */, true /* hangServer */, 500*time.Millisecond)
}

// TestHTTPSinkContentTypeJSON verifies that the HTTP sink content type
// header is set to `application/json` when the format is json.
func TestHTTPSinkContentTypeJSON(t *testing.T) {
defer leaktest.AfterTest(t)()

address := "http://localhost" // testBase appends the port
timeout := 5 * time.Second
tb := true
format := "json"
expectedContentType := "application/json"
defaults := logconfig.HTTPDefaults{
Address: &address,
Timeout: &timeout,

// We need to disable keepalives otherwise the HTTP server in the
// test will let an async goroutine run waiting for more requests.
DisableKeepAlives: &tb,
CommonSinkConfig: logconfig.CommonSinkConfig{
Format: &format,
},
}

testFn := func(header http.Header, body string) error {
t.Log(body)
contentType := header.Get("Content-Type")
if contentType != expectedContentType {
return errors.Newf("mismatched content type: expected %s, got %s")
}
return nil
}

testBase(t, defaults, testFn, false /* hangServer */, time.Duration(0))
}

// TestHTTPSinkContentTypePlainText verifies that the HTTP sink content type
// header is set to `text/plain` when the format is json.
func TestHTTPSinkContentTypePlainText(t *testing.T) {
defer leaktest.AfterTest(t)()

address := "http://localhost" // testBase appends the port
timeout := 5 * time.Second
tb := true
format := "crdb-v1"
expectedContentType := "text/plain"
defaults := logconfig.HTTPDefaults{
Address: &address,
Timeout: &timeout,

// We need to disable keepalives otherwise the HTTP server in the
// test will let an async goroutine run waiting for more requests.
DisableKeepAlives: &tb,
CommonSinkConfig: logconfig.CommonSinkConfig{
Format: &format,
},
}

testFn := func(header http.Header, body string) error {
t.Log(body)
contentType := header.Get("Content-Type")
if contentType != expectedContentType {
return errors.Newf("mismatched content type: expected %s, got %s")
}
return nil
}

testBase(t, defaults, testFn, false /* hangServer */, time.Duration(0))
}
2 changes: 2 additions & 0 deletions pkg/util/log/sinks.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type sinkOutputOptions struct {
// forceSync forces synchronous operation of this output operation.
// That is, it will block until the output has been handled.
forceSync bool
// format specifies the log entry format (e.g. json etc.).
format string
}

// logSink abstracts the destination of logging events, after all
Expand Down