Skip to content
This repository has been archived by the owner on Oct 3, 2023. It is now read-only.

Commit

Permalink
Apply timeout to each worker in metrics_proto export (#222)
Browse files Browse the repository at this point in the history
* Apply timeout to each worker in metrics_proto export

* Apply timeout to createMetricDescriptor
  • Loading branch information
songy23 authored Sep 17, 2019
1 parent 633d9ea commit 2798eee
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 15 deletions.
22 changes: 16 additions & 6 deletions metrics_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"strings"
"sync"
"time"

monitoring "cloud.google.com/go/monitoring/apiv3"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
Expand All @@ -44,7 +45,7 @@ type metricsBatcher struct {
wg *sync.WaitGroup
}

func newMetricsBatcher(ctx context.Context, projectID string, numWorkers int, mc *monitoring.MetricClient) *metricsBatcher {
func newMetricsBatcher(ctx context.Context, projectID string, numWorkers int, mc *monitoring.MetricClient, timeout time.Duration) *metricsBatcher {
if numWorkers < minNumWorkers {
numWorkers = minNumWorkers
}
Expand All @@ -58,7 +59,7 @@ func newMetricsBatcher(ctx context.Context, projectID string, numWorkers int, mc
var wg sync.WaitGroup
wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
w := newWorker(ctx, mc, reqsChan, respsChan, &wg)
w := newWorker(ctx, mc, reqsChan, respsChan, &wg, timeout)
workers = append(workers, w)
go w.start()
}
Expand Down Expand Up @@ -143,8 +144,9 @@ func sendReq(ctx context.Context, c *monitoring.MetricClient, req *monitoringpb.
}

type worker struct {
ctx context.Context
mc *monitoring.MetricClient
ctx context.Context
timeout time.Duration
mc *monitoring.MetricClient

resp *response

Expand All @@ -159,7 +161,8 @@ func newWorker(
mc *monitoring.MetricClient,
reqsChan chan *monitoringpb.CreateTimeSeriesRequest,
respsChan chan *response,
wg *sync.WaitGroup) *worker {
wg *sync.WaitGroup,
timeout time.Duration) *worker {
return &worker{
ctx: ctx,
mc: mc,
Expand All @@ -172,12 +175,19 @@ func newWorker(

func (w *worker) start() {
for req := range w.reqsChan {
w.recordDroppedTimeseries(sendReq(w.ctx, w.mc, req))
w.sendReqWithTimeout(req)
}
w.respsChan <- w.resp
w.wg.Done()
}

func (w *worker) sendReqWithTimeout(req *monitoringpb.CreateTimeSeriesRequest) {
ctx, cancel := newContextWithTimeout(w.ctx, w.timeout)
defer cancel()

w.recordDroppedTimeseries(sendReq(ctx, w.mc, req))
}

func (w *worker) recordDroppedTimeseries(numTimeSeries int, err error) {
w.resp.droppedTimeSeries += numTimeSeries
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions metrics_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ func TestWorkers(t *testing.T) {
if err != nil {
t.Fatalf("Failed to create metric client %v", err)
}
m1 := newMetricsBatcher(ctx, "test", 1, c1) // batcher with 1 worker
m1 := newMetricsBatcher(ctx, "test", 1, c1, defaultTimeout) // batcher with 1 worker

c2, err := makeClient(addr)
if err != nil {
t.Fatalf("Failed to create metric client %v", err)
}
m2 := newMetricsBatcher(ctx, "test", 2, c2) // batcher with 2 workers
m2 := newMetricsBatcher(ctx, "test", 2, c2, defaultTimeout) // batcher with 2 workers

tss := make([]*monitoringpb.TimeSeries, 0, 500) // make 500 time series, should be split to 3 reqs
for i := 0; i < 500; i++ {
Expand Down
16 changes: 10 additions & 6 deletions metrics_proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,10 @@ func (se *statsExporter) PushMetricsProto(ctx context.Context, node *commonpb.No
return 0, errNilMetricOrMetricDescriptor
}

ctx, cancel := newContextWithTimeout(ctx, se.o.Timeout)
defer cancel()

// Caches the resources seen so far
seenResources := make(map[*resourcepb.Resource]*monitoredrespb.MonitoredResource)

mb := newMetricsBatcher(ctx, se.o.ProjectID, se.o.NumberOfWorkers, se.c)
mb := newMetricsBatcher(ctx, se.o.ProjectID, se.o.NumberOfWorkers, se.c, se.o.Timeout)
for _, metric := range metrics {
if len(metric.GetTimeseries()) == 0 {
// No TimeSeries to export, skip this metric.
Expand All @@ -70,14 +67,14 @@ func (se *statsExporter) PushMetricsProto(ctx context.Context, node *commonpb.No
if metric.GetMetricDescriptor().GetType() == metricspb.MetricDescriptor_SUMMARY {
summaryMtcs := se.convertSummaryMetrics(metric)
for _, summaryMtc := range summaryMtcs {
if err := se.protoCreateMetricDescriptor(ctx, summaryMtc); err != nil {
if err := se.createMetricDescriptorWithTimeout(ctx, summaryMtc); err != nil {
mb.recordDroppedTimeseries(len(summaryMtc.GetTimeseries()), err)
continue
}
se.protoMetricToTimeSeries(ctx, mappedRsc, summaryMtc, mb)
}
} else {
if err := se.protoCreateMetricDescriptor(ctx, metric); err != nil {
if err := se.createMetricDescriptorWithTimeout(ctx, metric); err != nil {
mb.recordDroppedTimeseries(len(metric.GetTimeseries()), err)
continue
}
Expand Down Expand Up @@ -298,6 +295,13 @@ func labelsPerTimeSeries(defaults map[string]labelValue, labelKeys []string, lab
return labels, nil
}

func (se *statsExporter) createMetricDescriptorWithTimeout(ctx context.Context, metric *metricspb.Metric) error {
ctx, cancel := newContextWithTimeout(ctx, se.o.Timeout)
defer cancel()

return se.protoCreateMetricDescriptor(ctx, metric)
}

// createMetricDescriptor creates a metric descriptor from the OpenCensus proto metric
// and then creates it remotely using Stackdriver's API.
func (se *statsExporter) protoCreateMetricDescriptor(ctx context.Context, metric *metricspb.Metric) error {
Expand Down
2 changes: 1 addition & 1 deletion metrics_proto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1083,7 +1083,7 @@ func makePercentileValue(val, percentile float64) *metricspb.SummaryValue_Snapsh
}

func protoMetricToTimeSeries(ctx context.Context, se *statsExporter, mappedRsc *monitoredrespb.MonitoredResource, metric *metricspb.Metric) ([]*monitoringpb.TimeSeries, error) {
mb := newMetricsBatcher(ctx, se.o.ProjectID, se.o.NumberOfWorkers, se.c)
mb := newMetricsBatcher(ctx, se.o.ProjectID, se.o.NumberOfWorkers, se.c, defaultTimeout)
se.protoMetricToTimeSeries(ctx, mappedRsc, metric, mb)
return mb.allTss, mb.close(ctx)
}

0 comments on commit 2798eee

Please sign in to comment.