From b84dfc627232f9f59736c726ef2625c35c122393 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Tue, 30 Mar 2021 09:19:23 +0200 Subject: [PATCH] Fix issue with rolling out policies from the coordinator. (#180) (#183) * Fix issue with rolling out policies from the coordinator. * Simplify branch logic. (cherry picked from commit 548ef5e9a690c622ac690e586a6f865fed01f82f) Co-authored-by: Blake Rouse --- internal/pkg/coordinator/v0.go | 38 ++++++++++++++++++++-------------- internal/pkg/dl/policies.go | 2 +- internal/pkg/dsl/term.go | 6 ++++-- 3 files changed, 28 insertions(+), 18 deletions(-) diff --git a/internal/pkg/coordinator/v0.go b/internal/pkg/coordinator/v0.go index 57ecabeea..5c8da5837 100644 --- a/internal/pkg/coordinator/v0.go +++ b/internal/pkg/coordinator/v0.go @@ -28,7 +28,7 @@ func NewCoordinatorZero(policy model.Policy) (Coordinator, error) { return &coordinatorZeroT{ log: log.With().Str("ctx", "coordinator v0").Str("policyId", policy.PolicyId).Logger(), policy: policy, - in: make(chan model.Policy, 1), + in: make(chan model.Policy), out: make(chan model.Policy), }, nil } @@ -40,26 +40,19 @@ func (c *coordinatorZeroT) Name() string { // Run runs the coordinator for the policy. func (c *coordinatorZeroT) Run(ctx context.Context) error { - c.in <- c.policy + err := c.updatePolicy(c.policy) + if err != nil { + c.log.Err(err).Msg("failed to handle policy") + } + for { select { case p := <-c.in: - newData, err := c.handlePolicy(p.Data) + err = c.updatePolicy(p) if err != nil { c.log.Err(err).Msg("failed to handle policy") continue } - if p.CoordinatorIdx == 0 { - p.CoordinatorIdx = 1 - p.Data = newData - c.policy = p - c.out <- p - } else if string(newData) != string(p.Data) { - p.CoordinatorIdx += 1 - p.Data = newData - c.policy = p - c.out <- p - } case <-ctx.Done(): return ctx.Err() } @@ -77,7 +70,22 @@ func (c *coordinatorZeroT) Output() <-chan model.Policy { return c.out } -// handlePolicy handles the new policy. +// updatePolicy performs the working of incrementing the coordinator idx. +func (c *coordinatorZeroT) updatePolicy(p model.Policy) error { + newData, err := c.handlePolicy(p.Data) + if err != nil { + return err + } + if p.CoordinatorIdx == 0 || string(newData) != string(p.Data) { + p.CoordinatorIdx += 1 + p.Data = newData + c.policy = p + c.out <- p + } + return nil +} + +// handlePolicy performs the actual work of coordination. // // Does nothing at the moment. func (c *coordinatorZeroT) handlePolicy(data json.RawMessage) (json.RawMessage, error) { diff --git a/internal/pkg/dl/policies.go b/internal/pkg/dl/policies.go index f5277e731..78817c87b 100644 --- a/internal/pkg/dl/policies.go +++ b/internal/pkg/dl/policies.go @@ -27,7 +27,7 @@ func prepareQueryLatestPolicies() []byte { root := dsl.NewRoot() root.Size(0) policyId := root.Aggs().Agg(FieldPolicyId) - policyId.Terms("field", FieldPolicyId, nil) + policyId.Terms("field", FieldPolicyId, nil).Size(10000) revisionIdx := policyId.Aggs().Agg(FieldRevisionIdx).TopHits() revisionIdx.Size(1) rSort := revisionIdx.Sort() diff --git a/internal/pkg/dsl/term.go b/internal/pkg/dsl/term.go index 8a13b80e5..7da88b08a 100644 --- a/internal/pkg/dsl/term.go +++ b/internal/pkg/dsl/term.go @@ -4,7 +4,7 @@ package dsl -func (n *Node) Term(field string, value interface{}, boost *float64) { +func (n *Node) Term(field string, value interface{}, boost *float64) *Node { childNode := n.appendOrSetChildNode(kKeywordTerm) leaf := value @@ -22,9 +22,10 @@ func (n *Node) Term(field string, value interface{}, boost *float64) { childNode.nodeMap = nodeMapT{field: &Node{ leaf: leaf, }} + return childNode } -func (n *Node) Terms(field string, value interface{}, boost *float64) { +func (n *Node) Terms(field string, value interface{}, boost *float64) *Node { childNode := n.appendOrSetChildNode(kKeywordTerms) childNode.nodeMap = nodeMapT{ @@ -34,4 +35,5 @@ func (n *Node) Terms(field string, value interface{}, boost *float64) { if boost != nil { childNode.nodeMap[kKeywordBoost] = &Node{leaf: *boost} } + return childNode }