From f7c12b889e02d1793bd0fb6e651b2d73de86f5ba Mon Sep 17 00:00:00 2001 From: Doug MacEachern Date: Sat, 7 Jul 2018 11:16:00 -0700 Subject: [PATCH] vcsim: add support for PropertyCollector incremental updates Prior to this change, clients could only call WaitForUpdatesEx once, which would return the current state. The next invocation with Version given would fail as unsupported. Clients can now use WaitForUpdatesEx to wait for multiple updates. The basic idea is that methods should now update object fields using the Registry.UpdateObject method instead of setting fields directly. UpdateObject uses the same PropertyChange structure that is sent as part of the WaitForUpdatesEx response. UpdateObject will also apply the PropertyChange(s) to the local object, removing the need to set fields directly. - govc commands now use Client.Flag.WithCancel to catch SIGINT, allowing proper cleanup of collectors - Add vcsim ListView support to ViewManager Fixes #922 --- govc/events/command.go | 31 ++-- govc/flags/client.go | 28 ++++ govc/object/collect.go | 60 ++++--- property/collector.go | 6 + property/wait.go | 30 +++- scripts/debug-xmlformat.sh | 6 +- simulator/event_manager.go | 7 +- simulator/property_collector.go | 230 ++++++++++++++++++++++----- simulator/property_collector_test.go | 130 ++++++++++++--- simulator/property_filter.go | 64 +++++++- simulator/race_test.go | 20 ++- simulator/registry.go | 69 ++++++-- simulator/task.go | 2 +- simulator/task_manager.go | 19 ++- simulator/view_manager.go | 83 +++++++++- simulator/virtual_machine.go | 41 +++-- simulator/virtual_machine_test.go | 5 +- vim25/mo/retrieve.go | 16 ++ vim25/mo/type_info.go | 19 ++- 19 files changed, 715 insertions(+), 151 deletions(-) diff --git a/govc/events/command.go b/govc/events/command.go index b6f82828e..32ef4a039 100644 --- a/govc/events/command.go +++ b/govc/events/command.go @@ -164,24 +164,21 @@ func (cmd *events) Run(ctx context.Context, f *flag.FlagSet) error { return err } - if len(objs) > 0 { - // need an event manager - m := event.NewManager(c) - - // get the event stream - err = m.Events(ctx, objs, cmd.Max, cmd.Tail, cmd.Force, func(obj types.ManagedObjectReference, ee []types.BaseEvent) error { - var o *types.ManagedObjectReference - if len(objs) > 1 { - o = &obj - } + if len(objs) == 0 { + return nil + } - return cmd.printEvents(ctx, o, ee, m) - }, cmd.Kind...) + m := event.NewManager(c) - if err != nil { - return err - } - } + return cmd.WithCancel(ctx, func(wctx context.Context) error { + return m.Events(wctx, objs, cmd.Max, cmd.Tail, cmd.Force, + func(obj types.ManagedObjectReference, ee []types.BaseEvent) error { + var o *types.ManagedObjectReference + if len(objs) > 1 { + o = &obj + } - return nil + return cmd.printEvents(ctx, o, ee, m) + }, cmd.Kind...) + }) } diff --git a/govc/flags/client.go b/govc/flags/client.go index 267bbe4c0..3b48b9a38 100644 --- a/govc/flags/client.go +++ b/govc/flags/client.go @@ -28,8 +28,10 @@ import ( "net/http" "net/url" "os" + "os/signal" "path/filepath" "strings" + "syscall" "time" "github.com/vmware/govmomi/session" @@ -625,3 +627,29 @@ func (flag *ClientFlag) Environ(extra bool) []string { return env } + +// WithCancel calls the given function, returning when complete or canceled via SIGINT. +func (flag *ClientFlag) WithCancel(ctx context.Context, f func(context.Context) error) error { + wctx, cancel := context.WithCancel(ctx) + defer cancel() + + done := make(chan bool) + var werr error + + go func() { + defer close(done) + werr = f(wctx) + }() + + sig := make(chan os.Signal) + signal.Notify(sig, syscall.SIGINT) + + select { + case <-sig: + cancel() + <-done // Wait for f() to complete + case <-done: + } + + return werr +} diff --git a/govc/object/collect.go b/govc/object/collect.go index 668476439..64e9d41b9 100644 --- a/govc/object/collect.go +++ b/govc/object/collect.go @@ -26,6 +26,7 @@ import ( "reflect" "strings" "text/tabwriter" + "time" "github.com/vmware/govmomi/govc/cli" "github.com/vmware/govmomi/govc/flags" @@ -47,6 +48,7 @@ type collect struct { dump bool n int kind kinds + wait time.Duration filter property.Filter obj string @@ -65,6 +67,7 @@ func (cmd *collect) Register(ctx context.Context, f *flag.FlagSet) { f.StringVar(&cmd.raw, "R", "", "Raw XML encoded CreateFilter request") f.IntVar(&cmd.n, "n", 0, "Wait for N property updates") f.Var(&cmd.kind, "type", "Resource type. If specified, MOID is used for a container view root") + f.DurationVar(&cmd.wait, "wait", 0, "Max wait time for updates") } func (cmd *collect) Usage() string { @@ -388,41 +391,48 @@ func (cmd *collect) Run(ctx context.Context, f *flag.FlagSet) error { entered := false hasFilter := len(cmd.filter) != 0 - return property.WaitForUpdates(ctx, p, filter, func(updates []types.ObjectUpdate) bool { - matches := 0 + if cmd.wait != 0 { + filter.Options = &types.WaitOptions{ + MaxWaitSeconds: types.NewInt32(int32(cmd.wait.Seconds())), + } + } - for _, update := range updates { - if entered && update.Kind == types.ObjectUpdateKindEnter { - // on the first update we only get kind "enter" - // if a new object is added, the next update with have both "enter" and "modify". - continue - } + return cmd.WithCancel(ctx, func(wctx context.Context) error { + return property.WaitForUpdates(wctx, p, filter, func(updates []types.ObjectUpdate) bool { + matches := 0 + for _, update := range updates { + if entered && update.Kind == types.ObjectUpdateKindEnter { + // on the first update we only get kind "enter" + // if a new object is added, the next update with have both "enter" and "modify". + continue + } - c := &change{cmd, update} + c := &change{cmd, update} - if hasFilter { - if cmd.match(update) { - matches++ - } else { - continue + if hasFilter { + if cmd.match(update) { + matches++ + } else { + continue + } } + + _ = cmd.WriteResult(c) } - _ = cmd.WriteResult(c) - } + entered = true - entered = true + if hasFilter { + if matches > 0 { + return true + } - if hasFilter { - if matches > 0 { - return true + return false } - return false - } - - cmd.n-- + cmd.n-- - return cmd.n == -1 + return cmd.n == -1 && cmd.wait == 0 + }) }) } diff --git a/property/collector.go b/property/collector.go index ccf712cf9..80e5dfbe3 100644 --- a/property/collector.go +++ b/property/collector.go @@ -111,6 +111,12 @@ func (p *Collector) WaitForUpdates(ctx context.Context, v string) (*types.Update return res.Returnval, nil } +func (p *Collector) CancelWaitForUpdates(ctx context.Context) error { + req := &types.CancelWaitForUpdates{This: p.Reference()} + _, err := methods.CancelWaitForUpdates(ctx, p.roundTripper, req) + return err +} + func (p *Collector) RetrieveProperties(ctx context.Context, req types.RetrieveProperties) (*types.RetrievePropertiesResponse, error) { req.This = p.Reference() return methods.RetrieveProperties(ctx, p.roundTripper, &req) diff --git a/property/wait.go b/property/wait.go index fe847926c..f730525ca 100644 --- a/property/wait.go +++ b/property/wait.go @@ -19,12 +19,14 @@ package property import ( "context" + "github.com/vmware/govmomi/vim25/methods" "github.com/vmware/govmomi/vim25/types" ) // WaitFilter provides helpers to construct a types.CreateFilter for use with property.Wait type WaitFilter struct { types.CreateFilter + Options *types.WaitOptions } // Add a new ObjectSpec and PropertySpec to the WaitFilter @@ -75,6 +77,7 @@ func Wait(ctx context.Context, c *Collector, obj types.ManagedObjectReference, p // creates a new property collector and calls CreateFilter. A new property // collector is required because filters can only be added, not removed. // +// If the Context is canceled, a call to CancelWaitForUpdates() is made and its error value is returned. // The newly created collector is destroyed before this function returns (both // in case of success or error). // @@ -85,7 +88,7 @@ func WaitForUpdates(ctx context.Context, c *Collector, filter *WaitFilter, f fun } // Attempt to destroy the collector using the background context, as the - // specified context may have timed out or have been cancelled. + // specified context may have timed out or have been canceled. defer p.Destroy(context.Background()) err = p.CreateFilter(ctx, filter.CreateFilter) @@ -93,20 +96,33 @@ func WaitForUpdates(ctx context.Context, c *Collector, filter *WaitFilter, f fun return err } - for version := ""; ; { - res, err := p.WaitForUpdates(ctx, version) + req := types.WaitForUpdatesEx{ + This: p.Reference(), + Options: filter.Options, + } + + for { + res, err := methods.WaitForUpdatesEx(ctx, p.roundTripper, &req) if err != nil { + if ctx.Err() == context.Canceled { + werr := p.CancelWaitForUpdates(context.Background()) + return werr + } return err } - // Retry if the result came back empty - if res == nil { + set := res.Returnval + if set == nil { + if req.Options != nil && req.Options.MaxWaitSeconds != nil { + return nil // WaitOptions.MaxWaitSeconds exceeded + } + // Retry if the result came back empty continue } - version = res.Version + req.Version = set.Version - for _, fs := range res.FilterSet { + for _, fs := range set.FilterSet { if f(fs.ObjectSet) { return nil } diff --git a/scripts/debug-xmlformat.sh b/scripts/debug-xmlformat.sh index a33a28a51..75f72484a 100755 --- a/scripts/debug-xmlformat.sh +++ b/scripts/debug-xmlformat.sh @@ -13,6 +13,8 @@ for file in *.req.xml; do header Request "$file" "${base}.req.headers" xmlformat < "$file" file="${base}.res.xml" - header Response "$file" "${base}.res.headers" - xmlformat < "$file" + if [ -e "$file" ] ; then + header Response "$file" "${base}.res.headers" + xmlformat < "$file" + fi done diff --git a/simulator/event_manager.go b/simulator/event_manager.go index d68f76310..ff08d90fe 100644 --- a/simulator/event_manager.go +++ b/simulator/event_manager.go @@ -157,14 +157,15 @@ func (m *EventManager) PostEvent(ctx *Context, req *types.PostEvent) soap.HasFau event.CreatedTime = time.Now() event.UserName = ctx.Session.UserName - m.page = m.page.Next() + m.page = m.page.Prev() m.page.Value = req.EventToPost m.formatMessage(req.EventToPost) for _, c := range m.collectors { if c.eventMatches(req.EventToPost) { - c.page = c.page.Next() - c.page.Value = event + c.page = c.page.Prev() + c.page.Value = req.EventToPost + Map.Update(c, []types.PropertyChange{{Name: "latestPage", Val: c.GetLatestPage()}}) } } diff --git a/simulator/property_collector.go b/simulator/property_collector.go index 41d8cef63..3f1576857 100644 --- a/simulator/property_collector.go +++ b/simulator/property_collector.go @@ -17,11 +17,14 @@ limitations under the License. package simulator import ( + "context" "errors" "log" "path" "reflect" "strings" + "sync" + "time" "github.com/vmware/govmomi/object" "github.com/vmware/govmomi/vim25/methods" @@ -32,10 +35,17 @@ import ( type PropertyCollector struct { mo.PropertyCollector + + nopLocker + updates map[types.ManagedObjectReference][]types.ObjectUpdate + mu sync.Mutex + cancel context.CancelFunc } func NewPropertyCollector(ref types.ManagedObjectReference) object.Reference { - s := &PropertyCollector{} + s := &PropertyCollector{ + updates: make(map[types.ManagedObjectReference][]types.ObjectUpdate), + } s.Self = ref return s } @@ -72,6 +82,10 @@ func getObject(ctx *Context, ref types.ManagedObjectReference) (reflect.Value, b obj = o.Get() } + return getManagedObject(obj), true +} + +func getManagedObject(obj mo.Reference) reflect.Value { rval := reflect.ValueOf(obj).Elem() rtype := rval.Type() @@ -82,26 +96,21 @@ func getObject(ctx *Context, ref types.ManagedObjectReference) (reflect.Value, b // for the case where the type has a field of the same name, for example: // mo.ResourcePool.ResourcePool for { - if path.Base(rtype.PkgPath()) != "mo" { - if rtype.Kind() != reflect.Struct || rtype.NumField() == 0 { - log.Printf("%#v does not have an embedded mo type", ref) - return reflect.Value{}, false - } - rval = rval.Field(0) - rtype = rval.Type() - } else { + if path.Base(rtype.PkgPath()) == "mo" { break } + if rtype.Kind() != reflect.Struct || rtype.NumField() == 0 { + log.Panicf("%#v does not have an embedded mo type", obj.Reference()) + } + rval = rval.Field(0) + rtype = rval.Type() } - return rval, true + return rval } -func fieldValueInterface(f reflect.StructField, rval reflect.Value) interface{} { - if rval.Kind() == reflect.Ptr { - rval = rval.Elem() - } - +// wrapValue converts slice types to the appropriate ArrayOf type used in property collector responses. +func wrapValue(rval reflect.Value, rtype reflect.Type) interface{} { pval := rval.Interface() if rval.Kind() == reflect.Slice { @@ -128,7 +137,7 @@ func fieldValueInterface(f reflect.StructField, rval reflect.Value) interface{} Long: v, } default: - kind := f.Type.Elem().Name() + kind := rtype.Elem().Name() // Remove govmomi interface prefix name if strings.HasPrefix(kind, "Base") { kind = kind[4:] @@ -143,6 +152,14 @@ func fieldValueInterface(f reflect.StructField, rval reflect.Value) interface{} return pval } +func fieldValueInterface(f reflect.StructField, rval reflect.Value) interface{} { + if rval.Kind() == reflect.Ptr { + rval = rval.Elem() + } + + return wrapValue(rval, f.Type) +} + func fieldValue(rval reflect.Value, p string) (interface{}, error) { var value interface{} fields := strings.Split(p, ".") @@ -429,7 +446,10 @@ func (pc *PropertyCollector) collect(ctx *Context, r *types.RetrievePropertiesEx func (pc *PropertyCollector) CreateFilter(ctx *Context, c *types.CreateFilter) soap.HasFault { body := &methods.CreateFilterBody{} - filter := &PropertyFilter{pc: pc} + filter := &PropertyFilter{ + pc: pc, + refs: make(map[types.ManagedObjectReference]struct{}), + } filter.PartialUpdates = c.PartialUpdates filter.Spec = c.Spec @@ -445,7 +465,9 @@ func (pc *PropertyCollector) CreateFilter(ctx *Context, c *types.CreateFilter) s func (pc *PropertyCollector) CreatePropertyCollector(ctx *Context, c *types.CreatePropertyCollector) soap.HasFault { body := &methods.CreatePropertyCollectorBody{} - cpc := &PropertyCollector{} + cpc := &PropertyCollector{ + updates: make(map[types.ManagedObjectReference][]types.ObjectUpdate), + } body.Res = &types.CreatePropertyCollectorResponse{ Returnval: ctx.Session.Put(cpc).Reference(), @@ -457,9 +479,13 @@ func (pc *PropertyCollector) CreatePropertyCollector(ctx *Context, c *types.Crea func (pc *PropertyCollector) DestroyPropertyCollector(ctx *Context, c *types.DestroyPropertyCollector) soap.HasFault { body := &methods.DestroyPropertyCollectorBody{} + if pc.cancel != nil { + pc.cancel() + } + for _, ref := range pc.Filter { filter := ctx.Session.Get(ref).(*PropertyFilter) - filter.DestroyPropertyFilter(&types.DestroyPropertyFilter{This: ref}) + filter.DestroyPropertyFilter(ctx, &types.DestroyPropertyFilter{This: ref}) } ctx.Session.Remove(c.This) @@ -519,24 +545,46 @@ func (pc *PropertyCollector) RetrieveProperties(ctx *Context, r *types.RetrieveP } func (pc *PropertyCollector) CancelWaitForUpdates(r *types.CancelWaitForUpdates) soap.HasFault { + pc.mu.Lock() + if pc.cancel != nil { + pc.cancel() + } + pc.mu.Unlock() + return &methods.CancelWaitForUpdatesBody{Res: new(types.CancelWaitForUpdatesResponse)} } -func (pc *PropertyCollector) WaitForUpdatesEx(ctx *Context, r *types.WaitForUpdatesEx) soap.HasFault { - body := &methods.WaitForUpdatesExBody{} +func (pc *PropertyCollector) update(u types.ObjectUpdate) { + pc.mu.Lock() + pc.updates[u.Obj] = append(pc.updates[u.Obj], u) + pc.mu.Unlock() +} - // At the moment we need to support Task completion. Handlers can simply set the Task - // state before returning and the non-incremental update is enough for the client. - // We can wait for incremental updates to simulate timeouts, etc. - if r.Version != "" { - body.Fault_ = Fault("incremental updates not supported yet", &types.NotSupported{}) - return body - } +func (pc *PropertyCollector) PutObject(o mo.Reference) { + pc.update(types.ObjectUpdate{ + Obj: o.Reference(), + Kind: types.ObjectUpdateKindEnter, + ChangeSet: nil, + }) +} - update := &types.UpdateSet{ - Version: "-", - } +func (pc *PropertyCollector) UpdateObject(o mo.Reference, changes []types.PropertyChange) { + pc.update(types.ObjectUpdate{ + Obj: o.Reference(), + Kind: types.ObjectUpdateKindModify, + ChangeSet: changes, + }) +} + +func (pc *PropertyCollector) RemoveObject(ref types.ManagedObjectReference) { + pc.update(types.ObjectUpdate{ + Obj: ref, + Kind: types.ObjectUpdateKindLeave, + ChangeSet: nil, + }) +} +func (pc *PropertyCollector) apply(ctx *Context, update *types.UpdateSet) types.BaseMethodFault { for _, ref := range pc.Filter { filter := ctx.Session.Get(ref).(*PropertyFilter) @@ -545,8 +593,7 @@ func (pc *PropertyCollector) WaitForUpdatesEx(ctx *Context, r *types.WaitForUpda res, fault := pc.collect(ctx, r) if fault != nil { - body.Fault_ = Fault("", fault) - return body + return fault } fu := types.PropertyFilterUpdate{ @@ -554,6 +601,10 @@ func (pc *PropertyCollector) WaitForUpdatesEx(ctx *Context, r *types.WaitForUpda } for _, o := range res.Objects { + if _, ok := filter.refs[o.Obj]; ok { + continue + } + filter.refs[o.Obj] = struct{}{} ou := types.ObjectUpdate{ Obj: o.Obj, Kind: types.ObjectUpdateKindEnter, @@ -570,14 +621,121 @@ func (pc *PropertyCollector) WaitForUpdatesEx(ctx *Context, r *types.WaitForUpda fu.ObjectSet = append(fu.ObjectSet, ou) } - update.FilterSet = append(update.FilterSet, fu) + if len(fu.ObjectSet) != 0 { + update.FilterSet = append(update.FilterSet, fu) + } + } + return nil +} + +func (pc *PropertyCollector) WaitForUpdatesEx(ctx *Context, r *types.WaitForUpdatesEx) soap.HasFault { + wait := context.Background() + pc.mu.Lock() + wait, pc.cancel = context.WithCancel(wait) + if r.Options != nil { + if max := r.Options.MaxWaitSeconds; max != nil { + wait, pc.cancel = context.WithTimeout(ctx, time.Second*time.Duration(*max)) + } + } + pc.mu.Unlock() + + body := &methods.WaitForUpdatesExBody{} + + set := &types.UpdateSet{ + Version: r.Version, } body.Res = &types.WaitForUpdatesExResponse{ - Returnval: update, + Returnval: set, } - return body + apply := func() bool { + if fault := pc.apply(ctx, set); fault != nil { + body.Fault_ = Fault("", fault) + body.Res = nil + return false + } + return true + } + + if r.Version == "" { + apply() + set.Version = "-" + Map.AddHandler(pc) + return body + } + + ticker := time.NewTicker(100 * time.Millisecond) // allow for updates to accumulate + defer ticker.Stop() + for { + select { + case <-wait.Done(): + body.Res.Returnval = nil + switch wait.Err() { + case context.Canceled: + log.Printf("%s: WaitForUpdates canceled", pc.Self) + body.Fault_ = Fault("", new(types.RequestCanceled)) // CancelWaitForUpdates was called + body.Res = nil + case context.DeadlineExceeded: + log.Printf("%s: WaitForUpdates MaxWaitSeconds exceeded", pc.Self) + } + + return body + case <-ticker.C: + pc.mu.Lock() + updates := pc.updates + if len(updates) != 0 { + pc.updates = make(map[types.ManagedObjectReference][]types.ObjectUpdate) + } + pc.mu.Unlock() + if len(updates) == 0 { + continue + } + + log.Printf("%s: %d updated objects, applying to %d filters", pc.Self, len(updates), len(pc.Filter)) + + for _, f := range pc.Filter { + filter := ctx.Session.Get(f).(*PropertyFilter) + fu := types.PropertyFilterUpdate{ + Filter: f, + } + + for ref, changes := range updates { + log.Printf("%s has %d changes", ref, len(changes)) + for _, change := range changes { + switch change.Kind { + case types.ObjectUpdateKindEnter: + if !apply() { + return body + } + case types.ObjectUpdateKindModify: + if !apply() { + return body + } + if _, ok := filter.refs[ref]; ok { + change = filter.apply(ctx, change) + if len(change.ChangeSet) != 0 { + fu.ObjectSet = append(fu.ObjectSet, change) + } + } + case types.ObjectUpdateKindLeave: + if _, ok := filter.refs[ref]; !ok { + continue + } + delete(filter.refs, ref) + } + } + } + + if len(fu.ObjectSet) != 0 { + set.FilterSet = append(set.FilterSet, fu) + } + } + if len(set.FilterSet) != 0 { + return body + } + } + } } // WaitForUpdates is deprecated, but pyvmomi is still using it at the moment. diff --git a/simulator/property_collector_test.go b/simulator/property_collector_test.go index e7e5283a9..2af8b274d 100644 --- a/simulator/property_collector_test.go +++ b/simulator/property_collector_test.go @@ -19,6 +19,7 @@ package simulator import ( "context" "reflect" + "sync" "testing" "github.com/vmware/govmomi" @@ -250,6 +251,7 @@ func TestWaitForUpdates(t *testing.T) { t.Fatal(err) } + updates := make(chan bool) cb := func(once bool) func([]types.PropertyChange) bool { return func(pc []types.PropertyChange) bool { if len(pc) != 1 { @@ -267,6 +269,9 @@ func TestWaitForUpdates(t *testing.T) { t.Fail() } + if once == false { + updates <- true + } return once } } @@ -279,11 +284,16 @@ func TestWaitForUpdates(t *testing.T) { t.Error(err) } - // incremental updates not yet suppported - err = property.Wait(ctx, pc, folder.Reference(), props, cb(false)) - if err == nil { - t.Error("expected error") - } + wctx, cancel := context.WithCancel(ctx) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + _ = property.Wait(wctx, pc, folder.Reference(), props, cb(false)) + }() + <-updates + cancel() + wg.Wait() // test object not found Map.Remove(folder.Reference()) @@ -305,12 +315,104 @@ func TestWaitForUpdates(t *testing.T) { t.Fatal(err) } - _, err = methods.CancelWaitForUpdates(ctx, c.Client, &types.CancelWaitForUpdates{This: p.Reference()}) + err = p.CancelWaitForUpdates(ctx) if err != nil { t.Fatal(err) } } +func TestIncrementalWaitForUpdates(t *testing.T) { + ctx := context.Background() + + m := VPX() + + defer m.Remove() + + err := m.Create() + if err != nil { + t.Fatal(err) + } + + s := m.Service.NewServer() + defer s.Close() + + c, err := govmomi.NewClient(ctx, s.URL, true) + if err != nil { + t.Fatal(err) + } + + pc := property.DefaultCollector(c.Client) + obj := Map.Any("VirtualMachine").(*VirtualMachine) + ref := obj.Reference() + vm := object.NewVirtualMachine(c.Client, ref) + + tests := []struct { + name string + props []string + }{ + {"1 field", []string{"runtime.powerState"}}, + {"2 fields", []string{"summary.runtime.powerState", "summary.runtime.bootTime"}}, + {"3 fields", []string{"runtime.powerState", "summary.runtime.powerState", "summary.runtime.bootTime"}}, + {"parent field", []string{"runtime"}}, + {"nested parent field", []string{"summary.runtime"}}, + {"all", nil}, + } + + // toggle power state to generate updates + state := map[types.VirtualMachinePowerState]func(context.Context) (*object.Task, error){ + types.VirtualMachinePowerStatePoweredOff: vm.PowerOn, + types.VirtualMachinePowerStatePoweredOn: vm.PowerOff, + } + + for i, test := range tests { + var props []string + matches := false + wait := make(chan bool) + filter := new(property.WaitFilter).Add(ref, ref.Type, test.props) + + go func() { + perr := property.WaitForUpdates(ctx, pc, filter, func(updates []types.ObjectUpdate) bool { + for _, update := range updates { + if update.Kind == types.ObjectUpdateKindEnter { + wait <- true + continue + } + for _, change := range update.ChangeSet { + props = append(props, change.Name) + } + } + + if test.props == nil { + // special case to test All flag + matches = isTrue(filter.Spec.PropSet[0].All) && len(props) > 1 + + return matches + } + + if len(props) > len(test.props) { + return true + } + + matches = reflect.DeepEqual(props, test.props) + return matches + }) + + if perr != nil { + t.Fatal(perr) + } + wait <- true + }() + + <-wait // wait for enter + _, _ = state[obj.Runtime.PowerState](ctx) + <-wait // wait for modify + + if !matches { + t.Errorf("%d: updates=%s, expected=%s", i, props, test.props) + } + } +} + func TestPropertyCollectorWithUnsetValues(t *testing.T) { ctx := context.Background() @@ -464,22 +566,6 @@ func TestExtractEmbeddedField(t *testing.T) { if obj.Type() != reflect.ValueOf(new(mo.ResourcePool)).Elem().Type() { t.Errorf("unexpected type=%s", obj.Type().Name()) } - - // satisfies the mo.Reference interface, but does not embed a type from the "mo" package - type NoMo struct { - types.ManagedObjectReference - - Self types.ManagedObjectReference - } - - n := new(NoMo) - n.ManagedObjectReference = types.ManagedObjectReference{Type: "NoMo", Value: "no-mo"} - Map.Put(n) - - _, ok = getObject(internalContext, n.Reference()) - if ok { - t.Error("expected not ok") - } } func TestPropertyCollectorFold(t *testing.T) { diff --git a/simulator/property_filter.go b/simulator/property_filter.go index 99a74fb1c..930649806 100644 --- a/simulator/property_filter.go +++ b/simulator/property_filter.go @@ -17,6 +17,9 @@ limitations under the License. package simulator import ( + "reflect" + "strings" + "github.com/vmware/govmomi/vim25/methods" "github.com/vmware/govmomi/vim25/mo" "github.com/vmware/govmomi/vim25/soap" @@ -26,17 +29,72 @@ import ( type PropertyFilter struct { mo.PropertyFilter - pc *PropertyCollector + pc *PropertyCollector + refs map[types.ManagedObjectReference]struct{} } -func (f *PropertyFilter) DestroyPropertyFilter(c *types.DestroyPropertyFilter) soap.HasFault { +func (f *PropertyFilter) DestroyPropertyFilter(ctx *Context, c *types.DestroyPropertyFilter) soap.HasFault { body := &methods.DestroyPropertyFilterBody{} RemoveReference(&f.pc.Filter, c.This) - Map.Remove(c.This) + ctx.Session.Remove(c.This) body.Res = &types.DestroyPropertyFilterResponse{} return body } + +// matches returns true if the change matches one of the filter Spec.PropSet +func (f *PropertyFilter) matches(obj mo.Reference, change *types.PropertyChange) bool { + ref := obj.Reference() + + for _, p := range f.Spec.PropSet { + if p.Type != ref.Type { + return false + } + + if isTrue(p.All) { + return true + } + + for _, name := range p.PathSet { + if name == change.Name { + return true + } + + // strings.HasPrefix("runtime.powerState", "runtime") == parent field matches + if strings.HasPrefix(change.Name, name) { + change.Name = name + change.Val, _ = fieldValue(reflect.ValueOf(obj), name) + + return true + } + } + } + + return false +} + +// apply the PropertyFilter.Spec to the given ObjectUpdate +func (f *PropertyFilter) apply(ctx *Context, change types.ObjectUpdate) types.ObjectUpdate { + obj := ctx.Map.Get(change.Obj) + parents := make(map[string]bool) + set := change.ChangeSet + change.ChangeSet = nil + + for i, p := range set { + if f.matches(obj, &p) { + if p.Name != set[i].Name { + // update matches a parent field from the spec. + if parents[p.Name] { + continue // only return 1 instance of the parent + } + parents[p.Name] = true + } + change.ChangeSet = append(change.ChangeSet, p) + } + } + + return change +} diff --git a/simulator/race_test.go b/simulator/race_test.go index bf7ae327e..90de4b966 100644 --- a/simulator/race_test.go +++ b/simulator/race_test.go @@ -98,7 +98,7 @@ func TestRace(t *testing.T) { t.Error(terr) } go func() { - for _ = range ticker.C { + for range ticker.C { var content []types.ObjectContent rerr := pc.RetrieveOne(ctx, info.Result.(types.ManagedObjectReference), nil, &content) if rerr != nil { @@ -116,11 +116,29 @@ func TestRace(t *testing.T) { for i := range vms { vm := vms[i] + wctx, cancel := context.WithCancel(ctx) wg.Add(1) + updates := make(chan bool) go func() { defer wg.Done() + werr := property.Wait(wctx, pc, vm.Reference(), []string{"runtime.powerState"}, func(_ []types.PropertyChange) bool { + updates <- true + return false + }) + if werr != nil { + if werr != context.Canceled { + t.Error(werr) + } + } + }() + wg.Add(1) + go func() { + defer wg.Done() + // wait for routine above to get the first update before changing power state, otherwise cancel() may happen too soon + <-updates task, _ := vm.PowerOff(ctx) _ = task.Wait(ctx) + cancel() }() } }() diff --git a/simulator/registry.go b/simulator/registry.go index 904b549cf..fc6351d11 100644 --- a/simulator/registry.go +++ b/simulator/registry.go @@ -46,10 +46,11 @@ var refValueMap = map[string]string{ // Map is the default Registry instance. var Map = NewRegistry() -// RegisterObject interface supports callbacks when objects are added and removed from the Registry +// RegisterObject interface supports callbacks when objects are added, modified and removed from the Registry type RegisterObject interface { mo.Reference PutObject(mo.Reference) + UpdateObject(mo.Reference, []types.PropertyChange) RemoveObject(types.ManagedObjectReference) } @@ -126,7 +127,9 @@ func (r *Registry) setReference(item mo.Reference, ref types.ManagedObjectRefere // AddHandler adds a RegisterObject handler to the Registry. func (r *Registry) AddHandler(h RegisterObject) { + r.m.Lock() r.handlers[h.Reference()] = h + r.m.Unlock() } // NewEntity sets Entity().Self with a new, unique Value. @@ -173,10 +176,23 @@ func (r *Registry) Any(kind string) mo.Entity { return nil } +// handlersApply calls the given func for each r.handlers +func (r *Registry) handlersApply(f func(o RegisterObject)) { + r.m.Lock() + handlers := make([]RegisterObject, 0, len(r.handlers)) + for _, handler := range r.handlers { + handlers = append(handlers, handler) + } + r.m.Unlock() + + for i := range handlers { + f(handlers[i]) + } +} + // Put adds a new object to Registry, generating a ManagedObjectReference if not already set. func (r *Registry) Put(item mo.Reference) mo.Reference { r.m.Lock() - defer r.m.Unlock() ref := item.Reference() if ref.Type == "" || ref.Value == "" { @@ -192,25 +208,50 @@ func (r *Registry) Put(item mo.Reference) mo.Reference { r.objects[ref] = item - for _, h := range r.handlers { - h.PutObject(item) - } + r.m.Unlock() + + r.handlersApply(func(o RegisterObject) { + o.PutObject(item) + }) return item } // Remove removes an object from the Registry. func (r *Registry) Remove(item types.ManagedObjectReference) { - r.m.Lock() - defer r.m.Unlock() - - for _, h := range r.handlers { - h.RemoveObject(item) - } + r.handlersApply(func(o RegisterObject) { + o.RemoveObject(item) + }) + r.m.Lock() delete(r.objects, item) delete(r.handlers, item) delete(r.locks, item) + r.m.Unlock() +} + +// Update dispatches object property changes to RegisterObject handlers, +// such as any PropertyCollector instances with in-progress WaitForUpdates calls. +// The changes are also applied to the given object via mo.ApplyPropertyChange, +// so there is no need to set object fields directly. +func (r *Registry) Update(obj mo.Reference, changes []types.PropertyChange) { + for i := range changes { + if changes[i].Op == "" { + changes[i].Op = types.PropertyChangeOpAssign + } + if changes[i].Val != nil { + rval := reflect.ValueOf(changes[i].Val) + changes[i].Val = wrapValue(rval, rval.Type()) + } + } + + val := getManagedObject(obj).Addr().Interface().(mo.Reference) + + mo.ApplyPropertyChange(val, changes) + + r.handlersApply(func(o RegisterObject) { + o.UpdateObject(val, changes) + }) } // getEntityParent traverses up the inventory and returns the first object of type kind. @@ -444,3 +485,9 @@ func (r *Registry) WithLock(obj mo.Reference, f func()) { } f() } + +// nopLocker can be embedded to opt-out of auto-locking (see Registry.WithLock) +type nopLocker struct{} + +func (*nopLocker) Lock() {} +func (*nopLocker) Unlock() {} diff --git a/simulator/task.go b/simulator/task.go index d6dd5bc41..ce18497c9 100644 --- a/simulator/task.go +++ b/simulator/task.go @@ -63,7 +63,7 @@ func CreateTask(e mo.Reference, name string, run func(*Task) (types.AnyType, typ task.Info.DescriptionId = fmt.Sprintf("%s.%s", ref.Type, id) task.Info.Entity = &ref task.Info.EntityName = ref.Value - + task.Info.Reason = &types.TaskReasonUser{UserName: "vcsim"} // TODO: Context.Session.User task.Info.QueueTime = time.Now() task.Info.State = types.TaskInfoStateQueued diff --git a/simulator/task_manager.go b/simulator/task_manager.go index df2710825..c7e900ce6 100644 --- a/simulator/task_manager.go +++ b/simulator/task_manager.go @@ -17,6 +17,8 @@ limitations under the License. package simulator import ( + "sync" + "github.com/vmware/govmomi/object" "github.com/vmware/govmomi/vim25/mo" "github.com/vmware/govmomi/vim25/types" @@ -26,6 +28,7 @@ var recentTaskMax = 200 // the VC limit type TaskManager struct { mo.TaskManager + mu sync.Mutex } func NewTaskManager(ref types.ManagedObjectReference) object.Reference { @@ -41,12 +44,16 @@ func (m *TaskManager) PutObject(obj mo.Reference) { return } - m.RecentTask = append(m.RecentTask, ref) - - if len(m.RecentTask) > recentTaskMax { - m.RecentTask = m.RecentTask[1:] + m.mu.Lock() + defer m.mu.Unlock() + recent := append(m.RecentTask, ref) + if len(recent) > recentTaskMax { + recent = recent[1:] } -} -func (m *TaskManager) RemoveObject(_ types.ManagedObjectReference) { + Map.Update(m, []types.PropertyChange{{Name: "recentTask", Val: recent}}) } + +func (*TaskManager) RemoveObject(types.ManagedObjectReference) {} + +func (*TaskManager) UpdateObject(mo.Reference, []types.PropertyChange) {} diff --git a/simulator/view_manager.go b/simulator/view_manager.go index 78a5dadb1..133d9ff56 100644 --- a/simulator/view_manager.go +++ b/simulator/view_manager.go @@ -137,7 +137,8 @@ type ContainerView struct { types map[string]bool } -func (v *ContainerView) DestroyView(c *types.DestroyView) soap.HasFault { +func (v *ContainerView) DestroyView(ctx *Context, c *types.DestroyView) soap.HasFault { + ctx.Session.Remove(c.This) return destroyView(c.This) } @@ -192,3 +193,83 @@ func (v *ContainerView) add(root mo.Reference, seen map[types.ManagedObjectRefer } }) } + +func (m *ViewManager) CreateListView(ctx *Context, req *types.CreateListView) soap.HasFault { + body := new(methods.CreateListViewBody) + list := new(ListView) + + if err := list.add(req.Obj); err != nil { + body.Fault_ = Fault("", err) + return body + } + + ctx.Session.Put(list) + + body.Res = &types.CreateListViewResponse{ + Returnval: list.Self, + } + + return body +} + +type ListView struct { + mo.ListView +} + +func (v *ListView) update() { + Map.Update(v, []types.PropertyChange{{Name: "view", Val: v.View}}) +} + +func (v *ListView) add(refs []types.ManagedObjectReference) *types.ManagedObjectNotFound { + for _, ref := range refs { + obj := Map.Get(ref) + if obj == nil { + return &types.ManagedObjectNotFound{Obj: ref} + } + v.View = append(v.View, ref) + } + return nil +} + +func (v *ListView) DestroyView(ctx *Context, c *types.DestroyView) soap.HasFault { + ctx.Session.Remove(c.This) + return destroyView(c.This) +} + +func (v *ListView) ModifyListView(req *types.ModifyListView) soap.HasFault { + body := new(methods.ModifyListViewBody) + + for _, ref := range req.Remove { + RemoveReference(&v.View, ref) + } + + if err := v.add(req.Add); err != nil { + body.Fault_ = Fault("", err) + return body + } + + body.Res = new(types.ModifyListViewResponse) + + if len(req.Remove) != 0 || len(req.Add) != 0 { + v.update() + } + + return body +} + +func (v *ListView) ResetListView(req *types.ResetListView) soap.HasFault { + body := new(methods.ResetListViewBody) + + v.View = nil + + if err := v.add(req.Obj); err != nil { + body.Fault_ = Fault("", err) + return body + } + + body.Res = new(types.ResetListViewResponse) + + v.update() + + return body +} diff --git a/simulator/virtual_machine.go b/simulator/virtual_machine.go index ccd25dab5..ca0890839 100644 --- a/simulator/virtual_machine.go +++ b/simulator/virtual_machine.go @@ -709,15 +709,9 @@ func (c *powerVMTask) Run(task *Task) (types.AnyType, types.BaseMethodFault) { } } - c.VirtualMachine.Runtime.PowerState = c.state - c.VirtualMachine.Summary.Runtime.PowerState = c.state - - bt := &c.VirtualMachine.Summary.Runtime.BootTime + var boot types.AnyType if c.state == types.VirtualMachinePowerStatePoweredOn { - now := time.Now() - *bt = &now - } else { - *bt = nil + boot = time.Now() } event := c.event() @@ -731,6 +725,12 @@ func (c *powerVMTask) Run(task *Task) (types.AnyType, types.BaseMethodFault) { c.ctx.postEvent(&types.VmPoweredOffEvent{VmEvent: event}) } + Map.Update(c.VirtualMachine, []types.PropertyChange{ + {Name: "runtime.powerState", Val: c.state}, + {Name: "summary.runtime.powerState", Val: c.state}, + {Name: "summary.runtime.bootTime", Val: boot}, + }) + return nil, nil } @@ -935,29 +935,33 @@ func (vm *VirtualMachine) CloneVMTask(ctx *Context, req *types.CloneVM_Task) soa func (vm *VirtualMachine) RelocateVMTask(req *types.RelocateVM_Task) soap.HasFault { task := CreateTask(vm, "relocateVm", func(t *Task) (types.AnyType, types.BaseMethodFault) { + var changes []types.PropertyChange + if ref := req.Spec.Datastore; ref != nil { ds := Map.Get(*ref).(*Datastore) Map.RemoveReference(ds, &ds.Vm, *ref) - vm.Datastore = []types.ManagedObjectReference{*ref} - // TODO: migrate vm.Config.Files (and vm.Summary.Config.VmPathName) + + changes = append(changes, types.PropertyChange{Name: "datastore", Val: []types.ManagedObjectReference{*ref}}) } if ref := req.Spec.Pool; ref != nil { pool := Map.Get(*ref).(*ResourcePool) Map.RemoveReference(pool, &pool.Vm, *ref) - vm.ResourcePool = ref + changes = append(changes, types.PropertyChange{Name: "resourcePool", Val: *ref}) } if ref := req.Spec.Host; ref != nil { host := Map.Get(*ref).(*HostSystem) Map.RemoveReference(host, &host.Vm, *ref) - vm.Runtime.Host = ref + changes = append(changes, types.PropertyChange{Name: "runtime.host", Val: *ref}) } + Map.Update(vm, changes) + return nil, nil }) @@ -1060,7 +1064,7 @@ func (vm *VirtualMachine) RemoveAllSnapshotsTask(req *types.RemoveAllSnapshots_T } } -func (vm *VirtualMachine) ShutdownGuest(c *types.ShutdownGuest) soap.HasFault { +func (vm *VirtualMachine) ShutdownGuest(ctx *Context, c *types.ShutdownGuest) soap.HasFault { r := &methods.ShutdownGuestBody{} // should be poweron if vm.Runtime.PowerState == types.VirtualMachinePowerStatePoweredOff { @@ -1075,6 +1079,17 @@ func (vm *VirtualMachine) ShutdownGuest(c *types.ShutdownGuest) soap.HasFault { vm.Runtime.PowerState = types.VirtualMachinePowerStatePoweredOff vm.Summary.Runtime.PowerState = types.VirtualMachinePowerStatePoweredOff + event := vm.event() + ctx.postEvent( + &types.VmGuestShutdownEvent{VmEvent: event}, + &types.VmPoweredOffEvent{VmEvent: event}, + ) + + Map.Update(vm, []types.PropertyChange{ + {Name: "runtime.powerState", Val: types.VirtualMachinePowerStatePoweredOff}, + {Name: "summary.runtime.powerState", Val: types.VirtualMachinePowerStatePoweredOff}, + }) + r.Res = new(types.ShutdownGuestResponse) return r diff --git a/simulator/virtual_machine_test.go b/simulator/virtual_machine_test.go index 718d95cc6..d6d379c36 100644 --- a/simulator/virtual_machine_test.go +++ b/simulator/virtual_machine_test.go @@ -229,8 +229,11 @@ func TestCreateVm(t *testing.T) { } } - return false + return true }) + if err != nil { + t.Error(err) + } } } diff --git a/vim25/mo/retrieve.go b/vim25/mo/retrieve.go index e7ffc32ce..8addb1344 100644 --- a/vim25/mo/retrieve.go +++ b/vim25/mo/retrieve.go @@ -65,6 +65,22 @@ func ObjectContentToType(o types.ObjectContent) (interface{}, error) { return v.Elem().Interface(), nil } +// ApplyPropertyChange converts the response of a call to WaitForUpdates +// and applies it to the given managed object. +func ApplyPropertyChange(obj Reference, changes []types.PropertyChange) { + t := typeInfoForType(obj.Reference().Type) + v := reflect.ValueOf(obj) + + for _, p := range changes { + rv, ok := t.props[p.Name] + if !ok { + continue + } + + assignValue(v, rv, anyTypeToValue(p.Val)) + } +} + // LoadRetrievePropertiesResponse converts the response of a call to // RetrieveProperties to one or more managed objects. func LoadRetrievePropertiesResponse(res *types.RetrievePropertiesResponse, dst interface{}) error { diff --git a/vim25/mo/type_info.go b/vim25/mo/type_info.go index 0c9e5b034..2e9335037 100644 --- a/vim25/mo/type_info.go +++ b/vim25/mo/type_info.go @@ -155,6 +155,8 @@ func (t *typeInfo) build(typ reflect.Type, fn string, fi []int) { } } +var nilValue reflect.Value + // assignValue assignes a value 'pv' to the struct pointed to by 'val', given a // slice of field indices. It recurses into the struct until it finds the field // specified by the indices. It creates new values for pointer types where @@ -172,6 +174,11 @@ func assignValue(val reflect.Value, fi []int, pv reflect.Value) { rv := val.Field(fi[0]) fi = fi[1:] if len(fi) == 0 { + if pv == nilValue { + pv = reflect.Zero(rv.Type()) + rv.Set(pv) + return + } rt := rv.Type() pt := pv.Type() @@ -200,7 +207,7 @@ func assignValue(val reflect.Value, fi []int, pv reflect.Value) { } else if rt.ConvertibleTo(pt) { rv.Set(pv.Convert(rt)) } else { - panic(fmt.Sprintf("cannot assign %s (%s) to %s (%s)", rt.Name(), rt.Kind(), pt.Name(), pt.Kind())) + panic(fmt.Sprintf("cannot assign %q (%s) to %q (%s)", rt.Name(), rt.Kind(), pt.Name(), pt.Kind())) } return @@ -212,11 +219,19 @@ func assignValue(val reflect.Value, fi []int, pv reflect.Value) { var arrayOfRegexp = regexp.MustCompile("ArrayOf(.*)$") func anyTypeToValue(t interface{}) reflect.Value { + if t == nil { + return nilValue + } rt := reflect.TypeOf(t) rv := reflect.ValueOf(t) // Dereference if ArrayOfXYZ type - m := arrayOfRegexp.FindStringSubmatch(rt.Name()) + at := rt + if at.Kind() == reflect.Ptr { + at = at.Elem() + rv = rv.Elem() + } + m := arrayOfRegexp.FindStringSubmatch(at.Name()) if len(m) > 0 { // ArrayOfXYZ type has single field named XYZ rv = rv.FieldByName(m[1])