Skip to content

Commit

Permalink
🧹 Make resolved policy execution use reporting queries
Browse files Browse the repository at this point in the history
The execution was previously implicitly tying together reporting queries
and reporting jobs. This now uses the ReportingQueries section of the
CollectorJob to do this.

Also, part of this change, I've introduced an additional reporting job
types. One represents the executed query(reporting job by code id). The
other represents a thing that is both a check and a query. This was
helpful in fixing an issue where we don't properly forward a score and
intead rescore it. This would turn skipped scores into unscored.

This last point is one of those non-backwards compatible changes. Once
the new resolved policy is used everywhere, this bug will be seen in old
clients. I don't see a way to fix it in a backwards compatible way, and
its a small enough issue that I think its fine.
  • Loading branch information
jaym committed Nov 27, 2024
1 parent 42a2719 commit 7ccf232
Show file tree
Hide file tree
Showing 9 changed files with 142 additions and 90 deletions.
60 changes: 33 additions & 27 deletions policy/cnspec_policy.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 4 additions & 6 deletions policy/cnspec_policy.proto
Original file line number Diff line number Diff line change
Expand Up @@ -556,16 +556,14 @@ message ReportingJob {

enum Type {
UNSPECIFIED = 0;
CHECK = 1;
DATA_QUERY = 2;
CONTROL = 3;
POLICY = 4;
FRAMEWORK = 5;
RISK_FACTOR = 6;

// DO NOT USE CHECK OR DATA_QUERY, THEY ARE DEPRECATED
// Here's the reason why:
// A query can be either or both. We cannot pick one in all cases
CHECK = 1;
DATA_QUERY = 2;
CHECK_AND_DATA_QUERY = 7;
EXECUTION_QUERY = 8;
}

string checksum = 1;
Expand Down
16 changes: 13 additions & 3 deletions policy/executor/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func ExecuteFilterQueries(runtime llx.Runtime, queries []*explorer.Mquery, timeo
log.Debug().Err(err).Str("mql", m.Mql).Msg("skipping filter query, not supported")
continue
}
builder.AddQuery(codeBundle, nil, nil)
builder.AddQuery(codeBundle, nil, nil, nil)

builder.CollectScore(codeBundle.CodeV2.Id)
queryMap[codeBundle.CodeV2.Id] = m
Expand Down Expand Up @@ -117,7 +117,7 @@ func ExecuteFilterQueries(runtime llx.Runtime, queries []*explorer.Mquery, timeo
func ExecuteQuery(runtime llx.Runtime, codeBundle *llx.CodeBundle, props map[string]*llx.Primitive, features cnquery.Features) (*policy.Score, map[string]*llx.RawResult, error) {
builder := internal.NewBuilder()

builder.AddQuery(codeBundle, nil, props)
builder.AddQuery(codeBundle, nil, props, nil)
for _, checksum := range internal.CodepointChecksums(codeBundle) {
builder.CollectDatapoint(checksum)
}
Expand Down Expand Up @@ -160,8 +160,18 @@ func ExecuteQuery(runtime llx.Runtime, codeBundle *llx.CodeBundle, props map[str
func builderFromResolvedPolicy(resolvedPolicy *policy.ResolvedPolicy) *internal.GraphBuilder {
b := internal.NewBuilder()

rqs := resolvedPolicy.CollectorJob.ReportingQueries
if rqs == nil {
rqs = map[string]*policy.StringArray{}
}
for _, eq := range resolvedPolicy.ExecutionJob.Queries {
b.AddQuery(eq.Code, eq.Properties, nil)
var notifies []string
if sa := rqs[eq.Code.GetCodeV2().GetId()]; sa != nil {
if len(sa.Items) > 0 {
notifies = sa.Items
}
}
b.AddQuery(eq.Code, eq.Properties, nil, notifies)
}

for _, rj := range resolvedPolicy.CollectorJob.ReportingJobs {
Expand Down
67 changes: 39 additions & 28 deletions policy/executor/internal/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type query struct {
codeBundle *llx.CodeBundle
requiredProps map[string]string
resolvedProperties map[string]*llx.Primitive
notifies []string
}

type GraphBuilder struct {
Expand Down Expand Up @@ -75,11 +76,12 @@ func NewBuilder() *GraphBuilder {
}

// AddQuery adds the provided code to be executed to the graph
func (b *GraphBuilder) AddQuery(c *llx.CodeBundle, propertyChecksums map[string]string, resolvedProperties map[string]*llx.Primitive) {
func (b *GraphBuilder) AddQuery(c *llx.CodeBundle, propertyChecksums map[string]string, resolvedProperties map[string]*llx.Primitive, notifies []string) {
b.queries = append(b.queries, query{
codeBundle: c,
requiredProps: propertyChecksums,
resolvedProperties: resolvedProperties,
notifies: notifies,
})
}

Expand Down Expand Up @@ -192,14 +194,22 @@ func (b *GraphBuilder) Build(runtime llx.Runtime, assetMrn string) (*GraphExecut
}
}

reportingQueryForReportingJob := map[string]string{}
reportingQueryNodeByCodeId := map[string]string{}
for queryID, q := range queries {
canRun := checkVersion(q.codeBundle, mondooVersion)
if canRun {
ge.addExecutionQueryNode(queryID, q, q.resolvedProperties, b.datapointType)
} else {
unrunnableQueries = append(unrunnableQueries, q)
}
ge.addReportingQueryNode(queryID, q)
n := ge.addReportingQueryNode(queryID, q)
reportingQueryNodeByCodeId[q.codeBundle.GetCodeV2().GetId()] = n.id
if len(q.notifies) > 0 {
for _, notify := range q.notifies {
reportingQueryForReportingJob[notify] = n.id
}
}
}

scoresToCollect := make([]string, len(b.collectScoreQrIDs))
Expand All @@ -208,12 +218,19 @@ func (b *GraphBuilder) Build(runtime llx.Runtime, assetMrn string) (*GraphExecut
copy(datapointsToCollect, b.collectDatapointChecksums)

for _, rj := range b.reportingJobs {
_, isQuery := queries[rj.QrId]
scoresToCollect = append(scoresToCollect, rj.Uuid)
for datapointChecksum := range rj.Datapoints {
datapointsToCollect = append(datapointsToCollect, datapointChecksum)
}
ge.addReportingJobNode(assetMrn, rj.Uuid, rj, isQuery)
rq := reportingQueryForReportingJob[rj.Uuid]
if rq == "" {
// If a reporting query didn't explicitly notify this reporting job, but
// we have a reporting job for it, we will add it.
// For example, when we have risk factors, this will happen
rq = reportingQueryNodeByCodeId[rj.QrId]
}

ge.addReportingJobNode(assetMrn, rj.Uuid, rj, rq)
}

for _, queryID := range scoresToCollect {
Expand Down Expand Up @@ -295,11 +312,6 @@ func (ge *GraphExecutor) createFinisherNode(r progress.Progress) {
}

func (ge *GraphExecutor) addExecutionQueryNode(queryID string, q query, resolvedProperties map[string]*llx.Primitive, datapointTypeMap map[string]string) {
n, ok := ge.nodes[NodeID(queryID)]
if ok {
return
}

codeBundle := q.codeBundle

nodeData := &ExecutionQueryNodeData{
Expand All @@ -310,7 +322,7 @@ func (ge *GraphExecutor) addExecutionQueryNode(queryID string, q query, resolved
runQueue: ge.executionManager.runQueue,
}

n = &Node{
n := &Node{
id: NodeID(string(ExecutionQueryNodeType) + "/" + queryID),
nodeType: ExecutionQueryNodeType,
data: nodeData,
Expand Down Expand Up @@ -357,10 +369,10 @@ func (ge *GraphExecutor) addExecutionQueryNode(queryID string, q query, resolved
ge.nodes[n.id] = n
}

func (ge *GraphExecutor) addReportingQueryNode(queryID string, q query) {
func (ge *GraphExecutor) addReportingQueryNode(queryID string, q query) *Node {
n, ok := ge.nodes[NodeID(queryID)]
if ok {
return
return n
}

nodeData := &ReportingQueryNodeData{
Expand All @@ -387,14 +399,20 @@ func (ge *GraphExecutor) addReportingQueryNode(queryID string, q query) {
}

ge.nodes[n.id] = n

return n
}

func (ge *GraphExecutor) addReportingJobNode(assetMrn string, reportingJobID string, rj *policy.ReportingJob, isQuery bool) {
n, ok := ge.nodes[NodeID(reportingJobID)]
func (ge *GraphExecutor) addReportingJobNode(assetMrn string, reportingJobID string, rj *policy.ReportingJob, reportingQueryForReportingJob string) {
_, ok := ge.nodes[NodeID(reportingJobID)]
if ok {
return
}

forwardScore := rj.Type == policy.ReportingJob_CHECK ||
rj.Type == policy.ReportingJob_DATA_QUERY ||
rj.Type == policy.ReportingJob_CHECK_AND_DATA_QUERY ||
rj.Type == policy.ReportingJob_EXECUTION_QUERY
queryID := rj.QrId
// TODO: This needs to be handled by the server so as not to
// break existing clients. The function that was doing the
Expand All @@ -406,13 +424,13 @@ func (ge *GraphExecutor) addReportingJobNode(assetMrn string, reportingJobID str

nodeData := &ReportingJobNodeData{
queryID: queryID,
isQuery: isQuery,
forwardScore: forwardScore,
rjType: rj.Type,
childScores: map[string]*reportingJobResult{},
datapoints: map[string]*reportingJobDatapoint{},
featureFlagFailErrors: ge.featureFlagFailErrors,
}
n = &Node{
n := &Node{
id: NodeID(reportingJobID),
nodeType: ReportingJobNodeType,
data: nodeData,
Expand All @@ -427,16 +445,9 @@ func (ge *GraphExecutor) addReportingJobNode(assetMrn string, reportingJobID str
ge.addEdge(n.id, NodeID(e))
}

if isQuery {
// The specs of the reporting job doesn't contain the query
// Not all rj.QrIds are represented in the graph, only those
// that correspond to actual queries. For example, a QrId that
// is a policy is not represented as a node directly in the graph.
// So, this is special handling to make sure the reporting job
// knows that a reporting query is going to send it information
// and that it needs to use that information to calculate its score
nodeData.childScores[rj.QrId] = &reportingJobResult{}
ge.addEdge(NodeID(rj.QrId), n.id)
if reportingQueryForReportingJob != "" {
nodeData.childScores[reportingQueryForReportingJob] = &reportingJobResult{}
ge.addEdge(NodeID(reportingQueryForReportingJob), n.id)
}

for childReportingJobID, ss := range rj.ChildJobs {
Expand All @@ -451,7 +462,7 @@ func (ge *GraphExecutor) addReportingJobNode(assetMrn string, reportingJobID str
}

func (ge *GraphExecutor) addDatapointNode(datapointChecksum string, expectedType *string, res *llx.RawResult) {
n, ok := ge.nodes[NodeID(datapointChecksum)]
_, ok := ge.nodes[NodeID(datapointChecksum)]
if ok {
return
}
Expand All @@ -461,7 +472,7 @@ func (ge *GraphExecutor) addDatapointNode(datapointChecksum string, expectedType
isReported: res != nil,
res: res,
}
n = &Node{
n := &Node{
id: NodeID(datapointChecksum),
nodeType: DatapointNodeType,
data: nodeData,
Expand Down
12 changes: 6 additions & 6 deletions policy/executor/internal/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestBuilder(t *testing.T) {
}},
Checksums: map[uint64]string{1: "checksum1", 2: "pqep"},
},
}, nil, nil)
}, nil, nil, nil)

b.AddQuery(
&llx.CodeBundle{
Expand All @@ -98,7 +98,7 @@ func TestBuilder(t *testing.T) {
}},
Checksums: map[uint64]string{1: "checksum2"},
},
}, map[string]string{"prop": "checksum1"}, nil)
}, map[string]string{"prop": "checksum1"}, nil, []string{"query1rj"})

b.AddQuery(
&llx.CodeBundle{
Expand All @@ -112,7 +112,7 @@ func TestBuilder(t *testing.T) {
},
}, nil, map[string]*llx.Primitive{
"resolvedprop": llx.StringPrimitive("hello"),
})
}, []string{"query2rj"})
b.AddDatapointType("checksum3", string(types.Bool))

b.AddQuery(
Expand All @@ -124,7 +124,7 @@ func TestBuilder(t *testing.T) {
}},
Checksums: map[uint64]string{1: "checksum5"},
},
}, nil, nil)
}, nil, nil, nil)
b.CollectDatapoint("checksum5")

b.AddQuery(
Expand All @@ -136,7 +136,7 @@ func TestBuilder(t *testing.T) {
}},
Checksums: map[uint64]string{1: "checksum6"},
},
}, nil, nil)
}, nil, nil, nil)

b.AddQuery(
&llx.CodeBundle{
Expand All @@ -148,7 +148,7 @@ func TestBuilder(t *testing.T) {
Checksums: map[uint64]string{1: "checksum5", 2: "checksum7"},
},
MinMondooVersion: "9999.9999.9999",
}, nil, nil)
}, nil, nil, nil)

b.AddReportingJob(&policy.ReportingJob{
QrId: "query1",
Expand Down
Loading

0 comments on commit 7ccf232

Please sign in to comment.