From 8815bce6b2439ff3fb6b22e94a3fea43f4f19aac Mon Sep 17 00:00:00 2001 From: Mike Ball Date: Tue, 10 Aug 2021 08:44:18 -0400 Subject: [PATCH] feat: add support for Graphite metrics provider Address issue #1403 by adding a Graphite metrics provider. Signed-off-by: Mike Ball --- metricproviders/graphite/api.go | 156 +++++++++++ metricproviders/graphite/api_test.go | 270 +++++++++++++++++++ metricproviders/graphite/graphite.go | 104 +++++++ metricproviders/graphite/graphite_test.go | 131 +++++++++ metricproviders/graphite/mock_test.go | 13 + metricproviders/metricproviders.go | 7 + pkg/apis/rollouts/v1alpha1/analysis_types.go | 10 + 7 files changed, 691 insertions(+) create mode 100644 metricproviders/graphite/api.go create mode 100644 metricproviders/graphite/api_test.go create mode 100644 metricproviders/graphite/graphite.go create mode 100644 metricproviders/graphite/graphite_test.go create mode 100644 metricproviders/graphite/mock_test.go diff --git a/metricproviders/graphite/api.go b/metricproviders/graphite/api.go new file mode 100644 index 0000000000..7c35d416e1 --- /dev/null +++ b/metricproviders/graphite/api.go @@ -0,0 +1,156 @@ +package graphite + +import ( + "context" + "encoding/json" + "fmt" + "io/ioutil" + "math" + "net/http" + "net/url" + "path" + "regexp" + "strconv" + "time" + + log "github.com/sirupsen/logrus" +) + +// API represents a Graphite API client +type API interface { + Query(query string) (*float64, error) +} + +// GraphiteAPI is a Graphite API client +type APIClient struct { + url url.URL + client *http.Client + timeout time.Duration + logCTX log.Entry +} + +// Query performs a Graphite API query with the query it's passed +func (api APIClient) Query(quer string) (*float64, error) { + query := api.trimQuery(quer) + u, err := url.Parse(fmt.Sprintf("./render?%s", query)) + if err != nil { + return nil, err + } + + q := u.Query() + q.Set("format", "json") + u.RawQuery = q.Encode() + + u.Path = path.Join(api.url.Path, u.Path) + u = api.url.ResolveReference(u) + + req, err := http.NewRequest("GET", u.String(), nil) + if err != nil { + return nil, err + } + + ctx, cancel := context.WithTimeout(req.Context(), api.timeout) + defer cancel() + + r, err := api.client.Do(req.WithContext(ctx)) + if err != nil { + return nil, err + } + defer r.Body.Close() + + b, err := ioutil.ReadAll(r.Body) + if err != nil { + return nil, err + } + + if 400 <= r.StatusCode { + return nil, fmt.Errorf("error response: %s", string(b)) + } + + var result graphiteResponse + err = json.Unmarshal(b, &result) + if err != nil { + return nil, err + } + + var value *float64 + for _, tr := range result { + for _, dp := range tr.DataPoints { + if dp.Value != nil { + value = dp.Value + } + } + } + + return value, nil +} + +func (api APIClient) trimQuery(q string) string { + space := regexp.MustCompile(`\s+`) + return space.ReplaceAllString(q, " ") +} + +type graphiteDataPoint struct { + Value *float64 + TimeStamp time.Time +} + +func (gdp *graphiteDataPoint) UnmarshalJSON(data []byte) error { + var v []interface{} + if err := json.Unmarshal(data, &v); err != nil { + return err + } + + if len(v) != 2 { + return fmt.Errorf("error unmarshaling data point: %v", v) + } + + switch v[0].(type) { + case nil: + // no value + case float64: + f, _ := v[0].(float64) + gdp.Value = &f + case string: + f, err := strconv.ParseFloat(v[0].(string), 64) + if err != nil { + return err + } + gdp.Value = &f + default: + f, ok := v[0].(float64) + if !ok { + return fmt.Errorf("error unmarshaling value: %v", v[0]) + } + gdp.Value = &f + } + + switch v[1].(type) { + case nil: + // no value + case float64: + ts := int64(math.Round(v[1].(float64))) + gdp.TimeStamp = time.Unix(ts, 0) + case string: + ts, err := strconv.ParseInt(v[1].(string), 10, 64) + if err != nil { + return err + } + gdp.TimeStamp = time.Unix(ts, 0) + default: + ts, ok := v[1].(int64) + if !ok { + return fmt.Errorf("error unmarshaling timestamp: %v", v[0]) + } + gdp.TimeStamp = time.Unix(ts, 0) + } + + return nil +} + +type graphiteTargetResp struct { + Target string `json:"target"` + DataPoints []graphiteDataPoint `json:"datapoints"` +} + +type graphiteResponse []graphiteTargetResp diff --git a/metricproviders/graphite/api_test.go b/metricproviders/graphite/api_test.go new file mode 100644 index 0000000000..ade46d733f --- /dev/null +++ b/metricproviders/graphite/api_test.go @@ -0,0 +1,270 @@ +package graphite + +import ( + "errors" + "fmt" + "net/http" + "net/http/httptest" + "testing" + + "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1" + log "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" +) + +func testGraphiteMetric(addr string) v1alpha1.Metric { + return v1alpha1.Metric{ + Provider: v1alpha1.MetricProvider{ + Graphite: &v1alpha1.GraphiteMetric{ + Address: addr, + }, + }, + } +} + +func TestNewAPIClientWithValidURL(t *testing.T) { + e := log.Entry{} + _, err := NewAPIClient(testGraphiteMetric("http://some-graphite.foo"), e) + + assert.NoError(t, err) +} + +func TestNewAPIWithInvalidURL(t *testing.T) { + addr := ":::" + e := log.Entry{} + g, err := NewAPIClient(testGraphiteMetric(addr), e) + + assert.Equal(t, err.Error(), fmt.Sprintf("Graphite address %s is not a valid URL", addr)) + assert.Nil(t, g) +} + +func TestQuery(t *testing.T) { + goodResult := float64(100) + tests := []struct { + name string + query string + expectedTarget string + expectedFrom string + expectedResult *float64 + expectedErr error + body string + responseCode int + }{{ + "ok", + "target=sumSeries(app.http.*.*.count)&from=-2min", + "sumSeries(app.http.*.*.count)", + "-2min", + &goodResult, + nil, + `[ + { + "datapoints": [ + [ + 10, + 1621348400 + ], + [ + 75, + 1621348410 + ], + [ + 25, + 1621348420 + ], + [ + 100, + 1621348430 + ] + ], + "target": "sumSeries(app.http.*.*.count)", + "tags": { + "aggregatedBy": "sum", + "name": "sumSeries(app.http.*.*.count)" + } + } + ]`, + 200, + }, { + "graphite response body with invalid JSON", + "target=sumSeries(app.http.*.*.count)&from=-2min", + "sumSeries(app.http.*.*.count)", + "-2min", + nil, + errors.New("invalid character 'i' looking for beginning of value"), + "invalid JSON", + 200, + }, { + "400 graphite response status", + "target=sumSeries(app.http.*.*.count)&from=-2min", + "sumSeries(app.http.*.*.count)", + "-2min", + nil, + errors.New("error response: foo"), + "foo", + 400, + }, { + "500 graphite response status", + "target=sumSeries(app.http.*.*.count)&from=-2min", + "sumSeries(app.http.*.*.count)", + "-2min", + nil, + errors.New("error response: bar"), + "bar", + 500, + }, { + "invalid query", + "target=#$%^&*(proper$#$%%^(password&from=-2min", + "#$%^&*(proper$#$%%^(password", + "-2min", + nil, + errors.New("parse \"./render?target=#$%^&*(proper$#$%%^(password&from=-2min\": invalid URL escape \"%^&\""), + "", + 200, + }, { + "graphite response data point JSON with only one item", + "target=sumSeries(app.http.*.*.count)&from=-2min", + "sumSeries(app.http.*.*.count)", + "-2min", + nil, + errors.New("error unmarshaling data point: [10]"), + `[ + { + "datapoints": [ + [ + 10 + ] + ], + "target": "sumSeries(app.http.*.*.count)", + "tags": { + "aggregatedBy": "sum", + "name": "sumSeries(app.http.*.*.count)" + } + } + ]`, + 200, + }, { + "graphite response data point JSON with an invalid timestamp", + "target=sumSeries(app.http.*.*.count)&from=-2min", + "sumSeries(app.http.*.*.count)", + "-2min", + nil, + errors.New("strconv.ParseInt: parsing \"f\": invalid syntax"), + `[ + { + "datapoints": [ + [ + 100, + "f" + ] + ], + "target": "sumSeries(app.http.*.*.count)" + } + ]`, + 200, + }, { + "graphite response data point JSON with a string value", + "target=sumSeries(app.http.*.*.count)&from=-2min", + "sumSeries(app.http.*.*.count)", + "-2min", + &goodResult, + nil, + `[ + { + "datapoints": [ + [ + "100", + 1621348420 + ] + ], + "target": "sumSeries(app.http.*.*.count)" + } + ]`, + 200, + }, { + "graphite response data point JSON triggers unmarshaling error", + "target=sumSeries(app.http.*.*.count)&from=-2min", + "sumSeries(app.http.*.*.count)", + "-2min", + nil, + errors.New("error unmarshaling value: []"), + `[ + { + "datapoints": [ + [ + [], + 1621348420 + ] + ], + "target": "sumSeries(app.http.*.*.count)" + } + ]`, + 200, + }, { + "graphite response data point JSON with a string timestamp", + "target=sumSeries(app.http.*.*.count)&from=-2min", + "sumSeries(app.http.*.*.count)", + "-2min", + &goodResult, + nil, + `[ + { + "datapoints": [ + [ + 100, + "1621348420" + ] + ], + "target": "sumSeries(app.http.*.*.count)" + } + ]`, + 200, + }, { + "graphite response data point timestamp JSON triggers unmarshaling error", + "target=sumSeries(app.http.*.*.count)&from=-2min", + "sumSeries(app.http.*.*.count)", + "-2min", + nil, + errors.New("error unmarshaling timestamp: 100"), + `[ + { + "datapoints": [ + [ + 100, + [] + ] + ], + "target": "sumSeries(app.http.*.*.count)" + } + ]`, + 200, + }} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + target := r.URL.Query().Get("target") + assert.Equal(t, test.expectedTarget, target) + + from := r.URL.Query().Get("from") + assert.Equal(t, test.expectedFrom, from) + + w.WriteHeader(test.responseCode) + + json := test.body + w.Write([]byte(json)) + })) + defer ts.Close() + + g, err := NewAPIClient(testGraphiteMetric(ts.URL), log.Entry{}) + assert.Nil(t, err) + + val, err := g.Query(test.query) + if test.expectedErr != nil { + assert.Equal(t, err.Error(), test.expectedErr.Error()) + } else { + assert.Nil(t, err) + } + assert.Equal(t, test.expectedResult, val) + }) + } +} diff --git a/metricproviders/graphite/graphite.go b/metricproviders/graphite/graphite.go new file mode 100644 index 0000000000..9a191e123d --- /dev/null +++ b/metricproviders/graphite/graphite.go @@ -0,0 +1,104 @@ +package graphite + +import ( + "errors" + "fmt" + "net/http" + "net/url" + "time" + + log "github.com/sirupsen/logrus" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1" + "github.com/argoproj/argo-rollouts/utils/evaluate" + metricutil "github.com/argoproj/argo-rollouts/utils/metric" +) + +const ( + // ProviderType indicates the provider is Graphite. + ProviderType = "Graphite" +) + +// NewAPIClient generates a APIClient from the metric configuration +func NewAPIClient(metric v1alpha1.Metric, logCTX log.Entry) (*APIClient, error) { + addr := metric.Provider.Graphite.Address + graphiteURL, err := url.Parse(addr) + if addr == "" || err != nil { + return nil, fmt.Errorf("%s address %s is not a valid URL", ProviderType, addr) + } + + return &APIClient{ + logCTX: logCTX, + client: http.DefaultClient, + url: *graphiteURL, + timeout: 5 * time.Second, + }, nil +} + +// Provider contains the required components to run a Graphite query. +// TODO: add support for username/password authentication. +type Provider struct { + api API + logCtx log.Entry +} + +// Type indicates provider is a Graphite provider. +func (p *Provider) Type() string { + return ProviderType +} + +// Run queries Graphite for the metric. +func (p *Provider) Run(run *v1alpha1.AnalysisRun, metric v1alpha1.Metric) v1alpha1.Measurement { + startTime := metav1.Now() + newMeasurement := v1alpha1.Measurement{ + StartedAt: &startTime, + } + + value, err := p.api.Query(metric.Provider.Graphite.Query) + if err != nil { + return metricutil.MarkMeasurementError(newMeasurement, err) + } + + if value == nil { + return metricutil.MarkMeasurementError(newMeasurement, errors.New("no values found")) + } + + newMeasurement.Value = fmt.Sprintf("%.3f", *value) + + newStatus, err := evaluate.EvaluateResult(*value, metric, p.logCtx) + if err != nil { + return metricutil.MarkMeasurementError(newMeasurement, err) + } + + newMeasurement.Phase = newStatus + finishedTime := metav1.Now() + newMeasurement.FinishedAt = &finishedTime + + return newMeasurement +} + +// Resume should not be used with the Graphite provider since all the work should occur in the Run method +func (p *Provider) Resume(run *v1alpha1.AnalysisRun, metric v1alpha1.Metric, measurement v1alpha1.Measurement) v1alpha1.Measurement { + p.logCtx.Warn("Graphite provider should not execute the Resume method") + return measurement +} + +// Terminate should not be used with the Graphite provider since all the work should occur in the Run method +func (p *Provider) Terminate(run *v1alpha1.AnalysisRun, metric v1alpha1.Metric, measurement v1alpha1.Measurement) v1alpha1.Measurement { + p.logCtx.Warn("Graphite provider should not execute the Terminate method") + return measurement +} + +// GarbageCollect is a no-op for the prometheus provider +func (p *Provider) GarbageCollect(run *v1alpha1.AnalysisRun, metric v1alpha1.Metric, limit int) error { + return nil +} + +// NewGraphiteProvider returns a new Graphite provider +func NewGraphiteProvider(api API, logCtx log.Entry) *Provider { + return &Provider{ + logCtx: logCtx, + api: api, + } +} diff --git a/metricproviders/graphite/graphite_test.go b/metricproviders/graphite/graphite_test.go new file mode 100644 index 0000000000..9130c679e6 --- /dev/null +++ b/metricproviders/graphite/graphite_test.go @@ -0,0 +1,131 @@ +package graphite + +import ( + "errors" + "testing" + + log "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + + "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1" +) + +func newMockAPI(response *float64, err error) mockAPI { + return mockAPI{ + response: response, + err: err, + } +} + +func newTestingMetric() v1alpha1.Metric { + return v1alpha1.Metric{ + Name: "foo", + SuccessCondition: "result == 10.000", + FailureCondition: "result != 10.000", + Provider: v1alpha1.MetricProvider{ + Graphite: &v1alpha1.GraphiteMetric{ + Address: "http://some-graphite.foo", + Query: "foo=1", + }, + }, + } +} + +func TestType(t *testing.T) { + response := 10.000 + g := NewGraphiteProvider(newMockAPI(&response, nil), log.Entry{}) + assert.Equal(t, ProviderType, g.Type()) +} + +func TestRunSuccessfulEvaluation(t *testing.T) { + response := 10.000 + g := NewGraphiteProvider(newMockAPI(&response, nil), log.Entry{}) + measurement := g.Run(&v1alpha1.AnalysisRun{}, newTestingMetric()) + assert.NotNil(t, measurement.StartedAt) + assert.Equal(t, "10.000", measurement.Value) + assert.NotNil(t, measurement.FinishedAt) + assert.Equal(t, v1alpha1.AnalysisPhaseSuccessful, measurement.Phase) +} + +func TestRunFailedEvaluation(t *testing.T) { + response := 5.000 + g := NewGraphiteProvider(newMockAPI(&response, nil), log.Entry{}) + measurement := g.Run(&v1alpha1.AnalysisRun{}, newTestingMetric()) + assert.NotNil(t, measurement.StartedAt) + assert.Equal(t, "5.000", measurement.Value) + assert.NotNil(t, measurement.FinishedAt) + assert.Equal(t, v1alpha1.AnalysisPhaseFailed, measurement.Phase) +} + +func TestRunMeasurementError(t *testing.T) { + metric := v1alpha1.Metric{ + Name: "foo", + // Malformed Success and Failure Conditions + SuccessCondition: "result 10.000", + FailureCondition: "result 10.000", + Provider: v1alpha1.MetricProvider{ + Graphite: &v1alpha1.GraphiteMetric{ + Address: "http://some-graphite.foo", + Query: "foo=1", + }, + }, + } + response := 10.000 + g := NewGraphiteProvider(newMockAPI(&response, nil), log.Entry{}) + measurement := g.Run(&v1alpha1.AnalysisRun{}, metric) + assert.NotNil(t, measurement.StartedAt) + assert.Equal(t, "10.000", measurement.Value) + assert.NotNil(t, measurement.FinishedAt) + assert.Equal(t, v1alpha1.AnalysisPhaseError, measurement.Phase) + assert.Equal(t, "unexpected token Number(\"10.000\")", measurement.Message) +} + +func TestRunErrorEvaluationFromNilQueryResponse(t *testing.T) { + g := NewGraphiteProvider(newMockAPI(nil, nil), log.Entry{}) + measurement := g.Run(&v1alpha1.AnalysisRun{}, newTestingMetric()) + assert.NotNil(t, measurement.StartedAt) + assert.Equal(t, "", measurement.Value) + assert.NotNil(t, measurement.FinishedAt) + assert.Equal(t, v1alpha1.AnalysisPhaseError, measurement.Phase) + assert.Equal(t, "no values found", measurement.Message) +} + +func TestRunErrorEvaluationFromErrorQueryResponse(t *testing.T) { + response := 10.000 + g := NewGraphiteProvider(newMockAPI(&response, errors.New("some err")), log.Entry{}) + measurement := g.Run(&v1alpha1.AnalysisRun{}, newTestingMetric()) + assert.NotNil(t, measurement.StartedAt) + assert.Equal(t, "", measurement.Value) + assert.NotNil(t, measurement.FinishedAt) + assert.Equal(t, v1alpha1.AnalysisPhaseError, measurement.Phase) + assert.Equal(t, "some err", measurement.Message) +} + +func TestResume(t *testing.T) { + response := 1.000 + e := log.NewEntry(log.New()) + g := NewGraphiteProvider(newMockAPI(&response, nil), *e) + metric := newTestingMetric() + analysisRun := &v1alpha1.AnalysisRun{} + measurement := g.Run(analysisRun, metric) + m := g.Resume(nil, metric, measurement) + assert.Equal(t, m, measurement) +} + +func TestTerminate(t *testing.T) { + response := 1.000 + e := log.NewEntry(log.New()) + g := NewGraphiteProvider(newMockAPI(&response, nil), *e) + metric := newTestingMetric() + analysisRun := &v1alpha1.AnalysisRun{} + measurement := g.Run(analysisRun, metric) + m := g.Terminate(nil, metric, measurement) + assert.Equal(t, m, measurement) +} + +func TestGarbageCollect(t *testing.T) { + response := 1.000 + g := NewGraphiteProvider(newMockAPI(&response, nil), log.Entry{}) + err := g.GarbageCollect(nil, v1alpha1.Metric{}, 0) + assert.NoError(t, err) +} diff --git a/metricproviders/graphite/mock_test.go b/metricproviders/graphite/mock_test.go new file mode 100644 index 0000000000..b0b1953144 --- /dev/null +++ b/metricproviders/graphite/mock_test.go @@ -0,0 +1,13 @@ +package graphite + +type mockAPI struct { + response *float64 + err error +} + +func (m mockAPI) Query(query string) (*float64, error) { + if m.err != nil { + return nil, m.err + } + return m.response, nil +} diff --git a/metricproviders/metricproviders.go b/metricproviders/metricproviders.go index 11df714b23..279dd7e839 100644 --- a/metricproviders/metricproviders.go +++ b/metricproviders/metricproviders.go @@ -3,6 +3,7 @@ package metricproviders import ( "fmt" + "github.com/argoproj/argo-rollouts/metricproviders/graphite" "github.com/argoproj/argo-rollouts/metricproviders/newrelic" "github.com/argoproj/argo-rollouts/metricproviders/wavefront" @@ -77,6 +78,12 @@ func (f *ProviderFactory) NewProvider(logCtx log.Entry, metric v1alpha1.Metric) return nil, err } return newrelic.NewNewRelicProvider(client, logCtx), nil + case graphite.ProviderType: + client, err := graphite.NewAPIClient(metric, logCtx) + if err != nil { + return nil, err + } + return graphite.NewGraphiteProvider(client, logCtx), nil default: return nil, fmt.Errorf("no valid provider in metric '%s'", metric.Name) } diff --git a/pkg/apis/rollouts/v1alpha1/analysis_types.go b/pkg/apis/rollouts/v1alpha1/analysis_types.go index 7144824155..aa3d893030 100644 --- a/pkg/apis/rollouts/v1alpha1/analysis_types.go +++ b/pkg/apis/rollouts/v1alpha1/analysis_types.go @@ -139,6 +139,8 @@ type MetricProvider struct { NewRelic *NewRelicMetric `json:"newRelic,omitempty" protobuf:"bytes,6,opt,name=newRelic"` // Job specifies the job metric run Job *JobMetric `json:"job,omitempty" protobuf:"bytes,7,opt,name=job"` + // Graphite specifies the Graphite metric to query + Graphite *GraphiteMetric `json:"graphite,omitempty" protobuf:"bytes,1,opt,name=graphite"` } // AnalysisPhase is the overall phase of an AnalysisRun, MetricResult, or Measurement @@ -193,6 +195,14 @@ type JobMetric struct { Spec batchv1.JobSpec `json:"spec" protobuf:"bytes,2,opt,name=spec"` } +// GraphiteMetric defines the Graphite query to perform canary analysis +type GraphiteMetric struct { + // Address is the HTTP address and port of the Graphite server + Address string `json:"address,omitempty" protobuf:"bytes,1,opt,name=address"` + // Query is a raw Graphite query to perform + Query string `json:"query,omitempty" protobuf:"bytes,2,opt,name=query"` +} + // AnalysisRun is an instantiation of an AnalysisTemplate // +genclient // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object