Skip to content

Commit

Permalink
feat(metrics-operator): update dql provider to include range (#1919)
Browse files Browse the repository at this point in the history
Signed-off-by: Rakshit Gondwal <[email protected]>
  • Loading branch information
rakshitgondwal authored Aug 24, 2023
1 parent a3f366d commit 39db23e
Show file tree
Hide file tree
Showing 5 changed files with 760 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

//go:generate moq -pkg fake --skip-ensure -out ./fake/dt_client_mock.go . DTAPIClient
type DTAPIClient interface {
Do(ctx context.Context, path, method string, payload []byte) ([]byte, error)
Do(ctx context.Context, path, method string, payload []byte) ([]byte, int, error)
}

type apiClient struct {
Expand Down Expand Up @@ -56,21 +56,21 @@ func NewAPIClient(config apiConfig, options ...APIClientOption) *apiClient {
}

// Do sends and API request to the Dynatrace API and returns its result as a string containing the raw response payload
func (client *apiClient) Do(ctx context.Context, path, method string, payload []byte) ([]byte, error) {
func (client *apiClient) Do(ctx context.Context, path, method string, payload []byte) ([]byte, int, error) {
if err := client.auth(ctx); err != nil {
return nil, err
return nil, http.StatusUnauthorized, err
}
api := fmt.Sprintf("%s%s", client.config.serverURL, path)
req, err := http.NewRequestWithContext(ctx, method, api, bytes.NewBuffer(payload))
if err != nil {
return nil, err
return nil, http.StatusInternalServerError, err
}
req.Header.Add("Content-Type", "application/json")
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", client.config.oAuthCredentials.accessToken))

res, err := client.httpClient.Do(req)
if err != nil {
return nil, err
return nil, http.StatusInternalServerError, err
}
defer func() {
err := res.Body.Close()
Expand All @@ -79,13 +79,14 @@ func (client *apiClient) Do(ctx context.Context, path, method string, payload []
}
}()
if isErrorStatus(res.StatusCode) {
return nil, ErrRequestFailed
return nil, res.StatusCode, ErrRequestFailed
}
b, err := io.ReadAll(res.Body)
if err != nil {
return nil, err
return nil, http.StatusInternalServerError, err
}
return b, nil

return b, res.StatusCode, nil
}

func (client *apiClient) auth(ctx context.Context) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,11 @@ func TestAPIClient(t *testing.T) {

require.NotNil(t, apiClient)

resp, err := apiClient.Do(context.TODO(), "/query", http.MethodPost, nil)
resp, code, err := apiClient.Do(context.TODO(), "/query", http.MethodPost, nil)

require.Nil(t, err)
require.Equal(t, "success", string(resp))
require.Equal(t, 200, code)

require.Equal(t, "my-token", apiClient.config.oAuthCredentials.accessToken)
}
Expand Down Expand Up @@ -86,10 +87,11 @@ func TestAPIClientAuthError(t *testing.T) {

require.NotNil(t, apiClient)

resp, err := apiClient.Do(context.TODO(), "/query", http.MethodPost, nil)
resp, code, err := apiClient.Do(context.TODO(), "/query", http.MethodPost, nil)

require.ErrorIs(t, err, ErrRequestFailed)
require.Empty(t, resp)
require.Equal(t, http.StatusUnauthorized, code)
}

func TestAPIClientAuthNoToken(t *testing.T) {
Expand Down Expand Up @@ -123,10 +125,11 @@ func TestAPIClientAuthNoToken(t *testing.T) {

require.NotNil(t, apiClient)

resp, err := apiClient.Do(context.TODO(), "/query", http.MethodPost, nil)
resp, code, err := apiClient.Do(context.TODO(), "/query", http.MethodPost, nil)

require.ErrorIs(t, err, ErrAuthenticationFailed)
require.Empty(t, resp)
require.Equal(t, http.StatusUnauthorized, code)
}

func TestAPIClientRequestError(t *testing.T) {
Expand Down Expand Up @@ -160,11 +163,12 @@ func TestAPIClientRequestError(t *testing.T) {

require.NotNil(t, apiClient)

resp, err := apiClient.Do(context.TODO(), "/query", http.MethodPost, nil)
resp, code, err := apiClient.Do(context.TODO(), "/query", http.MethodPost, nil)

// authentication should have worked
require.Equal(t, "my-token", apiClient.config.oAuthCredentials.accessToken)

require.ErrorIs(t, err, ErrRequestFailed)
require.Empty(t, resp)
require.Equal(t, http.StatusInternalServerError, code)
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

146 changes: 117 additions & 29 deletions metrics-operator/controllers/common/providers/dynatrace/dynatrace_dql.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const maxRetries = 5
const retryFetchInterval = 10 * time.Second

const dqlQuerySucceeded = "SUCCEEDED"
const defaultPath = "/platform/storage/query/v1/query:"

type keptnDynatraceDQLProvider struct {
log logr.Logger
Expand Down Expand Up @@ -56,6 +57,16 @@ type DQLMetric struct {
Max float64 `json:"max"`
}

type DQLRequest struct {
Query string `json:"query"`
DefaultTimeframeStart string `json:"defaultTimeframeStart"`
DefaultTimeframeEnd string `json:"defaultTimeframeEnd"`
Timezone string `json:"timezone"`
Locale string `json:"locale"`
FetchTimeoutSeconds int `json:"fetchTimeoutSeconds"`
RequestTimeoutMilliseconds int `json:"requestTimeoutMilliseconds"`
}

type KeptnDynatraceDQLProviderOption func(provider *keptnDynatraceDQLProvider)

func WithDTAPIClient(dtApiClient dtclient.DTAPIClient) KeptnDynatraceDQLProviderOption {
Expand Down Expand Up @@ -85,39 +96,89 @@ func NewKeptnDynatraceDQLProvider(k8sClient client.Client, opts ...KeptnDynatrac
return provider
}

// EvaluateQuery fetches the SLI values from dynatrace provider
func (d *keptnDynatraceDQLProvider) EvaluateQuery(ctx context.Context, metric metricsapi.KeptnMetric, provider metricsapi.KeptnMetricsProvider) (string, []byte, error) {
if err := d.ensureDTClientIsSetUp(ctx, provider); err != nil {
return "", nil, err
}
// submit DQL
dqlHandler, err := d.postDQL(ctx, metric.Spec.Query)
if err != nil {
d.log.Error(err, "Error while posting the DQL query", "query", metric.Spec.Query)
return "", nil, err
}
// attend result
results, err := d.getDQL(ctx, *dqlHandler)
results, err := d.getResults(ctx, metric, provider)
if err != nil {
d.log.Error(err, "Error while waiting for DQL query", "query", dqlHandler)
return "", nil, err
}

// parse result
if len(results.Records) > 1 {
d.log.Info("More than a single result, the first one will be used")
}
if len(results.Records) == 0 {
return "", nil, ErrInvalidResult
}

r := fmt.Sprintf("%f", results.Records[0].Value.Avg)
b, err := json.Marshal(results)
if err != nil {
d.log.Error(err, "Error marshaling DQL results")
}

return r, b, nil
}

func (d *keptnDynatraceDQLProvider) EvaluateQueryForStep(ctx context.Context, metric metricsapi.KeptnMetric, provider metricsapi.KeptnMetricsProvider) ([]string, []byte, error) {
results, err := d.getResults(ctx, metric, provider)
if err != nil {
return nil, nil, err
}

r := d.getResultSlice(results)
b, err := json.Marshal(results)
if err != nil {
d.log.Error(err, "Error marshaling DQL results")
}

return r, b, nil
}

func (d *keptnDynatraceDQLProvider) getResults(ctx context.Context, metric metricsapi.KeptnMetric, provider metricsapi.KeptnMetricsProvider) (*DQLResult, error) {
if err := d.ensureDTClientIsSetUp(ctx, provider); err != nil {
return nil, err
}

b, status, err := d.postDQL(ctx, metric)
if err != nil {
d.log.Error(err, "Error while posting the DQL query", "query", metric.Spec.Query)
return nil, err
}

results, err := d.parseDQLResults(b, status)
if err != nil {
return nil, err
}
return results, nil
}

func (d *keptnDynatraceDQLProvider) parseDQLResults(b []byte, status int) (*DQLResult, error) {
results := &DQLResult{}
if status == 200 {
r := &DynatraceDQLResult{}
err := json.Unmarshal(b, &r)
if err != nil {
return nil, fmt.Errorf("could not unmarshal response %s: %w", string(b), err)
}
if r.State == dqlQuerySucceeded {
results = &r.Result
}
} else {
dqlHandler := &DynatraceDQLHandler{}
err := json.Unmarshal(b, &dqlHandler)
if err != nil {
return nil, fmt.Errorf("could not unmarshal response %s: %w", string(b), err)
}
results, err = d.getDQL(context.Background(), *dqlHandler)
if err != nil {
d.log.Error(err, "Error while waiting for DQL query", "query", dqlHandler)
return nil, err
}
}

if len(results.Records) == 0 {
return nil, ErrInvalidResult
}

return results, nil
}

func (d *keptnDynatraceDQLProvider) ensureDTClientIsSetUp(ctx context.Context, provider metricsapi.KeptnMetricsProvider) error {
// try to initialize the DT API Client if it has not been set in the options
if d.dtClient == nil {
Expand All @@ -137,27 +198,42 @@ func (d *keptnDynatraceDQLProvider) ensureDTClientIsSetUp(ctx context.Context, p
return nil
}

func (d *keptnDynatraceDQLProvider) postDQL(ctx context.Context, query string) (*DynatraceDQLHandler, error) {
func (d *keptnDynatraceDQLProvider) postDQL(ctx context.Context, metric metricsapi.KeptnMetric) ([]byte, int, error) {
d.log.V(10).Info("posting DQL")
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

values := url.Values{}
values.Add("query", query)
path := defaultPath + "execute"

payload := DQLRequest{
Query: metric.Spec.Query,
DefaultTimeframeStart: "",
DefaultTimeframeEnd: "",
Timezone: "UTC",
Locale: "en_US",
FetchTimeoutSeconds: 60,
RequestTimeoutMilliseconds: 1000,
}

path := fmt.Sprintf("/platform/storage/query/v0.7/query:execute?%s", values.Encode())
if metric.Spec.Range != nil {
intervalDuration, err := time.ParseDuration(metric.Spec.Range.Interval)
if err != nil {
return nil, 0, err
}
payload.DefaultTimeframeStart = time.Now().Add(-intervalDuration).Format(time.RFC3339)
payload.DefaultTimeframeEnd = time.Now().Format(time.RFC3339)
}

b, err := d.dtClient.Do(ctx, path, http.MethodPost, []byte(`{}`))
payloadBytes, err := json.Marshal(payload)
if err != nil {
return nil, err
return nil, 0, err
}

dqlHandler := &DynatraceDQLHandler{}
err = json.Unmarshal(b, &dqlHandler)
b, status, err := d.dtClient.Do(ctx, path, http.MethodPost, payloadBytes)
if err != nil {
return nil, fmt.Errorf("could not unmarshal response %s: %w", string(b), err)
return nil, 0, err
}
return dqlHandler, nil
return b, status, nil
}

func (d *keptnDynatraceDQLProvider) getDQL(ctx context.Context, handler DynatraceDQLHandler) (*DQLResult, error) {
Expand Down Expand Up @@ -185,9 +261,9 @@ func (d *keptnDynatraceDQLProvider) retrieveDQLResults(ctx context.Context, hand
values := url.Values{}
values.Add("request-token", handler.RequestToken)

path := fmt.Sprintf("/platform/storage/query/v0.7/query:poll?%s", values.Encode())
path := defaultPath + fmt.Sprintf("poll?%s", values.Encode())

b, err := d.dtClient.Do(ctx, path, http.MethodGet, nil)
b, _, err := d.dtClient.Do(ctx, path, http.MethodGet, nil)
if err != nil {
return nil, err
}
Expand All @@ -206,3 +282,15 @@ func (d *keptnDynatraceDQLProvider) retrieveDQLResults(ctx context.Context, hand
}
return result, nil
}

func (d *keptnDynatraceDQLProvider) getResultSlice(result *DQLResult) []string {
if len(result.Records) == 0 {
return nil
}
// Initialize resultSlice with the correct length
resultSlice := make([]string, len(result.Records))
for index, r := range result.Records {
resultSlice[index] = fmt.Sprintf("%f", r.Value.Max)
}
return resultSlice
}
Loading

0 comments on commit 39db23e

Please sign in to comment.