From e9ae4cea3cac3b081678c201c18ead67f20694fa Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Sat, 2 Mar 2024 15:32:46 +0000 Subject: [PATCH] fix: batch writes on entire PRW request Batch the entire Remote-Write request, rather than per-timeseries. This allows a significantly higher write throughput. Also, move the tagsFromMetric call out of the inner loop. --- bigquerydb/client.go | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/bigquerydb/client.go b/bigquerydb/client.go index 5f8287b4..0fc7feb0 100644 --- a/bigquerydb/client.go +++ b/bigquerydb/client.go @@ -163,16 +163,20 @@ func (c *BigqueryClient) Write(timeseries []*prompb.TimeSeries) error { inserter := c.client.Dataset(c.datasetID).Table(c.tableID).Inserter() inserter.SkipInvalidRows = true ctx, cancel := context.WithTimeout(context.Background(), c.timeout) + defer cancel() + batch := make([]*Item, 0, len(timeseries)) for i := range timeseries { ts := timeseries[i] samples := ts.Samples - batch := make([]*Item, 0, len(samples)) c.recordsFetched.Add(float64(len(samples))) metric := make(model.Metric, len(ts.Labels)) for _, l := range ts.Labels { metric[model.LabelName(l.Name)] = model.LabelValue(l.Value) } + + t := tagsFromMetric(metric) + for _, s := range samples { v := float64(s.Value) if math.IsNaN(v) || math.IsInf(v, 0) { @@ -185,27 +189,25 @@ func (c *BigqueryClient) Write(timeseries []*prompb.TimeSeries) error { value: v, metricname: string(metric[model.MetricNameLabel]), timestamp: model.Time(s.Timestamp).Unix(), - tags: tagsFromMetric(metric), + tags: t, }) - } + } - begin := time.Now() - if err := inserter.Put(ctx, batch); err != nil { - if multiError, ok := err.(bigquery.PutMultiError); ok { - for _, err1 := range multiError { - for _, err2 := range err1.Errors { - fmt.Println(err2) - } + begin := time.Now() + if err := inserter.Put(ctx, batch); err != nil { + if multiError, ok := err.(bigquery.PutMultiError); ok { + for _, err1 := range multiError { + for _, err2 := range err1.Errors { + fmt.Println(err2) } } - defer cancel() - return err } - duration := time.Since(begin).Seconds() - c.batchWriteDuration.Observe(duration) + return err } - defer cancel() + duration := time.Since(begin).Seconds() + c.batchWriteDuration.Observe(duration) + return nil }