diff --git a/CHANGELOG.md b/CHANGELOG.md index 18465f9b069..46919a7647c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ 1. [16430](https://github.com/influxdata/influxdb/pull/16430): Added toggle to table thresholds to allow users to choose between setting threshold colors to text or background 1. [16418](https://github.com/influxdata/influxdb/pull/16418): Add Developer Documentation 1. [16260](https://github.com/influxdata/influxdb/pull/16260): Capture User-Agent header as query source for logging purposes +1. [16469](https://github.com/influxdata/influxdb/pull/16469): Add support for configurable max batch size in points write handler ### Bug Fixes diff --git a/errors.go b/errors.go index c584ad6d47a..8b7566cc958 100644 --- a/errors.go +++ b/errors.go @@ -24,6 +24,7 @@ const ( ETooManyRequests = "too many requests" EUnauthorized = "unauthorized" EMethodNotAllowed = "method not allowed" + ETooLarge = "request too large" ) // Error is the error struct of platform. diff --git a/http/errors.go b/http/errors.go index 102eea48684..d2323ac7728 100644 --- a/http/errors.go +++ b/http/errors.go @@ -157,4 +157,5 @@ var statusCodePlatformError = map[string]int{ platform.ETooManyRequests: http.StatusTooManyRequests, platform.EUnauthorized: http.StatusUnauthorized, platform.EMethodNotAllowed: http.StatusMethodNotAllowed, + platform.ETooLarge: http.StatusRequestEntityTooLarge, } diff --git a/http/write_handler.go b/http/write_handler.go index 334f8c4a487..534c2f343fc 100644 --- a/http/write_handler.go +++ b/http/write_handler.go @@ -3,6 +3,7 @@ package http import ( "compress/gzip" "context" + "errors" "fmt" "io" "io/ioutil" @@ -21,6 +22,13 @@ import ( "go.uber.org/zap" ) +var ( + // ErrMaxBatchSizeExceeded is returned when a points batch exceeds + // the defined upper limit in bytes. This pertains to the size of the + // batch after inflation from any compression (i.e. ungzipped). + ErrMaxBatchSizeExceeded = errors.New("points batch is too large") +) + // WriteBackend is all services and associated parameters required to construct // the WriteHandler. type WriteBackend struct { @@ -58,6 +66,19 @@ type WriteHandler struct { PointsWriter storage.PointsWriter EventRecorder metric.EventRecorder + + maxBatchSizeBytes int64 +} + +// WriteHandlerOption is a functional option for a *WriteHandler +type WriteHandlerOption func(*WriteHandler) + +// WithMaxBatchSizeBytes configures the maximum size for a +// (decompressed) points batch allowed by the write handler +func WithMaxBatchSizeBytes(n int64) WriteHandlerOption { + return func(w *WriteHandler) { + w.maxBatchSizeBytes = n + } } // Prefix provides the route prefix. @@ -72,7 +93,7 @@ const ( ) // NewWriteHandler creates a new handler at /api/v2/write to receive line protocol. -func NewWriteHandler(log *zap.Logger, b *WriteBackend) *WriteHandler { +func NewWriteHandler(log *zap.Logger, b *WriteBackend, opts ...WriteHandlerOption) *WriteHandler { h := &WriteHandler{ Router: NewRouter(b.HTTPErrorHandler), HTTPErrorHandler: b.HTTPErrorHandler, @@ -84,6 +105,10 @@ func NewWriteHandler(log *zap.Logger, b *WriteBackend) *WriteHandler { EventRecorder: b.WriteEventRecorder, } + for _, opt := range opts { + opt(h) + } + h.HandlerFunc("POST", prefixWrite, h.handleWrite) return h } @@ -97,9 +122,19 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) { // TODO(desa): I really don't like how we're recording the usage metrics here // Ideally this will be moved when we solve https://github.com/influxdata/influxdb/issues/13403 - var orgID influxdb.ID - var requestBytes int - sw := kithttp.NewStatusResponseWriter(w) + var ( + orgID influxdb.ID + requestBytes int + sw = kithttp.NewStatusResponseWriter(w) + handleError = func(err error, code, message string) { + h.HandleHTTPError(ctx, &influxdb.Error{ + Code: code, + Op: "http/handleWrite", + Msg: message, + Err: err, + }, w) + } + ) w = sw defer func() { h.EventRecorder.Record(ctx, metric.Event{ @@ -111,20 +146,22 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) { }) }() - in := r.Body + var in io.ReadCloser = r.Body + defer in.Close() + if r.Header.Get("Content-Encoding") == "gzip" { var err error in, err = gzip.NewReader(r.Body) if err != nil { - h.HandleHTTPError(ctx, &influxdb.Error{ - Code: influxdb.EInvalid, - Op: "http/handleWrite", - Msg: errInvalidGzipHeader, - Err: err, - }, w) + handleError(err, influxdb.EInvalid, errInvalidGzipHeader) return } - defer in.Close() + } + + // given a limit is configured on the number of bytes in a + // batch then wrap the reader in a limited reader + if h.maxBatchSizeBytes > 0 { + in = newLimitedReadCloser(in, h.maxBatchSizeBytes) } a, err := pcontext.GetAuthorizer(ctx) @@ -183,21 +220,12 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) { p, err := influxdb.NewPermissionAtID(bucket.ID, influxdb.WriteAction, influxdb.BucketsResourceType, org.ID) if err != nil { - h.HandleHTTPError(ctx, &influxdb.Error{ - Code: influxdb.EInternal, - Op: "http/handleWrite", - Msg: fmt.Sprintf("unable to create permission for bucket: %v", err), - Err: err, - }, w) + handleError(err, influxdb.EInternal, fmt.Sprintf("unable to create permission for bucket: %v", err)) return } if !a.Allowed(*p) { - h.HandleHTTPError(ctx, &influxdb.Error{ - Code: influxdb.EForbidden, - Op: "http/handleWrite", - Msg: "insufficient permissions for write", - }, w) + handleError(err, influxdb.EForbidden, "insufficient permissions for write") return } @@ -210,22 +238,28 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) { span.Finish() if err != nil { log.Error("Error reading body", zap.Error(err)) - h.HandleHTTPError(ctx, &influxdb.Error{ - Code: influxdb.EInternal, - Op: "http/handleWrite", - Msg: fmt.Sprintf("unable to read data: %v", err), - Err: err, - }, w) + handleError(err, influxdb.EInternal, "unable to read data") + return + } + + // close the reader now that all bytes have been consumed + // this will return non-nil in the case of a configured limit + // being exceeded + if err := in.Close(); err != nil { + log.Error("Error reading body", zap.Error(err)) + + code := influxdb.EInternal + if errors.Is(err, ErrMaxBatchSizeExceeded) { + code = influxdb.ETooLarge + } + + handleError(err, code, "unable to read data") return } requestBytes = len(data) if requestBytes == 0 { - h.HandleHTTPError(ctx, &influxdb.Error{ - Code: influxdb.EInvalid, - Op: "http/handleWrite", - Msg: "writing requires points", - }, w) + handleError(err, influxdb.EInvalid, "writing requires points") return } @@ -237,21 +271,13 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) { span.Finish() if err != nil { log.Error("Error parsing points", zap.Error(err)) - h.HandleHTTPError(ctx, &influxdb.Error{ - Code: influxdb.EInvalid, - Msg: err.Error(), - }, w) + handleError(err, influxdb.EInvalid, "") return } if err := h.PointsWriter.WritePoints(ctx, points); err != nil { log.Error("Error writing points", zap.Error(err)) - h.HandleHTTPError(ctx, &influxdb.Error{ - Code: influxdb.EInternal, - Op: "http/handleWrite", - Msg: "unexpected error writing points to database", - Err: err, - }, w) + handleError(err, influxdb.EInternal, "unexpected error writing points to database") return } @@ -369,3 +395,39 @@ func compressWithGzip(data io.Reader) (io.Reader, error) { return pr, err } + +type limitedReader struct { + *io.LimitedReader + err error + close func() error +} + +func newLimitedReadCloser(r io.ReadCloser, n int64) *limitedReader { + // read up to max + 1 as limited reader just returns EOF when the limit is reached + // or when there is nothing left to read. If we exceed the max batch size by one + // then we know the limit has been passed. + return &limitedReader{ + LimitedReader: &io.LimitedReader{R: r, N: n + 1}, + close: r.Close, + } +} + +// Close returns an ErrMaxBatchSizeExceeded when the wrapped reader +// exceeds the set limit for number of bytes. +// This is safe to call more than once but not concurrently. +func (l *limitedReader) Close() (err error) { + defer func() { + if cerr := l.close(); cerr != nil && err == nil { + err = cerr + } + + // only call close once + l.close = func() error { return nil } + }() + + if l.N < 1 { + l.err = ErrMaxBatchSizeExceeded + } + + return l.err +} diff --git a/http/write_handler_test.go b/http/write_handler_test.go index cae33ed7a10..ab2a116bded 100644 --- a/http/write_handler_test.go +++ b/http/write_handler_test.go @@ -84,6 +84,7 @@ func TestWriteHandler_handleWrite(t *testing.T) { bucket *influxdb.Bucket // bucket to return in bucket service bucketErr error // err to return in bucket service writeErr error // err to return from the points writer + opts []WriteHandlerOption // write handle configured options } // want is the expected output of the HTTP endpoint @@ -255,6 +256,24 @@ func TestWriteHandler_handleWrite(t *testing.T) { body: `{"code":"internal error","message":"authorizer not found on context"}`, }, }, + { + name: "large requests rejected", + request: request{ + org: "043e0780ee2b1000", + bucket: "04504b356e23b000", + body: "m1,t1=v1 f1=1", + auth: bucketWritePermission("043e0780ee2b1000", "04504b356e23b000"), + }, + state: state{ + org: testOrg("043e0780ee2b1000"), + bucket: testBucket("043e0780ee2b1000", "04504b356e23b000"), + opts: []WriteHandlerOption{WithMaxBatchSizeBytes(5)}, + }, + wants: wants{ + code: 413, + body: `{"code":"request too large","message":"unable to read data: points batch is too large"}`, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -275,7 +294,7 @@ func TestWriteHandler_handleWrite(t *testing.T) { PointsWriter: &mock.PointsWriter{Err: tt.state.writeErr}, WriteEventRecorder: &metric.NopEventRecorder{}, } - writeHandler := NewWriteHandler(zaptest.NewLogger(t), NewWriteBackend(zaptest.NewLogger(t), b)) + writeHandler := NewWriteHandler(zaptest.NewLogger(t), NewWriteBackend(zaptest.NewLogger(t), b), tt.state.opts...) handler := httpmock.NewAuthMiddlewareHandler(writeHandler, tt.request.auth) r := httptest.NewRequest(