Skip to content

Commit

Permalink
Merge pull request #334 from karl-nilsson/fixes
Browse files Browse the repository at this point in the history
fix: batch BQ writes by entire PRW request
  • Loading branch information
vinny-sabatini authored Mar 8, 2024
2 parents 35d47e2 + e9ae4ce commit 227500d
Showing 1 changed file with 17 additions and 15 deletions.
32 changes: 17 additions & 15 deletions bigquerydb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,16 +163,20 @@ func (c *BigqueryClient) Write(timeseries []*prompb.TimeSeries) error {
inserter := c.client.Dataset(c.datasetID).Table(c.tableID).Inserter()
inserter.SkipInvalidRows = true
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()
batch := make([]*Item, 0, len(timeseries))

for i := range timeseries {
ts := timeseries[i]
samples := ts.Samples
batch := make([]*Item, 0, len(samples))
c.recordsFetched.Add(float64(len(samples)))
metric := make(model.Metric, len(ts.Labels))
for _, l := range ts.Labels {
metric[model.LabelName(l.Name)] = model.LabelValue(l.Value)
}

t := tagsFromMetric(metric)

for _, s := range samples {
v := float64(s.Value)
if math.IsNaN(v) || math.IsInf(v, 0) {
Expand All @@ -185,27 +189,25 @@ func (c *BigqueryClient) Write(timeseries []*prompb.TimeSeries) error {
value: v,
metricname: string(metric[model.MetricNameLabel]),
timestamp: model.Time(s.Timestamp).Unix(),
tags: tagsFromMetric(metric),
tags: t,
})

}
}

begin := time.Now()
if err := inserter.Put(ctx, batch); err != nil {
if multiError, ok := err.(bigquery.PutMultiError); ok {
for _, err1 := range multiError {
for _, err2 := range err1.Errors {
fmt.Println(err2)
}
begin := time.Now()
if err := inserter.Put(ctx, batch); err != nil {
if multiError, ok := err.(bigquery.PutMultiError); ok {
for _, err1 := range multiError {
for _, err2 := range err1.Errors {
fmt.Println(err2)
}
}
defer cancel()
return err
}
duration := time.Since(begin).Seconds()
c.batchWriteDuration.Observe(duration)
return err
}
defer cancel()
duration := time.Since(begin).Seconds()
c.batchWriteDuration.Observe(duration)

return nil
}

Expand Down

0 comments on commit 227500d

Please sign in to comment.