diff --git a/pkg/services/ngalert/state/historian/otel_loki_client.go b/pkg/services/ngalert/state/historian/otel_loki_client.go index 7e6211efeebe0..64f5d2f9c241d 100644 --- a/pkg/services/ngalert/state/historian/otel_loki_client.go +++ b/pkg/services/ngalert/state/historian/otel_loki_client.go @@ -2,6 +2,7 @@ package historian import ( "bytes" + "compress/gzip" "context" "crypto/tls" "encoding/json" @@ -14,6 +15,7 @@ import ( "github.com/grafana/grafana/pkg/services/ngalert/metrics" "github.com/unknwon/log" + "github.com/valyala/bytebufferpool" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/plog/plogotlp" @@ -23,7 +25,10 @@ import ( "google.golang.org/grpc/metadata" ) -var _ remoteLokiClient = (*otelLokiClient)(nil) +var ( + _ remoteLokiClient = (*otelLokiClient)(nil) + payloadsPool = bytebufferpool.Pool{} +) type OtelConfig struct { Enabled bool @@ -155,6 +160,8 @@ func (p *otelLokiClient) pushHttp(ctx context.Context, req *plogotlp.ExportReque contentTypeHeader = "Content-Type" apiKeyHeader = "apikey" protobufContentType = "application/x-protobuf" + contentEncoding = "Content-Encoding" + contentEncodingGzip = "gzip" ) err = p.initClient() @@ -167,11 +174,18 @@ func (p *otelLokiClient) pushHttp(ctx context.Context, req *plogotlp.ExportReque return "", fmt.Errorf("failed to marshal logs: %w", err) } + buff := payloadsPool.Get() + err = gzipRequestIntoBuffer(protoBody, buff) + if err != nil { + return "", fmt.Errorf("failed to gzip request: %w", err) + } + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, p.cfg.Endpoint, bytes.NewReader(protoBody)) if err != nil { return "", fmt.Errorf("failed to create http request: %w", err) } + httpReq.Header.Set(contentEncoding, contentEncodingGzip) httpReq.Header.Set(contentTypeHeader, protobufContentType) if p.cfg.ApiKey != "" { httpReq.Header.Set(apiKeyHeader, p.cfg.ApiKey) @@ -321,3 +335,20 @@ func getOTLPHTTPConnectionTransport(otelConfig OtelConfig) *http.Transport { return &http.Transport{} } + +func gzipRequestIntoBuffer(request []byte, gzippedBuffer *bytebufferpool.ByteBuffer) (err error) { + gzipWriter := gzip.NewWriter(gzippedBuffer) + defer func() { + closingError := gzipWriter.Close() + if err == nil && closingError != nil { + err = fmt.Errorf("failed to close the gzip writer: %w", closingError) + } + }() + + _, err = gzipWriter.Write(request) + if err != nil { + return fmt.Errorf("failed to write to the gzip writer: %w", err) + } + + return nil +}