Skip to content

Commit

Permalink
fixed remote write regression (prometheus-community#88)
Browse files Browse the repository at this point in the history
* fixed regressions around series leak due to races; refactor for clarity

Signed-off-by: bwplotka <[email protected]>

* fixed remote write regresion.

Fixes prometheus-community#86

Signed-off-by: bwplotka <[email protected]>

---------

Signed-off-by: bwplotka <[email protected]>
  • Loading branch information
bwplotka authored Sep 26, 2024
1 parent 1b88ab7 commit 5b08980
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 16 deletions.
21 changes: 11 additions & 10 deletions cmd/avalanche.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,18 +147,19 @@ func main() {
}()
}

// First cut: just send the metrics once then exit
if err := metrics.SendRemoteWrite(config, reg); err != nil {
log.Fatal(err)
}
if *remotePprofInterval > 0 {
done <- struct{}{}
wg.Wait()
}
return
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
if err := metrics.SendRemoteWrite(ctx, config, reg); err != nil {
return err
}
if *remotePprofInterval > 0 {
done <- struct{}{}
wg.Wait()
}
return nil // One-off.
}, func(error) { cancel() })
}

// Standard mode for continuous exposure of metrics.
httpSrv := &http.Server{Addr: fmt.Sprintf(":%v", *port)}
g.Add(func() error {
fmt.Printf("Serving your metrics at :%v/metrics\n", *port)
Expand Down
22 changes: 16 additions & 6 deletions metrics/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type Client struct {

// SendRemoteWrite initializes a http client and
// sends metrics to a prometheus compatible remote endpoint.
func SendRemoteWrite(config *ConfigWrite, gatherer prometheus.Gatherer) error {
func SendRemoteWrite(ctx context.Context, config *ConfigWrite, gatherer prometheus.Gatherer) error {
var rt http.RoundTripper = &http.Transport{
TLSClientConfig: &config.TLSClientConfig,
}
Expand All @@ -79,7 +79,7 @@ func SendRemoteWrite(config *ConfigWrite, gatherer prometheus.Gatherer) error {
config: config,
gatherer: gatherer,
}
return c.write()
return c.write(ctx)
}

// Add the tenant ID header
Expand Down Expand Up @@ -109,7 +109,7 @@ func cloneRequest(r *http.Request) *http.Request {
return r2
}

func (c *Client) write() error {
func (c *Client) write(ctx context.Context) error {
tss, err := collectMetrics(c.gatherer, c.config.OutOfOrder)
if err != nil {
return err
Expand All @@ -125,10 +125,14 @@ func (c *Client) write() error {
merr = &errors.MultiError{}
)

log.Printf("Sending: %v timeseries, %v samples, %v timeseries per request, %v delay between requests\n", len(tss), c.config.RequestCount, c.config.BatchSize, c.config.RequestInterval)
log.Printf("Sending: %v timeseries, %v samples, %v timeseries per request, %v delay between requests\n", len(tss), c.config.RequestCount, c.config.BatchSize, c.config.RequestInterval)
ticker := time.NewTicker(c.config.RequestInterval)
defer ticker.Stop()
for ii := 0; ii < c.config.RequestCount; ii++ {
if ctx.Err() != nil {
return ctx.Err()
}

// Download the pprofs during half of the iteration to get avarege readings.
// Do that only when it is not set to take profiles at a given interval.
if len(c.config.PprofURLs) > 0 && ii == c.config.RequestCount/2 {
Expand Down Expand Up @@ -223,6 +227,7 @@ func ToTimeSeriesSlice(metricFamilies []*dto.MetricFamily) []prompb.TimeSeries {
tss := make([]prompb.TimeSeries, 0, len(metricFamilies)*10)
timestamp := int64(model.Now()) // Not using metric.TimestampMs because it is (always?) nil. Is this right?

skippedSamples := 0
for _, metricFamily := range metricFamilies {
for _, metric := range metricFamily.Metric {
labels := prompbLabels(*metricFamily.Name, metric.Label)
Expand All @@ -235,16 +240,21 @@ func ToTimeSeriesSlice(metricFamilies []*dto.MetricFamily) []prompb.TimeSeries {
Value: *metric.Counter.Value,
Timestamp: timestamp,
}}
tss = append(tss, ts)
case dto.MetricType_GAUGE:
ts.Samples = []prompb.Sample{{
Value: *metric.Gauge.Value,
Timestamp: timestamp,
}}
tss = append(tss, ts)
default:
skippedSamples++
}
tss = append(tss, ts)
}
}

if skippedSamples > 0 {
log.Printf("WARN: Skipping %v samples; sending only %v samples, given only gauge and counters are currently implemented\n", skippedSamples, len(tss))
}
return tss
}

Expand Down

0 comments on commit 5b08980

Please sign in to comment.