Skip to content

Commit

Permalink
Merge pull request #40 from weaveworks/planning
Browse files Browse the repository at this point in the history
feat: added reactive planning package
  • Loading branch information
richardcase authored Aug 9, 2021
2 parents 7f5f095 + dbed674 commit 638965e
Show file tree
Hide file tree
Showing 6 changed files with 317 additions and 0 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/juju/errors v0.0.0-20180806074554-22422dad46e1 // indirect
github.com/juju/loggo v0.0.0-20190526231331-6e530bcce5d8 // indirect
github.com/juju/testing v0.0.0-20190613124551-e81189438503 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/onsi/gomega v1.14.0
github.com/opencontainers/go-digest v1.0.0
github.com/opencontainers/image-spec v1.0.1
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,7 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLA
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
github.com/onsi/ginkgo v0.0.0-20151202141238-7f8ab55aaf3b/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
Expand Down
28 changes: 28 additions & 0 deletions pkg/id/id.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package id

import (
"fmt"
"math/rand"
"time"

"github.com/oklog/ulid"
)

// DefaultRand is a random source based on the unix time not.
var DefaultRand = rand.New(rand.NewSource(time.Now().UnixNano())) //nolint:gosec

// New will generate a new unique identifier using the default random source.
func New() (string, error) {
return NewWithRand(DefaultRand)
}

// NewWithRand will generate a unique identifier with a specific random source.
func NewWithRand(rnd *rand.Rand) (string, error) {
entropy := ulid.Monotonic(rnd, 0)
newID, err := ulid.New(ulid.Now(), entropy)
if err != nil {
return "", fmt.Errorf("generating microvm id: %w", err)
}

return newID.String(), nil
}
108 changes: 108 additions & 0 deletions pkg/planner/actuator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package planner

import (
"context"
"fmt"
"time"

"github.com/sirupsen/logrus"

"github.com/weaveworks/reignite/pkg/id"
"github.com/weaveworks/reignite/pkg/log"
)

// Actuator will execute the given plan.
type Actuator interface {
// Execute the plan.
Execute(ctx context.Context, p Plan) error
}

// NewActuator creates a new actuator.
func NewActuator() Actuator {
return &actuatorImpl{}
}

type actuatorImpl struct{}

// Execute will execute the plan.
func (e *actuatorImpl) Execute(ctx context.Context, p Plan) error {
execID, err := id.New()
if err != nil {
return fmt.Errorf("getting plan execution id: %w", err)
}
logger := log.GetLogger(ctx).WithFields(logrus.Fields{
"execution_id": execID,
"plan_name": p.Name(),
})

start := time.Now().UTC()

logger.Infof("started executing plan")

numStepsExecuted, err := e.executePlan(ctx, p, logger)
if err != nil {
logger.WithFields(logrus.Fields{
"execution_time": time.Since(start),
"num_steps": numStepsExecuted,
}).Error("failed executing plan")

return fmt.Errorf("executing plan steps: %w", err)
}

logger.WithFields(logrus.Fields{
"execution_time": time.Since(start),
"num_steps": numStepsExecuted,
}).Info("finished executing plan")

return nil
}

func (e *actuatorImpl) executePlan(ctx context.Context, p Plan, logger *logrus.Entry) (int, error) {
numStepsExecuted := 0
for {
steps, err := p.Create(ctx)
if err != nil {
return numStepsExecuted, fmt.Errorf("creating plan for %s: %w", p.Name(), err)
}
if len(steps) == 0 {
logger.Debug("no more steps to execute")

return numStepsExecuted, nil
}
executed, err := e.react(ctx, steps, logger)
numStepsExecuted += executed
if err != nil {
return numStepsExecuted, fmt.Errorf("executing steps: %w", err)
}
}
}

func (e *actuatorImpl) react(ctx context.Context, steps []Procedure, logger *logrus.Entry) (int, error) {
var childSteps []Procedure
var err error
numStepsExecuted := 0

for _, step := range steps {
select {
case <-ctx.Done():
logger.WithField("step_name", step.Name()).Info("step not executed due to context done")

return numStepsExecuted, ctx.Err() //nolint:wrapcheck
default:
numStepsExecuted++
childSteps, err = step.Do(ctx)
if err != nil {
return numStepsExecuted, fmt.Errorf("executing step %s: %w", step.Name(), err)
}
}
if len(childSteps) > 0 {
executed, err := e.react(ctx, childSteps, logger)
numStepsExecuted += executed
if err != nil {
return numStepsExecuted, err
}
}
}

return numStepsExecuted, nil
}
151 changes: 151 additions & 0 deletions pkg/planner/actuator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package planner_test

import (
"context"
"testing"
"time"

. "github.com/onsi/gomega"

"github.com/weaveworks/reignite/pkg/planner"
)

