Skip to content

Commit

Permalink
Merge pull request #369 from asteris-llc/feature/metadata-envelope
Browse files Browse the repository at this point in the history
create a metadata envelope for nodes
  • Loading branch information
BrianHicks authored Oct 14, 2016
2 parents b131456 + 71362de commit a94b8d8
Show file tree
Hide file tree
Showing 45 changed files with 783 additions and 407 deletions.
11 changes: 7 additions & 4 deletions apply/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/asteris-llc/converge/executor"
"github.com/asteris-llc/converge/graph"
"github.com/asteris-llc/converge/graph/node"
"github.com/asteris-llc/converge/plan"
"github.com/asteris-llc/converge/render"
"github.com/pkg/errors"
Expand Down Expand Up @@ -65,10 +66,12 @@ func execPipeline(ctx context.Context, in *graph.Graph, pipelineF MkPipelineF, r
var hasErrors error

out, err := in.Transform(ctx,
notify.Transform(func(id string, out *graph.Graph) error {
notify.Transform(func(meta *node.Node, out *graph.Graph) error {
renderingPlant.Graph = out
pipeline := pipelineF(out, id)
val, pipelineError := pipeline.Exec(out.Get(id))
pipeline := pipelineF(out, meta.ID)

val, pipelineError := pipeline.Exec(meta.Value())

if pipelineError != nil {
hasErrors = ErrTreeContainsErrors
return pipelineError
Expand All @@ -82,7 +85,7 @@ func execPipeline(ctx context.Context, in *graph.Graph, pipelineF MkPipelineF, r
hasErrors = ErrTreeContainsErrors
}

out.Add(id, asResult)
out.Add(meta.WithValue(asResult))
return nil
}),
)
Expand Down
32 changes: 18 additions & 14 deletions apply/apply_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/asteris-llc/converge/apply"
"github.com/asteris-llc/converge/graph"
"github.com/asteris-llc/converge/graph/node"
"github.com/asteris-llc/converge/helpers/faketask"
"github.com/asteris-llc/converge/helpers/logging"
"github.com/asteris-llc/converge/plan"
Expand All @@ -33,7 +34,7 @@ func TestApplyNoOp(t *testing.T) {

g := graph.New()
task := faketask.Swapper()
g.Add("root", &plan.Result{Status: &resource.Status{Level: resource.StatusWillChange}, Task: task})
g.Add(node.New("root", &plan.Result{Status: &resource.Status{Level: resource.StatusWillChange}, Task: task}))

require.NoError(t, g.Validate())

Expand All @@ -51,7 +52,7 @@ func TestApplyNoRun(t *testing.T) {

g := graph.New()
task := faketask.NoOp()
g.Add("root", &plan.Result{Status: &resource.Status{Level: resource.StatusWontChange}, Task: task})
g.Add(node.New("root", &plan.Result{Status: &resource.Status{Level: resource.StatusWontChange}, Task: task}))

require.NoError(t, g.Validate())

Expand All @@ -67,8 +68,8 @@ func TestApplyErrorsBelow(t *testing.T) {
defer logging.HideLogs(t)()

g := graph.New()
g.Add("root", &plan.Result{Status: &resource.Status{Level: resource.StatusWillChange}, Task: faketask.NoOp()})
g.Add("root/err", &plan.Result{Status: &resource.Status{Level: resource.StatusWillChange}, Task: faketask.Error()})
g.Add(node.New("root", &plan.Result{Status: &resource.Status{Level: resource.StatusWillChange}, Task: faketask.NoOp()}))
g.Add(node.New("root/err", &plan.Result{Status: &resource.Status{Level: resource.StatusWillChange}, Task: faketask.Error()}))

g.ConnectParent("root", "root/err")

Expand All @@ -80,11 +81,15 @@ func TestApplyErrorsBelow(t *testing.T) {
out, err := apply.Apply(context.Background(), g)
assert.Equal(t, apply.ErrTreeContainsErrors, err)

errNode, ok := out.Get("root/err").(*apply.Result)
errMeta, ok := out.Get("root/err")
require.True(t, ok, `"root/err" was not present in the graph`)
errNode, ok := errMeta.Value().(*apply.Result)
require.True(t, ok)
assert.EqualError(t, errNode.Error(), "error")

rootNode, ok := out.Get("root").(*apply.Result)
rootMeta, ok := out.Get("root")
require.True(t, ok, `"root" was not present in the graph`)
rootNode, ok := rootMeta.Value().(*apply.Result)
require.True(t, ok)
assert.EqualError(t, rootNode.Error(), `error in dependency "root/err"`)
}
Expand All @@ -93,7 +98,7 @@ func TestApplyStillChange(t *testing.T) {
defer logging.HideLogs(t)()

g := graph.New()
g.Add("root", &plan.Result{Status: &resource.Status{Level: resource.StatusWillChange}, Task: faketask.WillChange()})
g.Add(node.New("root", &plan.Result{Status: &resource.Status{Level: resource.StatusWillChange}, Task: faketask.WillChange()}))

require.NoError(t, g.Validate())

Expand All @@ -110,7 +115,7 @@ func TestApplyNilError(t *testing.T) {
defer logging.HideLogs(t)()

g := graph.New()
g.Add("root", &plan.Result{Status: &resource.Status{Level: resource.StatusWillChange}, Task: faketask.NilAndError()})
g.Add(node.New("root", &plan.Result{Status: &resource.Status{Level: resource.StatusWillChange}, Task: faketask.NilAndError()}))

require.NoError(t, g.Validate())

Expand All @@ -121,12 +126,11 @@ func TestApplyNilError(t *testing.T) {
}

func getResult(t *testing.T, src *graph.Graph, key string) *apply.Result {
val := src.Get(key)
result, ok := val.(*apply.Result)
if !ok {
t.Logf("needed a %T for %q, got a %T\n", result, key, val)
t.FailNow()
}
meta, ok := src.Get(key)
require.True(t, ok, "%q was not present in the graph", key)

result, ok := meta.Value().(*apply.Result)
require.True(t, ok, "needed a %T for %q, got a %T", result, key, meta.Value())

return result
}
7 changes: 6 additions & 1 deletion apply/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,12 @@ func (g *pipelineGen) DependencyCheck(taskI interface{}) (interface{}, error) {
return nil, errors.New("input node is not a task wrapper")
}
for _, depID := range graph.Targets(g.Graph.DownEdges(g.ID)) {
elem := g.Graph.Get(depID)
meta, ok := g.Graph.Get(depID)
if !ok {
return nil, nil
}

elem := meta.Value()
dep, ok := elem.(executor.Status)
if !ok {
return nil, fmt.Errorf("apply.DependencyCheck: expected %s to have type executor.Status but got type %T", depID, elem)
Expand Down
5 changes: 3 additions & 2 deletions cmd/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

log "github.com/Sirupsen/logrus"
"github.com/asteris-llc/converge/graph"
"github.com/asteris-llc/converge/graph/node"
"github.com/asteris-llc/converge/helpers/logging"
"github.com/asteris-llc/converge/rpc"
"github.com/asteris-llc/converge/rpc/pb"
Expand Down Expand Up @@ -115,7 +116,7 @@ real happens.`,
slog := flog.WithFields(log.Fields{
"stage": resp.Stage,
"run": resp.Run,
"id": resp.Id,
"id": resp.Meta.Id,
})
if resp.Run == pb.StatusResponse_STARTED {
slog.Info("got status")
Expand All @@ -126,7 +127,7 @@ real happens.`,
if resp.Stage == pb.StatusResponse_APPLY && resp.Run == pb.StatusResponse_FINISHED {
details := resp.GetDetails()
if details != nil {
g.Add(resp.Id, details.ToPrintable())
g.Add(node.New(resp.Id, details.ToPrintable()))
}
}
},
Expand Down
5 changes: 3 additions & 2 deletions cmd/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

log "github.com/Sirupsen/logrus"
"github.com/asteris-llc/converge/graph"
"github.com/asteris-llc/converge/graph/node"
"github.com/asteris-llc/converge/helpers/logging"
"github.com/asteris-llc/converge/rpc"
"github.com/asteris-llc/converge/rpc/pb"
Expand Down Expand Up @@ -116,7 +117,7 @@ not display healthy checks.`,
slog := flog.WithFields(log.Fields{
"stage": resp.Stage,
"run": resp.Run,
"id": resp.Id,
"id": resp.Meta.Id,
})
if resp.Run == pb.StatusResponse_STARTED {
slog.Info("got status")
Expand All @@ -127,7 +128,7 @@ not display healthy checks.`,
if resp.Run == pb.StatusResponse_FINISHED {
details := resp.GetDetails()
if details != nil {
g.Add(resp.Id, details.ToPrintable())
g.Add(node.New(resp.Id, details.ToPrintable()))
}
}
},
Expand Down
5 changes: 3 additions & 2 deletions cmd/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

log "github.com/Sirupsen/logrus"
"github.com/asteris-llc/converge/graph"
"github.com/asteris-llc/converge/graph/node"
"github.com/asteris-llc/converge/helpers/logging"
"github.com/asteris-llc/converge/rpc"
"github.com/asteris-llc/converge/rpc/pb"
Expand Down Expand Up @@ -115,7 +116,7 @@ can be done separately to see what needs to be changed before execution.`,
slog := flog.WithFields(log.Fields{
"stage": resp.Stage,
"run": resp.Run,
"id": resp.Id,
"id": resp.Meta.Id,
})
if resp.Run == pb.StatusResponse_STARTED {
slog.Info("got status")
Expand All @@ -126,7 +127,7 @@ can be done separately to see what needs to be changed before execution.`,
if resp.Run == pb.StatusResponse_FINISHED {
details := resp.GetDetails()
if details != nil {
g.Add(resp.Id, details.ToPrintable())
g.Add(node.New(resp.Id, details.ToPrintable()))
}
}
},
Expand Down
48 changes: 26 additions & 22 deletions graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strings"
"sync"

"github.com/asteris-llc/converge/graph/node"
"github.com/asteris-llc/converge/helpers/logging"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/terraform/dag"
Expand All @@ -28,10 +29,10 @@ import (
)

// WalkFunc is taken by the walking functions
type WalkFunc func(string, interface{}) error
type WalkFunc func(*node.Node) error

// TransformFunc is taken by the transformation functions
type TransformFunc func(string, *Graph) error
type TransformFunc func(*node.Node, *Graph) error

type walkerFunc func(context.Context, *Graph, WalkFunc) error

Expand Down Expand Up @@ -60,12 +61,12 @@ func New() *Graph {
}

// Add a new value by ID
func (g *Graph) Add(id string, value interface{}) {
func (g *Graph) Add(node *node.Node) {
g.innerLock.Lock()
defer g.innerLock.Unlock()

g.inner.Add(id)
g.values.Set(id, value)
g.inner.Add(node.ID)
g.values.Set(node.ID, node)
}

// Remove an existing value by ID
Expand All @@ -77,14 +78,20 @@ func (g *Graph) Remove(id string) {
g.values.Remove(id)
}

// Get a value by ID
func (g *Graph) Get(id string) interface{} {
val, _ := g.values.Get(id)
return val
// Get returns the value of the element and a bool indicating if it was
// found. If it was not found the value of the returned element is nil, but a
// valid node will be constructed.
func (g *Graph) Get(id string) (*node.Node, bool) {
raw, ok := g.values.Get(id)
if !ok {
return nil, ok
}

return raw.(*node.Node), true
}

// GetParent returns the direct parent vertex of the current node.
func (g *Graph) GetParent(id string) interface{} {
func (g *Graph) GetParent(id string) (*node.Node, bool) {
var parentID string
for _, edge := range g.UpEdges(id) {
switch edge.(type) {
Expand Down Expand Up @@ -346,7 +353,8 @@ func dependencyWalk(rctx context.Context, g *Graph, cb WalkFunc) error {
}

logger.WithField("id", id).Debug("executing")
if err := cb(id, g.Get(id)); err != nil {
val, _ := g.Get(id)
if err := cb(val); err != nil {
setErr(id, err)
}
}
Expand Down Expand Up @@ -420,7 +428,8 @@ func rootFirstWalk(ctx context.Context, g *Graph, cb WalkFunc) error {

logger.WithField("id", id).Debug("walking")

if err := cb(id, g.Get(id)); err != nil {
raw, _ := g.Get(id) // we want to call with every value, including nil
if err := cb(raw); err != nil {
return err
}

Expand Down Expand Up @@ -449,7 +458,8 @@ func (g *Graph) Copy() *Graph {
out := New()

for _, v := range g.Vertices() {
out.Add(v, g.Get(v))
val, _ := g.Get(v) // we don't care if it's nil here, we're doing a direct copy
out.Add(val)
}

for _, e := range g.inner.Edges() {
Expand Down Expand Up @@ -503,15 +513,9 @@ func (g *Graph) Vertices() []string {
return vertices
}

// MaybeGet returns the value of the element and a bool indicating if it was
// found, if it was not found the value of the returned element is nil.
func (g *Graph) MaybeGet(id string) (interface{}, bool) {
return g.values.Get(id)
}

// Contains returns true if the id exists in the map
func (g *Graph) Contains(id string) bool {
_, found := g.MaybeGet(id)
_, found := g.Get(id)
return found
}

Expand Down Expand Up @@ -550,8 +554,8 @@ func (g *Graph) String() string {
func transform(ctx context.Context, source *Graph, walker walkerFunc, cb TransformFunc) (*Graph, error) {
dest := source.Copy()

err := walker(ctx, dest, func(id string, _ interface{}) error {
return cb(id, dest)
err := walker(ctx, dest, func(meta *node.Node) error {
return cb(meta, dest)
})
if err != nil {
return dest, err
Expand Down
Loading

0 comments on commit a94b8d8

Please sign in to comment.