From 8185b9287729b437b408461ecd6b4df0b8c22975 Mon Sep 17 00:00:00 2001 From: Izabella Raulin Date: Wed, 5 Jul 2017 10:33:13 +0200 Subject: [PATCH] Block unloading plugin which is used by running task --- control/available_plugin.go | 4 +- control/control.go | 34 +++++- control/control_test.go | 4 + control/fixtures/fixtures.go | 4 + control/metrics.go | 31 ++++++ control/plugin_manager.go | 16 ++- control/strategy/fixtures/fixtures.go | 5 +- control/subscription_group.go | 105 +++++++++++++++--- control/subscription_group_medium_test.go | 15 ++- core/plugin.go | 1 + mgmt/rest/client/client_func_test.go | 106 ++++++++++++------- mgmt/rest/v1/fixtures/mock_metric_manager.go | 12 ++- mgmt/rest/v2/mock/mock_metric_manager.go | 12 ++- 13 files changed, 272 insertions(+), 77 deletions(-) diff --git a/control/available_plugin.go b/control/available_plugin.go index ea838ebac..21878a36e 100644 --- a/control/available_plugin.go +++ b/control/available_plugin.go @@ -555,7 +555,7 @@ func (ap *availablePlugins) streamMetrics( } func (ap *availablePlugins) publishMetrics(metrics []core.Metric, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue, taskID string) []error { - key := strings.Join([]string{plugin.PublisherPluginType.String(), pluginName, strconv.Itoa(pluginVersion)}, core.Separator) + key := fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", plugin.PublisherPluginType.String(), pluginName, pluginVersion) pool, serr := ap.getPool(key) if serr != nil { return []error{serr} @@ -588,7 +588,7 @@ func (ap *availablePlugins) publishMetrics(metrics []core.Metric, pluginName str func (ap *availablePlugins) processMetrics(metrics []core.Metric, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue, taskID string) ([]core.Metric, []error) { var errs []error - key := strings.Join([]string{plugin.ProcessorPluginType.String(), pluginName, strconv.Itoa(pluginVersion)}, core.Separator) + key := fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", plugin.ProcessorPluginType.String(), pluginName, pluginVersion) pool, serr := ap.getPool(key) if serr != nil { errs = append(errs, serr) diff --git a/control/control.go b/control/control.go index 2e455ff7b..c2ab57848 100644 --- a/control/control.go +++ b/control/control.go @@ -146,6 +146,7 @@ type catalogsMetrics interface { Subscribe([]string, int) error Unsubscribe([]string, int) error GetPlugin(core.Namespace, int) (core.CatalogedPlugin, error) + GetPlugins(core.Namespace) ([]core.CatalogedPlugin, error) } type managesSigning interface { @@ -637,8 +638,34 @@ func (p *pluginControl) returnPluginDetails(rp *core.RequestedPlugin) (*pluginDe } func (p *pluginControl) Unload(pl core.Plugin) (core.CatalogedPlugin, serror.SnapError) { - up, err := p.pluginManager.UnloadPlugin(pl) + up, err := p.pluginManager.get(fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", pl.TypeName(), pl.Name(), pl.Version())) if err != nil { + se := serror.New(ErrPluginNotFound, map[string]interface{}{ + "plugin-name": pl.Name(), + "plugin-version": pl.Version(), + "plugin-type": pl.TypeName(), + }) + return nil, se + } + + if errs := p.subscriptionGroups.validatePluginUnloading(up); errs != nil { + impactOnTasks := []string{} + for _, err := range errs { + taskId := err.Fields()["task-id"].(string) + impactOnTasks = append(impactOnTasks, taskId) + } + se := serror.New(errorPluginCannotBeUnloaded(impactOnTasks), map[string]interface{}{ + "plugin-name": pl.Name(), + "plugin-version": pl.Version(), + "plugin-type": pl.TypeName(), + "impacted-tasks": impactOnTasks, + }) + return nil, se + } + + // unload the plugin means removing it from plugin catalog + // and, for collector plugins, removing its metrics from metric catalog + if _, err := p.pluginManager.UnloadPlugin(pl); err != nil { return nil, err } @@ -685,7 +712,6 @@ func (p *pluginControl) SwapPlugins(in *core.RequestedPlugin, out core.Cataloged } return serr } - up, err := p.pluginManager.UnloadPlugin(out) if err != nil { _, err2 := p.pluginManager.UnloadPlugin(lp) @@ -943,6 +969,10 @@ func (p *pluginControl) GetMetricVersions(ns core.Namespace) ([]core.CatalogedMe return rmts, nil } +func (p *pluginControl) GetPlugins(ns core.Namespace) ([]core.CatalogedPlugin, error) { + return p.metricCatalog.GetPlugins(ns) +} + func (p *pluginControl) MetricExists(mns core.Namespace, ver int) bool { _, err := p.metricCatalog.GetMetric(mns, ver) if err == nil { diff --git a/control/control_test.go b/control/control_test.go index da958eb6e..bed391b36 100644 --- a/control/control_test.go +++ b/control/control_test.go @@ -763,6 +763,10 @@ func (m *mc) GetPlugin(core.Namespace, int) (core.CatalogedPlugin, error) { return nil, nil } +func (m *mc) GetPlugins(core.Namespace) ([]core.CatalogedPlugin, error) { + return nil, nil +} + func (m *mc) GetVersions(core.Namespace) ([]*metricType, error) { return nil, nil } diff --git a/control/fixtures/fixtures.go b/control/fixtures/fixtures.go index a7b0f7102..5effc5273 100644 --- a/control/fixtures/fixtures.go +++ b/control/fixtures/fixtures.go @@ -22,6 +22,7 @@ package fixtures import ( "encoding/json" + "fmt" "time" "github.com/intelsdi-x/snap/core" @@ -142,6 +143,9 @@ func (m MockPlugin) Name() string { return m.name } func (m MockPlugin) TypeName() string { return m.pluginType.String() } func (m MockPlugin) Version() int { return m.ver } func (m MockPlugin) Config() *cdata.ConfigDataNode { return m.config } +func (m MockPlugin) Key() string { + return fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", m.pluginType.String(), m.name, m.ver) +} type MockRequestedMetric struct { namespace core.Namespace diff --git a/control/metrics.go b/control/metrics.go index 82e87e7cf..398befe35 100644 --- a/control/metrics.go +++ b/control/metrics.go @@ -289,6 +289,10 @@ func (cp *catalogedPlugin) Version() int { return cp.version } +func (cp *catalogedPlugin) Key() string { + return fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", cp.TypeName(), cp.Name(), cp.Version()) +} + func (cp *catalogedPlugin) IsSigned() bool { return cp.signed } @@ -612,6 +616,33 @@ func (mc *metricCatalog) GetPlugin(mns core.Namespace, ver int) (core.CatalogedP return mt.Plugin, nil } +func (mc *metricCatalog) GetPlugins(mns core.Namespace) ([]core.CatalogedPlugin, error) { + plugins := []core.CatalogedPlugin{} + pluginsMap := map[string]core.CatalogedPlugin{} + + mts, err := mc.tree.GetVersions(mns.Strings()) + if err != nil { + log.WithFields(log.Fields{ + "_module": "control", + "_file": "metrics.go,", + "_block": "get-plugins", + "error": err, + }).Error("error getting plugin") + return nil, err + } + for _, mt := range mts { + // iterate over metrics and add the plugin which exposes the following metric to a map + // under plugin key to ensure that plugins do not repeat + key := mt.Plugin.Key() + pluginsMap[key] = mt.Plugin + } + for _, plg := range pluginsMap { + plugins = append(plugins, plg) + } + + return plugins, nil +} + func appendIfMissing(keys []string, ns string) []string { for _, key := range keys { if ns == key { diff --git a/control/plugin_manager.go b/control/plugin_manager.go index 3ea3a3903..85e67b742 100644 --- a/control/plugin_manager.go +++ b/control/plugin_manager.go @@ -65,6 +65,8 @@ var ( ErrPluginNotFound = errors.New("plugin not found") // ErrPluginAlreadyLoaded - error message when a plugin is already loaded ErrPluginAlreadyLoaded = errors.New("plugin is already loaded") + // ErrPluginCannotBeUnloaded - error message when a plugin cannot be unloaded because is already in use by running task(s) + ErrPluginCannotBeUnloaded = errors.New("Plugin is used by running task. Stop the task to be able to unload the plugin") // ErrPluginNotInLoadedState - error message when a plugin must ne in a loaded state ErrPluginNotInLoadedState = errors.New("Plugin must be in a LoadedState") @@ -73,6 +75,15 @@ var ( defaultManagerOpts = []pluginManagerOpt{optDefaultManagerSecurity()} ) +func errorPluginCannotBeUnloaded(impactedTaskIDs []string) error { + var impactedTasks string + + for _, id := range impactedTaskIDs { + impactedTasks += fmt.Sprintf("\n%s", id) + } + return fmt.Errorf("%s:%s", ErrPluginCannotBeUnloaded, impactedTasks) +} + type pluginState string type loadedPlugins struct { @@ -687,6 +698,8 @@ func (p *pluginManager) UnloadPlugin(pl core.Plugin) (*loadedPlugin, serror.Snap "plugin-version": plugin.Version(), "plugin-path": plugin.Details.Path, }).Debugf("Removing plugin") + + // remove plugin binary from tempDirPath (do not apply for remote plugin) if strings.Contains(plugin.Details.Path, p.tempDirPath) { if err := os.RemoveAll(filepath.Dir(plugin.Details.Path)); err != nil { pmLogger.WithFields(log.Fields{ @@ -713,9 +726,10 @@ func (p *pluginManager) UnloadPlugin(pl core.Plugin) (*loadedPlugin, serror.Snap }).Debug("Nothing to delete as temp path is empty") } + // remove plugin key p.loadedPlugins.remove(plugin.Key()) - // Remove any metrics from the catalog if this was a collector + // remove any metrics from the catalog if this was a collector if plugin.TypeName() == core.CollectorPluginType.String() || plugin.TypeName() == core.StreamingCollectorPluginType.String() { p.metricCatalog.RmUnloadedPluginMetrics(plugin) } diff --git a/control/strategy/fixtures/fixtures.go b/control/strategy/fixtures/fixtures.go index 55ff80300..39b07c533 100644 --- a/control/strategy/fixtures/fixtures.go +++ b/control/strategy/fixtures/fixtures.go @@ -19,8 +19,7 @@ limitations under the License. package fixtures import ( - "strconv" - "strings" + "fmt" "time" "github.com/intelsdi-x/snap/control/plugin" @@ -140,7 +139,7 @@ func (m MockAvailablePlugin) LastHit() time.Time { } func (m MockAvailablePlugin) String() string { - return strings.Join([]string{m.pluginType.String(), m.pluginName, strconv.Itoa(m.Version())}, core.Separator) + return fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", m.pluginType.String(), m.pluginName, m.Version()) } func (m MockAvailablePlugin) Kill(string) error { diff --git a/control/subscription_group.go b/control/subscription_group.go index a8ace4dca..f60ed041a 100644 --- a/control/subscription_group.go +++ b/control/subscription_group.go @@ -61,6 +61,7 @@ type ManagesSubscriptionGroups interface { plugins []core.SubscribedPlugin, configTree *cdata.ConfigDataTree, asserts ...core.SubscribedPluginAssert) (serrs []serror.SnapError) validateMetric(metric core.Metric) (serrs []serror.SnapError) + validatePluginUnloading(*loadedPlugin) (errs []serror.SnapError) } type subscriptionGroup struct { @@ -253,6 +254,20 @@ func (s *subscriptionGroups) ValidateDeps(requested []core.RequestedMetric, return } +// validatePluginUnloading checks if process of unloading the plugin is safe for existing running tasks. +// If the plugin is used by running task and there is no replacements, return an error with appropriate message +// containing ids of tasks which use the plugin, what blocks unloading process until they are stopped +func (s *subscriptionGroups) validatePluginUnloading(pluginToUnload *loadedPlugin) (errs []serror.SnapError) { + s.Lock() + defer s.Unlock() + for id, group := range s.subscriptionMap { + if err := group.validatePluginUnloading(id, pluginToUnload); err != nil { + errs = append(errs, err) + } + } + return errs +} + func (p *subscriptionGroups) validatePluginSubscription(pl core.SubscribedPlugin, mergedConfig *cdata.ConfigDataNode) []serror.SnapError { var serrs = []serror.SnapError{} controlLogger.WithFields(log.Fields{ @@ -345,6 +360,69 @@ func (s *subscriptionGroups) validateMetric( return serrs } +// pluginIsSubscribed returns true if a provided plugin has been found among subscribed plugins +// in the following subscription group +func (s *subscriptionGroup) pluginIsSubscribed(plugin *loadedPlugin) bool { + // range over subscribed plugins to find if the plugin is there + for _, sp := range s.plugins { + if sp.TypeName() == plugin.TypeName() && sp.Name() == plugin.Name() && sp.Version() == plugin.Version() { + return true + } + } + return false +} + +// validatePluginUnloading verifies if a given plugin might be unloaded without causing running task failures +func (s *subscriptionGroup) validatePluginUnloading(id string, plgToUnload *loadedPlugin) (serr serror.SnapError) { + impacted := false + if !s.pluginIsSubscribed(plgToUnload) { + // the plugin is not subscribed, so the task is not impacted by its unloading + return nil + } + controlLogger.WithFields(log.Fields{ + "_block": "subscriptionGroup.validatePluginUnloading", + "task-id": id, + "plugin-to-unload": plgToUnload.Key(), + }).Debug("validating impact of unloading the plugin") + + for _, requestedMetric := range s.requestedMetrics { + // get all plugins exposing the requested metric + plgs, _ := s.GetPlugins(requestedMetric.Namespace()) + // when requested version is fixed (greater than 0), take into account only plugins in the requested version + if requestedMetric.Version() > 0 { + // skip those which are not impacted by unloading (version different than plgToUnload.Version()) + if requestedMetric.Version() == plgToUnload.Version() { + plgsInVer := []core.CatalogedPlugin{} + + for _, plg := range plgs { + if plg.Version() == requestedMetric.Version() { + plgsInVer = append(plgsInVer, plg) + } + } + // set plugins only in the requested version + plgs = plgsInVer + } + } + if len(plgs) == 1 && plgs[0].Key() == plgToUnload.Key() { + // the requested metric is exposed only by the single plugin and there is no replacement + impacted = true + controlLogger.WithFields(log.Fields{ + "_block": "subscriptionGroup.validatePluginUnloading", + "task-id": id, + "plugin-to-unload": plgToUnload.Key(), + "requested-metric": fmt.Sprintf("%s:%d", requestedMetric.Namespace(), requestedMetric.Version()), + }).Errorf("unloading the plugin would cause missing in collection the requested metric") + } + } + if impacted { + serr = serror.New(ErrPluginCannotBeUnloaded, map[string]interface{}{ + "task-id": id, + "plugin-to-unload": plgToUnload.Key(), + }) + } + return serr +} + func (s *subscriptionGroup) process(id string) (serrs []serror.SnapError) { // gathers collectors based on requested metrics pluginToMetricMap, plugins, serrs := s.getMetricsAndCollectors(s.requestedMetrics, s.configTree) @@ -353,23 +431,22 @@ func (s *subscriptionGroup) process(id string) (serrs []serror.SnapError) { "metrics": fmt.Sprintf("%+v", s.requestedMetrics), }).Debug("gathered collectors") + // notice that requested plugins contains only processors and publishers for _, plugin := range s.requestedPlugins { - //add processors and publishers to collectors just gathered - if plugin.TypeName() != core.CollectorPluginType.String() { - plugins = append(plugins, plugin) - // add defaults to plugins (exposed in a plugins ConfigPolicy) - if lp, err := s.pluginManager.get( - fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", - plugin.TypeName(), - plugin.Name(), - plugin.Version())); err == nil && lp.ConfigPolicy != nil { - if policy := lp.ConfigPolicy.Get([]string{""}); policy != nil && len(policy.Defaults()) > 0 { - plugin.Config().ApplyDefaults(policy.Defaults()) - } + // add processors and publishers to collectors just gathered + plugins = append(plugins, plugin) + // add defaults to plugins (exposed in a plugins ConfigPolicy) + if lp, err := s.pluginManager.get( + fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", + plugin.TypeName(), + plugin.Name(), + plugin.Version())); err == nil && lp.ConfigPolicy != nil { + if policy := lp.ConfigPolicy.Get([]string{""}); policy != nil && len(policy.Defaults()) > 0 { + // set defaults to plugin config + plugin.Config().ApplyDefaults(policy.Defaults()) } } } - // calculates those plugins that need to be subscribed and unsubscribed to subs, unsubs := comparePlugins(plugins, s.plugins) controlLogger.WithFields(log.Fields{ @@ -387,7 +464,7 @@ func (s *subscriptionGroup) process(id string) (serrs []serror.SnapError) { } } - //updating view + // updating view // metrics are grouped by plugin s.metrics = pluginToMetricMap s.plugins = plugins diff --git a/control/subscription_group_medium_test.go b/control/subscription_group_medium_test.go index c49da8504..c19a98379 100644 --- a/control/subscription_group_medium_test.go +++ b/control/subscription_group_medium_test.go @@ -146,7 +146,7 @@ func TestSubscriptionGroups_Process_GlobalPluginConfig(t *testing.T) { sg := newSubscriptionGroups(c) So(sg, ShouldNotBeNil) - sg.Add("task-id", []core.RequestedMetric{requested}, cdata.NewTree(), []core.SubscribedPlugin{subsPlugin}) + sg.Add("task-id", []core.RequestedMetric{requested}, cdata.NewTree(), []core.SubscribedPlugin{}) <-lpe.sub So(len(sg.subscriptionMap), ShouldEqual, 1) group, ok := sg.subscriptionMap["task-id"] @@ -190,7 +190,7 @@ func TestSubscriptionGroups_ProcessStaticNegative(t *testing.T) { sg := newSubscriptionGroups(c) So(sg, ShouldNotBeNil) - sg.Add("task-id", []core.RequestedMetric{requested}, cdata.NewTree(), []core.SubscribedPlugin{subsPlugin}) + sg.Add("task-id", []core.RequestedMetric{requested}, cdata.NewTree(), []core.SubscribedPlugin{}) <-lpe.sub So(len(sg.subscriptionMap), ShouldEqual, 1) group, ok := sg.subscriptionMap["task-id"] @@ -260,7 +260,7 @@ func TestSubscriptionGroups_ProcessStaticPositive(t *testing.T) { sg := newSubscriptionGroups(c) So(sg, ShouldNotBeNil) - sg.Add("task-id", []core.RequestedMetric{requested}, cdata.NewTree(), []core.SubscribedPlugin{mock1}) + sg.Add("task-id", []core.RequestedMetric{requested}, cdata.NewTree(), []core.SubscribedPlugin{}) <-lpe.sub So(len(sg.subscriptionMap), ShouldEqual, 1) group, ok := sg.subscriptionMap["task-id"] @@ -336,7 +336,7 @@ func TestSubscriptionGroups_ProcessDynamicPositive(t *testing.T) { errs := sg.ValidateDeps([]core.RequestedMetric{requested}, []core.SubscribedPlugin{mock1}, ctree) So(errs, ShouldBeNil) Convey("Subscription group created for requested metric with wildcards", func() { - sg.Add("task-id", []core.RequestedMetric{requested}, ctree, []core.SubscribedPlugin{mock1}) + sg.Add("task-id", []core.RequestedMetric{requested}, ctree, []core.SubscribedPlugin{}) <-lpe.sub So(len(sg.subscriptionMap), ShouldEqual, 1) group, ok := sg.subscriptionMap["task-id"] @@ -411,7 +411,7 @@ func TestSubscriptionGroups_ProcessDynamicNegative(t *testing.T) { sg := newSubscriptionGroups(c) So(sg, ShouldNotBeNil) - sg.Add("task-id", []core.RequestedMetric{requested}, cdata.NewTree(), []core.SubscribedPlugin{mock1}) + sg.Add("task-id", []core.RequestedMetric{requested}, cdata.NewTree(), []core.SubscribedPlugin{}) <-lpe.sub So(len(sg.subscriptionMap), ShouldEqual, 1) group, ok := sg.subscriptionMap["task-id"] @@ -486,9 +486,8 @@ func TestSubscriptionGroups_ProcessSpecifiedDynamicPositive(t *testing.T) { serrs := sg.ValidateDeps([]core.RequestedMetric{requested}, []core.SubscribedPlugin{mock1}, cdata.NewTree()) So(serrs, ShouldBeNil) Convey("Subscription group created for requested metric with specified instance of dynamic element and with wildcards", func() { - sg.Add("task-id", []core.RequestedMetric{requested}, cdata.NewTree(), []core.SubscribedPlugin{mock1}) + sg.Add("task-id", []core.RequestedMetric{requested}, cdata.NewTree(), []core.SubscribedPlugin{}) <-lpe.sub - So(len(sg.subscriptionMap), ShouldEqual, 1) group, ok := sg.subscriptionMap["task-id"] So(ok, ShouldBeTrue) @@ -567,7 +566,7 @@ func TestSubscriptionGroups_ProcessSpecifiedDynamicNegative(t *testing.T) { sg := newSubscriptionGroups(c) So(sg, ShouldNotBeNil) - sg.Add("task-id", []core.RequestedMetric{requested}, cdata.NewTree(), []core.SubscribedPlugin{mock1}) + sg.Add("task-id", []core.RequestedMetric{requested}, cdata.NewTree(), []core.SubscribedPlugin{}) <-lpe.sub So(len(sg.subscriptionMap), ShouldEqual, 1) group, ok := sg.subscriptionMap["task-id"] diff --git a/core/plugin.go b/core/plugin.go index 466edd055..1dd2b9645 100644 --- a/core/plugin.go +++ b/core/plugin.go @@ -119,6 +119,7 @@ type CatalogedPlugin interface { PluginPath() string LoadedTimestamp() *time.Time Policy() *cpolicy.ConfigPolicy + Key() string } // the collection of cataloged plugins used diff --git a/mgmt/rest/client/client_func_test.go b/mgmt/rest/client/client_func_test.go index 38a03a84a..075b0298e 100644 --- a/mgmt/rest/client/client_func_test.go +++ b/mgmt/rest/client/client_func_test.go @@ -605,45 +605,6 @@ func TestSnapClient(t *testing.T) { }) }) }) - Convey("UnloadPlugin", func() { - Convey("unload unknown plugin", func() { - p := c.UnloadPlugin("not a type", "foo", 3) - So(p.Err, ShouldNotBeNil) - So(p.Err.Error(), ShouldEqual, "plugin not found") - }) - Convey("unload already unloaded plugin", func() { - p := c.UnloadPlugin("collector", "mock", 2) - So(p.Err, ShouldNotBeNil) - So(p.Err.Error(), ShouldEqual, "plugin not found") - }) - Convey("unload one of multiple", func() { - p1 := c.GetPlugins(false) - So(p1.Err, ShouldBeNil) - So(len(p1.LoadedPlugins), ShouldEqual, 2) - - p3 := c.UnloadPlugin("publisher", "mock-file", 3) - So(p3.Err, ShouldBeNil) - So(p3.Name, ShouldEqual, "mock-file") - So(p3.Version, ShouldEqual, 3) - So(p3.Type, ShouldEqual, "publisher") - }) - Convey("unload when only one plugin loaded", func() { - p1 := c.GetPlugins(false) - So(p1.Err, ShouldBeNil) - So(len(p1.LoadedPlugins), ShouldEqual, 1) - So(p1.LoadedPlugins[0].Name, ShouldEqual, "mock") - - p2 := c.UnloadPlugin("collector", "mock", 1) - So(p2.Err, ShouldBeNil) - So(p2.Name, ShouldEqual, "mock") - So(p2.Version, ShouldEqual, 1) - So(p2.Type, ShouldEqual, "collector") - - p3 := c.GetPlugins(false) - So(p3.Err, ShouldBeNil) - So(len(p3.LoadedPlugins), ShouldEqual, 0) - }) - }) }) c, err := New("http://localhost:-1", "v1", true) @@ -679,6 +640,73 @@ func TestSnapClient(t *testing.T) { }) } +func TestClient_UnloadPlugin(t *testing.T) { + CompressUpload = false + + Convey("Client should exist", t, func() { + uri := startAPI() + c, cerr := New(uri, "v1", true) + wf := getWMFromSample("1.json") + sch := &Schedule{Type: "simple", Interval: "1s"} + So(cerr, ShouldBeNil) + if cerr == nil { + p1 = c.LoadPlugin(MOCK_PLUGIN_PATH1) + p2 = c.LoadPlugin(MOCK_PLUGIN_PATH2) + p3 = c.LoadPlugin(FILE_PLUGIN_PATH) + } + Convey("UnloadPlugin", func() { + Convey("unload unknown plugin", func() { + p := c.UnloadPlugin("not a type", "foo", 3) + So(p.Err, ShouldNotBeNil) + So(p.Err.Error(), ShouldEqual, "plugin not found") + }) + Convey("unload loaded plugin", func() { + p := c.UnloadPlugin("publisher", "mock-file", 3) + So(p.Err, ShouldBeNil) + So(p.Name, ShouldEqual, "mock-file") + So(p.Version, ShouldEqual, 3) + So(p.Type, ShouldEqual, "publisher") + + Convey("unload already unloaded plugin", func() { + p := c.UnloadPlugin("publisher", "mock-file", 3) + So(p.Err, ShouldNotBeNil) + So(p.Err.Error(), ShouldEqual, "plugin not found") + }) + }) + Convey("unload plugin used by task", func() { + tf := c.CreateTask(sch, wf, "baron", "", true, 0) + So(tf.Err, ShouldBeNil) + plgs := c.GetPlugins(false) + So(plgs.Err, ShouldBeNil) + So(len(plgs.LoadedPlugins), ShouldEqual, 3) + + Convey("unload one of multiple", func() { + p2 := c.UnloadPlugin("collector", "mock", 2) + So(p2.Err, ShouldBeNil) + So(p2.Name, ShouldEqual, "mock") + So(p2.Version, ShouldEqual, 2) + So(p2.Type, ShouldEqual, "collector") + + Convey("unload when only one left", func() { + p1 := c.UnloadPlugin("collector", "mock", 1) + So(p1.Err, ShouldNotBeNil) + So(p1.Err.Error(), ShouldStartWith, control.ErrPluginCannotBeUnloaded.Error()) + + Convey("unload after stopping the task", func() { + t := c.StopTask(tf.ID) + So(t.Err, ShouldBeNil) + p1 := c.UnloadPlugin("collector", "mock", 1) + So(p1.Name, ShouldEqual, "mock") + So(p1.Version, ShouldEqual, 1) + So(p1.Type, ShouldEqual, "collector") + }) + }) + }) + }) + }) + }) +} + type timeoutHandler struct{} //ServeHTTP implements http.Handler interface diff --git a/mgmt/rest/v1/fixtures/mock_metric_manager.go b/mgmt/rest/v1/fixtures/mock_metric_manager.go index a9d13511d..8d337411d 100644 --- a/mgmt/rest/v1/fixtures/mock_metric_manager.go +++ b/mgmt/rest/v1/fixtures/mock_metric_manager.go @@ -22,6 +22,7 @@ package fixtures import ( "errors" + "fmt" "time" "github.com/intelsdi-x/snap/control/plugin/cpolicy" @@ -50,10 +51,13 @@ type MockLoadedPlugin struct { MyVersion int } -func (m MockLoadedPlugin) Name() string { return m.MyName } -func (m MockLoadedPlugin) Port() string { return "" } -func (m MockLoadedPlugin) TypeName() string { return m.MyType } -func (m MockLoadedPlugin) Version() int { return m.MyVersion } +func (m MockLoadedPlugin) Name() string { return m.MyName } +func (m MockLoadedPlugin) Port() string { return "" } +func (m MockLoadedPlugin) TypeName() string { return m.MyType } +func (m MockLoadedPlugin) Version() int { return m.MyVersion } +func (m MockLoadedPlugin) Key() string { + return fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", m.MyType, m.MyName, m.MyVersion) +} func (m MockLoadedPlugin) Plugin() string { return "" } func (m MockLoadedPlugin) IsSigned() bool { return false } func (m MockLoadedPlugin) Status() string { return "" } diff --git a/mgmt/rest/v2/mock/mock_metric_manager.go b/mgmt/rest/v2/mock/mock_metric_manager.go index 438315392..1ad85f0cf 100644 --- a/mgmt/rest/v2/mock/mock_metric_manager.go +++ b/mgmt/rest/v2/mock/mock_metric_manager.go @@ -22,6 +22,7 @@ package mock import ( "errors" + "fmt" "time" "github.com/intelsdi-x/snap/control/plugin/cpolicy" @@ -50,10 +51,13 @@ type MockLoadedPlugin struct { MyVersion int } -func (m MockLoadedPlugin) Name() string { return m.MyName } -func (m MockLoadedPlugin) Port() string { return "" } -func (m MockLoadedPlugin) TypeName() string { return m.MyType } -func (m MockLoadedPlugin) Version() int { return m.MyVersion } +func (m MockLoadedPlugin) Name() string { return m.MyName } +func (m MockLoadedPlugin) Port() string { return "" } +func (m MockLoadedPlugin) TypeName() string { return m.MyType } +func (m MockLoadedPlugin) Version() int { return m.MyVersion } +func (m MockLoadedPlugin) Key() string { + return fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", m.MyType, m.MyName, m.MyVersion) +} func (m MockLoadedPlugin) Plugin() string { return "" } func (m MockLoadedPlugin) IsSigned() bool { return false } func (m MockLoadedPlugin) Status() string { return "" }