func TestActuator_SingleProc(t *testing.T) {
RegisterTestingT(t)

ctx := context.Background()

testProcs := []planner.Procedure{newTestProc(10*time.Millisecond, []planner.Procedure{})}
testPlan := newTestPlan(testProcs)

act := planner.NewActuator()
err := act.Execute(ctx, testPlan)

Expect(err).NotTo(HaveOccurred())
testProc, ok := testProcs[0].(*testProc)
Expect(ok).To(BeTrue())
Expect(testProc.Executed).To(BeTrue())
}

func TestActuator_MultipleProcs(t *testing.T) {
RegisterTestingT(t)

ctx := context.Background()

testProcs := []planner.Procedure{newTestProc(10*time.Millisecond, []planner.Procedure{}), newTestProc(10*time.Millisecond, []planner.Procedure{})}
testPlan := newTestPlan(testProcs)

act := planner.NewActuator()
err := act.Execute(ctx, testPlan)

Expect(err).NotTo(HaveOccurred())
for _, proc := range testProcs {
testProc, ok := proc.(*testProc)
Expect(ok).To(BeTrue())
Expect(testProc.Executed).To(BeTrue())
}
}

func TestActuator_ChildProcs(t *testing.T) {
RegisterTestingT(t)

ctx := context.Background()

testProcs := []planner.Procedure{newTestProc(10*time.Millisecond, []planner.Procedure{newTestProc(10*time.Millisecond, []planner.Procedure{})})}
testPlan := newTestPlan(testProcs)

act := planner.NewActuator()
err := act.Execute(ctx, testPlan)

Expect(err).NotTo(HaveOccurred())

parentProc, ok := testProcs[0].(*testProc)
Expect(ok).To(BeTrue())
Expect(parentProc.Executed).To(BeTrue())

childProc, ok := parentProc.ChildProcs[0].(*testProc)
Expect(ok).To(BeTrue())
Expect(childProc.Executed).To(BeTrue())
}

func TestActuator_Timeout(t *testing.T) {
RegisterTestingT(t)

ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 200*time.Millisecond)
defer cancel()

testProcs := []planner.Procedure{
newTestProc(210*time.Millisecond, []planner.Procedure{}),
newTestProc(200*time.Millisecond, []planner.Procedure{}),
}
testPlan := newTestPlan(testProcs)

act := planner.NewActuator()
err := act.Execute(ctx, testPlan)

Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(context.DeadlineExceeded))

proc1, ok := testProcs[0].(*testProc)
Expect(ok).To(BeTrue())
Expect(proc1.Executed).To(BeTrue())

proc2, ok := testProcs[1].(*testProc)
Expect(ok).To(BeTrue())
Expect(proc2.Executed).To(BeFalse())
}

func newTestPlan(procs []planner.Procedure) planner.Plan {
return &testPlan{
testProcs: procs,
}
}

type testPlan struct {
testProcs []planner.Procedure
}

func (tp *testPlan) Name() string {
return "test_plan"
}

func (tp *testPlan) Create(ctx context.Context) ([]planner.Procedure, error) {
toExec := []planner.Procedure{}

for _, proc := range tp.testProcs {
testProc, _ := proc.(*testProc)
if !testProc.Executed {
toExec = append(toExec, proc)
}
}
return toExec, nil
}

func (tp *testPlan) Result() interface{} {
return nil
}

func newTestProc(delay time.Duration, childProcs []planner.Procedure) planner.Procedure {
return &testProc{
DoDelay: delay,
ChildProcs: childProcs,
}
}

type testProc struct {
DoDelay time.Duration
ChildProcs []planner.Procedure
Executed bool
}

func (p *testProc) Name() string {
return "test_proc"
}

func (p *testProc) Do(ctx context.Context) ([]planner.Procedure, error) {
p.Executed = true
time.Sleep(p.DoDelay)

return p.ChildProcs, nil
}
28 changes: 28 additions & 0 deletions pkg/planner/planner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package planner

import "context"

// NOTE: this is based on this prior work https://gianarb.it/blog/reactive-plan-golang-example
// which has been adapted for use here.

// Plan represents an interface for a plan of operations.
type Plan interface {
// Name is the name of the plan.
Name() string

// Create will perform the plan and will return a list of operations/procedures
// that need to be run to accomplish the plan
Create(ctx context.Context) ([]Procedure, error)

// Result is the result of the plan
Result() interface{}
}

// Procedure represents a procedure/operation that will be carried out
// as part of executing a plan.
type Procedure interface {
// Name is the name of the procedure/operation.
Name() string
// Do will perform the operation/procedure.
Do(ctx context.Context) ([]Procedure, error)
}

0 comments on commit 638965e

Please sign in to comment.