From 7b23de91a05fdab052412e7ecc4f76661669be45 Mon Sep 17 00:00:00 2001 From: Cody Roseborough Date: Tue, 1 Nov 2016 15:38:46 -0700 Subject: [PATCH] Fixes #1324: Properly closes connections in tribe Properly closes some http.Respons's that were not previously closed. --- mgmt/rest/client/client.go | 4 ++ mgmt/tribe/worker/worker.go | 94 +++++++++++++++++++++---------------- 2 files changed, 57 insertions(+), 41 deletions(-) diff --git a/mgmt/rest/client/client.go b/mgmt/rest/client/client.go index ac9f055c6..3b2cf093e 100644 --- a/mgmt/rest/client/client.go +++ b/mgmt/rest/client/client.go @@ -172,6 +172,7 @@ func (c *Client) do(method, path string, ct contentType, body ...[]byte) (*rbody } return nil, fmt.Errorf("URL target is not available. %v", err) } + defer rsp.Body.Close() case "PUT": var b *bytes.Reader if len(body) == 0 { @@ -193,6 +194,7 @@ func (c *Client) do(method, path string, ct contentType, body ...[]byte) (*rbody } return nil, fmt.Errorf("URL target is not available. %v", err) } + defer rsp.Body.Close() case "DELETE": var b *bytes.Reader if len(body) == 0 { @@ -214,6 +216,7 @@ func (c *Client) do(method, path string, ct contentType, body ...[]byte) (*rbody } return nil, fmt.Errorf("URL target is not available. %v", err) } + defer rsp.Body.Close() case "POST": var b *bytes.Reader if len(body) == 0 { @@ -234,6 +237,7 @@ func (c *Client) do(method, path string, ct contentType, body ...[]byte) (*rbody } return nil, fmt.Errorf("URL target is not available. %v", err) } + defer rsp.Body.Close() } return httpRespToAPIResp(rsp) diff --git a/mgmt/tribe/worker/worker.go b/mgmt/tribe/worker/worker.go index ddb606528..203c44b73 100644 --- a/mgmt/tribe/worker/worker.go +++ b/mgmt/tribe/worker/worker.go @@ -324,55 +324,67 @@ func (w worker) loadPlugin(plugin core.Plugin) error { }).Info("unable to create client") continue } - resp, err := c.TribeRequest() + f, err := w.downloadPlugin(c, plugin) + // If we can't download from this member, try the next if err != nil { - logger.WithFields(log.Fields{ - "err": err, - "url": url, - }).Info("plugin not found") + logger.Error(err) continue } - if resp.StatusCode == 200 { - if resp.Header.Get("Content-Type") != "application/x-gzip" { - logger.WithField("content-type", resp.Header.Get("Content-Type")).Error("Expected application/x-gzip") - } - dir, err := ioutil.TempDir("", "") - if err != nil { - logger.Error(err) - return err - } - f, err := os.Create(path.Join(dir, fmt.Sprintf("%s-%s-%d", plugin.TypeName(), plugin.Name(), plugin.Version()))) - if err != nil { - logger.Error(err) - f.Close() - return err - } - io.Copy(f, resp.Body) - f.Close() - err = os.Chmod(f.Name(), 0700) - if err != nil { - logger.Error(err) - return err - } - rp, err := core.NewRequestedPlugin(f.Name()) - if err != nil { - logger.Error(err) - return err - } - _, err = w.pluginManager.Load(rp) - if err != nil { - logger.Error(err) - return err - } - if w.isPluginLoaded(plugin.Name(), plugin.TypeName(), plugin.Version()) { - return nil - } - return errors.New("failed to load plugin") + rp, err := core.NewRequestedPlugin(f.Name()) + if err != nil { + logger.Error(err) + return err + } + _, err = w.pluginManager.Load(rp) + if err != nil { + logger.Error(err) + return err + } + if w.isPluginLoaded(plugin.Name(), plugin.TypeName(), plugin.Version()) { + return nil } + return errors.New("failed to load plugin") } return errors.New("failed to find a member with the plugin") } +func (w worker) downloadPlugin(c *client.Client, plugin core.Plugin) (*os.File, error) { + logger := w.logger.WithFields(log.Fields{ + "plugin-name": plugin.Name(), + "plugin-version": plugin.Version(), + "plugin-type": plugin.TypeName(), + "url": c.URL, + "_block": "download-plugin", + }) + resp, err := c.TribeRequest() + if err != nil { + logger.WithFields(log.Fields{ + "err": err, + }).Info("plugin not found") + return nil, fmt.Errorf("Plugin not found at %s: %s", c.URL, err.Error()) + } + defer resp.Body.Close() + if resp.StatusCode == 200 { + if resp.Header.Get("Content-Type") != "application/x-gzip" { + logger.WithField("content-type", resp.Header.Get("Content-Type")).Error("Expected application/x-gzip") + } + dir, err := ioutil.TempDir("", "") + if err != nil { + logger.Error(err) + return nil, err + } + fpath := path.Join(dir, fmt.Sprintf("%s-%s-%d", plugin.TypeName(), plugin.Name(), plugin.Version())) + f, err := os.OpenFile(fpath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0700) + if err != nil { + logger.Error(err) + return nil, err + } + io.Copy(f, resp.Body) + f.Close() + return f, nil + } + return nil, fmt.Errorf("Status code not 200 was %v: %s", resp.StatusCode, c.URL) +} func (w worker) createTask(taskID string, startOnCreate bool) { logger := w.logger.WithFields(log.Fields{ "task-id": taskID,