Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metrics-operator): update dql provider to include range #1919

Merged
merged 14 commits into from
Aug 24, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

//go:generate moq -pkg fake --skip-ensure -out ./fake/dt_client_mock.go . DTAPIClient
type DTAPIClient interface {
Do(ctx context.Context, path, method string, payload []byte) ([]byte, error)
Do(ctx context.Context, path, method string, payload []byte) ([]byte, int, error)
}

type apiClient struct {
Expand Down Expand Up @@ -56,21 +56,21 @@ func NewAPIClient(config apiConfig, options ...APIClientOption) *apiClient {
}

// Do sends and API request to the Dynatrace API and returns its result as a string containing the raw response payload
func (client *apiClient) Do(ctx context.Context, path, method string, payload []byte) ([]byte, error) {
func (client *apiClient) Do(ctx context.Context, path, method string, payload []byte) ([]byte, int, error) {
if err := client.auth(ctx); err != nil {
return nil, err
return nil, http.StatusInternalServerError, err
rakshitgondwal marked this conversation as resolved.
Show resolved Hide resolved
}
api := fmt.Sprintf("%s%s", client.config.serverURL, path)
req, err := http.NewRequestWithContext(ctx, method, api, bytes.NewBuffer(payload))
if err != nil {
return nil, err
return nil, http.StatusInternalServerError, err
}
req.Header.Add("Content-Type", "application/json")
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", client.config.oAuthCredentials.accessToken))

res, err := client.httpClient.Do(req)
if err != nil {
return nil, err
return nil, http.StatusInternalServerError, err
}
defer func() {
err := res.Body.Close()
Expand All @@ -79,13 +79,14 @@ func (client *apiClient) Do(ctx context.Context, path, method string, payload []
}
}()
if isErrorStatus(res.StatusCode) {
return nil, ErrRequestFailed
return nil, res.StatusCode, ErrRequestFailed
}
b, err := io.ReadAll(res.Body)
if err != nil {
return nil, err
return nil, http.StatusInternalServerError, err
}
return b, nil

return b, res.StatusCode, nil
}

func (client *apiClient) auth(ctx context.Context) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,11 @@ func TestAPIClient(t *testing.T) {

require.NotNil(t, apiClient)

resp, err := apiClient.Do(context.TODO(), "/query", http.MethodPost, nil)
resp, code, err := apiClient.Do(context.TODO(), "/query", http.MethodPost, nil)

require.Nil(t, err)
require.Equal(t, "success", string(resp))
require.Equal(t, 200, code)

require.Equal(t, "my-token", apiClient.config.oAuthCredentials.accessToken)
}
Expand Down Expand Up @@ -86,10 +87,11 @@ func TestAPIClientAuthError(t *testing.T) {

require.NotNil(t, apiClient)

resp, err := apiClient.Do(context.TODO(), "/query", http.MethodPost, nil)
resp, code, err := apiClient.Do(context.TODO(), "/query", http.MethodPost, nil)

require.ErrorIs(t, err, ErrRequestFailed)
require.Empty(t, resp)
require.Equal(t, http.StatusInternalServerError, code)
}

func TestAPIClientAuthNoToken(t *testing.T) {
Expand Down Expand Up @@ -123,10 +125,11 @@ func TestAPIClientAuthNoToken(t *testing.T) {

require.NotNil(t, apiClient)

resp, err := apiClient.Do(context.TODO(), "/query", http.MethodPost, nil)
resp, code, err := apiClient.Do(context.TODO(), "/query", http.MethodPost, nil)

require.ErrorIs(t, err, ErrAuthenticationFailed)
require.Empty(t, resp)
require.Equal(t, http.StatusInternalServerError, code)
}

func TestAPIClientRequestError(t *testing.T) {
Expand Down Expand Up @@ -160,11 +163,12 @@ func TestAPIClientRequestError(t *testing.T) {

require.NotNil(t, apiClient)

resp, err := apiClient.Do(context.TODO(), "/query", http.MethodPost, nil)
resp, code, err := apiClient.Do(context.TODO(), "/query", http.MethodPost, nil)

// authentication should have worked
require.Equal(t, "my-token", apiClient.config.oAuthCredentials.accessToken)

require.ErrorIs(t, err, ErrRequestFailed)
require.Empty(t, resp)
require.Equal(t, http.StatusInternalServerError, code)
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const maxRetries = 5
const retryFetchInterval = 10 * time.Second

const dqlQuerySucceeded = "SUCCEEDED"
const defaultPath = "/platform/storage/query/v1/query:"

type keptnDynatraceDQLProvider struct {
log logr.Logger
Expand Down Expand Up @@ -56,6 +57,16 @@ type DQLMetric struct {
Max float64 `json:"max"`
}

type DQLRequest struct {
Query string `json:"query"`
DefaultTimeframeStart string `json:"defaultTimeframeStart"`
DefaultTimeframeEnd string `json:"defaultTimeframeEnd"`
Timezone string `json:"timezone"`
Locale string `json:"locale"`
FetchTimeoutSeconds int `json:"fetchTimeoutSeconds"`
RequestTimeoutMilliseconds int `json:"requestTimeoutMilliseconds"`
}

type KeptnDynatraceDQLProviderOption func(provider *keptnDynatraceDQLProvider)

func WithDTAPIClient(dtApiClient dtclient.DTAPIClient) KeptnDynatraceDQLProviderOption {
Expand Down Expand Up @@ -85,39 +96,91 @@ func NewKeptnDynatraceDQLProvider(k8sClient client.Client, opts ...KeptnDynatrac
return provider
}

// EvaluateQuery fetches the SLI values from dynatrace provider
func (d *keptnDynatraceDQLProvider) EvaluateQuery(ctx context.Context, metric metricsapi.KeptnMetric, provider metricsapi.KeptnMetricsProvider) (string, []byte, error) {
if err := d.ensureDTClientIsSetUp(ctx, provider); err != nil {
return "", nil, err
}
// submit DQL
dqlHandler, err := d.postDQL(ctx, metric.Spec.Query)

b, status, err := d.postDQL(ctx, metric)
if err != nil {
d.log.Error(err, "Error while posting the DQL query", "query", metric.Spec.Query)
return "", nil, err
}
// attend result
results, err := d.getDQL(ctx, *dqlHandler)

results, err := d.parseDQLResults(b, status)
if err != nil {
d.log.Error(err, "Error while waiting for DQL query", "query", dqlHandler)
return "", nil, err
}

// parse result
if len(results.Records) > 1 {
d.log.Info("More than a single result, the first one will be used")
}
if len(results.Records) == 0 {
return "", nil, ErrInvalidResult
}

r := fmt.Sprintf("%f", results.Records[0].Value.Avg)
b, err := json.Marshal(results)
b, err = json.Marshal(results)
if err != nil {
d.log.Error(err, "Error marshaling DQL results")
}

return r, b, nil
}

func (d *keptnDynatraceDQLProvider) EvaluateQueryForStep(ctx context.Context, metric metricsapi.KeptnMetric, provider metricsapi.KeptnMetricsProvider) ([]string, []byte, error) {
if err := d.ensureDTClientIsSetUp(ctx, provider); err != nil {
return nil, nil, err
}

b, status, err := d.postDQL(ctx, metric)
if err != nil {
d.log.Error(err, "Error while posting the DQL query", "query", metric.Spec.Query)
return nil, nil, err
}

results, err := d.parseDQLResults(b, status)
if err != nil {
return nil, nil, err
}
rakshitgondwal marked this conversation as resolved.
Show resolved Hide resolved

r := d.getResultSlice(results)
b, err = json.Marshal(results)
if err != nil {
d.log.Error(err, "Error marshaling DQL results")
}

return r, b, nil
}

func (d *keptnDynatraceDQLProvider) parseDQLResults(b []byte, status int) (*DQLResult, error) {
results := &DQLResult{}
if status == 200 {
r := &DynatraceDQLResult{}
err := json.Unmarshal(b, &r)
if err != nil {
return nil, fmt.Errorf("could not unmarshal response %s: %w", string(b), err)
}
if r.State == dqlQuerySucceeded {
results = &r.Result
}
} else {
dqlHandler := &DynatraceDQLHandler{}
err := json.Unmarshal(b, &dqlHandler)
if err != nil {
return nil, fmt.Errorf("could not unmarshal response %s: %w", string(b), err)
}
results, err = d.getDQL(context.Background(), *dqlHandler)
if err != nil {
d.log.Error(err, "Error while waiting for DQL query", "query", dqlHandler)
return nil, err
}
}

if len(results.Records) == 0 {
return nil, ErrInvalidResult
}

return results, nil
}

func (d *keptnDynatraceDQLProvider) ensureDTClientIsSetUp(ctx context.Context, provider metricsapi.KeptnMetricsProvider) error {
// try to initialize the DT API Client if it has not been set in the options
if d.dtClient == nil {
Expand All @@ -137,27 +200,42 @@ func (d *keptnDynatraceDQLProvider) ensureDTClientIsSetUp(ctx context.Context, p
return nil
}

func (d *keptnDynatraceDQLProvider) postDQL(ctx context.Context, query string) (*DynatraceDQLHandler, error) {
func (d *keptnDynatraceDQLProvider) postDQL(ctx context.Context, metric metricsapi.KeptnMetric) ([]byte, int, error) {
d.log.V(10).Info("posting DQL")
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

values := url.Values{}
values.Add("query", query)
path := defaultPath + "execute"

payload := DQLRequest{
Query: metric.Spec.Query,
DefaultTimeframeStart: "",
DefaultTimeframeEnd: "",
Timezone: "UTC",
Locale: "en_US",
FetchTimeoutSeconds: 60,
RequestTimeoutMilliseconds: 1000,
}

path := fmt.Sprintf("/platform/storage/query/v0.7/query:execute?%s", values.Encode())
if metric.Spec.Range != nil {
intervalDuration, err := time.ParseDuration(metric.Spec.Range.Interval)
if err != nil {
return nil, 0, err
}
payload.DefaultTimeframeStart = time.Now().Add(-intervalDuration).Format(time.RFC3339)
payload.DefaultTimeframeEnd = time.Now().Format(time.RFC3339)
}

b, err := d.dtClient.Do(ctx, path, http.MethodPost, []byte(`{}`))
payloadBytes, err := json.Marshal(payload)
if err != nil {
return nil, err
return nil, 0, err
}

dqlHandler := &DynatraceDQLHandler{}
err = json.Unmarshal(b, &dqlHandler)
b, status, err := d.dtClient.Do(ctx, path, http.MethodPost, payloadBytes)
if err != nil {
return nil, fmt.Errorf("could not unmarshal response %s: %w", string(b), err)
return nil, 0, err
}
return dqlHandler, nil
return b, status, nil
}

func (d *keptnDynatraceDQLProvider) getDQL(ctx context.Context, handler DynatraceDQLHandler) (*DQLResult, error) {
Expand Down Expand Up @@ -185,9 +263,9 @@ func (d *keptnDynatraceDQLProvider) retrieveDQLResults(ctx context.Context, hand
values := url.Values{}
values.Add("request-token", handler.RequestToken)

path := fmt.Sprintf("/platform/storage/query/v0.7/query:poll?%s", values.Encode())
path := defaultPath + fmt.Sprintf("poll?%s", values.Encode())

b, err := d.dtClient.Do(ctx, path, http.MethodGet, nil)
b, _, err := d.dtClient.Do(ctx, path, http.MethodGet, nil)
if err != nil {
return nil, err
}
Expand All @@ -206,3 +284,15 @@ func (d *keptnDynatraceDQLProvider) retrieveDQLResults(ctx context.Context, hand
}
return result, nil
}

func (d *keptnDynatraceDQLProvider) getResultSlice(result *DQLResult) []string {
if len(result.Records) == 0 {
return nil
}
// Initialize resultSlice with the correct length
resultSlice := make([]string, len(result.Records))
for index, r := range result.Records {
resultSlice[index] = fmt.Sprintf("%f", r.Value.Max)
}
return resultSlice
}
Loading