Skip to content

Commit

Permalink
CloudWatch Logs: Queries in an expression should run synchronously (#…
Browse files Browse the repository at this point in the history
…64443)
  • Loading branch information
iwysiu authored Mar 13, 2023
1 parent fc8a753 commit 74436d3
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 9 deletions.
3 changes: 3 additions & 0 deletions pkg/expr/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ var (
logger = log.New("expr")
)

const FromExpressionHeaderName = "FromExpression"

type QueryError struct {
RefID string
Err error
Expand Down Expand Up @@ -227,6 +229,7 @@ func (dn *DSNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s
},
Headers: dn.request.Headers,
}
req.Headers[FromExpressionHeaderName] = "true"

responseType := "unknown"
defer func() {
Expand Down
12 changes: 7 additions & 5 deletions pkg/tsdb/cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
"github.com/grafana/grafana-plugin-sdk-go/backend/resource/httpadapter"
"github.com/grafana/grafana/pkg/expr"
"github.com/grafana/grafana/pkg/infra/httpclient"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/featuremgmt"
Expand Down Expand Up @@ -155,19 +156,20 @@ func (e *cloudWatchExecutor) QueryData(ctx context.Context, req *backend.QueryDa
to the query, but rather an ID is first returned. Following this, a client is expected to send requests along
with the ID until the status of the query is complete, receiving (possibly partial) results each time. For
queries made via dashboards and Explore, the logic of making these repeated queries is handled on the
frontend, but because alerts are executed on the backend the logic needs to be reimplemented here.
frontend, but because alerts and expressions are executed on the backend the logic needs to be reimplemented here.
*/
q := req.Queries[0]
var model DataQueryJson
err := json.Unmarshal(q.JSON, &model)
if err != nil {
return nil, err
}
_, fromAlert := req.Headers[ngalertmodels.FromAlertHeaderName]
isLogAlertQuery := fromAlert && model.QueryMode == logsQueryMode

if isLogAlertQuery {
return e.executeLogAlertQuery(ctx, req)
_, fromAlert := req.Headers[ngalertmodels.FromAlertHeaderName]
_, fromExpression := req.Headers[expr.FromExpressionHeaderName]
isSyncLogQuery := (fromAlert || fromExpression) && model.QueryMode == logsQueryMode
if isSyncLogQuery {
return executeSyncLogQuery(ctx, e, req)
}

var result *backend.QueryDataResponse
Expand Down
67 changes: 66 additions & 1 deletion pkg/tsdb/cloudwatch/cloudwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
"github.com/grafana/grafana/pkg/expr"
"github.com/grafana/grafana/pkg/infra/httpclient"
"github.com/grafana/grafana/pkg/services/featuremgmt"
ngalertmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
Expand Down Expand Up @@ -192,7 +193,7 @@ func Test_CheckHealth(t *testing.T) {
}, resp)
})
}
func Test_executeLogAlertQuery(t *testing.T) {
func Test_executeSyncLogQuery(t *testing.T) {
origNewCWClient := NewCWClient
t.Cleanup(func() {
NewCWClient = origNewCWClient
Expand Down Expand Up @@ -254,6 +255,70 @@ func Test_executeLogAlertQuery(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, []string{"instance manager's region"}, sess.calledRegions)
})

t.Run("with header", func(t *testing.T) {
testcases := []struct {
name string
headers map[string]string
called bool
}{
{
"alert header",
map[string]string{ngalertmodels.FromAlertHeaderName: "some value"},
true,
},
{
"expression header",
map[string]string{expr.FromExpressionHeaderName: "some value"},
true,
},
{
"no header",
map[string]string{},
false,
},
}
origExecuteSyncLogQuery := executeSyncLogQuery
var syncCalled bool
executeSyncLogQuery = func(ctx context.Context, e *cloudWatchExecutor, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
syncCalled = true
return nil, nil
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
syncCalled = false
cli = fakeCWLogsClient{queryResults: cloudwatchlogs.GetQueryResultsOutput{Status: aws.String("Complete")}}
im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
return DataSource{Settings: models.CloudWatchSettings{AWSDatasourceSettings: awsds.AWSDatasourceSettings{Region: "instance manager's region"}}}, nil
})
sess := fakeSessionCache{}

executor := newExecutor(im, newTestConfig(), &sess, featuremgmt.WithFeatures())
_, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{
Headers: tc.headers,
PluginContext: backend.PluginContext{DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{}},
Queries: []backend.DataQuery{
{
TimeRange: backend.TimeRange{From: time.Unix(0, 0), To: time.Unix(1, 0)},
JSON: json.RawMessage(`{
"queryMode": "Logs",
"type": "logAction",
"subtype": "StartQuery",
"region": "default",
"queryString": "fields @message"
}`),
},
},
})

assert.NoError(t, err)
assert.Equal(t, tc.called, syncCalled)
})
}

executeSyncLogQuery = origExecuteSyncLogQuery
})
}

func TestQuery_ResourceRequest_DescribeLogGroups_with_CrossAccountQuerying(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const (
alertPollPeriod = time.Second
)

func (e *cloudWatchExecutor) executeLogAlertQuery(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
var executeSyncLogQuery = func(ctx context.Context, e *cloudWatchExecutor, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
resp := backend.NewQueryDataResponse()

for _, q := range req.Queries {
Expand All @@ -45,7 +45,7 @@ func (e *cloudWatchExecutor) executeLogAlertQuery(ctx context.Context, req *back
return nil, err
}

getQueryResultsOutput, err := e.alertQuery(ctx, logsClient, q, logsQuery)
getQueryResultsOutput, err := e.syncQuery(ctx, logsClient, q, logsQuery)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -73,7 +73,7 @@ func (e *cloudWatchExecutor) executeLogAlertQuery(ctx context.Context, req *back
return resp, nil
}

func (e *cloudWatchExecutor) alertQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI,
func (e *cloudWatchExecutor) syncQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI,
queryContext backend.DataQuery, logsQuery models.LogsQuery) (*cloudwatchlogs.GetQueryResultsOutput, error) {
startQueryOutput, err := e.executeStartQuery(ctx, logsClient, logsQuery, queryContext.TimeRange)
if err != nil {
Expand Down

0 comments on commit 74436d3

Please sign in to comment.