Skip to content

Commit

Permalink
add gzip
Browse files Browse the repository at this point in the history
  • Loading branch information
Erez Levi committed Jun 18, 2024
1 parent 0253a22 commit 009279f
Showing 1 changed file with 32 additions and 1 deletion.
33 changes: 32 additions & 1 deletion pkg/services/ngalert/state/historian/otel_loki_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package historian

import (
"bytes"
"compress/gzip"
"context"
"crypto/tls"
"encoding/json"
Expand All @@ -14,6 +15,7 @@ import (

"github.com/grafana/grafana/pkg/services/ngalert/metrics"
"github.com/unknwon/log"
"github.com/valyala/bytebufferpool"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/plog/plogotlp"
Expand All @@ -23,7 +25,10 @@ import (
"google.golang.org/grpc/metadata"
)

var _ remoteLokiClient = (*otelLokiClient)(nil)
var (
_ remoteLokiClient = (*otelLokiClient)(nil)
payloadsPool = bytebufferpool.Pool{}
)

type OtelConfig struct {
Enabled bool
Expand Down Expand Up @@ -155,6 +160,8 @@ func (p *otelLokiClient) pushHttp(ctx context.Context, req *plogotlp.ExportReque
contentTypeHeader = "Content-Type"
apiKeyHeader = "apikey"
protobufContentType = "application/x-protobuf"
contentEncoding = "Content-Encoding"
contentEncodingGzip = "gzip"
)

err = p.initClient()
Expand All @@ -167,11 +174,18 @@ func (p *otelLokiClient) pushHttp(ctx context.Context, req *plogotlp.ExportReque
return "", fmt.Errorf("failed to marshal logs: %w", err)
}

buff := payloadsPool.Get()
err = gzipRequestIntoBuffer(protoBody, buff)
if err != nil {
return "", fmt.Errorf("failed to gzip request: %w", err)
}

httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, p.cfg.Endpoint, bytes.NewReader(protoBody))
if err != nil {
return "", fmt.Errorf("failed to create http request: %w", err)
}

httpReq.Header.Set(contentEncoding, contentEncodingGzip)
httpReq.Header.Set(contentTypeHeader, protobufContentType)
if p.cfg.ApiKey != "" {
httpReq.Header.Set(apiKeyHeader, p.cfg.ApiKey)
Expand Down Expand Up @@ -321,3 +335,20 @@ func getOTLPHTTPConnectionTransport(otelConfig OtelConfig) *http.Transport {

return &http.Transport{}
}

func gzipRequestIntoBuffer(request []byte, gzippedBuffer *bytebufferpool.ByteBuffer) (err error) {
gzipWriter := gzip.NewWriter(gzippedBuffer)
defer func() {
closingError := gzipWriter.Close()
if err == nil && closingError != nil {
err = fmt.Errorf("failed to close the gzip writer: %w", closingError)
}
}()

_, err = gzipWriter.Write(request)
if err != nil {
return fmt.Errorf("failed to write to the gzip writer: %w", err)
}

return nil
}

0 comments on commit 009279f

Please sign in to comment.