diff --git a/go.mod b/go.mod index a02c7068..b847e858 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/juju/errors v0.0.0-20180806074554-22422dad46e1 // indirect github.com/juju/loggo v0.0.0-20190526231331-6e530bcce5d8 // indirect github.com/juju/testing v0.0.0-20190613124551-e81189438503 // indirect + github.com/oklog/ulid v1.3.1 // indirect github.com/onsi/gomega v1.14.0 github.com/opencontainers/go-digest v1.0.0 github.com/opencontainers/image-spec v1.0.1 diff --git a/go.sum b/go.sum index 45af23ab..f98615f2 100644 --- a/go.sum +++ b/go.sum @@ -781,6 +781,7 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLA github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= +github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= github.com/onsi/ginkgo v0.0.0-20151202141238-7f8ab55aaf3b/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= diff --git a/pkg/id/id.go b/pkg/id/id.go new file mode 100644 index 00000000..40b99ec8 --- /dev/null +++ b/pkg/id/id.go @@ -0,0 +1,28 @@ +package id + +import ( + "fmt" + "math/rand" + "time" + + "github.com/oklog/ulid" +) + +// DefaultRand is a random source based on the unix time not. +var DefaultRand = rand.New(rand.NewSource(time.Now().UnixNano())) //nolint:gosec + +// New will generate a new unique identifier using the default random source. +func New() (string, error) { + return NewWithRand(DefaultRand) +} + +// NewWithRand will generate a unique identifier with a specific random source. +func NewWithRand(rnd *rand.Rand) (string, error) { + entropy := ulid.Monotonic(rnd, 0) + newID, err := ulid.New(ulid.Now(), entropy) + if err != nil { + return "", fmt.Errorf("generating microvm id: %w", err) + } + + return newID.String(), nil +} diff --git a/pkg/planner/actuator.go b/pkg/planner/actuator.go new file mode 100644 index 00000000..af62fcac --- /dev/null +++ b/pkg/planner/actuator.go @@ -0,0 +1,108 @@ +package planner + +import ( + "context" + "fmt" + "time" + + "github.com/sirupsen/logrus" + + "github.com/weaveworks/reignite/pkg/id" + "github.com/weaveworks/reignite/pkg/log" +) + +// Actuator will execute the given plan. +type Actuator interface { + // Execute the plan. + Execute(ctx context.Context, p Plan) error +} + +// NewActuator creates a new actuator. +func NewActuator() Actuator { + return &actuatorImpl{} +} + +type actuatorImpl struct{} + +// Execute will execute the plan. +func (e *actuatorImpl) Execute(ctx context.Context, p Plan) error { + execID, err := id.New() + if err != nil { + return fmt.Errorf("getting plan execution id: %w", err) + } + logger := log.GetLogger(ctx).WithFields(logrus.Fields{ + "execution_id": execID, + "plan_name": p.Name(), + }) + + start := time.Now().UTC() + + logger.Infof("started executing plan") + + numStepsExecuted, err := e.executePlan(ctx, p, logger) + if err != nil { + logger.WithFields(logrus.Fields{ + "execution_time": time.Since(start), + "num_steps": numStepsExecuted, + }).Error("failed executing plan") + + return fmt.Errorf("executing plan steps: %w", err) + } + + logger.WithFields(logrus.Fields{ + "execution_time": time.Since(start), + "num_steps": numStepsExecuted, + }).Info("finished executing plan") + + return nil +} + +func (e *actuatorImpl) executePlan(ctx context.Context, p Plan, logger *logrus.Entry) (int, error) { + numStepsExecuted := 0 + for { + steps, err := p.Create(ctx) + if err != nil { + return numStepsExecuted, fmt.Errorf("creating plan for %s: %w", p.Name(), err) + } + if len(steps) == 0 { + logger.Debug("no more steps to execute") + + return numStepsExecuted, nil + } + executed, err := e.react(ctx, steps, logger) + numStepsExecuted += executed + if err != nil { + return numStepsExecuted, fmt.Errorf("executing steps: %w", err) + } + } +} + +func (e *actuatorImpl) react(ctx context.Context, steps []Procedure, logger *logrus.Entry) (int, error) { + var childSteps []Procedure + var err error + numStepsExecuted := 0 + + for _, step := range steps { + select { + case <-ctx.Done(): + logger.WithField("step_name", step.Name()).Info("step not executed due to context done") + + return numStepsExecuted, ctx.Err() //nolint:wrapcheck + default: + numStepsExecuted++ + childSteps, err = step.Do(ctx) + if err != nil { + return numStepsExecuted, fmt.Errorf("executing step %s: %w", step.Name(), err) + } + } + if len(childSteps) > 0 { + executed, err := e.react(ctx, childSteps, logger) + numStepsExecuted += executed + if err != nil { + return numStepsExecuted, err + } + } + } + + return numStepsExecuted, nil +} diff --git a/pkg/planner/actuator_test.go b/pkg/planner/actuator_test.go new file mode 100644 index 00000000..f04c031e --- /dev/null +++ b/pkg/planner/actuator_test.go @@ -0,0 +1,151 @@ +package planner_test + +import ( + "context" + "testing" + "time" + + . "github.com/onsi/gomega" + + "github.com/weaveworks/reignite/pkg/planner" +) + +func TestActuator_SingleProc(t *testing.T) { + RegisterTestingT(t) + + ctx := context.Background() + + testProcs := []planner.Procedure{newTestProc(10*time.Millisecond, []planner.Procedure{})} + testPlan := newTestPlan(testProcs) + + act := planner.NewActuator() + err := act.Execute(ctx, testPlan) + + Expect(err).NotTo(HaveOccurred()) + testProc, ok := testProcs[0].(*testProc) + Expect(ok).To(BeTrue()) + Expect(testProc.Executed).To(BeTrue()) +} + +func TestActuator_MultipleProcs(t *testing.T) { + RegisterTestingT(t) + + ctx := context.Background() + + testProcs := []planner.Procedure{newTestProc(10*time.Millisecond, []planner.Procedure{}), newTestProc(10*time.Millisecond, []planner.Procedure{})} + testPlan := newTestPlan(testProcs) + + act := planner.NewActuator() + err := act.Execute(ctx, testPlan) + + Expect(err).NotTo(HaveOccurred()) + for _, proc := range testProcs { + testProc, ok := proc.(*testProc) + Expect(ok).To(BeTrue()) + Expect(testProc.Executed).To(BeTrue()) + } +} + +func TestActuator_ChildProcs(t *testing.T) { + RegisterTestingT(t) + + ctx := context.Background() + + testProcs := []planner.Procedure{newTestProc(10*time.Millisecond, []planner.Procedure{newTestProc(10*time.Millisecond, []planner.Procedure{})})} + testPlan := newTestPlan(testProcs) + + act := planner.NewActuator() + err := act.Execute(ctx, testPlan) + + Expect(err).NotTo(HaveOccurred()) + + parentProc, ok := testProcs[0].(*testProc) + Expect(ok).To(BeTrue()) + Expect(parentProc.Executed).To(BeTrue()) + + childProc, ok := parentProc.ChildProcs[0].(*testProc) + Expect(ok).To(BeTrue()) + Expect(childProc.Executed).To(BeTrue()) +} + +func TestActuator_Timeout(t *testing.T) { + RegisterTestingT(t) + + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, 200*time.Millisecond) + defer cancel() + + testProcs := []planner.Procedure{ + newTestProc(210*time.Millisecond, []planner.Procedure{}), + newTestProc(200*time.Millisecond, []planner.Procedure{}), + } + testPlan := newTestPlan(testProcs) + + act := planner.NewActuator() + err := act.Execute(ctx, testPlan) + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(context.DeadlineExceeded)) + + proc1, ok := testProcs[0].(*testProc) + Expect(ok).To(BeTrue()) + Expect(proc1.Executed).To(BeTrue()) + + proc2, ok := testProcs[1].(*testProc) + Expect(ok).To(BeTrue()) + Expect(proc2.Executed).To(BeFalse()) +} + +func newTestPlan(procs []planner.Procedure) planner.Plan { + return &testPlan{ + testProcs: procs, + } +} + +type testPlan struct { + testProcs []planner.Procedure +} + +func (tp *testPlan) Name() string { + return "test_plan" +} + +func (tp *testPlan) Create(ctx context.Context) ([]planner.Procedure, error) { + toExec := []planner.Procedure{} + + for _, proc := range tp.testProcs { + testProc, _ := proc.(*testProc) + if !testProc.Executed { + toExec = append(toExec, proc) + } + } + return toExec, nil +} + +func (tp *testPlan) Result() interface{} { + return nil +} + +func newTestProc(delay time.Duration, childProcs []planner.Procedure) planner.Procedure { + return &testProc{ + DoDelay: delay, + ChildProcs: childProcs, + } +} + +type testProc struct { + DoDelay time.Duration + ChildProcs []planner.Procedure + Executed bool +} + +func (p *testProc) Name() string { + return "test_proc" +} + +func (p *testProc) Do(ctx context.Context) ([]planner.Procedure, error) { + p.Executed = true + time.Sleep(p.DoDelay) + + return p.ChildProcs, nil +} diff --git a/pkg/planner/planner.go b/pkg/planner/planner.go new file mode 100644 index 00000000..78c80e69 --- /dev/null +++ b/pkg/planner/planner.go @@ -0,0 +1,28 @@ +package planner + +import "context" + +// NOTE: this is based on this prior work https://gianarb.it/blog/reactive-plan-golang-example +// which has been adapted for use here. + +// Plan represents an interface for a plan of operations. +type Plan interface { + // Name is the name of the plan. + Name() string + + // Create will perform the plan and will return a list of operations/procedures + // that need to be run to accomplish the plan + Create(ctx context.Context) ([]Procedure, error) + + // Result is the result of the plan + Result() interface{} +} + +// Procedure represents a procedure/operation that will be carried out +// as part of executing a plan. +type Procedure interface { + // Name is the name of the procedure/operation. + Name() string + // Do will perform the operation/procedure. + Do(ctx context.Context) ([]Procedure, error) +}