-
Notifications
You must be signed in to change notification settings - Fork 68
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add util/http #34
Closed
Closed
add util/http #34
Changes from 27 commits
Commits
Show all changes
30 commits
Select commit
Hold shift + click to select a range
5665107
Bazel-ify Cortex (#342)
tomwilkie c1bafed
Support raw snappy encoded requests (#412)
tomwilkie 1374f77
We should actually try and decompress the requests if reading the bod…
tomwilkie 1bef0dd
Port Cortex to use Prometheus 2 packages (#583)
juliusv a42693f
Don't pass a http.Request to ParseProtoRequest; pass an io.Reader to …
tomwilkie 87933e5
Update instrumentation calls to remove deprecated interface
bboreham 1405446
Add more tracing detail when parsing incoming requests
bboreham 9fb0b64
Use pools to reduce allocs in distributor write path (#1526)
gouthamve 888d3dd
Pre-allocate buffer for reading protobufs, for performance
bboreham 26f526f
Rename size->expectedSize for clarity
bboreham 57099a9
Add a maximum receive size to distributor
bboreham 0de2892
Update code to fix almost all pre-existing lint errors (#2008)
zendern 52a6776
Support Accept: application/json on common HTTP endpoints (#2673)
joe-elliott be57bd8
Don't return raw buffer from util.ParseProtoReader() (#3328)
bboreham e1f553e
Memberlist: keep a buffer with sent and received messages for trouble…
pstibrany 4a4f89b
feat: Add additional modes for config endpoint (#3645)
simonswine 4f74a0b
Improve decompression within ParseProtoReader. (#3682)
cyriltovena 16c874c
feat: add TLS support for communication from the Ruler-->AM (#3752)
jtlisi fbe8112
Alertmanager Distributor (#3671)
codesome 3a5ba9a
Removed usage of deprecated pkg/ingester/client stuff (#3915)
pracucci 98420fe
add api to list all alertmanager configs and rule groups (#3529)
805f1b2
Fix TestStreamWriteYAMLResponse flakyness (#4122)
pracucci 7a6254f
Distributors rings status (#4151)
ethervoid 0ee8b62
Support ingesting exemplars into TSDB when blocks storage is enabled …
mdisibio 5282724
Make http package generic
55f7f93
Mod tidy
c7a90a3
Add changelog message
72b91d4
Merge remote-tracking branch 'upstream/main' into util-http
a568d0d
Review changes
48e693a
Fix linter error
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,268 @@ | ||
package http | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"encoding/json" | ||
"flag" | ||
"fmt" | ||
"html/template" | ||
"io" | ||
"net/http" | ||
"strings" | ||
|
||
"github.com/go-kit/kit/log" | ||
"github.com/go-kit/kit/log/level" | ||
"github.com/gogo/protobuf/proto" | ||
"github.com/golang/snappy" | ||
"github.com/opentracing/opentracing-go" | ||
otlog "github.com/opentracing/opentracing-go/log" | ||
"gopkg.in/yaml.v2" | ||
) | ||
|
||
const messageSizeLargerErrFmt = "received message larger than max (%d vs %d)" | ||
|
||
// IsRequestBodyTooLarge returns true if the error is "http: request body too large". | ||
func IsRequestBodyTooLarge(err error) bool { | ||
return err != nil && strings.Contains(err.Error(), "http: request body too large") | ||
} | ||
|
||
// BasicAuth configures basic authentication for HTTP clients. | ||
type BasicAuth struct { | ||
Username string `yaml:"basic_auth_username"` | ||
Password string `yaml:"basic_auth_password"` | ||
} | ||
|
||
func (b *BasicAuth) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { | ||
f.StringVar(&b.Username, prefix+"basic-auth-username", "", "HTTP Basic authentication username. It overrides the username set in the URL (if any).") | ||
f.StringVar(&b.Password, prefix+"basic-auth-password", "", "HTTP Basic authentication password. It overrides the password set in the URL (if any).") | ||
} | ||
|
||
// IsEnabled returns false if basic authentication isn't enabled. | ||
func (b BasicAuth) IsEnabled() bool { | ||
return b.Username != "" || b.Password != "" | ||
} | ||
|
||
// WriteJSONResponse writes some JSON as a HTTP response. | ||
func WriteJSONResponse(w http.ResponseWriter, v interface{}) { | ||
w.Header().Set("Content-Type", "application/json") | ||
|
||
data, err := json.Marshal(v) | ||
if err != nil { | ||
http.Error(w, err.Error(), http.StatusInternalServerError) | ||
return | ||
} | ||
|
||
// We ignore errors here, because we cannot do anything about them. | ||
// Write will trigger sending Status code, so we cannot send a different status code afterwards. | ||
// Also this isn't internal error, but error communicating with client. | ||
_, _ = w.Write(data) | ||
} | ||
|
||
// WriteYAMLResponse writes some YAML as a HTTP response. | ||
func WriteYAMLResponse(w http.ResponseWriter, v interface{}) { | ||
// There is not standardised content-type for YAML, text/plain ensures the | ||
// YAML is displayed in the browser instead of offered as a download | ||
w.Header().Set("Content-Type", "text/plain; charset=utf-8") | ||
|
||
data, err := yaml.Marshal(v) | ||
if err != nil { | ||
http.Error(w, err.Error(), http.StatusInternalServerError) | ||
return | ||
} | ||
|
||
// We ignore errors here, because we cannot do anything about them. | ||
// Write will trigger sending Status code, so we cannot send a different status code afterwards. | ||
// Also this isn't internal error, but error communicating with client. | ||
_, _ = w.Write(data) | ||
} | ||
|
||
// WriteTextResponse sends a message as text/plain response with 200 status code. | ||
func WriteTextResponse(w http.ResponseWriter, message string) { | ||
w.Header().Set("Content-Type", "text/plain") | ||
|
||
// Ignore inactionable errors. | ||
_, _ = w.Write([]byte(message)) | ||
} | ||
|
||
// WriteHTMLResponse sends a message as text/html response with 200 status code. | ||
func WriteHTMLResponse(w http.ResponseWriter, message string) { | ||
w.Header().Set("Content-Type", "text/html") | ||
|
||
// Ignore inactionable errors. | ||
_, _ = w.Write([]byte(message)) | ||
} | ||
|
||
// RenderHTTPResponse either responds with json or a rendered html page using the passed in template | ||
// by checking the Accepts header | ||
func RenderHTTPResponse(w http.ResponseWriter, v interface{}, t *template.Template, r *http.Request) { | ||
accept := r.Header.Get("Accept") | ||
if strings.Contains(accept, "application/json") { | ||
WriteJSONResponse(w, v) | ||
return | ||
} | ||
|
||
err := t.Execute(w, v) | ||
if err != nil { | ||
http.Error(w, err.Error(), http.StatusInternalServerError) | ||
} | ||
} | ||
|
||
// StreamWriteYAMLResponse stream writes data as http response | ||
func StreamWriteYAMLResponse(w http.ResponseWriter, iter chan interface{}, logger log.Logger) { | ||
w.Header().Set("Content-Type", "application/yaml") | ||
for v := range iter { | ||
data, err := yaml.Marshal(v) | ||
if err != nil { | ||
level.Error(logger).Log("msg", "yaml marshal failed", "err", err) | ||
continue | ||
} | ||
_, err = w.Write(data) | ||
if err != nil { | ||
level.Error(logger).Log("msg", "write http response failed", "err", err) | ||
return | ||
} | ||
} | ||
} | ||
|
||
// CompressionType for encoding and decoding requests and responses. | ||
type CompressionType int | ||
|
||
// Values for CompressionType | ||
const ( | ||
NoCompression CompressionType = iota | ||
RawSnappy | ||
) | ||
|
||
// ParseProtoReader parses a compressed proto from an io.Reader. | ||
func ParseProtoReader(ctx context.Context, reader io.Reader, expectedSize, maxSize int, req proto.Message, compression CompressionType) error { | ||
sp := opentracing.SpanFromContext(ctx) | ||
if sp != nil { | ||
sp.LogFields(otlog.String("event", "util.ParseProtoRequest[start reading]")) | ||
} | ||
body, err := decompressRequest(reader, expectedSize, maxSize, compression, sp) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if sp != nil { | ||
sp.LogFields(otlog.String("event", "util.ParseProtoRequest[unmarshal]"), otlog.Int("size", len(body))) | ||
} | ||
|
||
// We re-implement proto.Unmarshal here as it calls XXX_Unmarshal first, | ||
// which we can't override without upsetting golint. | ||
req.Reset() | ||
if u, ok := req.(proto.Unmarshaler); ok { | ||
err = u.Unmarshal(body) | ||
} else { | ||
err = proto.NewBuffer(body).Unmarshal(req) | ||
} | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func decompressRequest(reader io.Reader, expectedSize, maxSize int, compression CompressionType, sp opentracing.Span) (body []byte, err error) { | ||
defer func() { | ||
if err != nil && len(body) > maxSize { | ||
err = fmt.Errorf(messageSizeLargerErrFmt, len(body), maxSize) | ||
} | ||
}() | ||
if expectedSize > maxSize { | ||
return nil, fmt.Errorf(messageSizeLargerErrFmt, expectedSize, maxSize) | ||
} | ||
buffer, ok := tryBufferFromReader(reader) | ||
if ok { | ||
body, err = decompressFromBuffer(buffer, maxSize, compression, sp) | ||
return | ||
} | ||
body, err = decompressFromReader(reader, expectedSize, maxSize, compression, sp) | ||
return | ||
} | ||
|
||
func decompressFromReader(reader io.Reader, expectedSize, maxSize int, compression CompressionType, sp opentracing.Span) ([]byte, error) { | ||
var ( | ||
buf bytes.Buffer | ||
body []byte | ||
err error | ||
) | ||
if expectedSize > 0 { | ||
buf.Grow(expectedSize + bytes.MinRead) // extra space guarantees no reallocation | ||
} | ||
// Read from LimitReader with limit max+1. So if the underlying | ||
// reader is over limit, the result will be bigger than max. | ||
reader = io.LimitReader(reader, int64(maxSize)+1) | ||
switch compression { | ||
case NoCompression: | ||
_, err = buf.ReadFrom(reader) | ||
body = buf.Bytes() | ||
case RawSnappy: | ||
_, err = buf.ReadFrom(reader) | ||
if err != nil { | ||
return nil, err | ||
} | ||
body, err = decompressFromBuffer(&buf, maxSize, RawSnappy, sp) | ||
} | ||
return body, err | ||
} | ||
|
||
func decompressFromBuffer(buffer *bytes.Buffer, maxSize int, compression CompressionType, sp opentracing.Span) ([]byte, error) { | ||
if len(buffer.Bytes()) > maxSize { | ||
return nil, fmt.Errorf(messageSizeLargerErrFmt, len(buffer.Bytes()), maxSize) | ||
} | ||
switch compression { | ||
case NoCompression: | ||
return buffer.Bytes(), nil | ||
case RawSnappy: | ||
if sp != nil { | ||
sp.LogFields(otlog.String("event", "util.ParseProtoRequest[decompress]"), | ||
otlog.Int("size", len(buffer.Bytes()))) | ||
} | ||
size, err := snappy.DecodedLen(buffer.Bytes()) | ||
if err != nil { | ||
return nil, err | ||
} | ||
if size > maxSize { | ||
return nil, fmt.Errorf(messageSizeLargerErrFmt, size, maxSize) | ||
} | ||
body, err := snappy.Decode(nil, buffer.Bytes()) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return body, nil | ||
} | ||
return nil, nil | ||
} | ||
|
||
// tryBufferFromReader attempts to cast the reader to a `*bytes.Buffer` this is possible when using httpgrpc. | ||
// If it fails it will return nil and false. | ||
func tryBufferFromReader(reader io.Reader) (*bytes.Buffer, bool) { | ||
if bufReader, ok := reader.(interface { | ||
BytesBuffer() *bytes.Buffer | ||
}); ok && bufReader != nil { | ||
return bufReader.BytesBuffer(), true | ||
} | ||
return nil, false | ||
} | ||
|
||
// SerializeProtoResponse serializes a protobuf response into an HTTP response. | ||
func SerializeProtoResponse(w http.ResponseWriter, resp proto.Message, compression CompressionType) error { | ||
data, err := proto.Marshal(resp) | ||
if err != nil { | ||
http.Error(w, err.Error(), http.StatusInternalServerError) | ||
return fmt.Errorf("error marshaling proto response: %v", err) | ||
} | ||
|
||
switch compression { | ||
case NoCompression: | ||
case RawSnappy: | ||
data = snappy.Encode(nil, data) | ||
} | ||
if _, err := w.Write(data); err != nil { | ||
http.Error(w, err.Error(), http.StatusInternalServerError) | ||
return fmt.Errorf("error sending proto response: %v", err) | ||
} | ||
return nil | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering if it might be better to refer to the package name it gets in dskit, as opposed to the one in Cortex (pkg/util/http) to avoid confusion on part of the reader?