Skip to content

Commit

Permalink
graphite.API#Query returns []dataPoint
Browse files Browse the repository at this point in the history
Previously, `graphite.API#Query` returned the value of
the last item in the `datapoints` array returned by
the Graphite API. However, this somewhat diverged from
the Prometheus metricprovider implementation and fails
to consider a use case for obtaining the first or an
arbitrary element from the datapoints array, as
discussed here:

#1406 (comment)

Signed-off-by: Mike Ball <[email protected]>
  • Loading branch information
mdb committed Aug 27, 2021
1 parent 353150d commit 91e32aa
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 77 deletions.
4 changes: 2 additions & 2 deletions docs/analysis/graphite.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ spec:
metrics:
- name: success-rate
interval: 5m
# Note that the Argo Rollouts Graphite metrics provider returns results as float64s with 6 decimal places.
successCondition: result >= 90.000000
# Note that the Argo Rollouts Graphite metrics provider returns results as an array of float64s with 6 decimal places.
successCondition: results[0] >= 90.000000
failureLimit: 3
provider:
graphite:
Expand Down
37 changes: 14 additions & 23 deletions metricproviders/graphite/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,22 @@ import (

// API represents a Graphite API client
type API interface {
Query(query string) (*float64, error)
Query(query string) ([]dataPoint, error)
}

// GraphiteAPI is a Graphite API client
// APIClient is a Graphite API client
type APIClient struct {
url url.URL
client *http.Client
logCTX log.Entry
}

// Query performs a Graphite API query with the query it's passed
func (api APIClient) Query(quer string) (*float64, error) {
func (api APIClient) Query(quer string) ([]dataPoint, error) {
query := api.trimQuery(quer)
u, err := url.Parse(fmt.Sprintf("./render?%s", query))
if err != nil {
return nil, err
return []dataPoint{}, err
}

q := u.Query()
Expand All @@ -44,53 +44,44 @@ func (api APIClient) Query(quer string) (*float64, error) {

req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return nil, err
return []dataPoint{}, err
}

r, err := api.client.Do(req)
if err != nil {
return nil, err
return []dataPoint{}, err
}
defer r.Body.Close()

b, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, err
return []dataPoint{}, err
}

if 400 <= r.StatusCode {
return nil, fmt.Errorf("error response: %s", string(b))
return []dataPoint{}, fmt.Errorf("error response: %s", string(b))
}

var result graphiteResponse
err = json.Unmarshal(b, &result)
if err != nil {
return nil, err
return []dataPoint{}, err
}

var value *float64
for _, tr := range result {
for _, dp := range tr.DataPoints {
if dp.Value != nil {
value = dp.Value
}
}
}

return value, nil
return result[0].DataPoints, nil
}

func (api APIClient) trimQuery(q string) string {
space := regexp.MustCompile(`\s+`)
return space.ReplaceAllString(q, " ")
}

type graphiteDataPoint struct {
type dataPoint struct {
Value *float64
TimeStamp time.Time
}

func (gdp *graphiteDataPoint) UnmarshalJSON(data []byte) error {
func (gdp *dataPoint) UnmarshalJSON(data []byte) error {
var v []interface{}
if err := json.Unmarshal(data, &v); err != nil {
return err
Expand Down Expand Up @@ -144,8 +135,8 @@ func (gdp *graphiteDataPoint) UnmarshalJSON(data []byte) error {
}

type graphiteTargetResp struct {
Target string `json:"target"`
DataPoints []graphiteDataPoint `json:"datapoints"`
Target string `json:"target"`
DataPoints []dataPoint `json:"datapoints"`
}

type graphiteResponse []graphiteTargetResp
69 changes: 32 additions & 37 deletions metricproviders/graphite/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -42,14 +43,19 @@ func TestQuery(t *testing.T) {
query := "target=sumSeries(app.http.*.*.count)&from=-2min"
targetQuery := "sumSeries(app.http.*.*.count)"
fromQuery := "-2min"
goodResult := float64(100)
value := float64(100)
timestamp := int64(1621348430)
goodResult := []dataPoint{{
Value: &value,
TimeStamp: time.Unix(timestamp, 0),
}}

tests := []struct {
name string
query string
expectedTarget string
expectedFrom string
expectedResult *float64
expectedResult []dataPoint
expectedErr error
body string
responseCode int
Expand All @@ -58,26 +64,14 @@ func TestQuery(t *testing.T) {
query,
targetQuery,
fromQuery,
&goodResult,
goodResult,
nil,
`[
fmt.Sprintf(`[
{
"datapoints": [
[
10,
1621348400
],
[
75,
1621348410
],
[
25,
1621348420
],
[
100,
1621348430
%f,
%d
]
],
"target": "sumSeries(app.http.*.*.count)",
Expand All @@ -86,14 +80,14 @@ func TestQuery(t *testing.T) {
"name": "sumSeries(app.http.*.*.count)"
}
}
]`,
]`, value, timestamp),
200,
}, {
"graphite response body with invalid JSON",
query,
targetQuery,
fromQuery,
nil,
[]dataPoint{},
errors.New("invalid character 'i' looking for beginning of value"),
"invalid JSON",
200,
Expand All @@ -102,7 +96,7 @@ func TestQuery(t *testing.T) {
query,
targetQuery,
fromQuery,
nil,
[]dataPoint{},
errors.New("error response: foo"),
"foo",
400,
Expand All @@ -111,7 +105,7 @@ func TestQuery(t *testing.T) {
query,
targetQuery,
fromQuery,
nil,
[]dataPoint{},
errors.New("error response: bar"),
"bar",
500,
Expand All @@ -120,7 +114,7 @@ func TestQuery(t *testing.T) {
"target=#$%^&*(proper$#$%%^(password&from=-2min",
"#$%^&*(proper$#$%%^(password",
"-2min",
nil,
[]dataPoint{},
errors.New("parse \"./render?target=#$%^&*(proper$#$%%^(password&from=-2min\": invalid URL escape \"%^&\""),
"",
200,
Expand All @@ -129,7 +123,7 @@ func TestQuery(t *testing.T) {
query,
targetQuery,
fromQuery,
nil,
[]dataPoint{},
errors.New("error unmarshaling data point: [10]"),
`[
{
Expand All @@ -151,7 +145,7 @@ func TestQuery(t *testing.T) {
query,
targetQuery,
fromQuery,
nil,
[]dataPoint{},
errors.New("strconv.ParseInt: parsing \"f\": invalid syntax"),
`[
{
Expand All @@ -170,26 +164,26 @@ func TestQuery(t *testing.T) {
query,
targetQuery,
fromQuery,
&goodResult,
goodResult,
nil,
`[
fmt.Sprintf(`[
{
"datapoints": [
[
"100",
1621348420
"%f",
%d
]
],
"target": "sumSeries(app.http.*.*.count)"
}
]`,
]`, value, timestamp),
200,
}, {
"graphite response data point JSON triggers unmarshaling error",
query,
targetQuery,
fromQuery,
nil,
[]dataPoint{},
errors.New("error unmarshaling value: []"),
`[
{
Expand All @@ -208,26 +202,26 @@ func TestQuery(t *testing.T) {
query,
targetQuery,
fromQuery,
&goodResult,
goodResult,
nil,
`[
fmt.Sprintf(`[
{
"datapoints": [
[
100,
"1621348420"
%f,
"%d"
]
],
"target": "sumSeries(app.http.*.*.count)"
}
]`,
]`, value, timestamp),
200,
}, {
"graphite response data point timestamp JSON triggers unmarshaling error",
query,
targetQuery,
fromQuery,
nil,
[]dataPoint{},
errors.New("error unmarshaling timestamp: 100"),
`[
{
Expand Down Expand Up @@ -268,6 +262,7 @@ func TestQuery(t *testing.T) {
} else {
assert.Nil(t, err)
}

assert.Equal(t, test.expectedResult, val)
})
}
Expand Down
31 changes: 26 additions & 5 deletions metricproviders/graphite/graphite.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,17 @@ func (p *Provider) Run(run *v1alpha1.AnalysisRun, metric v1alpha1.Metric) v1alph
StartedAt: &startTime,
}

value, err := p.api.Query(metric.Provider.Graphite.Query)
result, err := p.api.Query(metric.Provider.Graphite.Query)
if err != nil {
return metricutil.MarkMeasurementError(newMeasurement, err)
}

if value == nil {
if len(result) == 0 {
return metricutil.MarkMeasurementError(newMeasurement, errors.New("no values found"))
}

newMeasurement.Value = fmt.Sprintf("%f", *value)

newStatus, err := evaluate.EvaluateResult(*value, metric, p.logCtx)
newValue, newStatus, err := p.processResponse(metric, result)
newMeasurement.Value = newValue
if err != nil {
return metricutil.MarkMeasurementError(newMeasurement, err)
}
Expand Down Expand Up @@ -96,6 +95,28 @@ func (p *Provider) GarbageCollect(run *v1alpha1.AnalysisRun, metric v1alpha1.Met
return nil
}

func (p *Provider) processResponse(metric v1alpha1.Metric, dataPoints []dataPoint) (string, v1alpha1.AnalysisPhase, error) {
results := make([]float64, 0, len(dataPoints))
valueStr := "["

for _, dp := range dataPoints {
if dp.Value != nil {
valueStr = valueStr + fmt.Sprintf("%f,", *dp.Value)
results = append(results, *dp.Value)
}
}

// remove the last comma on the '[dp.Value,dp.Value,' string
if len(valueStr) > 1 {
valueStr = valueStr[:len(valueStr)-1]
}

valueStr = valueStr + "]"
newStatus, err := evaluate.EvaluateResult(results, metric, p.logCtx)

return valueStr, newStatus, err
}

// NewGraphiteProvider returns a new Graphite provider
func NewGraphiteProvider(api API, logCtx log.Entry) *Provider {
return &Provider{
Expand Down
Loading

0 comments on commit 91e32aa

Please sign in to comment.