Skip to content

Commit

Permalink
Fix issue with rolling out policies from the coordinator. (#180) (#183)
Browse files Browse the repository at this point in the history
* Fix issue with rolling out policies from the coordinator.

* Simplify branch logic.

(cherry picked from commit 548ef5e)

Co-authored-by: Blake Rouse <[email protected]>
  • Loading branch information
mergify[bot] and blakerouse authored Mar 30, 2021
1 parent 0b802d6 commit b84dfc6
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 18 deletions.
38 changes: 23 additions & 15 deletions internal/pkg/coordinator/v0.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func NewCoordinatorZero(policy model.Policy) (Coordinator, error) {
return &coordinatorZeroT{
log: log.With().Str("ctx", "coordinator v0").Str("policyId", policy.PolicyId).Logger(),
policy: policy,
in: make(chan model.Policy, 1),
in: make(chan model.Policy),
out: make(chan model.Policy),
}, nil
}
Expand All @@ -40,26 +40,19 @@ func (c *coordinatorZeroT) Name() string {

// Run runs the coordinator for the policy.
func (c *coordinatorZeroT) Run(ctx context.Context) error {
c.in <- c.policy
err := c.updatePolicy(c.policy)
if err != nil {
c.log.Err(err).Msg("failed to handle policy")
}

for {
select {
case p := <-c.in:
newData, err := c.handlePolicy(p.Data)
err = c.updatePolicy(p)
if err != nil {
c.log.Err(err).Msg("failed to handle policy")
continue
}
if p.CoordinatorIdx == 0 {
p.CoordinatorIdx = 1
p.Data = newData
c.policy = p
c.out <- p
} else if string(newData) != string(p.Data) {
p.CoordinatorIdx += 1
p.Data = newData
c.policy = p
c.out <- p
}
case <-ctx.Done():
return ctx.Err()
}
Expand All @@ -77,7 +70,22 @@ func (c *coordinatorZeroT) Output() <-chan model.Policy {
return c.out
}

// handlePolicy handles the new policy.
// updatePolicy performs the working of incrementing the coordinator idx.
func (c *coordinatorZeroT) updatePolicy(p model.Policy) error {
newData, err := c.handlePolicy(p.Data)
if err != nil {
return err
}
if p.CoordinatorIdx == 0 || string(newData) != string(p.Data) {
p.CoordinatorIdx += 1
p.Data = newData
c.policy = p
c.out <- p
}
return nil
}

// handlePolicy performs the actual work of coordination.
//
// Does nothing at the moment.
func (c *coordinatorZeroT) handlePolicy(data json.RawMessage) (json.RawMessage, error) {
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/dl/policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func prepareQueryLatestPolicies() []byte {
root := dsl.NewRoot()
root.Size(0)
policyId := root.Aggs().Agg(FieldPolicyId)
policyId.Terms("field", FieldPolicyId, nil)
policyId.Terms("field", FieldPolicyId, nil).Size(10000)
revisionIdx := policyId.Aggs().Agg(FieldRevisionIdx).TopHits()
revisionIdx.Size(1)
rSort := revisionIdx.Sort()
Expand Down
6 changes: 4 additions & 2 deletions internal/pkg/dsl/term.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

package dsl

func (n *Node) Term(field string, value interface{}, boost *float64) {
func (n *Node) Term(field string, value interface{}, boost *float64) *Node {
childNode := n.appendOrSetChildNode(kKeywordTerm)

leaf := value
Expand All @@ -22,9 +22,10 @@ func (n *Node) Term(field string, value interface{}, boost *float64) {
childNode.nodeMap = nodeMapT{field: &Node{
leaf: leaf,
}}
return childNode
}

func (n *Node) Terms(field string, value interface{}, boost *float64) {
func (n *Node) Terms(field string, value interface{}, boost *float64) *Node {
childNode := n.appendOrSetChildNode(kKeywordTerms)

childNode.nodeMap = nodeMapT{
Expand All @@ -34,4 +35,5 @@ func (n *Node) Terms(field string, value interface{}, boost *float64) {
if boost != nil {
childNode.nodeMap[kKeywordBoost] = &Node{leaf: *boost}
}
return childNode
}

0 comments on commit b84dfc6

Please sign in to comment.