Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Allow switching between Datadog v1 and v2. Fixes #2549 #2592

Merged
merged 18 commits into from
Mar 3, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/analysis/datadog.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,18 @@ spec:
failureLimit: 3
provider:
datadog:
apiVersion: v2
interval: 5m
query: |
sum:requests.error.count{service:{{args.service-name}}} /
sum:requests.request.count{service:{{args.service-name}}}
```

The field `apiVersion` refers to the API version of Datadog (v1 or v2). Default value is `v1` if this is omitted.

!!! note
Datadog is moving away from the legacy v1 API. Rate limits imposed by Datadog are therefore stricter when using v1. It is recommended to switch to v2 soon. If you switch to v2, you will not need to change any other field aside from `apiVersion`.

Datadog api and app tokens can be configured in a kubernetes secret in argo-rollouts namespace.

```yaml
Expand All @@ -39,3 +45,5 @@ data:
api-key: <datadog-api-key>
app-key: <datadog-app-key>
```

`apiVersion` here is different from the `apiVersion` from the Datadog configuration above.
9 changes: 9 additions & 0 deletions docs/features/kustomize/rollout_cr_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,9 @@
},
"datadog": {
"properties": {
"apiVersion": {
"type": "string"
},
"interval": {
"type": "string"
},
Expand Down Expand Up @@ -4529,6 +4532,9 @@
},
"datadog": {
"properties": {
"apiVersion": {
"type": "string"
},
"interval": {
"type": "string"
},
Expand Down Expand Up @@ -8815,6 +8821,9 @@
},
"datadog": {
"properties": {
"apiVersion": {
"type": "string"
},
"interval": {
"type": "string"
},
Expand Down
2 changes: 2 additions & 0 deletions manifests/crds/analysis-run-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ spec:
type: object
datadog:
properties:
apiVersion:
type: string
interval:
type: string
query:
Expand Down
2 changes: 2 additions & 0 deletions manifests/crds/analysis-template-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ spec:
type: object
datadog:
properties:
apiVersion:
type: string
interval:
type: string
query:
Expand Down
2 changes: 2 additions & 0 deletions manifests/crds/cluster-analysis-template-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ spec:
type: object
datadog:
properties:
apiVersion:
type: string
interval:
type: string
query:
Expand Down
6 changes: 6 additions & 0 deletions manifests/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ spec:
type: object
datadog:
properties:
apiVersion:
type: string
interval:
type: string
query:
Expand Down Expand Up @@ -3068,6 +3070,8 @@ spec:
type: object
datadog:
properties:
apiVersion:
type: string
interval:
type: string
query:
Expand Down Expand Up @@ -5843,6 +5847,8 @@ spec:
type: object
datadog:
properties:
apiVersion:
type: string
interval:
type: string
query:
Expand Down
155 changes: 143 additions & 12 deletions metricproviders/datadog/datadog.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package datadog

import (
"bytes"
"context"
"encoding/json"
"errors"
Expand All @@ -9,6 +10,7 @@ import (
"net/http"
"net/url"
"os"
"reflect"
"strconv"
"strings"
"time"
Expand All @@ -33,6 +35,7 @@ const (
DatadogApiKey = "api-key"
DatadogAppKey = "app-key"
DatadogAddress = "address"
DefaultApiVersion = "v1"
)

// Provider contains all the required components to run a Datadog query
Expand All @@ -42,12 +45,33 @@ type Provider struct {
config datadogConfig
}

type datadogResponse struct {
type datadogQueryAttributes struct {
From int64 `json:"from"`
To int64 `json:"to"`
Queries []map[string]string `json:"queries"`
}

type datadogQuery struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Attributes datadogQueryAttributes `json:"attributes"`
QueryType string `json:"type"`
}

type datadogResponseV1 struct {
Series []struct {
Pointlist [][]float64 `json:"pointlist"`
}
}

type datadogResponseV2 struct {
Data struct {
Attributes struct {
Values [][]float64
Times []int64
}
Errors string
}
}

type datadogConfig struct {
Address string `yaml:"address,omitempty"`
ApiKey string `yaml:"api-key,omitempty"`
Expand All @@ -72,16 +96,37 @@ func (p *Provider) Run(run *v1alpha1.AnalysisRun, metric v1alpha1.Metric) v1alph
StartedAt: &startTime,
}

endpoint := "https://api.datadoghq.com/api/v1/query"
endpoint := "https://api.datadoghq.com"
if p.config.Address != "" {
endpoint = p.config.Address + "/api/v1/query"
endpoint = p.config.Address
}

// Check if the URL is valid first before adding the endpoint
url, err := url.Parse(endpoint)
if err != nil {
return metricutil.MarkMeasurementError(measurement, err)
}

apiVersion := DefaultApiVersion
if metric.Provider.Datadog.ApiVersion != "" {
apiVersion = metric.Provider.Datadog.ApiVersion
}

if apiVersion == "v1" {
p.logCtx.Warn("Datadog will soon deprecate their API v1. Please consider switching to v2 soon.")
}

route := "/api/v1/query"
if apiVersion == "v2" {
route = "/api/v2/query/timeseries"
}

// Add endpoint after getting the API version
url, err = url.Parse(endpoint + route)
if err != nil {
return metricutil.MarkMeasurementError(measurement, err)
}

now := unixNow()
var interval int64 = 300
if metric.Provider.Datadog.Interval != "" {
Expand All @@ -93,13 +138,11 @@ func (p *Provider) Run(run *v1alpha1.AnalysisRun, metric v1alpha1.Metric) v1alph
interval = int64(expDuration.Seconds())
}

q := url.Query()
q.Set("query", metric.Provider.Datadog.Query)
q.Set("from", strconv.FormatInt(now-interval, 10))
q.Set("to", strconv.FormatInt(now, 10))
url.RawQuery = q.Encode()
request, err := p.createRequest(metric.Provider.Datadog.Query, apiVersion, now, interval, url)
if err != nil {
return metricutil.MarkMeasurementError(measurement, err)
}

request := &http.Request{Method: "GET"}
request.URL = url
request.Header = make(http.Header)
request.Header.Set("Content-Type", "application/json")
Expand All @@ -116,7 +159,7 @@ func (p *Provider) Run(run *v1alpha1.AnalysisRun, metric v1alpha1.Metric) v1alph
return metricutil.MarkMeasurementError(measurement, err)
}

value, status, err := p.parseResponse(metric, response)
value, status, err := p.parseResponse(metric, response, apiVersion)
if err != nil {
return metricutil.MarkMeasurementError(measurement, err)
}
Expand All @@ -129,7 +172,45 @@ func (p *Provider) Run(run *v1alpha1.AnalysisRun, metric v1alpha1.Metric) v1alph
return measurement
}

func (p *Provider) parseResponse(metric v1alpha1.Metric, response *http.Response) (string, v1alpha1.AnalysisPhase, error) {
func (p *Provider) createRequest(query string, apiVersion string, now int64, interval int64, url *url.URL) (*http.Request, error) {
if apiVersion == "v1" {
q := url.Query()
q.Set("query", query)
q.Set("from", strconv.FormatInt(now-interval, 10))
q.Set("to", strconv.FormatInt(now, 10))
url.RawQuery = q.Encode()

return &http.Request{Method: "GET"}, nil
} else if apiVersion == "v2" {
queryBody, _ := json.Marshal(datadogQuery{
zachaller marked this conversation as resolved.
Show resolved Hide resolved
QueryType: "timeseries_request",
Attributes: datadogQueryAttributes{
From: now - interval,
To: now,
Queries: []map[string]string{{
"data_source": "metrics",
"query": query,
}},
},
})
request := &http.Request{Method: "POST"}
request.Body = io.NopCloser(bytes.NewReader(queryBody))
return request, nil
}

return nil, fmt.Errorf("Invalid API version: %s", apiVersion)
}

func (p *Provider) parseResponse(metric v1alpha1.Metric, response *http.Response, apiVersion string) (string, v1alpha1.AnalysisPhase, error) {
if apiVersion == "v1" {
return p.parseResponseV1(metric, response)
} else if apiVersion == "v2" {
return p.parseResponseV2(metric, response)
}
return "", v1alpha1.AnalysisPhaseError, fmt.Errorf("Invalid API version: %s", apiVersion)
}

func (p *Provider) parseResponseV1(metric v1alpha1.Metric, response *http.Response) (string, v1alpha1.AnalysisPhase, error) {

bodyBytes, err := io.ReadAll(response.Body)

Expand All @@ -143,7 +224,7 @@ func (p *Provider) parseResponse(metric v1alpha1.Metric, response *http.Response
return "", v1alpha1.AnalysisPhaseError, fmt.Errorf("received non 2xx response code: %v %s", response.StatusCode, string(bodyBytes))
}

var res datadogResponse
var res datadogResponseV1
err = json.Unmarshal(bodyBytes, &res)
if err != nil {
return "", v1alpha1.AnalysisPhaseError, fmt.Errorf("Could not parse JSON body: %v", err)
Expand Down Expand Up @@ -173,6 +254,56 @@ func (p *Provider) parseResponse(metric v1alpha1.Metric, response *http.Response
return strconv.FormatFloat(value, 'f', -1, 64), status, err
}

func (p *Provider) parseResponseV2(metric v1alpha1.Metric, response *http.Response) (string, v1alpha1.AnalysisPhase, error) {

bodyBytes, err := io.ReadAll(response.Body)

if err != nil {
return "", v1alpha1.AnalysisPhaseError, fmt.Errorf("Received no bytes in response: %v", err)
}

if response.StatusCode == http.StatusForbidden || response.StatusCode == http.StatusUnauthorized {
return "", v1alpha1.AnalysisPhaseError, fmt.Errorf("received authentication error response code: %v %s", response.StatusCode, string(bodyBytes))
zachaller marked this conversation as resolved.
Show resolved Hide resolved
} else if response.StatusCode != http.StatusOK {
return "", v1alpha1.AnalysisPhaseError, fmt.Errorf("received non 2xx response code: %v %s", response.StatusCode, string(bodyBytes))
}

var res datadogResponseV2
err = json.Unmarshal(bodyBytes, &res)
if err != nil {
return "", v1alpha1.AnalysisPhaseError, fmt.Errorf("Could not parse JSON body: %v", err)
}

// Handle an error returned by Datadog
if res.Data.Errors != "" {
return "", v1alpha1.AnalysisPhaseError, fmt.Errorf("There were errors in your query: %v", res.Data.Errors)
}

// Handle an empty query result
if reflect.ValueOf(res.Data.Attributes).IsZero() || len(res.Data.Attributes.Values) == 0 || len(res.Data.Attributes.Times) == 0 {
var nilFloat64 *float64
status, err := evaluate.EvaluateResult(nilFloat64, metric, p.logCtx)
attributesBytes, jsonErr := json.Marshal(res.Data.Attributes)
if jsonErr != nil {
return "", v1alpha1.AnalysisPhaseError, fmt.Errorf("Failed to marshall JSON empty series: %v", jsonErr)
}

return string(attributesBytes), status, err
}

// Handle a populated query result
attributes := res.Data.Attributes
datapoint := attributes.Values[0]
timepoint := attributes.Times[len(attributes.Times)-1]
if timepoint == 0 {
return "", v1alpha1.AnalysisPhaseError, fmt.Errorf("Datapoint does not have a corresponding time value")
}

value := datapoint[len(datapoint)-1]
status, err := evaluate.EvaluateResult(value, metric, p.logCtx)
return strconv.FormatFloat(value, 'f', -1, 64), status, err
}

// Resume should not be used the Datadog provider since all the work should occur in the Run method
func (p *Provider) Resume(run *v1alpha1.AnalysisRun, metric v1alpha1.Metric, measurement v1alpha1.Measurement) v1alpha1.Measurement {
p.logCtx.Warn("Datadog provider should not execute the Resume method")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func TestRunSuite(t *testing.T) {
},
expectedIntervalSeconds: 300,
expectedPhase: v1alpha1.AnalysisPhaseError,
expectedErrorMessage: "Could not parse JSON body: json: cannot unmarshal string into Go struct field datadogResponse.Series of type []struct { Pointlist [][]float64 \"json:\\\"pointlist\\\"\" }",
expectedErrorMessage: "Could not parse JSON body: json: cannot unmarshal string into Go struct field datadogResponseV1.Series of type []struct { Pointlist [][]float64 \"json:\\\"pointlist\\\"\" }",
useEnvVarForKeys: false,
},

Expand All @@ -222,7 +222,7 @@ func TestRunSuite(t *testing.T) {
serverURL: "://wrong.schema",
metric: v1alpha1.Metric{},
expectedPhase: v1alpha1.AnalysisPhaseError,
expectedErrorMessage: "parse \"://wrong.schema/api/v1/query\": missing protocol scheme",
expectedErrorMessage: "parse \"://wrong.schema\": missing protocol scheme",
useEnvVarForKeys: false,
},
}
Expand All @@ -235,6 +235,9 @@ func TestRunSuite(t *testing.T) {
if serverURL == "" {
// Server setup with response
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
if test.metric.Provider.Datadog.ApiVersion == "" && DefaultApiVersion != "v1" {
t.Errorf("\nApiVersion was left blank in the tests, but the default API version is not v1 anymore.")
}

//Check query variables
actualQuery := req.URL.Query().Get("query")
Expand Down
Loading