Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
Release note (<category, see below>): <what> <show> <why>
  • Loading branch information
knz committed Jul 30, 2021
1 parent ad23285 commit 78db932
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 76 deletions.
1 change: 1 addition & 0 deletions docs/generated/logsinks.md
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ Type-specific configuration options:
| `method` | the HTTP method to be used. POST and GET are supported; defaults to POST. Inherited from `http-defaults.method` if not specified. |
| `unsafe-tls` | enables certificate authentication to be bypassed. Defaults to false. Inherited from `http-defaults.unsafe-tls` if not specified. |
| `timeout` | the HTTP timeout. Defaults to 0 for no timeout. Inherited from `http-defaults.timeout` if not specified. |
| `disable-keep-alives` | causes the logging sink to re-establish a new connection for every outgoing log message. This option is intended for testing only and can cause excessive network overhead in production systems. Inherited from `http-defaults.disable-keep-alives` if not specified. |


Configuration options shared across all sink types:
Expand Down
12 changes: 8 additions & 4 deletions pkg/util/log/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,11 +348,15 @@ func newHTTPSinkInfo(c logconfig.HTTPSinkConfig) (*sinkInfo, error) {
if err := info.applyConfig(c.CommonSinkConfig); err != nil {
return nil, err
}
httpSink := newHTTPSink(*c.Address, httpSinkOptions{
method: string(*c.Method),
unsafeTLS: *c.UnsafeTLS,
timeout: *c.Timeout,
httpSink, err := newHTTPSink(*c.Address, httpSinkOptions{
method: string(*c.Method),
unsafeTLS: *c.UnsafeTLS,
timeout: *c.Timeout,
disableKeepAlives: *c.DisableKeepAlives,
})
if err != nil {
return nil, err
}
info.sink = httpSink
return info, nil
}
Expand Down
45 changes: 19 additions & 26 deletions pkg/util/log/http_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,56 +14,49 @@ import (
"bytes"
"crypto/tls"
"fmt"
"net"
"net/http"
"net/url"
"time"

"github.com/cockroachdb/cockroach/pkg/cli/exit"
"github.com/cockroachdb/errors"
)

// TODO: HTTP requests should be bound to context via http.NewRequestWithContext
// Proper logging context to be decided/designed.

var insecureTransport http.RoundTripper = &http.Transport{
// Same as DefaultTransport...
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
// ...except insecure TLS.
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}

type httpSinkOptions struct {
unsafeTLS bool
timeout time.Duration
method string
unsafeTLS bool
timeout time.Duration
method string
disableKeepAlives bool
}

func newHTTPSink(url string, opt httpSinkOptions) *httpSink {
func newHTTPSink(url string, opt httpSinkOptions) (*httpSink, error) {
transport, ok := http.DefaultTransport.(*http.Transport)
if !ok {
return nil, errors.AssertionFailedf("http.DefaultTransport is not a http.Transport: %T", http.DefaultTransport)
}
transport = transport.Clone()
transport.DisableKeepAlives = opt.disableKeepAlives
hs := &httpSink{
client: http.Client{
Transport: http.DefaultTransport,
Timeout: opt.timeout},
Transport: transport,
Timeout: opt.timeout,
},
address: url,
doRequest: doPost}
doRequest: doPost,
}

if opt.unsafeTLS {
hs.client.Transport = insecureTransport
transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
}

if opt.method == http.MethodGet {
hs.doRequest = doGet
}

return hs
return hs, nil
}

type httpSink struct {
Expand Down
96 changes: 55 additions & 41 deletions pkg/util/log/http_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,18 @@ import (
"net"
"net/http"
"strings"
"sync"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log/channel"
"github.com/cockroachdb/cockroach/pkg/util/log/logconfig"
"github.com/cockroachdb/cockroach/pkg/util/netutil/addr"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

type requestTestFunc func(body string) error

// testBase sets the provided HTTPDefaults, logs "hello World", captures the
// resulting request to the server, and validates the body with the provided
// requestTestFunc.
Expand All @@ -39,40 +37,50 @@ type requestTestFunc func(body string) error
func testBase(
t *testing.T,
defaults logconfig.HTTPDefaults,
fn requestTestFunc,
fn func(body string) error,
hangServer bool,
deadline time.Duration,
) {
var wg sync.WaitGroup
defer wg.Wait()

sc := ScopeWithoutShowLogs(t)
defer sc.Close(t)

bodyCh := make(chan string, 1)
// cancelCh ensures that async goroutines terminate if the test
// goroutine terminates due to a Fatal call or a panic.
cancelCh := make(chan struct{})
defer func() { close(cancelCh) }()

// seenMessage is true after the request predicate
// has seen the expected message from the client.
var seenMessage syncutil.AtomicBool

handler := func(rw http.ResponseWriter, r *http.Request) {
wg.Add(1)
buf := make([]byte, 5000)
if _, err := r.Body.Read(buf); err != nil && err != io.EOF {
nbytes, err := r.Body.Read(buf)
if err != nil && err != io.EOF {
t.Error(err)
return
}
buf = buf[:nbytes]

if hangServer {
// The test is requesting the server to simulate a timeout. Just
// do nothing until the test terminates.
<-cancelCh
} else {
select {
case bodyCh <- string(buf):
case <-cancelCh:
// The test is expecting some message via a predicate.
if err := fn(string(buf)); err != nil {
// non-failing, in case there are extra log messages generated
t.Log(err)
} else {
seenMessage.Set(true)
}
}
wg.Done()
}

serverErrCh := make(chan error)
serverClosedCh := make(chan struct{})
defer func() { <-serverClosedCh }()
{
// Start the HTTP server that receives the logging events from the
// test.

l, err := net.Listen("tcp", "127.0.0.1:")
if err != nil {
t.Fatal(err)
Expand All @@ -83,10 +91,12 @@ func testBase(
}
*defaults.Address += ":" + port
s := http.Server{Handler: http.HandlerFunc(handler)}
wg.Add(1)

// serverErrCh collects errors and signals the termination of the
// server async goroutine.
serverErrCh := make(chan error, 1)
go func() {
defer close(serverClosedCh)
defer wg.Done()
defer func() { close(serverErrCh) }()
err := s.Serve(l)
if err != http.ErrServerClosed {
select {
Expand All @@ -95,13 +105,15 @@ func testBase(
}
}
}()

// At the end of this function, close the server
// allowing the above goroutine to finish and close serverClosedCh
// allowing the deferred read to proceed and this function to return.
// (Basically, it's a WaitGroup of one.)
defer func() {
close(cancelCh)
require.NoError(t, s.Close())
serverErr := <-serverErrCh
require.NoError(t, serverErr)
}()
}

Expand All @@ -127,6 +139,10 @@ func testBase(
logStart := timeutil.Now()
Ops.Infof(context.Background(), "hello world")
logDuration := timeutil.Since(logStart)

// Note: deadline is passed by the caller and already contains slack
// to accommodate for the overhead of the logging call compared to
// the timeout in the HTTP request.
if deadline > 0 && logDuration > deadline {
t.Error("Log call exceeded timeout")
}
Expand All @@ -135,23 +151,11 @@ func testBase(
return
}

// Check if any requests received within the timeout satisfy the given predicate.
timer := time.After(10 * time.Second)
outer:
for {
select {
case <-timer:
t.Fatal("timeout")
case body := <-bodyCh:
if err := fn(body); err != nil {
// non-failing, in case there are extra log messages generated
t.Log(err)
} else {
break outer
}
case err := <-serverErrCh:
t.Fatal(err)
}
// If the test was not requiring a timeout, it was requiring some
// logging message to match the predicate. If we don't see the
// predicate match, it is a test failure.
if !seenMessage.Get() {
t.Error("expected message matching predicate, found none")
}
}

Expand All @@ -160,10 +164,15 @@ func TestMessageReceived(t *testing.T) {
defer leaktest.AfterTest(t)()

address := "http://localhost" // testBase appends the port
timeout := 250 * time.Millisecond
timeout := 5 * time.Second
tb := true
defaults := logconfig.HTTPDefaults{
Address: &address,
Timeout: &timeout,

// We need to disable keepalives otherwise the HTTP server in the
// test will let an async goroutine run waiting for more requests.
DisableKeepAlives: &tb,
}

testFn := func(body string) error {
Expand All @@ -177,16 +186,21 @@ func TestMessageReceived(t *testing.T) {
testBase(t, defaults, testFn, false /* hangServer */, time.Duration(0))
}

// TestTimeout verifies that a log call to a hanging server doesn't last
// TestHTTPSinkTimeout verifies that a log call to a hanging server doesn't last
// to much longer than the configured timeout.
func TestTimeout(t *testing.T) {
func TestHTTPSinkTimeout(t *testing.T) {
defer leaktest.AfterTest(t)()

address := "http://localhost" // testBase appends the port
timeout := time.Millisecond
tb := true
defaults := logconfig.HTTPDefaults{
Address: &address,
Timeout: &timeout,

// We need to disable keepalives otherwise the HTTP server in the
// test will let an async goroutine run waiting for more requests.
DisableKeepAlives: &tb,
}

testBase(t, defaults, nil /* testFn */, true /* hangServer */, 500*time.Millisecond)
Expand Down
10 changes: 8 additions & 2 deletions pkg/util/log/logconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (

"github.com/cockroachdb/cockroach/pkg/util/log/logpb"
"github.com/cockroachdb/errors"
"github.com/dustin/go-humanize"
"gopkg.in/yaml.v2"
humanize "github.com/dustin/go-humanize"
yaml "gopkg.in/yaml.v2"
)

// DefaultFileFormat is the entry format for file sinks when not
Expand Down Expand Up @@ -423,6 +423,12 @@ type HTTPDefaults struct {
// Defaults to 0 for no timeout.
Timeout *time.Duration `yaml:",omitempty"`

// DisableKeepAlives causes the logging sink to re-establish a new
// connection for every outgoing log message. This option is
// intended for testing only and can cause excessive network
// overhead in production systems.
DisableKeepAlives *bool `yaml:",omitempty"`

CommonSinkConfig `yaml:",inline"`
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/util/log/logconfig/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) {
CommonSinkConfig: CommonSinkConfig{
Format: func() *string { s := DefaultHTTPFormat; return &s }(),
},
UnsafeTLS: &bf,
Method: func() *HTTPSinkMethod { m := HTTPSinkMethod(http.MethodPost); return &m }(),
Timeout: func() *time.Duration { d := time.Duration(0); return &d }(),
UnsafeTLS: &bf,
DisableKeepAlives: &bf,
Method: func() *HTTPSinkMethod { m := HTTPSinkMethod(http.MethodPost); return &m }(),
Timeout: func() *time.Duration { d := time.Duration(0); return &d }(),
}

propagateCommonDefaults(&baseFileDefaults.CommonSinkConfig, baseCommonSinkConfig)
Expand Down

0 comments on commit 78db932

Please sign in to comment.