Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(http): add configurable limit to points batch size on write endpoint #16469

Merged
merged 1 commit into from
Jan 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
1. [16430](https://github.com/influxdata/influxdb/pull/16430): Added toggle to table thresholds to allow users to choose between setting threshold colors to text or background
1. [16418](https://github.com/influxdata/influxdb/pull/16418): Add Developer Documentation
1. [16260](https://github.com/influxdata/influxdb/pull/16260): Capture User-Agent header as query source for logging purposes
1. [16469](https://github.com/influxdata/influxdb/pull/16469): Add support for configurable max batch size in points write handler

### Bug Fixes

Expand Down
1 change: 1 addition & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
ETooManyRequests = "too many requests"
EUnauthorized = "unauthorized"
EMethodNotAllowed = "method not allowed"
ETooLarge = "request too large"
)

// Error is the error struct of platform.
Expand Down
1 change: 1 addition & 0 deletions http/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,4 +157,5 @@ var statusCodePlatformError = map[string]int{
platform.ETooManyRequests: http.StatusTooManyRequests,
platform.EUnauthorized: http.StatusUnauthorized,
platform.EMethodNotAllowed: http.StatusMethodNotAllowed,
platform.ETooLarge: http.StatusRequestEntityTooLarge,
}
150 changes: 106 additions & 44 deletions http/write_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package http
import (
"compress/gzip"
"context"
"errors"
"fmt"
"io"
"io/ioutil"
Expand All @@ -21,6 +22,13 @@ import (
"go.uber.org/zap"
)

var (
// ErrMaxBatchSizeExceeded is returned when a points batch exceeds
// the defined upper limit in bytes. This pertains to the size of the
// batch after inflation from any compression (i.e. ungzipped).
ErrMaxBatchSizeExceeded = errors.New("points batch is too large")
)

// WriteBackend is all services and associated parameters required to construct
// the WriteHandler.
type WriteBackend struct {
Expand Down Expand Up @@ -58,6 +66,19 @@ type WriteHandler struct {
PointsWriter storage.PointsWriter

EventRecorder metric.EventRecorder

maxBatchSizeBytes int64
}

// WriteHandlerOption is a functional option for a *WriteHandler
type WriteHandlerOption func(*WriteHandler)

// WithMaxBatchSizeBytes configures the maximum size for a
// (decompressed) points batch allowed by the write handler
func WithMaxBatchSizeBytes(n int64) WriteHandlerOption {
return func(w *WriteHandler) {
w.maxBatchSizeBytes = n
}
}

// Prefix provides the route prefix.
Expand All @@ -72,7 +93,7 @@ const (
)

// NewWriteHandler creates a new handler at /api/v2/write to receive line protocol.
func NewWriteHandler(log *zap.Logger, b *WriteBackend) *WriteHandler {
func NewWriteHandler(log *zap.Logger, b *WriteBackend, opts ...WriteHandlerOption) *WriteHandler {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️ the use of functional options

h := &WriteHandler{
Router: NewRouter(b.HTTPErrorHandler),
HTTPErrorHandler: b.HTTPErrorHandler,
Expand All @@ -84,6 +105,10 @@ func NewWriteHandler(log *zap.Logger, b *WriteBackend) *WriteHandler {
EventRecorder: b.WriteEventRecorder,
}

for _, opt := range opts {
opt(h)
}

h.HandlerFunc("POST", prefixWrite, h.handleWrite)
return h
}
Expand All @@ -97,9 +122,19 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {

// TODO(desa): I really don't like how we're recording the usage metrics here
// Ideally this will be moved when we solve https://github.com/influxdata/influxdb/issues/13403
var orgID influxdb.ID
var requestBytes int
sw := kithttp.NewStatusResponseWriter(w)
var (
orgID influxdb.ID
requestBytes int
sw = kithttp.NewStatusResponseWriter(w)
handleError = func(err error, code, message string) {
h.HandleHTTPError(ctx, &influxdb.Error{
Code: code,
Op: "http/handleWrite",
Msg: message,
Err: err,
}, w)
}
Comment on lines +129 to +136
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯 for refactoring this to a function

)
w = sw
defer func() {
h.EventRecorder.Record(ctx, metric.Event{
Expand All @@ -111,20 +146,22 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
})
}()

in := r.Body
var in io.ReadCloser = r.Body
defer in.Close()

if r.Header.Get("Content-Encoding") == "gzip" {
var err error
in, err = gzip.NewReader(r.Body)
if err != nil {
h.HandleHTTPError(ctx, &influxdb.Error{
Code: influxdb.EInvalid,
Op: "http/handleWrite",
Msg: errInvalidGzipHeader,
Err: err,
}, w)
handleError(err, influxdb.EInvalid, errInvalidGzipHeader)
return
}
defer in.Close()
}

// given a limit is configured on the number of bytes in a
// batch then wrap the reader in a limited reader
if h.maxBatchSizeBytes > 0 {
in = newLimitedReadCloser(in, h.maxBatchSizeBytes)
}

a, err := pcontext.GetAuthorizer(ctx)
Expand Down Expand Up @@ -183,21 +220,12 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {

p, err := influxdb.NewPermissionAtID(bucket.ID, influxdb.WriteAction, influxdb.BucketsResourceType, org.ID)
if err != nil {
h.HandleHTTPError(ctx, &influxdb.Error{
Code: influxdb.EInternal,
Op: "http/handleWrite",
Msg: fmt.Sprintf("unable to create permission for bucket: %v", err),
Err: err,
}, w)
handleError(err, influxdb.EInternal, fmt.Sprintf("unable to create permission for bucket: %v", err))
return
}

if !a.Allowed(*p) {
h.HandleHTTPError(ctx, &influxdb.Error{
Code: influxdb.EForbidden,
Op: "http/handleWrite",
Msg: "insufficient permissions for write",
}, w)
handleError(err, influxdb.EForbidden, "insufficient permissions for write")
return
}

Expand All @@ -210,22 +238,28 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
span.Finish()
if err != nil {
log.Error("Error reading body", zap.Error(err))
h.HandleHTTPError(ctx, &influxdb.Error{
Code: influxdb.EInternal,
Op: "http/handleWrite",
Msg: fmt.Sprintf("unable to read data: %v", err),
Err: err,
}, w)
handleError(err, influxdb.EInternal, "unable to read data")
return
}

// close the reader now that all bytes have been consumed
// this will return non-nil in the case of a configured limit
// being exceeded
if err := in.Close(); err != nil {
log.Error("Error reading body", zap.Error(err))

code := influxdb.EInternal
if errors.Is(err, ErrMaxBatchSizeExceeded) {
code = influxdb.ETooLarge
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great to see the use of proper HTTP status codes 🎉

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can tell what wikipedia page I spent some time on for this PR 😂

}

handleError(err, code, "unable to read data")
return
}

requestBytes = len(data)
if requestBytes == 0 {
h.HandleHTTPError(ctx, &influxdb.Error{
Code: influxdb.EInvalid,
Op: "http/handleWrite",
Msg: "writing requires points",
}, w)
handleError(err, influxdb.EInvalid, "writing requires points")
return
}

Expand All @@ -237,21 +271,13 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
span.Finish()
if err != nil {
log.Error("Error parsing points", zap.Error(err))
h.HandleHTTPError(ctx, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: err.Error(),
}, w)
handleError(err, influxdb.EInvalid, "")
return
}

if err := h.PointsWriter.WritePoints(ctx, points); err != nil {
log.Error("Error writing points", zap.Error(err))
h.HandleHTTPError(ctx, &influxdb.Error{
Code: influxdb.EInternal,
Op: "http/handleWrite",
Msg: "unexpected error writing points to database",
Err: err,
}, w)
handleError(err, influxdb.EInternal, "unexpected error writing points to database")
return
}

Expand Down Expand Up @@ -369,3 +395,39 @@ func compressWithGzip(data io.Reader) (io.Reader, error) {

return pr, err
}

type limitedReader struct {
*io.LimitedReader
err error
close func() error
}

func newLimitedReadCloser(r io.ReadCloser, n int64) *limitedReader {
// read up to max + 1 as limited reader just returns EOF when the limit is reached
// or when there is nothing left to read. If we exceed the max batch size by one
// then we know the limit has been passed.
return &limitedReader{
LimitedReader: &io.LimitedReader{R: r, N: n + 1},
close: r.Close,
}
}

// Close returns an ErrMaxBatchSizeExceeded when the wrapped reader
// exceeds the set limit for number of bytes.
// This is safe to call more than once but not concurrently.
func (l *limitedReader) Close() (err error) {
defer func() {
if cerr := l.close(); cerr != nil && err == nil {
err = cerr
}
GeorgeMac marked this conversation as resolved.
Show resolved Hide resolved

// only call close once
l.close = func() error { return nil }
}()

if l.N < 1 {
l.err = ErrMaxBatchSizeExceeded
}

return l.err
}
21 changes: 20 additions & 1 deletion http/write_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func TestWriteHandler_handleWrite(t *testing.T) {
bucket *influxdb.Bucket // bucket to return in bucket service
bucketErr error // err to return in bucket service
writeErr error // err to return from the points writer
opts []WriteHandlerOption // write handle configured options
}

// want is the expected output of the HTTP endpoint
Expand Down Expand Up @@ -255,6 +256,24 @@ func TestWriteHandler_handleWrite(t *testing.T) {
body: `{"code":"internal error","message":"authorizer not found on context"}`,
},
},
{
name: "large requests rejected",
request: request{
org: "043e0780ee2b1000",
bucket: "04504b356e23b000",
body: "m1,t1=v1 f1=1",
auth: bucketWritePermission("043e0780ee2b1000", "04504b356e23b000"),
},
state: state{
org: testOrg("043e0780ee2b1000"),
bucket: testBucket("043e0780ee2b1000", "04504b356e23b000"),
opts: []WriteHandlerOption{WithMaxBatchSizeBytes(5)},
},
wants: wants{
code: 413,
body: `{"code":"request too large","message":"unable to read data: points batch is too large"}`,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -275,7 +294,7 @@ func TestWriteHandler_handleWrite(t *testing.T) {
PointsWriter: &mock.PointsWriter{Err: tt.state.writeErr},
WriteEventRecorder: &metric.NopEventRecorder{},
}
writeHandler := NewWriteHandler(zaptest.NewLogger(t), NewWriteBackend(zaptest.NewLogger(t), b))
writeHandler := NewWriteHandler(zaptest.NewLogger(t), NewWriteBackend(zaptest.NewLogger(t), b), tt.state.opts...)
handler := httpmock.NewAuthMiddlewareHandler(writeHandler, tt.request.auth)

r := httptest.NewRequest(
Expand Down