diff --git a/cmd/snaptel/metric.go b/cmd/snaptel/metric.go index bd0abf19a..d021fe102 100644 --- a/cmd/snaptel/metric.go +++ b/cmd/snaptel/metric.go @@ -29,7 +29,7 @@ import ( "time" "github.com/intelsdi-x/snap/mgmt/rest/client" - "github.com/intelsdi-x/snap/mgmt/rest/rbody" + "github.com/intelsdi-x/snap/mgmt/rest/v1/rbody" "github.com/urfave/cli" "github.com/intelsdi-x/snap/pkg/stringutils" diff --git a/mgmt/rest/client/client.go b/mgmt/rest/client/client.go index efc66aecf..b89cf4b85 100644 --- a/mgmt/rest/client/client.go +++ b/mgmt/rest/client/client.go @@ -38,7 +38,7 @@ import ( "github.com/asaskevich/govalidator" - "github.com/intelsdi-x/snap/mgmt/rest/rbody" + "github.com/intelsdi-x/snap/mgmt/rest/v1/rbody" ) var ( diff --git a/mgmt/rest/client/client_func_test.go b/mgmt/rest/client/client_func_test.go index 35b48377b..2e54c38f8 100644 --- a/mgmt/rest/client/client_func_test.go +++ b/mgmt/rest/client/client_func_test.go @@ -35,6 +35,7 @@ import ( "github.com/intelsdi-x/snap/control" "github.com/intelsdi-x/snap/mgmt/rest" + "github.com/intelsdi-x/snap/mgmt/rest/v1" "github.com/intelsdi-x/snap/plugin/helper" "github.com/intelsdi-x/snap/scheduler" "github.com/intelsdi-x/snap/scheduler/wmap" @@ -74,7 +75,7 @@ func getWMFromSample(sample string) *wmap.WorkflowMap { // When we eventually have a REST API Stop command this can be killed. func startAPI() string { // Start a REST API to talk to - rest.StreamingBufferWindow = 0.01 + v1.StreamingBufferWindow = 0.01 log.SetLevel(LOG_LEVEL) r, _ := rest.New(rest.GetDefaultConfig()) c := control.New(control.GetDefaultConfig()) @@ -489,7 +490,7 @@ func TestSnapClient(t *testing.T) { }) Convey("WatchTasks", func() { Convey("invalid task ID", func() { - rest.StreamingBufferWindow = 0.01 + v1.StreamingBufferWindow = 0.01 type ea struct { events []string @@ -520,7 +521,7 @@ func TestSnapClient(t *testing.T) { So(r.Err.Error(), ShouldEqual, "Task not found: ID(1)") }) Convey("event stream", func() { - rest.StreamingBufferWindow = 0.01 + v1.StreamingBufferWindow = 0.01 sch := &Schedule{Type: "simple", Interval: "100ms"} tf := c.CreateTask(sch, wf, "baron", "", false, 0) diff --git a/mgmt/rest/client/client_tribe_func_test.go b/mgmt/rest/client/client_tribe_func_test.go index cad3580bb..c8fdcf4a3 100644 --- a/mgmt/rest/client/client_tribe_func_test.go +++ b/mgmt/rest/client/client_tribe_func_test.go @@ -37,7 +37,7 @@ import ( "github.com/intelsdi-x/snap/control" "github.com/intelsdi-x/snap/mgmt/rest" "github.com/intelsdi-x/snap/mgmt/rest/client" - "github.com/intelsdi-x/snap/mgmt/rest/rbody" + "github.com/intelsdi-x/snap/mgmt/rest/v1/rbody" "github.com/intelsdi-x/snap/mgmt/tribe" "github.com/intelsdi-x/snap/scheduler" ) diff --git a/mgmt/rest/client/config.go b/mgmt/rest/client/config.go index 1eea17a2b..c3af9bc37 100644 --- a/mgmt/rest/client/config.go +++ b/mgmt/rest/client/config.go @@ -25,7 +25,7 @@ import ( "net/url" "github.com/intelsdi-x/snap/core/ctypes" - "github.com/intelsdi-x/snap/mgmt/rest/rbody" + "github.com/intelsdi-x/snap/mgmt/rest/v1/rbody" ) // GetPluginConfig retrieves the merged plugin config given the type of plugin, diff --git a/mgmt/rest/client/metric.go b/mgmt/rest/client/metric.go index 1e9c1fa8a..35a09dbfc 100644 --- a/mgmt/rest/client/metric.go +++ b/mgmt/rest/client/metric.go @@ -23,7 +23,7 @@ import ( "errors" "fmt" - "github.com/intelsdi-x/snap/mgmt/rest/rbody" + "github.com/intelsdi-x/snap/mgmt/rest/v1/rbody" ) var ( diff --git a/mgmt/rest/client/plugin.go b/mgmt/rest/client/plugin.go index 77ee4b492..04f67c3a8 100644 --- a/mgmt/rest/client/plugin.go +++ b/mgmt/rest/client/plugin.go @@ -27,7 +27,7 @@ import ( "time" "github.com/intelsdi-x/snap/core/serror" - "github.com/intelsdi-x/snap/mgmt/rest/rbody" + "github.com/intelsdi-x/snap/mgmt/rest/v1/rbody" ) // LoadPlugin loads plugins for the given plugin names. diff --git a/mgmt/rest/client/task.go b/mgmt/rest/client/task.go index 1e3abea2f..9bcd529b9 100644 --- a/mgmt/rest/client/task.go +++ b/mgmt/rest/client/task.go @@ -29,7 +29,7 @@ import ( "time" "github.com/intelsdi-x/snap/core" - "github.com/intelsdi-x/snap/mgmt/rest/rbody" + "github.com/intelsdi-x/snap/mgmt/rest/v1/rbody" "github.com/intelsdi-x/snap/scheduler/wmap" ) diff --git a/mgmt/rest/client/tribe.go b/mgmt/rest/client/tribe.go index b02d2e496..aa8ed68ad 100644 --- a/mgmt/rest/client/tribe.go +++ b/mgmt/rest/client/tribe.go @@ -23,7 +23,7 @@ import ( "encoding/json" "fmt" - "github.com/intelsdi-x/snap/mgmt/rest/rbody" + "github.com/intelsdi-x/snap/mgmt/rest/v1/rbody" ) // ListMembers retrieves a list of tribe members through an HTTP GET call. diff --git a/mgmt/rest/config.go b/mgmt/rest/config.go index 9085a5f17..32b5e06ed 100644 --- a/mgmt/rest/config.go +++ b/mgmt/rest/config.go @@ -1,158 +1,94 @@ -/* -http://www.apache.org/licenses/LICENSE-2.0.txt - - -Copyright 2015 Intel Corporation - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - package rest -import ( - "net/http" - "strconv" - - "github.com/intelsdi-x/snap/core" - "github.com/intelsdi-x/snap/core/cdata" - "github.com/intelsdi-x/snap/mgmt/rest/rbody" - "github.com/julienschmidt/httprouter" +// default configuration values +const ( + defaultEnable bool = true + defaultPort int = 8181 + defaultAddress string = "" + defaultHTTPS bool = false + defaultRestCertificate string = "" + defaultRestKey string = "" + defaultAuth bool = false + defaultAuthPassword string = "" + defaultPortSetByConfig bool = false + defaultPprof bool = false ) -func (s *Server) getPluginConfigItem(w http.ResponseWriter, r *http.Request, p httprouter.Params) { - var err error - styp := p.ByName("type") - if styp == "" { - cdn := s.mc.GetPluginConfigDataNodeAll() - item := &rbody.PluginConfigItem{ConfigDataNode: cdn} - respond(200, item, w) - return - } - - typ, err := getPluginType(styp) - if err != nil { - respond(400, rbody.FromError(err), w) - return - } - - name := p.ByName("name") - sver := p.ByName("version") - var iver int - if sver != "" { - if iver, err = strconv.Atoi(sver); err != nil { - respond(400, rbody.FromError(err), w) - return - } - } else { - iver = -2 - } - - cdn := s.mc.GetPluginConfigDataNode(typ, name, iver) - item := &rbody.PluginConfigItem{ConfigDataNode: cdn} - respond(200, item, w) -} - -func (s *Server) deletePluginConfigItem(w http.ResponseWriter, r *http.Request, p httprouter.Params) { - var err error - var typ core.PluginType - styp := p.ByName("type") - if styp != "" { - typ, err = getPluginType(styp) - if err != nil { - respond(400, rbody.FromError(err), w) - return - } - } - - name := p.ByName("name") - sver := p.ByName("version") - var iver int - if sver != "" { - if iver, err = strconv.Atoi(sver); err != nil { - respond(400, rbody.FromError(err), w) - return - } - } else { - iver = -2 - } - - src := []string{} - errCode, err := core.UnmarshalBody(&src, r.Body) - if errCode != 0 && err != nil { - respond(400, rbody.FromError(err), w) - return - } - - var res cdata.ConfigDataNode - if styp == "" { - res = s.mc.DeletePluginConfigDataNodeFieldAll(src...) - } else { - res = s.mc.DeletePluginConfigDataNodeField(typ, name, iver, src...) - } - - item := &rbody.DeletePluginConfigItem{ConfigDataNode: res} - respond(200, item, w) +// holds the configuration passed in through the SNAP config file +// Note: if this struct is modified, then the switch statement in the +// UnmarshalJSON method in this same file needs to be modified to +// match the field mapping that is defined here +type Config struct { + Enable bool `json:"enable"yaml:"enable"` + Port int `json:"port"yaml:"port"` + Address string `json:"addr"yaml:"addr"` + HTTPS bool `json:"https"yaml:"https"` + RestCertificate string `json:"rest_certificate"yaml:"rest_certificate"` + RestKey string `json:"rest_key"yaml:"rest_key"` + RestAuth bool `json:"rest_auth"yaml:"rest_auth"` + RestAuthPassword string `json:"rest_auth_password"yaml:"rest_auth_password"` + portSetByConfig bool `` + Pprof bool `json:"pprof"yaml:"pprof"` } -func (s *Server) setPluginConfigItem(w http.ResponseWriter, r *http.Request, p httprouter.Params) { - var err error - var typ core.PluginType - styp := p.ByName("type") - if styp != "" { - typ, err = getPluginType(styp) - if err != nil { - respond(400, rbody.FromError(err), w) - return - } - } - - name := p.ByName("name") - sver := p.ByName("version") - var iver int - if sver != "" { - if iver, err = strconv.Atoi(sver); err != nil { - respond(400, rbody.FromError(err), w) - return - } - } else { - iver = -2 - } - - src := cdata.NewNode() - errCode, err := core.UnmarshalBody(src, r.Body) - if errCode != 0 && err != nil { - respond(400, rbody.FromError(err), w) - return - } +const ( + CONFIG_CONSTRAINTS = ` + "restapi" : { + "type": ["object", "null"], + "properties" : { + "enable": { + "type": "boolean" + }, + "https" : { + "type": "boolean" + }, + "rest_auth": { + "type": "boolean" + }, + "rest_auth_password": { + "type": "string" + }, + "rest_certificate": { + "type": "string" + }, + "rest_key" : { + "type": "string" + }, + "port" : { + "type": "integer", + "minimum": 1, + "maximum": 65535 + }, + "addr" : { + "type": "string" + }, + "pprof": { + "type": "boolean" + } + }, + "additionalProperties": false + } + ` +) - var res cdata.ConfigDataNode - if styp == "" { - res = s.mc.MergePluginConfigDataNodeAll(src) - } else { - res = s.mc.MergePluginConfigDataNode(typ, name, iver, src) +// GetDefaultConfig gets the default snapteld configuration +func GetDefaultConfig() *Config { + return &Config{ + Enable: defaultEnable, + Port: defaultPort, + Address: defaultAddress, + HTTPS: defaultHTTPS, + RestCertificate: defaultRestCertificate, + RestKey: defaultRestKey, + RestAuth: defaultAuth, + RestAuthPassword: defaultAuthPassword, + portSetByConfig: defaultPortSetByConfig, + Pprof: defaultPprof, } - - item := &rbody.SetPluginConfigItem{ConfigDataNode: res} - respond(200, item, w) } -func getPluginType(t string) (core.PluginType, error) { - if ityp, err := strconv.Atoi(t); err == nil { - return core.PluginType(ityp), nil - } - ityp, err := core.ToPluginType(t) - if err != nil { - return core.PluginType(-1), err - } - return ityp, nil +// define a method that can be used to determine if the port the RESTful +// API is listening on was set in the configuration file +func (c *Config) PortSetByConfigFile() bool { + return c.portSetByConfig } diff --git a/mgmt/rest/config_test.go b/mgmt/rest/config_test.go deleted file mode 100644 index 8e40d6ab2..000000000 --- a/mgmt/rest/config_test.go +++ /dev/null @@ -1,49 +0,0 @@ -// +build legacy small medium large - -/* -http://www.apache.org/licenses/LICENSE-2.0.txt - -Copyright 2016 Intel Corporation - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package rest - -import ( - "github.com/intelsdi-x/snap/control" - "github.com/intelsdi-x/snap/scheduler" -) - -// Since we do not have a global snap package that could be imported -// we create a mock config struct to mock what is in snapteld.go - -type mockConfig struct { - LogLevel int `json:"-"yaml:"-"` - GoMaxProcs int `json:"-"yaml:"-"` - LogPath string `json:"-"yaml:"-"` - Control *control.Config - Scheduler *scheduler.Config `json:"-",yaml:"-"` - RestAPI *Config `json:"-",yaml:"-"` -} - -func getDefaultMockConfig() *mockConfig { - return &mockConfig{ - LogLevel: 3, - GoMaxProcs: 1, - LogPath: "", - Control: control.GetDefaultConfig(), - Scheduler: scheduler.GetDefaultConfig(), - RestAPI: GetDefaultConfig(), - } -} diff --git a/mgmt/rest/pprof.go b/mgmt/rest/pprof.go new file mode 100644 index 000000000..ff637d0bf --- /dev/null +++ b/mgmt/rest/pprof.go @@ -0,0 +1,44 @@ +package rest + +import ( + "net/http" + "net/http/pprof" + + "github.com/julienschmidt/httprouter" +) + +func (s *Server) addPprofRoutes() { + if s.pprof { + s.r.GET("/debug/pprof/", s.index) + s.r.GET("/debug/pprof/block", s.index) + s.r.GET("/debug/pprof/goroutine", s.index) + s.r.GET("/debug/pprof/heap", s.index) + s.r.GET("/debug/pprof/threadcreate", s.index) + s.r.GET("/debug/pprof/cmdline", s.cmdline) + s.r.GET("/debug/pprof/profile", s.profile) + s.r.GET("/debug/pprof/symbol", s.symbol) + s.r.GET("/debug/pprof/trace", s.trace) + } +} + +// profiling tools handlers + +func (s *Server) index(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { + pprof.Index(w, r) +} + +func (s *Server) cmdline(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { + pprof.Cmdline(w, r) +} + +func (s *Server) profile(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { + pprof.Profile(w, r) +} + +func (s *Server) symbol(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { + pprof.Symbol(w, r) +} + +func (s *Server) trace(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { + pprof.Trace(w, r) +} diff --git a/mgmt/rest/rbody/v2/metric.go b/mgmt/rest/rbody/v2/metric.go deleted file mode 100644 index 7bcf86bb7..000000000 --- a/mgmt/rest/rbody/v2/metric.go +++ /dev/null @@ -1,49 +0,0 @@ -package v2 - -import ( - "fmt" - - "github.com/intelsdi-x/snap/mgmt/rest/rbody" -) - -type MetricList struct { - Metrics []rbody.Metric `json:"metrics,omitempty"` -} - -type MetricReturned struct { - Metric *rbody.Metric `json:"metric,omitempty"` -} - -func (m *MetricReturned) ResponseBodyMessage() string { - return "Metric returned" -} - -func (m *MetricReturned) ResponseBodyType() string { - return "metric_returned" -} - -type MetricsReturned MetricList - -func (m MetricsReturned) Len() int { - return len(m.Metrics) -} - -func (m MetricsReturned) Less(i, j int) bool { - return (fmt.Sprintf("%s:%d", m.Metrics[i].Namespace, m.Metrics[i].Version)) < (fmt.Sprintf("%s:%d", m.Metrics[j].Namespace, m.Metrics[j].Version)) -} - -func (m MetricsReturned) Swap(i, j int) { - m.Metrics[i], m.Metrics[j] = m.Metrics[j], m.Metrics[i] -} - -func NewMetricsReturned() MetricsReturned { - return MetricsReturned{Metrics: []rbody.Metric{}} -} - -func (m MetricsReturned) ResponseBodyMessage() string { - return "Metrics returned" -} - -func (m MetricsReturned) ResponseBodyType() string { - return rbody.MetricsReturnedType -} diff --git a/mgmt/rest/rbody/v2/task.go b/mgmt/rest/rbody/v2/task.go deleted file mode 100644 index 759167045..000000000 --- a/mgmt/rest/rbody/v2/task.go +++ /dev/null @@ -1,27 +0,0 @@ -package v2 - -import "github.com/intelsdi-x/snap/mgmt/rest/rbody" - -type ScheduledTaskListReturned struct { - ScheduledTasks []rbody.ScheduledTask `json:"scheduled_task,omitempty"` -} - -func (s *ScheduledTaskListReturned) Len() int { - return len(s.ScheduledTasks) -} - -func (s *ScheduledTaskListReturned) Less(i, j int) bool { - return s.ScheduledTasks[j].CreationTime().After(s.ScheduledTasks[i].CreationTime()) -} - -func (s *ScheduledTaskListReturned) Swap(i, j int) { - s.ScheduledTasks[i], s.ScheduledTasks[j] = s.ScheduledTasks[j], s.ScheduledTasks[i] -} - -func (s *ScheduledTaskListReturned) ResponseBodyMessage() string { - return "Scheduled tasks retrieved" -} - -func (s *ScheduledTaskListReturned) ResponseBodyType() string { - return "scheduled_task_list_returned" -} diff --git a/mgmt/rest/rest_func_test.go b/mgmt/rest/rest_func_test.go deleted file mode 100644 index 06dc8a04f..000000000 --- a/mgmt/rest/rest_func_test.go +++ /dev/null @@ -1,637 +0,0 @@ -// +build legacy - -/* -http://www.apache.org/licenses/LICENSE-2.0.txt - - -Copyright 2015 Intel Corporation - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package rest - -// This test runs through basic REST API calls and validates them. - -import ( - "bufio" - "bytes" - "compress/gzip" - "encoding/json" - "fmt" - "io" - "io/ioutil" - "mime/multipart" - "net/http" - "os" - "path/filepath" - "testing" - "time" - - log "github.com/Sirupsen/logrus" - - "github.com/intelsdi-x/snap/control" - "github.com/intelsdi-x/snap/core" - "github.com/intelsdi-x/snap/core/cdata" - "github.com/intelsdi-x/snap/core/ctypes" - "github.com/intelsdi-x/snap/mgmt/rest/rbody" - "github.com/intelsdi-x/snap/pkg/cfgfile" - "github.com/intelsdi-x/snap/plugin/helper" - "github.com/intelsdi-x/snap/scheduler" - "github.com/intelsdi-x/snap/scheduler/wmap" - . "github.com/smartystreets/goconvey/convey" -) - -var ( - // Switching this turns on logging for all the REST API calls - LOG_LEVEL = log.WarnLevel - - SNAP_PATH = helper.BuildPath - SNAP_AUTODISCOVER_PATH = os.Getenv("SNAP_AUTODISCOVER_PATH") - MOCK_PLUGIN_PATH1 = helper.PluginFilePath("snap-plugin-collector-mock1") - MOCK_PLUGIN_PATH2 = helper.PluginFilePath("snap-plugin-collector-mock2") - FILE_PLUGIN_PATH = helper.PluginFilePath("snap-plugin-publisher-mock-file") - - CompressedUpload = true - TotalUploadSize = 0 - UploadCount = 0 -) - -const ( - MOCK_CONSTRAINTS = `{ - "$schema": "http://json-schema.org/draft-04/schema#", - "title": "snapteld global config schema", - "type": ["object", "null"], - "properties": { - "control": { "$ref": "#/definitions/control" }, - "scheduler": { "$ref": "#/definitions/scheduler"}, - "restapi" : { "$ref": "#/definitions/restapi"}, - "tribe": { "$ref": "#/definitions/tribe"} - }, - "additionalProperties": true, - "definitions": { ` + - `"control": {}, "scheduler": {}, ` + CONFIG_CONSTRAINTS + `, "tribe":{}` + - `}` + - `}` -) - -type restAPIInstance struct { - port int - server *Server -} - -func command() string { - return "curl" -} - -func readBody(r *http.Response) []byte { - b, err := ioutil.ReadAll(r.Body) - if err != nil { - log.Fatal(err) - } - r.Body.Close() - return b -} - -func getAPIResponse(resp *http.Response) *rbody.APIResponse { - r := new(rbody.APIResponse) - rb := readBody(resp) - err := json.Unmarshal(rb, r) - if err != nil { - log.Fatal(err) - } - r.JSONResponse = string(rb) - return r -} - -func getStreamingAPIResponse(resp *http.Response) *rbody.APIResponse { - r := new(rbody.APIResponse) - rb := readBody(resp) - err := json.Unmarshal(rb, r) - if err != nil { - log.Fatal(err) - } - r.JSONResponse = string(rb) - return r -} - -type watchTaskResult struct { - eventChan chan string - doneChan chan struct{} - killChan chan struct{} -} - -func (w *watchTaskResult) close() { - close(w.doneChan) -} - -func watchTask(id string, port int) *watchTaskResult { - resp, err := http.Get(fmt.Sprintf("http://localhost:%d/v1/tasks/%s/watch", port, id)) - if err != nil { - log.Fatal(err) - } - - r := &watchTaskResult{ - eventChan: make(chan string), - doneChan: make(chan struct{}), - killChan: make(chan struct{}), - } - go func() { - reader := bufio.NewReader(resp.Body) - for { - select { - case <-r.doneChan: - resp.Body.Close() - return - default: - line, _ := reader.ReadBytes('\n') - ste := &rbody.StreamedTaskEvent{} - err := json.Unmarshal(line, ste) - if err != nil { - log.Fatal(err) - r.close() - return - } - switch ste.EventType { - case rbody.TaskWatchTaskDisabled: - r.eventChan <- ste.EventType - r.close() - return - case rbody.TaskWatchTaskStopped, rbody.TaskWatchTaskStarted, rbody.TaskWatchMetricEvent: - log.Info(ste.EventType) - r.eventChan <- ste.EventType - } - } - } - }() - return r -} - -func getTasks(port int) *rbody.APIResponse { - resp, err := http.Get(fmt.Sprintf("http://localhost:%d/v1/tasks", port)) - if err != nil { - log.Fatal(err) - } - return getAPIResponse(resp) -} - -func getTask(id string, port int) *rbody.APIResponse { - resp, err := http.Get(fmt.Sprintf("http://localhost:%d/v1/tasks/%s", port, id)) - if err != nil { - log.Fatal(err) - } - return getAPIResponse(resp) -} - -func startTask(id string, port int) *rbody.APIResponse { - uri := fmt.Sprintf("http://localhost:%d/v1/tasks/%s/start", port, id) - client := &http.Client{} - b := bytes.NewReader([]byte{}) - req, err := http.NewRequest("PUT", uri, b) - if err != nil { - log.Fatal(err) - } - req.Header.Add("Content-Type", "application/json") - resp, err := client.Do(req) - if err != nil { - log.Fatal(err) - } - return getAPIResponse(resp) -} - -func stopTask(id string, port int) *rbody.APIResponse { - uri := fmt.Sprintf("http://localhost:%d/v1/tasks/%s/stop", port, id) - client := &http.Client{} - b := bytes.NewReader([]byte{}) - req, err := http.NewRequest("PUT", uri, b) - if err != nil { - log.Fatal(err) - } - req.Header.Add("Content-Type", "application/json") - resp, err := client.Do(req) - if err != nil { - log.Fatal(err) - } - return getAPIResponse(resp) -} - -func removeTask(id string, port int) *rbody.APIResponse { - uri := fmt.Sprintf("http://localhost:%d/v1/tasks/%s", port, id) - client := &http.Client{} - req, err := http.NewRequest("DELETE", uri, nil) - if err != nil { - log.Fatal(err) - } - req.Header.Add("Content-Type", "application/json") - resp, err := client.Do(req) - if err != nil { - log.Fatal(err) - } - return getAPIResponse(resp) -} - -func createTask(sample, name, interval string, noStart bool, port int) *rbody.APIResponse { - jsonP, err := ioutil.ReadFile("./wmap_sample/" + sample) - if err != nil { - log.Fatal(err) - } - wf, err := wmap.FromJson(jsonP) - if err != nil { - log.Fatal(err) - } - - uri := fmt.Sprintf("http://localhost:%d/v1/tasks", port) - - t := core.TaskCreationRequest{ - Schedule: &core.Schedule{Type: "simple", Interval: interval}, - Workflow: wf, - Name: name, - Start: !noStart, - } - // Marshal to JSON for request body - j, err := json.Marshal(t) - if err != nil { - log.Fatal(err) - } - - client := &http.Client{} - b := bytes.NewReader(j) - req, err := http.NewRequest("POST", uri, b) - if err != nil { - log.Fatal(err) - } - req.Header.Add("Content-Type", "application/json") - resp, err := client.Do(req) - if err != nil { - log.Fatal(err) - } - return getAPIResponse(resp) -} - -func enableTask(id string, port int) *rbody.APIResponse { - uri := fmt.Sprintf("http://localhost:%d/v1/tasks/%s/enable", port, id) - client := &http.Client{} - b := bytes.NewReader([]byte{}) - req, err := http.NewRequest("PUT", uri, b) - if err != nil { - log.Fatal(err) - } - req.Header.Add("Content-Type", "application/json") - resp, err := client.Do(req) - if err != nil { - log.Fatal(err) - } - return getAPIResponse(resp) -} - -func uploadPlugin(pluginPath string, port int) *rbody.APIResponse { - uri := fmt.Sprintf("http://localhost:%d/v1/plugins", port) - - client := &http.Client{} - file, err := os.Open(pluginPath) - if err != nil { - log.Fatal(err) - } - - body := &bytes.Buffer{} - writer := multipart.NewWriter(body) - var part io.Writer - part, err = writer.CreateFormFile("snap-plugins", filepath.Base(pluginPath)) - if err != nil { - log.Fatal(err) - } - if CompressedUpload { - cpart := gzip.NewWriter(part) - _, err = io.Copy(cpart, file) - if err != nil { - log.Fatal(err) - } - err = cpart.Close() - } else { - _, err = io.Copy(part, file) - } - if err != nil { - log.Fatal(err) - } - err = writer.Close() - if err != nil { - log.Fatal(err) - } - TotalUploadSize += body.Len() - UploadCount += 1 - req, err := http.NewRequest("POST", uri, body) - if err != nil { - log.Fatal(err) - } - req.Header.Add("Content-Type", writer.FormDataContentType()) - if CompressedUpload { - req.Header.Add("Plugin-Compression", "gzip") - } - file.Close() - resp, err := client.Do(req) - if err != nil { - log.Fatal(err) - } - return getAPIResponse(resp) -} - -func unloadPlugin(port int, pluginType string, name string, version int) *rbody.APIResponse { - uri := fmt.Sprintf("http://localhost:%d/v1/plugins/%s/%s/%d", port, pluginType, name, version) - client := &http.Client{} - req, err := http.NewRequest("DELETE", uri, nil) - if err != nil { - log.Fatal(err) - } - resp, err := client.Do(req) - if err != nil { - log.Fatal(err) - } - - return getAPIResponse(resp) -} - -func getPluginList(port int) *rbody.APIResponse { - resp, err := http.Get(fmt.Sprintf("http://localhost:%d/v1/plugins", port)) - if err != nil { - log.Fatal(err) - } - return getAPIResponse(resp) -} - -func getMetricCatalog(port int) *rbody.APIResponse { - return fetchMetrics(port, "") -} - -func fetchMetrics(port int, ns string) *rbody.APIResponse { - resp, err := http.Get(fmt.Sprintf("http://localhost:%d/v1/metrics%s", port, ns)) - if err != nil { - log.Fatal(err) - } - - return getAPIResponse(resp) -} - -func fetchMetricsWithVersion(port int, ns string, ver int) *rbody.APIResponse { - resp, err := http.Get(fmt.Sprintf("http://localhost:%d/v1/metrics%s?ver=%d", port, ns, ver)) - if err != nil { - log.Fatal(err) - } - - return getAPIResponse(resp) -} - -func getPluginConfigItem(port int, typ *core.PluginType, name, ver string) *rbody.APIResponse { - var uri string - if typ != nil { - uri = fmt.Sprintf("http://localhost:%d/v1/plugins/%d/%s/%s/config", port, *typ, name, ver) - } else { - uri = fmt.Sprintf("http://localhost:%d/v1/plugins/%s/%s/%s/config", port, "", name, ver) - } - resp, err := http.Get(uri) - if err != nil { - log.Fatal(err) - } - - return getAPIResponse(resp) -} - -func setPluginConfigItem(port int, typ string, name, ver string, cdn *cdata.ConfigDataNode) *rbody.APIResponse { - uri := fmt.Sprintf("http://localhost:%d/v1/plugins/%s/%s/%s/config", port, typ, name, ver) - - client := &http.Client{} - b, err := json.Marshal(cdn) - if err != nil { - log.Fatal(err) - } - req, err := http.NewRequest("PUT", uri, bytes.NewReader(b)) - if err != nil { - log.Fatal(err) - } - req.Header.Add("Content-Type", "application/json") - resp, err := client.Do(req) - if err != nil { - log.Fatal(err) - } - - return getAPIResponse(resp) -} - -func deletePluginConfigItem(port int, typ string, name, ver string, fields []string) *rbody.APIResponse { - uri := fmt.Sprintf("http://localhost:%d/v1/plugins/%s/%s/%s/config", port, typ, name, ver) - - client := &http.Client{} - b, err := json.Marshal(fields) - if err != nil { - log.Fatal(err) - } - req, err := http.NewRequest("DELETE", uri, bytes.NewReader(b)) - if err != nil { - log.Fatal(err) - } - req.Header.Add("Content-Type", "application/json") - resp, err := client.Do(req) - if err != nil { - log.Fatal(err) - } - return getAPIResponse(resp) -} - -// REST API instances that are started are killed when the tests end. -// When we eventually have a REST API Stop command this can be killed. -func startAPI(cfg *mockConfig) *restAPIInstance { - // Start a REST API to talk to - log.SetLevel(LOG_LEVEL) - r, _ := New(cfg.RestAPI) - c := control.New(cfg.Control) - c.Start() - s := scheduler.New(cfg.Scheduler) - s.SetMetricManager(c) - s.Start() - r.BindMetricManager(c) - r.BindTaskManager(s) - r.BindConfigManager(c.Config) - go func(ch <-chan error) { - // Block on the error channel. Will return exit status 1 for an error or just return if the channel closes. - err, ok := <-ch - if !ok { - return - } - log.Fatal(err) - }(r.Err()) - r.SetAddress("127.0.0.1:0") - r.Start() - time.Sleep(time.Millisecond * 100) - return &restAPIInstance{ - port: r.Port(), - server: r, - } -} - -func TestPluginRestCalls(t *testing.T) { - CompressedUpload = false - Convey("REST API functional V1", t, func() { - Convey("Load Plugin - POST - /v1/plugins", func() { - Convey("a single plugin loads", func() { - // This test alone tests gzip. Saves on test time. - CompressedUpload = true - r := startAPI(getDefaultMockConfig()) - port := r.port - col := core.CollectorPluginType - pub := core.PublisherPluginType - Convey("A global plugin config is added for all plugins", func() { - cdn := cdata.NewNode() - cdn.AddItem("password", ctypes.ConfigValueStr{Value: "p@ssw0rd"}) - r := setPluginConfigItem(port, "", "", "", cdn) - So(r.Body, ShouldHaveSameTypeAs, &rbody.SetPluginConfigItem{}) - r1 := r.Body.(*rbody.SetPluginConfigItem) - So(r1.Table()["password"], ShouldResemble, ctypes.ConfigValueStr{Value: "p@ssw0rd"}) - - r2 := getPluginConfigItem(port, &col, "", "") - So(r2.Body, ShouldHaveSameTypeAs, &rbody.PluginConfigItem{}) - r3 := r2.Body.(*rbody.PluginConfigItem) - So(len(r3.Table()), ShouldEqual, 1) - So(r3.Table()["password"], ShouldResemble, ctypes.ConfigValueStr{Value: "p@ssw0rd"}) - - Convey("A plugin config is added for all publishers", func() { - cdn := cdata.NewNode() - cdn.AddItem("user", ctypes.ConfigValueStr{Value: "john"}) - r := setPluginConfigItem(port, core.PublisherPluginType.String(), "", "", cdn) - So(r.Body, ShouldHaveSameTypeAs, &rbody.SetPluginConfigItem{}) - r1 := r.Body.(*rbody.SetPluginConfigItem) - So(r1.Table()["user"], ShouldResemble, ctypes.ConfigValueStr{Value: "john"}) - So(len(r1.Table()), ShouldEqual, 2) - - Convey("A plugin config is added for all versions of a publisher", func() { - cdn := cdata.NewNode() - cdn.AddItem("path", ctypes.ConfigValueStr{Value: "/usr/local/influxdb/bin"}) - r := setPluginConfigItem(port, "2", "influxdb", "", cdn) - So(r.Body, ShouldHaveSameTypeAs, &rbody.SetPluginConfigItem{}) - r1 := r.Body.(*rbody.SetPluginConfigItem) - So(r1.Table()["path"], ShouldResemble, ctypes.ConfigValueStr{Value: "/usr/local/influxdb/bin"}) - So(len(r1.Table()), ShouldEqual, 3) - - Convey("A plugin config is added for a specific version of a publisher", func() { - cdn := cdata.NewNode() - cdn.AddItem("rate", ctypes.ConfigValueFloat{Value: .8}) - r := setPluginConfigItem(port, core.PublisherPluginType.String(), "influxdb", "1", cdn) - So(r.Body, ShouldHaveSameTypeAs, &rbody.SetPluginConfigItem{}) - r1 := r.Body.(*rbody.SetPluginConfigItem) - So(r1.Table()["rate"], ShouldResemble, ctypes.ConfigValueFloat{Value: .8}) - So(len(r1.Table()), ShouldEqual, 4) - - r2 := getPluginConfigItem(port, &pub, "", "") - So(r2.Body, ShouldHaveSameTypeAs, &rbody.PluginConfigItem{}) - r3 := r2.Body.(*rbody.PluginConfigItem) - So(len(r3.Table()), ShouldEqual, 2) - - r4 := getPluginConfigItem(port, &pub, "influxdb", "1") - So(r4.Body, ShouldHaveSameTypeAs, &rbody.PluginConfigItem{}) - r5 := r4.Body.(*rbody.PluginConfigItem) - So(len(r5.Table()), ShouldEqual, 4) - - Convey("A global plugin config field is deleted", func() { - r := deletePluginConfigItem(port, "", "", "", []string{"password"}) - So(r.Body, ShouldHaveSameTypeAs, &rbody.DeletePluginConfigItem{}) - r1 := r.Body.(*rbody.DeletePluginConfigItem) - So(len(r1.Table()), ShouldEqual, 0) - - r2 := setPluginConfigItem(port, core.PublisherPluginType.String(), "influxdb", "", cdn) - So(r2.Body, ShouldHaveSameTypeAs, &rbody.SetPluginConfigItem{}) - r3 := r2.Body.(*rbody.SetPluginConfigItem) - So(len(r3.Table()), ShouldEqual, 3) - }) - }) - }) - }) - }) - - }) - Convey("Plugin config is set at startup", func() { - cfg := getDefaultMockConfig() - err := cfgfile.Read("../../examples/configs/snap-config-sample.json", &cfg, MOCK_CONSTRAINTS) - So(err, ShouldBeNil) - if len(SNAP_AUTODISCOVER_PATH) == 0 { - if len(SNAP_PATH) != 0 { - - SNAP_AUTODISCOVER_PATH = helper.PluginPath() - log.Warning(fmt.Sprintf("SNAP_AUTODISCOVER_PATH has been set to plugin build path (%s). This might cause test failures", SNAP_AUTODISCOVER_PATH)) - } - } else { - log.Warning(fmt.Sprintf("SNAP_AUTODISCOVER_PATH is set to %s. This might cause test failures", SNAP_AUTODISCOVER_PATH)) - } - cfg.Control.AutoDiscoverPath = SNAP_AUTODISCOVER_PATH - r := startAPI(cfg) - port := r.port - col := core.CollectorPluginType - - Convey("Gets the collector config by name and version", func() { - r := getPluginConfigItem(port, &col, "pcm", "1") - So(r.Body, ShouldHaveSameTypeAs, &rbody.PluginConfigItem{}) - r1 := r.Body.(*rbody.PluginConfigItem) - So(r1.Table()["path"], ShouldResemble, ctypes.ConfigValueStr{Value: "/usr/local/pcm/bin"}) - So(r1.Table()["user"], ShouldResemble, ctypes.ConfigValueStr{Value: "john"}) - So(len(r1.Table()), ShouldEqual, 6) - }) - Convey("Gets the config for a collector by name", func() { - r := getPluginConfigItem(port, &col, "pcm", "") - So(r.Body, ShouldHaveSameTypeAs, &rbody.PluginConfigItem{}) - r1 := r.Body.(*rbody.PluginConfigItem) - So(r1.Table()["path"], ShouldResemble, ctypes.ConfigValueStr{Value: "/usr/local/pcm/bin"}) - So(r1.Table()["user"], ShouldResemble, ctypes.ConfigValueStr{Value: "jane"}) - So(len(r1.Table()), ShouldEqual, 3) - }) - Convey("Gets the config for all collectors", func() { - r := getPluginConfigItem(port, &col, "", "") - So(r.Body, ShouldHaveSameTypeAs, &rbody.PluginConfigItem{}) - r1 := r.Body.(*rbody.PluginConfigItem) - So(r1.Table()["user"], ShouldResemble, ctypes.ConfigValueStr{Value: "jane"}) - So(r1.Table()["password"], ShouldResemble, ctypes.ConfigValueStr{Value: "p@ssw0rd"}) - So(len(r1.Table()), ShouldEqual, 2) - }) - Convey("Gets the config for all plugins", func() { - r := getPluginConfigItem(port, nil, "", "") - So(r.Body, ShouldHaveSameTypeAs, &rbody.PluginConfigItem{}) - r1 := r.Body.(*rbody.PluginConfigItem) - So(r1.Table()["password"], ShouldResemble, ctypes.ConfigValueStr{Value: "p@ssw0rd"}) - So(len(r1.Table()), ShouldEqual, 1) - }) - }) - }) - - Convey("Enable task - put - /v1/tasks/:id/enable", func() { - Convey("Enable a running task", func(c C) { - r := startAPI(getDefaultMockConfig()) - port := r.port - - uploadPlugin(MOCK_PLUGIN_PATH2, port) - uploadPlugin(FILE_PLUGIN_PATH, port) - - r1 := createTask("1.json", "yeti", "1s", true, port) - So(r1.Body, ShouldHaveSameTypeAs, new(rbody.AddScheduledTask)) - plr1 := r1.Body.(*rbody.AddScheduledTask) - - id := plr1.ID - - r2 := startTask(id, port) - So(r2.Body, ShouldHaveSameTypeAs, new(rbody.ScheduledTaskStarted)) - plr2 := r2.Body.(*rbody.ScheduledTaskStarted) - So(plr2.ID, ShouldEqual, id) - - r4 := enableTask(id, port) - So(r4.Body, ShouldHaveSameTypeAs, new(rbody.Error)) - plr4 := r4.Body.(*rbody.Error) - So(plr4.ErrorMessage, ShouldEqual, "Task must be disabled") - }) - }) - }) -} diff --git a/mgmt/rest/rest_v1_test.go b/mgmt/rest/rest_v1_test.go index a75097a98..6d01b55d2 100644 --- a/mgmt/rest/rest_v1_test.go +++ b/mgmt/rest/rest_v1_test.go @@ -21,9 +21,12 @@ limitations under the License. package rest +// This test runs through basic REST API calls and validates them. + import ( "bufio" "bytes" + "compress/gzip" "encoding/json" "fmt" "io" @@ -32,26 +35,557 @@ import ( "net/http" "net/url" "os" + "path/filepath" "strings" "testing" - - . "github.com/smartystreets/goconvey/convey" + "time" log "github.com/Sirupsen/logrus" + + "github.com/intelsdi-x/snap/control" + "github.com/intelsdi-x/snap/core" "github.com/intelsdi-x/snap/core/cdata" "github.com/intelsdi-x/snap/core/ctypes" - "github.com/intelsdi-x/snap/mgmt/rest/fixtures" + "github.com/intelsdi-x/snap/mgmt/rest/v1/fixtures" + "github.com/intelsdi-x/snap/mgmt/rest/v1/rbody" + "github.com/intelsdi-x/snap/pkg/cfgfile" "github.com/intelsdi-x/snap/plugin/helper" + "github.com/intelsdi-x/snap/scheduler" + "github.com/intelsdi-x/snap/scheduler/wmap" + . "github.com/smartystreets/goconvey/convey" ) -var ( - LOG_LEVEL = log.WarnLevel - MOCK_PLUGIN_PATH1 = helper.PluginFilePath("snap-plugin-collector-mock1") -) +func getAPIResponse(resp *http.Response) *rbody.APIResponse { + r := new(rbody.APIResponse) + rb := readBody(resp) + err := json.Unmarshal(rb, r) + if err != nil { + log.Fatal(err) + } + r.JSONResponse = string(rb) + return r +} + +func getStreamingAPIResponse(resp *http.Response) *rbody.APIResponse { + r := new(rbody.APIResponse) + rb := readBody(resp) + err := json.Unmarshal(rb, r) + if err != nil { + log.Fatal(err) + } + r.JSONResponse = string(rb) + return r +} + +type watchTaskResult struct { + eventChan chan string + doneChan chan struct{} + killChan chan struct{} +} + +func (w *watchTaskResult) close() { + close(w.doneChan) +} + +func watchTask(id string, port int) *watchTaskResult { + resp, err := http.Get(fmt.Sprintf("http://localhost:%d/v1/tasks/%s/watch", port, id)) + if err != nil { + log.Fatal(err) + } + + r := &watchTaskResult{ + eventChan: make(chan string), + doneChan: make(chan struct{}), + killChan: make(chan struct{}), + } + go func() { + reader := bufio.NewReader(resp.Body) + for { + select { + case <-r.doneChan: + resp.Body.Close() + return + default: + line, _ := reader.ReadBytes('\n') + ste := &rbody.StreamedTaskEvent{} + err := json.Unmarshal(line, ste) + if err != nil { + log.Fatal(err) + r.close() + return + } + switch ste.EventType { + case rbody.TaskWatchTaskDisabled: + r.eventChan <- ste.EventType + r.close() + return + case rbody.TaskWatchTaskStopped, rbody.TaskWatchTaskStarted, rbody.TaskWatchMetricEvent: + log.Info(ste.EventType) + r.eventChan <- ste.EventType + } + } + } + }() + return r +} + +func getTasks(port int) *rbody.APIResponse { + resp, err := http.Get(fmt.Sprintf("http://localhost:%d/v1/tasks", port)) + if err != nil { + log.Fatal(err) + } + return getAPIResponse(resp) +} + +func getTask(id string, port int) *rbody.APIResponse { + resp, err := http.Get(fmt.Sprintf("http://localhost:%d/v1/tasks/%s", port, id)) + if err != nil { + log.Fatal(err) + } + return getAPIResponse(resp) +} + +func startTask(id string, port int) *rbody.APIResponse { + uri := fmt.Sprintf("http://localhost:%d/v1/tasks/%s/start", port, id) + client := &http.Client{} + b := bytes.NewReader([]byte{}) + req, err := http.NewRequest("PUT", uri, b) + if err != nil { + log.Fatal(err) + } + req.Header.Add("Content-Type", "application/json") + resp, err := client.Do(req) + if err != nil { + log.Fatal(err) + } + return getAPIResponse(resp) +} + +func stopTask(id string, port int) *rbody.APIResponse { + uri := fmt.Sprintf("http://localhost:%d/v1/tasks/%s/stop", port, id) + client := &http.Client{} + b := bytes.NewReader([]byte{}) + req, err := http.NewRequest("PUT", uri, b) + if err != nil { + log.Fatal(err) + } + req.Header.Add("Content-Type", "application/json") + resp, err := client.Do(req) + if err != nil { + log.Fatal(err) + } + return getAPIResponse(resp) +} -type restAPIInstance struct { - port int - server *Server +func removeTask(id string, port int) *rbody.APIResponse { + uri := fmt.Sprintf("http://localhost:%d/v1/tasks/%s", port, id) + client := &http.Client{} + req, err := http.NewRequest("DELETE", uri, nil) + if err != nil { + log.Fatal(err) + } + req.Header.Add("Content-Type", "application/json") + resp, err := client.Do(req) + if err != nil { + log.Fatal(err) + } + return getAPIResponse(resp) +} + +func createTask(sample, name, interval string, noStart bool, port int) *rbody.APIResponse { + jsonP, err := ioutil.ReadFile("./wmap_sample/" + sample) + if err != nil { + log.Fatal(err) + } + wf, err := wmap.FromJson(jsonP) + if err != nil { + log.Fatal(err) + } + + uri := fmt.Sprintf("http://localhost:%d/v1/tasks", port) + + t := core.TaskCreationRequest{ + Schedule: &core.Schedule{Type: "simple", Interval: interval}, + Workflow: wf, + Name: name, + Start: !noStart, + } + // Marshal to JSON for request body + j, err := json.Marshal(t) + if err != nil { + log.Fatal(err) + } + + client := &http.Client{} + b := bytes.NewReader(j) + req, err := http.NewRequest("POST", uri, b) + if err != nil { + log.Fatal(err) + } + req.Header.Add("Content-Type", "application/json") + resp, err := client.Do(req) + if err != nil { + log.Fatal(err) + } + return getAPIResponse(resp) +} + +func enableTask(id string, port int) *rbody.APIResponse { + uri := fmt.Sprintf("http://localhost:%d/v1/tasks/%s/enable", port, id) + client := &http.Client{} + b := bytes.NewReader([]byte{}) + req, err := http.NewRequest("PUT", uri, b) + if err != nil { + log.Fatal(err) + } + req.Header.Add("Content-Type", "application/json") + resp, err := client.Do(req) + if err != nil { + log.Fatal(err) + } + return getAPIResponse(resp) +} + +func uploadPlugin(pluginPath string, port int) *rbody.APIResponse { + uri := fmt.Sprintf("http://localhost:%d/v1/plugins", port) + + client := &http.Client{} + file, err := os.Open(pluginPath) + if err != nil { + log.Fatal(err) + } + + body := &bytes.Buffer{} + writer := multipart.NewWriter(body) + var part io.Writer + part, err = writer.CreateFormFile("snap-plugins", filepath.Base(pluginPath)) + if err != nil { + log.Fatal(err) + } + if CompressedUpload { + cpart := gzip.NewWriter(part) + _, err = io.Copy(cpart, file) + if err != nil { + log.Fatal(err) + } + err = cpart.Close() + } else { + _, err = io.Copy(part, file) + } + if err != nil { + log.Fatal(err) + } + err = writer.Close() + if err != nil { + log.Fatal(err) + } + TotalUploadSize += body.Len() + UploadCount += 1 + req, err := http.NewRequest("POST", uri, body) + if err != nil { + log.Fatal(err) + } + req.Header.Add("Content-Type", writer.FormDataContentType()) + if CompressedUpload { + req.Header.Add("Plugin-Compression", "gzip") + } + file.Close() + resp, err := client.Do(req) + if err != nil { + log.Fatal(err) + } + return getAPIResponse(resp) +} + +func unloadPlugin(port int, pluginType string, name string, version int) *rbody.APIResponse { + uri := fmt.Sprintf("http://localhost:%d/v1/plugins/%s/%s/%d", port, pluginType, name, version) + client := &http.Client{} + req, err := http.NewRequest("DELETE", uri, nil) + if err != nil { + log.Fatal(err) + } + resp, err := client.Do(req) + if err != nil { + log.Fatal(err) + } + + return getAPIResponse(resp) +} + +func getPluginList(port int) *rbody.APIResponse { + resp, err := http.Get(fmt.Sprintf("http://localhost:%d/v1/plugins", port)) + if err != nil { + log.Fatal(err) + } + return getAPIResponse(resp) +} + +func getMetricCatalog(port int) *rbody.APIResponse { + return fetchMetrics(port, "") +} + +func fetchMetrics(port int, ns string) *rbody.APIResponse { + resp, err := http.Get(fmt.Sprintf("http://localhost:%d/v1/metrics%s", port, ns)) + if err != nil { + log.Fatal(err) + } + + return getAPIResponse(resp) +} + +func fetchMetricsWithVersion(port int, ns string, ver int) *rbody.APIResponse { + resp, err := http.Get(fmt.Sprintf("http://localhost:%d/v1/metrics%s?ver=%d", port, ns, ver)) + if err != nil { + log.Fatal(err) + } + + return getAPIResponse(resp) +} + +func getPluginConfigItem(port int, typ *core.PluginType, name, ver string) *rbody.APIResponse { + var uri string + if typ != nil { + uri = fmt.Sprintf("http://localhost:%d/v1/plugins/%d/%s/%s/config", port, *typ, name, ver) + } else { + uri = fmt.Sprintf("http://localhost:%d/v1/plugins/%s/%s/%s/config", port, "", name, ver) + } + resp, err := http.Get(uri) + if err != nil { + log.Fatal(err) + } + + return getAPIResponse(resp) +} + +func setPluginConfigItem(port int, typ string, name, ver string, cdn *cdata.ConfigDataNode) *rbody.APIResponse { + uri := fmt.Sprintf("http://localhost:%d/v1/plugins/%s/%s/%s/config", port, typ, name, ver) + + client := &http.Client{} + b, err := json.Marshal(cdn) + if err != nil { + log.Fatal(err) + } + req, err := http.NewRequest("PUT", uri, bytes.NewReader(b)) + if err != nil { + log.Fatal(err) + } + req.Header.Add("Content-Type", "application/json") + resp, err := client.Do(req) + if err != nil { + log.Fatal(err) + } + + return getAPIResponse(resp) +} + +func deletePluginConfigItem(port int, typ string, name, ver string, fields []string) *rbody.APIResponse { + uri := fmt.Sprintf("http://localhost:%d/v1/plugins/%s/%s/%s/config", port, typ, name, ver) + + client := &http.Client{} + b, err := json.Marshal(fields) + if err != nil { + log.Fatal(err) + } + req, err := http.NewRequest("DELETE", uri, bytes.NewReader(b)) + if err != nil { + log.Fatal(err) + } + req.Header.Add("Content-Type", "application/json") + resp, err := client.Do(req) + if err != nil { + log.Fatal(err) + } + return getAPIResponse(resp) +} + +// REST API instances that are started are killed when the tests end. +// When we eventually have a REST API Stop command this can be killed. +func startAPI(cfg *mockConfig) *restAPIInstance { + // Start a REST API to talk to + log.SetLevel(LOG_LEVEL) + r, _ := New(cfg.RestAPI) + c := control.New(cfg.Control) + c.Start() + s := scheduler.New(cfg.Scheduler) + s.SetMetricManager(c) + s.Start() + r.BindMetricManager(c) + r.BindTaskManager(s) + r.BindConfigManager(c.Config) + go func(ch <-chan error) { + // Block on the error channel. Will return exit status 1 for an error or just return if the channel closes. + err, ok := <-ch + if !ok { + return + } + log.Fatal(err) + }(r.Err()) + r.SetAddress("127.0.0.1:0") + r.Start() + time.Sleep(time.Millisecond * 100) + return &restAPIInstance{ + port: r.Port(), + server: r, + } +} + +func TestPluginRestCalls(t *testing.T) { + CompressedUpload = false + Convey("REST API functional V1", t, func() { + Convey("Load Plugin - POST - /v1/plugins", func() { + Convey("a single plugin loads", func() { + // This test alone tests gzip. Saves on test time. + CompressedUpload = true + r := startAPI(getDefaultMockConfig()) + port := r.port + col := core.CollectorPluginType + pub := core.PublisherPluginType + Convey("A global plugin config is added for all plugins", func() { + cdn := cdata.NewNode() + cdn.AddItem("password", ctypes.ConfigValueStr{Value: "p@ssw0rd"}) + r := setPluginConfigItem(port, "", "", "", cdn) + So(r.Body, ShouldHaveSameTypeAs, &rbody.SetPluginConfigItem{}) + r1 := r.Body.(*rbody.SetPluginConfigItem) + So(r1.Table()["password"], ShouldResemble, ctypes.ConfigValueStr{Value: "p@ssw0rd"}) + + r2 := getPluginConfigItem(port, &col, "", "") + So(r2.Body, ShouldHaveSameTypeAs, &rbody.PluginConfigItem{}) + r3 := r2.Body.(*rbody.PluginConfigItem) + So(len(r3.Table()), ShouldEqual, 1) + So(r3.Table()["password"], ShouldResemble, ctypes.ConfigValueStr{Value: "p@ssw0rd"}) + + Convey("A plugin config is added for all publishers", func() { + cdn := cdata.NewNode() + cdn.AddItem("user", ctypes.ConfigValueStr{Value: "john"}) + r := setPluginConfigItem(port, core.PublisherPluginType.String(), "", "", cdn) + So(r.Body, ShouldHaveSameTypeAs, &rbody.SetPluginConfigItem{}) + r1 := r.Body.(*rbody.SetPluginConfigItem) + So(r1.Table()["user"], ShouldResemble, ctypes.ConfigValueStr{Value: "john"}) + So(len(r1.Table()), ShouldEqual, 2) + + Convey("A plugin config is added for all versions of a publisher", func() { + cdn := cdata.NewNode() + cdn.AddItem("path", ctypes.ConfigValueStr{Value: "/usr/local/influxdb/bin"}) + r := setPluginConfigItem(port, "2", "influxdb", "", cdn) + So(r.Body, ShouldHaveSameTypeAs, &rbody.SetPluginConfigItem{}) + r1 := r.Body.(*rbody.SetPluginConfigItem) + So(r1.Table()["path"], ShouldResemble, ctypes.ConfigValueStr{Value: "/usr/local/influxdb/bin"}) + So(len(r1.Table()), ShouldEqual, 3) + + Convey("A plugin config is added for a specific version of a publisher", func() { + cdn := cdata.NewNode() + cdn.AddItem("rate", ctypes.ConfigValueFloat{Value: .8}) + r := setPluginConfigItem(port, core.PublisherPluginType.String(), "influxdb", "1", cdn) + So(r.Body, ShouldHaveSameTypeAs, &rbody.SetPluginConfigItem{}) + r1 := r.Body.(*rbody.SetPluginConfigItem) + So(r1.Table()["rate"], ShouldResemble, ctypes.ConfigValueFloat{Value: .8}) + So(len(r1.Table()), ShouldEqual, 4) + + r2 := getPluginConfigItem(port, &pub, "", "") + So(r2.Body, ShouldHaveSameTypeAs, &rbody.PluginConfigItem{}) + r3 := r2.Body.(*rbody.PluginConfigItem) + So(len(r3.Table()), ShouldEqual, 2) + + r4 := getPluginConfigItem(port, &pub, "influxdb", "1") + So(r4.Body, ShouldHaveSameTypeAs, &rbody.PluginConfigItem{}) + r5 := r4.Body.(*rbody.PluginConfigItem) + So(len(r5.Table()), ShouldEqual, 4) + + Convey("A global plugin config field is deleted", func() { + r := deletePluginConfigItem(port, "", "", "", []string{"password"}) + So(r.Body, ShouldHaveSameTypeAs, &rbody.DeletePluginConfigItem{}) + r1 := r.Body.(*rbody.DeletePluginConfigItem) + So(len(r1.Table()), ShouldEqual, 0) + + r2 := setPluginConfigItem(port, core.PublisherPluginType.String(), "influxdb", "", cdn) + So(r2.Body, ShouldHaveSameTypeAs, &rbody.SetPluginConfigItem{}) + r3 := r2.Body.(*rbody.SetPluginConfigItem) + So(len(r3.Table()), ShouldEqual, 3) + }) + }) + }) + }) + }) + + }) + Convey("Plugin config is set at startup", func() { + cfg := getDefaultMockConfig() + err := cfgfile.Read("../../examples/configs/snap-config-sample.json", &cfg, MOCK_CONSTRAINTS) + So(err, ShouldBeNil) + if len(SNAP_AUTODISCOVER_PATH) == 0 { + if len(SNAP_PATH) != 0 { + + SNAP_AUTODISCOVER_PATH = helper.PluginPath() + log.Warning(fmt.Sprintf("SNAP_AUTODISCOVER_PATH has been set to plugin build path (%s). This might cause test failures", SNAP_AUTODISCOVER_PATH)) + } + } else { + log.Warning(fmt.Sprintf("SNAP_AUTODISCOVER_PATH is set to %s. This might cause test failures", SNAP_AUTODISCOVER_PATH)) + } + cfg.Control.AutoDiscoverPath = SNAP_AUTODISCOVER_PATH + r := startAPI(cfg) + port := r.port + col := core.CollectorPluginType + + Convey("Gets the collector config by name and version", func() { + r := getPluginConfigItem(port, &col, "pcm", "1") + So(r.Body, ShouldHaveSameTypeAs, &rbody.PluginConfigItem{}) + r1 := r.Body.(*rbody.PluginConfigItem) + So(r1.Table()["path"], ShouldResemble, ctypes.ConfigValueStr{Value: "/usr/local/pcm/bin"}) + So(r1.Table()["user"], ShouldResemble, ctypes.ConfigValueStr{Value: "john"}) + So(len(r1.Table()), ShouldEqual, 6) + }) + Convey("Gets the config for a collector by name", func() { + r := getPluginConfigItem(port, &col, "pcm", "") + So(r.Body, ShouldHaveSameTypeAs, &rbody.PluginConfigItem{}) + r1 := r.Body.(*rbody.PluginConfigItem) + So(r1.Table()["path"], ShouldResemble, ctypes.ConfigValueStr{Value: "/usr/local/pcm/bin"}) + So(r1.Table()["user"], ShouldResemble, ctypes.ConfigValueStr{Value: "jane"}) + So(len(r1.Table()), ShouldEqual, 3) + }) + Convey("Gets the config for all collectors", func() { + r := getPluginConfigItem(port, &col, "", "") + So(r.Body, ShouldHaveSameTypeAs, &rbody.PluginConfigItem{}) + r1 := r.Body.(*rbody.PluginConfigItem) + So(r1.Table()["user"], ShouldResemble, ctypes.ConfigValueStr{Value: "jane"}) + So(r1.Table()["password"], ShouldResemble, ctypes.ConfigValueStr{Value: "p@ssw0rd"}) + So(len(r1.Table()), ShouldEqual, 2) + }) + Convey("Gets the config for all plugins", func() { + r := getPluginConfigItem(port, nil, "", "") + So(r.Body, ShouldHaveSameTypeAs, &rbody.PluginConfigItem{}) + r1 := r.Body.(*rbody.PluginConfigItem) + So(r1.Table()["password"], ShouldResemble, ctypes.ConfigValueStr{Value: "p@ssw0rd"}) + So(len(r1.Table()), ShouldEqual, 1) + }) + }) + }) + + Convey("Enable task - put - /v1/tasks/:id/enable", func() { + Convey("Enable a running task", func(c C) { + r := startAPI(getDefaultMockConfig()) + port := r.port + + uploadPlugin(MOCK_PLUGIN_PATH2, port) + uploadPlugin(FILE_PLUGIN_PATH, port) + + r1 := createTask("1.json", "yeti", "1s", true, port) + So(r1.Body, ShouldHaveSameTypeAs, new(rbody.AddScheduledTask)) + plr1 := r1.Body.(*rbody.AddScheduledTask) + + id := plr1.ID + + r2 := startTask(id, port) + So(r2.Body, ShouldHaveSameTypeAs, new(rbody.ScheduledTaskStarted)) + plr2 := r2.Body.(*rbody.ScheduledTaskStarted) + So(plr2.ID, ShouldEqual, id) + + r4 := enableTask(id, port) + So(r4.Body, ShouldHaveSameTypeAs, new(rbody.Error)) + plr4 := r4.Body.(*rbody.Error) + So(plr4.ErrorMessage, ShouldEqual, "Task must be disabled") + }) + }) + }) } func startV1API(cfg *mockConfig, testType string) *restAPIInstance { diff --git a/mgmt/rest/server.go b/mgmt/rest/server.go index 0ef54e0f5..66f581e59 100644 --- a/mgmt/rest/server.go +++ b/mgmt/rest/server.go @@ -20,15 +20,11 @@ limitations under the License. package rest import ( - "bytes" "crypto/tls" - "encoding/json" "errors" "fmt" "net" "net/http" - "net/http/pprof" - "strings" "sync" "time" @@ -36,32 +32,8 @@ import ( "github.com/julienschmidt/httprouter" "github.com/urfave/negroni" - "github.com/intelsdi-x/snap/core" - "github.com/intelsdi-x/snap/core/cdata" - "github.com/intelsdi-x/snap/core/serror" - "github.com/intelsdi-x/snap/mgmt/rest/rbody" - "github.com/intelsdi-x/snap/mgmt/tribe/agreement" - cschedule "github.com/intelsdi-x/snap/pkg/schedule" - "github.com/intelsdi-x/snap/pkg/stringutils" - "github.com/intelsdi-x/snap/scheduler/wmap" -) - -const ( - APIVersion = 1 -) - -// default configuration values -const ( - defaultEnable bool = true - defaultPort int = 8181 - defaultAddress string = "" - defaultHTTPS bool = false - defaultRestCertificate string = "" - defaultRestKey string = "" - defaultAuth bool = false - defaultAuthPassword string = "" - defaultPortSetByConfig bool = false - defaultPprof bool = false + "github.com/intelsdi-x/snap/mgmt/rest/api" + "github.com/intelsdi-x/snap/mgmt/rest/v1" ) var ( @@ -71,111 +43,8 @@ var ( protocolPrefix = "http" ) -// holds the configuration passed in through the SNAP config file -// Note: if this struct is modified, then the switch statement in the -// UnmarshalJSON method in this same file needs to be modified to -// match the field mapping that is defined here -type Config struct { - Enable bool `json:"enable"yaml:"enable"` - Port int `json:"port"yaml:"port"` - Address string `json:"addr"yaml:"addr"` - HTTPS bool `json:"https"yaml:"https"` - RestCertificate string `json:"rest_certificate"yaml:"rest_certificate"` - RestKey string `json:"rest_key"yaml:"rest_key"` - RestAuth bool `json:"rest_auth"yaml:"rest_auth"` - RestAuthPassword string `json:"rest_auth_password"yaml:"rest_auth_password"` - portSetByConfig bool `` - Pprof bool `json:"pprof"yaml:"pprof"` -} - -const ( - CONFIG_CONSTRAINTS = ` - "restapi" : { - "type": ["object", "null"], - "properties" : { - "enable": { - "type": "boolean" - }, - "https" : { - "type": "boolean" - }, - "rest_auth": { - "type": "boolean" - }, - "rest_auth_password": { - "type": "string" - }, - "rest_certificate": { - "type": "string" - }, - "rest_key" : { - "type": "string" - }, - "port" : { - "type": "integer", - "minimum": 1, - "maximum": 65535 - }, - "addr" : { - "type": "string" - }, - "pprof": { - "type": "boolean" - } - }, - "additionalProperties": false - } - ` -) - -type managesMetrics interface { - MetricCatalog() ([]core.CatalogedMetric, error) - FetchMetrics(core.Namespace, int) ([]core.CatalogedMetric, error) - GetMetricVersions(core.Namespace) ([]core.CatalogedMetric, error) - GetMetric(core.Namespace, int) (core.CatalogedMetric, error) - Load(*core.RequestedPlugin) (core.CatalogedPlugin, serror.SnapError) - Unload(core.Plugin) (core.CatalogedPlugin, serror.SnapError) - PluginCatalog() core.PluginCatalog - AvailablePlugins() []core.AvailablePlugin - GetAutodiscoverPaths() []string -} - -type managesTasks interface { - CreateTask(cschedule.Schedule, *wmap.WorkflowMap, bool, ...core.TaskOption) (core.Task, core.TaskErrors) - GetTasks() map[string]core.Task - GetTask(string) (core.Task, error) - StartTask(string) []serror.SnapError - StopTask(string) []serror.SnapError - RemoveTask(string) error - WatchTask(string, core.TaskWatcherHandler) (core.TaskWatcherCloser, error) - EnableTask(string) (core.Task, error) -} - -type managesTribe interface { - GetAgreement(name string) (*agreement.Agreement, serror.SnapError) - GetAgreements() map[string]*agreement.Agreement - AddAgreement(name string) serror.SnapError - RemoveAgreement(name string) serror.SnapError - JoinAgreement(agreementName, memberName string) serror.SnapError - LeaveAgreement(agreementName, memberName string) serror.SnapError - GetMembers() []string - GetMember(name string) *agreement.Member -} - -type managesConfig interface { - GetPluginConfigDataNode(core.PluginType, string, int) cdata.ConfigDataNode - GetPluginConfigDataNodeAll() cdata.ConfigDataNode - MergePluginConfigDataNode(pluginType core.PluginType, name string, ver int, cdn *cdata.ConfigDataNode) cdata.ConfigDataNode - MergePluginConfigDataNodeAll(cdn *cdata.ConfigDataNode) cdata.ConfigDataNode - DeletePluginConfigDataNodeField(pluginType core.PluginType, name string, ver int, fields ...string) cdata.ConfigDataNode - DeletePluginConfigDataNodeFieldAll(fields ...string) cdata.ConfigDataNode -} - type Server struct { - mm managesMetrics - mt managesTasks - tr managesTribe - mc managesConfig + apis []api.API n *negroni.Negroni r *httprouter.Router snapTLS *snapTLS @@ -195,26 +64,26 @@ type Server struct { // New creates a REST API server with a given config func New(cfg *Config) (*Server, error) { // pull a few parameters from the configuration passed in by snapteld - https := cfg.HTTPS - cpath := cfg.RestCertificate - kpath := cfg.RestKey - pprof := cfg.Pprof s := &Server{ err: make(chan error), killChan: make(chan struct{}), addrString: cfg.Address, - pprof: pprof, + pprof: cfg.Pprof, } - if https { + if cfg.HTTPS { var err error - s.snapTLS, err = newtls(cpath, kpath) + s.snapTLS, err = newtls(cfg.RestCertificate, cfg.RestKey) if err != nil { return nil, err } protocolPrefix = "https" } + restLogger.Info(fmt.Sprintf("Configuring REST API with HTTPS set to: %v", cfg.HTTPS)) + + s.apis = []api.API{ + v1.New(&s.wg, s.killChan, protocolPrefix), + } - restLogger.Info(fmt.Sprintf("Configuring REST API with HTTPS set to: %v", https)) s.n = negroni.New( NewLogger(), negroni.NewRecovery(), @@ -226,84 +95,28 @@ func New(cfg *Config) (*Server, error) { return s, nil } -// GetDefaultConfig gets the default snapteld configuration -func GetDefaultConfig() *Config { - return &Config{ - Enable: defaultEnable, - Port: defaultPort, - Address: defaultAddress, - HTTPS: defaultHTTPS, - RestCertificate: defaultRestCertificate, - RestKey: defaultRestKey, - RestAuth: defaultAuth, - RestAuthPassword: defaultAuthPassword, - portSetByConfig: defaultPortSetByConfig, - Pprof: defaultPprof, +func (s *Server) BindMetricManager(m api.Metrics) { + for _, apiInstance := range s.apis { + apiInstance.BindMetricManager(m) } } -// define a method that can be used to determine if the port the RESTful -// API is listening on was set in the configuration file -func (c *Config) PortSetByConfigFile() bool { - return c.portSetByConfig +func (s *Server) BindTaskManager(t api.Tasks) { + for _, apiInstance := range s.apis { + apiInstance.BindTaskManager(t) + } } -// UnmarshalJSON unmarshals valid json into a Config. An example Config can be found -// at github.com/intelsdi-x/snap/blob/master/examples/configs/snap-config-sample.json -func (c *Config) UnmarshalJSON(data []byte) error { - // construct a map of strings to json.RawMessages (to defer the parsing of individual - // fields from the unmarshalled interface until later) and unmarshal the input - // byte array into that map - t := make(map[string]json.RawMessage) - if err := json.Unmarshal(data, &t); err != nil { - return err +func (s *Server) BindTribeManager(t api.Tribe) { + for _, apiInstance := range s.apis { + apiInstance.BindTribeManager(t) } - // loop through the individual map elements, parse each in turn, and set - // the appropriate field in this configuration - for k, v := range t { - switch k { - case "enable": - if err := json.Unmarshal(v, &(c.Enable)); err != nil { - return fmt.Errorf("%v (while parsing 'restapi::enable')", err) - } - case "port": - if err := json.Unmarshal(v, &(c.Port)); err != nil { - return fmt.Errorf("%v (while parsing 'restapi::port')", err) - } - c.portSetByConfig = true - case "addr": - if err := json.Unmarshal(v, &(c.Address)); err != nil { - return fmt.Errorf("%v (while parsing 'restapi::addr')", err) - } - case "https": - if err := json.Unmarshal(v, &(c.HTTPS)); err != nil { - return fmt.Errorf("%v (while parsing 'restapi::https')", err) - } - case "rest_certificate": - if err := json.Unmarshal(v, &(c.RestCertificate)); err != nil { - return fmt.Errorf("%v (while parsing 'restapi::rest_certificate')", err) - } - case "rest_key": - if err := json.Unmarshal(v, &(c.RestKey)); err != nil { - return fmt.Errorf("%v (while parsing 'restapi::rest_key')", err) - } - case "rest_auth": - if err := json.Unmarshal(v, &(c.RestAuth)); err != nil { - return fmt.Errorf("%v (while parsing 'restapi::rest_auth')", err) - } - case "rest_auth_password": - if err := json.Unmarshal(v, &(c.RestAuthPassword)); err != nil { - return fmt.Errorf("%v (while parsing 'restapi::rest_auth_password')", err) - } - case "pprof": - if err := json.Unmarshal(v, &(c.Pprof)); err != nil { - return fmt.Errorf("%v (while parsing 'restapi::pprof')", err) - } - default: - return fmt.Errorf("Unrecognized key '%v' in global config file while parsing 'restapi'", k) - } +} + +func (s *Server) BindConfigManager(c api.Config) { + for _, apiInstance := range s.apis { + apiInstance.BindConfigManager(c) } - return nil } // SetAPIAuth sets API authentication to enabled or disabled @@ -411,8 +224,8 @@ func (s *Server) serveTLS(ln net.Listener) { if err != nil { select { case <-s.closingChan: - // If we called Stop() then there will be a value in s.closingChan, so - // we'll get here and we can exit without showing the error. + // If we called Stop() then there will be a value in s.closingChan, so + // we'll get here and we can exit without showing the error. default: restLogger.Error(err) s.err <- err @@ -426,8 +239,8 @@ func (s *Server) serve(ln net.Listener) { if err != nil { select { case <-s.closingChan: - // If we called Stop() then there will be a value in s.closingChan, so - // we'll get here and we can exit without showing the error. + // If we called Stop() then there will be a value in s.closingChan, so + // we'll get here and we can exit without showing the error. default: restLogger.Error(err) s.err <- err @@ -435,6 +248,15 @@ func (s *Server) serve(ln net.Listener) { } } +func (s *Server) addRoutes() { + for _, apiInstance := range s.apis { + for _, route := range apiInstance.GetRoutes() { + s.r.Handle(route.Method, route.Path, route.Handle) + } + } + s.addPprofRoutes() +} + // Monkey patch ListenAndServe and TCP alive code from https://golang.org/src/net/http/server.go // The built in ListenAndServe and ListenAndServeTLS include TCP keepalive // At this point the Go team is not wanting to provide separate listen and serve methods @@ -452,142 +274,3 @@ func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) { tc.SetKeepAlivePeriod(3 * time.Minute) return tc, nil } - -func (s *Server) BindMetricManager(m managesMetrics) { - s.mm = m -} - -func (s *Server) BindTaskManager(t managesTasks) { - s.mt = t -} - -func (s *Server) BindTribeManager(t managesTribe) { - s.tr = t -} - -func (s *Server) BindConfigManager(c managesConfig) { - s.mc = c -} - -func (s *Server) addRoutes() { - - // plugin routes - s.r.GET("/v1/plugins", s.getPlugins) - s.r.GET("/v1/plugins/:type", s.getPlugins) - s.r.GET("/v1/plugins/:type/:name", s.getPlugins) - s.r.GET("/v1/plugins/:type/:name/:version", s.getPlugin) - s.r.POST("/v1/plugins", s.loadPlugin) - s.r.DELETE("/v1/plugins/:type/:name/:version", s.unloadPlugin) - s.r.GET("/v1/plugins/:type/:name/:version/config", s.getPluginConfigItem) - s.r.PUT("/v1/plugins/:type/:name/:version/config", s.setPluginConfigItem) - s.r.DELETE("/v1/plugins/:type/:name/:version/config", s.deletePluginConfigItem) - - // metric routes - s.r.GET("/v1/metrics", s.getMetrics) - s.r.GET("/v2/metrics", s.getMetrics) - - s.r.GET("/v1/metrics/*namespace", s.getMetricsFromTree) - s.r.GET("/v2/metrics/*namespace", s.getMetricsFromTree) - - // task routes - s.r.GET("/v1/tasks", s.getTasks) - s.r.GET("/v2/tasks", s.getTasks) - - s.r.GET("/v1/tasks/:id", s.getTask) - s.r.GET("/v1/tasks/:id/watch", s.watchTask) - s.r.POST("/v1/tasks", s.addTask) - s.r.PUT("/v1/tasks/:id/start", s.startTask) - s.r.PUT("/v1/tasks/:id/stop", s.stopTask) - s.r.DELETE("/v1/tasks/:id", s.removeTask) - s.r.PUT("/v1/tasks/:id/enable", s.enableTask) - - // tribe routes - if s.tr != nil { - s.r.GET("/v1/tribe/agreements", s.getAgreements) - s.r.GET("/v2/tribes/agreements", s.getAgreements) - - s.r.POST("/v1/tribe/agreements", s.addAgreement) - s.r.POST("/v2/tribes/agreements", s.addAgreement) - - s.r.GET("/v1/tribe/agreements/:name", s.getAgreement) - s.r.GET("/v2/tribes/agreements/:name", s.getAgreement) - - s.r.DELETE("/v1/tribe/agreements/:name", s.deleteAgreement) - s.r.DELETE("/v2/tribes/agreements/:name", s.deleteAgreement) - - s.r.PUT("/v1/tribe/agreements/:name/join", s.joinAgreement) - s.r.PUT("/v2/tribes/agreements/:name/join", s.joinAgreement) - - s.r.DELETE("/v1/tribe/agreements/:name/leave", s.leaveAgreement) - s.r.DELETE("/v2/tribes/agreements/:name/leave", s.leaveAgreement) - - s.r.GET("/v1/tribe/members", s.getMembers) - s.r.GET("/v2/tribes/members", s.getMembers) - - s.r.GET("/v1/tribe/member/:name", s.getMember) - s.r.GET("/v2/tribes/members/:name", s.getMember) - } - - // profiling tools routes - if s.pprof { - s.r.GET("/debug/pprof/", s.index) - s.r.GET("/debug/pprof/block", s.index) - s.r.GET("/debug/pprof/goroutine", s.index) - s.r.GET("/debug/pprof/heap", s.index) - s.r.GET("/debug/pprof/threadcreate", s.index) - s.r.GET("/debug/pprof/cmdline", s.cmdline) - s.r.GET("/debug/pprof/profile", s.profile) - s.r.GET("/debug/pprof/symbol", s.symbol) - s.r.GET("/debug/pprof/trace", s.trace) - } -} - -// profiling tools handlers - -func (s *Server) index(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { - pprof.Index(w, r) -} - -func (s *Server) cmdline(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { - pprof.Cmdline(w, r) -} - -func (s *Server) profile(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { - pprof.Profile(w, r) -} - -func (s *Server) symbol(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { - pprof.Symbol(w, r) -} - -func (s *Server) trace(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { - pprof.Trace(w, r) -} - -func respond(code int, b rbody.Body, w http.ResponseWriter) { - resp := &rbody.APIResponse{ - Meta: &rbody.APIResponseMeta{ - Code: code, - Message: b.ResponseBodyMessage(), - Type: b.ResponseBodyType(), - Version: APIVersion, - }, - Body: b, - } - if !w.(negroni.ResponseWriter).Written() { - w.WriteHeader(code) - } - - j, err := json.MarshalIndent(resp, "", " ") - if err != nil { - panic(err) - } - j = bytes.Replace(j, []byte("\\u0026"), []byte("&"), -1) - fmt.Fprint(w, string(j)) -} - -func parseNamespace(ns string) []string { - fc := stringutils.GetFirstChar(ns) - ns = strings.Trim(ns, fc) - return strings.Split(ns, fc) -} diff --git a/mgmt/rest/server_test.go b/mgmt/rest/server_test.go index 7a2fbe8c8..f57a87ccf 100644 --- a/mgmt/rest/server_test.go +++ b/mgmt/rest/server_test.go @@ -1,4 +1,4 @@ -// +build legacy +// +build medium /* http://www.apache.org/licenses/LICENSE-2.0.txt @@ -28,6 +28,24 @@ import ( . "github.com/smartystreets/goconvey/convey" ) +const ( + MOCK_CONSTRAINTS = `{ + "$schema": "http://json-schema.org/draft-04/schema#", + "title": "snapteld global config schema", + "type": ["object", "null"], + "properties": { + "control": { "$ref": "#/definitions/control" }, + "scheduler": { "$ref": "#/definitions/scheduler"}, + "restapi" : { "$ref": "#/definitions/restapi"}, + "tribe": { "$ref": "#/definitions/tribe"} + }, + "additionalProperties": true, + "definitions": { ` + + `"control": {}, "scheduler": {}, ` + CONFIG_CONSTRAINTS + `, "tribe":{}` + + `}` + + `}` +) + type mockRestAPIConfig struct { RestAPI *Config } @@ -145,44 +163,3 @@ func TestRestAPIDefaultConfig(t *testing.T) { }) }) } - -func TestParseNamespace(t *testing.T) { - tcs := getNsTestCases() - - Convey("Test parseNamespace", t, func() { - for _, c := range tcs { - Convey("Test parseNamespace "+c.input, func() { - So(c.output, ShouldResemble, parseNamespace(c.input)) - }) - } - }) -} - -type nsTestCase struct { - input string - output []string -} - -func getNsTestCases() []nsTestCase { - tcs := []nsTestCase{ - { - input: "小a小b小c", - output: []string{"a", "b", "c"}}, - { - input: "%a%b%c", - output: []string{"a", "b", "c"}}, - { - input: "-aヒ-b/-c|", - output: []string{"aヒ", "b/", "c|"}}, - { - input: ">a>b=>c=", - output: []string{"a", "b=", "c="}}, - { - input: ">a>b<>c<", - output: []string{"a", "b<", "c<"}}, - { - input: "㊽a㊽b%㊽c/|", - output: []string{"a", "b%", "c/|"}}, - } - return tcs -} diff --git a/mgmt/rest/tribe_test.go b/mgmt/rest/tribe_v1_test.go similarity index 99% rename from mgmt/rest/tribe_test.go rename to mgmt/rest/tribe_v1_test.go index c11c74ac5..a11cae485 100644 --- a/mgmt/rest/tribe_test.go +++ b/mgmt/rest/tribe_v1_test.go @@ -1,4 +1,4 @@ -// +build legacy +// +build medium /* http://www.apache.org/licenses/LICENSE-2.0.txt @@ -39,11 +39,17 @@ import ( "github.com/intelsdi-x/snap/control" "github.com/intelsdi-x/snap/core" "github.com/intelsdi-x/snap/core/tribe_event" - "github.com/intelsdi-x/snap/mgmt/rest/rbody" + "github.com/intelsdi-x/snap/mgmt/rest/v1/rbody" "github.com/intelsdi-x/snap/mgmt/tribe" "github.com/intelsdi-x/snap/scheduler" ) +var ( + tribeLogger = restLogger.WithFields(log.Fields{ + "_module": "rest-tribe", + }) +) + func getMembers(port int) *rbody.APIResponse { resp, err := http.Get(fmt.Sprintf("http://127.0.0.1:%d/v1/tribe/members", port)) if err != nil { diff --git a/mgmt/rest/v1/api.go b/mgmt/rest/v1/api.go new file mode 100644 index 000000000..852156209 --- /dev/null +++ b/mgmt/rest/v1/api.go @@ -0,0 +1,92 @@ +package v1 + +import ( + "sync" + + log "github.com/Sirupsen/logrus" + "github.com/intelsdi-x/snap/mgmt/rest/api" +) + +const ( + version = "v1" + prefix = "/" + version +) + +var ( + restLogger = log.WithField("_module", "_mgmt-rest-v1") + protocolPrefix = "http" +) + +type apiV1 struct { + metricManager api.Metrics + taskManager api.Tasks + tribeManager api.Tribe + configManager api.Config + + wg *sync.WaitGroup + killChan chan struct{} +} + +func New(wg *sync.WaitGroup, killChan chan struct{}, protocol string) *apiV1 { + protocolPrefix = protocol + return &apiV1{wg: wg, killChan: killChan} +} + +func (s *apiV1) GetRoutes() []api.Route { + routes := []api.Route{ + // plugin routes + api.Route{Method: "GET", Path: prefix + "/plugins", Handle: s.getPlugins}, + api.Route{Method: "GET", Path: prefix + "/plugins/:type", Handle: s.getPlugins}, + api.Route{Method: "GET", Path: prefix + "/plugins/:type/:name", Handle: s.getPlugins}, + api.Route{Method: "GET", Path: prefix + "/plugins/:type/:name/:version", Handle: s.getPlugin}, + api.Route{Method: "POST", Path: prefix + "/plugins", Handle: s.loadPlugin}, + api.Route{Method: "DELETE", Path: prefix + "/plugins/:type/:name/:version", Handle: s.unloadPlugin}, + api.Route{Method: "GET", Path: prefix + "/plugins/:type/:name/:version/config", Handle: s.getPluginConfigItem}, + api.Route{Method: "PUT", Path: prefix + "/plugins/:type/:name/:version/config", Handle: s.setPluginConfigItem}, + api.Route{Method: "DELETE", Path: prefix + "/plugins/:type/:name/:version/config", Handle: s.deletePluginConfigItem}, + + // metric routes + api.Route{Method: "GET", Path: prefix + "/metrics", Handle: s.getMetrics}, + api.Route{Method: "GET", Path: prefix + "/metrics/*namespace", Handle: s.getMetricsFromTree}, + + // task routes + api.Route{Method: "GET", Path: prefix + "/tasks", Handle: s.getTasks}, + api.Route{Method: "GET", Path: prefix + "/tasks/:id", Handle: s.getTask}, + api.Route{Method: "GET", Path: prefix + "/tasks/:id/watch", Handle: s.watchTask}, + api.Route{Method: "POST", Path: prefix + "/tasks", Handle: s.addTask}, + api.Route{Method: "PUT", Path: prefix + "/tasks/:id/start", Handle: s.startTask}, + api.Route{Method: "PUT", Path: prefix + "/tasks/:id/stop", Handle: s.stopTask}, + api.Route{Method: "DELETE", Path: prefix + "/tasks/:id", Handle: s.removeTask}, + api.Route{Method: "PUT", Path: prefix + "/tasks/:id/enable", Handle: s.enableTask}, + } + // tribe routes + if s.tribeManager != nil { + routes = append(routes, []api.Route{ + api.Route{Method: "GET", Path: prefix + "/tribe/agreements", Handle: s.getAgreements}, + api.Route{Method: "POST", Path: prefix + "/tribe/agreements", Handle: s.addAgreement}, + api.Route{Method: "GET", Path: prefix + "/tribe/agreements/:name", Handle: s.getAgreement}, + api.Route{Method: "DELETE", Path: prefix + "/tribe/agreements/:name", Handle: s.deleteAgreement}, + api.Route{Method: "PUT", Path: prefix + "/tribe/agreements/:name/join", Handle: s.joinAgreement}, + api.Route{Method: "DELETE", Path: prefix + "/tribe/agreements/:name/leave", Handle: s.leaveAgreement}, + api.Route{Method: "GET", Path: prefix + "/tribe/members", Handle: s.getMembers}, + api.Route{Method: "GET", Path: prefix + "/tribe/member/:name", Handle: s.getMember}, + }...) + } + return routes +} + +func (s *apiV1) BindMetricManager(metricManager api.Metrics) { + s.metricManager = metricManager +} + +func (s *apiV1) BindTaskManager(taskManager api.Tasks) { + s.taskManager = taskManager +} + +func (s *apiV1) BindTribeManager(tribeManager api.Tribe) { + s.tribeManager = tribeManager +} + +func (s *apiV1) BindConfigManager(configManager api.Config) { + s.configManager = configManager +} diff --git a/mgmt/rest/v1/config.go b/mgmt/rest/v1/config.go new file mode 100644 index 000000000..23215b658 --- /dev/null +++ b/mgmt/rest/v1/config.go @@ -0,0 +1,158 @@ +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2015 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1 + +import ( + "net/http" + "strconv" + + "github.com/intelsdi-x/snap/core" + "github.com/intelsdi-x/snap/core/cdata" + "github.com/intelsdi-x/snap/mgmt/rest/v1/rbody" + "github.com/julienschmidt/httprouter" +) + +func (s *apiV1) getPluginConfigItem(w http.ResponseWriter, r *http.Request, p httprouter.Params) { + var err error + styp := p.ByName("type") + if styp == "" { + cdn := s.configManager.GetPluginConfigDataNodeAll() + item := &rbody.PluginConfigItem{ConfigDataNode: cdn} + rbody.Write(200, item, w) + return + } + + typ, err := getPluginType(styp) + if err != nil { + rbody.Write(400, rbody.FromError(err), w) + return + } + + name := p.ByName("name") + sver := p.ByName("version") + var iver int + if sver != "" { + if iver, err = strconv.Atoi(sver); err != nil { + rbody.Write(400, rbody.FromError(err), w) + return + } + } else { + iver = -2 + } + + cdn := s.configManager.GetPluginConfigDataNode(typ, name, iver) + item := &rbody.PluginConfigItem{ConfigDataNode: cdn} + rbody.Write(200, item, w) +} + +func (s *apiV1) deletePluginConfigItem(w http.ResponseWriter, r *http.Request, p httprouter.Params) { + var err error + var typ core.PluginType + styp := p.ByName("type") + if styp != "" { + typ, err = getPluginType(styp) + if err != nil { + rbody.Write(400, rbody.FromError(err), w) + return + } + } + + name := p.ByName("name") + sver := p.ByName("version") + var iver int + if sver != "" { + if iver, err = strconv.Atoi(sver); err != nil { + rbody.Write(400, rbody.FromError(err), w) + return + } + } else { + iver = -2 + } + + src := []string{} + errCode, err := core.UnmarshalBody(&src, r.Body) + if errCode != 0 && err != nil { + rbody.Write(400, rbody.FromError(err), w) + return + } + + var res cdata.ConfigDataNode + if styp == "" { + res = s.configManager.DeletePluginConfigDataNodeFieldAll(src...) + } else { + res = s.configManager.DeletePluginConfigDataNodeField(typ, name, iver, src...) + } + + item := &rbody.DeletePluginConfigItem{ConfigDataNode: res} + rbody.Write(200, item, w) +} + +func (s *apiV1) setPluginConfigItem(w http.ResponseWriter, r *http.Request, p httprouter.Params) { + var err error + var typ core.PluginType + styp := p.ByName("type") + if styp != "" { + typ, err = getPluginType(styp) + if err != nil { + rbody.Write(400, rbody.FromError(err), w) + return + } + } + + name := p.ByName("name") + sver := p.ByName("version") + var iver int + if sver != "" { + if iver, err = strconv.Atoi(sver); err != nil { + rbody.Write(400, rbody.FromError(err), w) + return + } + } else { + iver = -2 + } + + src := cdata.NewNode() + errCode, err := core.UnmarshalBody(src, r.Body) + if errCode != 0 && err != nil { + rbody.Write(400, rbody.FromError(err), w) + return + } + + var res cdata.ConfigDataNode + if styp == "" { + res = s.configManager.MergePluginConfigDataNodeAll(src) + } else { + res = s.configManager.MergePluginConfigDataNode(typ, name, iver, src) + } + + item := &rbody.SetPluginConfigItem{ConfigDataNode: res} + rbody.Write(200, item, w) +} + +func getPluginType(t string) (core.PluginType, error) { + if ityp, err := strconv.Atoi(t); err == nil { + return core.PluginType(ityp), nil + } + ityp, err := core.ToPluginType(t) + if err != nil { + return core.PluginType(-1), err + } + return ityp, nil +} diff --git a/mgmt/rest/fixtures/mock_config_manager.go b/mgmt/rest/v1/fixtures/mock_config_manager.go similarity index 100% rename from mgmt/rest/fixtures/mock_config_manager.go rename to mgmt/rest/v1/fixtures/mock_config_manager.go diff --git a/mgmt/rest/fixtures/mock_metric_manager.go b/mgmt/rest/v1/fixtures/mock_metric_manager.go similarity index 100% rename from mgmt/rest/fixtures/mock_metric_manager.go rename to mgmt/rest/v1/fixtures/mock_metric_manager.go diff --git a/mgmt/rest/fixtures/mock_task_manager.go b/mgmt/rest/v1/fixtures/mock_task_manager.go similarity index 100% rename from mgmt/rest/fixtures/mock_task_manager.go rename to mgmt/rest/v1/fixtures/mock_task_manager.go diff --git a/mgmt/rest/fixtures/mock_tribe_manager.go b/mgmt/rest/v1/fixtures/mock_tribe_manager.go similarity index 100% rename from mgmt/rest/fixtures/mock_tribe_manager.go rename to mgmt/rest/v1/fixtures/mock_tribe_manager.go diff --git a/mgmt/rest/metric.go b/mgmt/rest/v1/metric.go similarity index 69% rename from mgmt/rest/metric.go rename to mgmt/rest/v1/metric.go index 3c0c40b68..1c0ced306 100644 --- a/mgmt/rest/metric.go +++ b/mgmt/rest/v1/metric.go @@ -17,7 +17,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package rest +package v1 import ( "fmt" @@ -27,24 +27,19 @@ import ( "strconv" "strings" - "github.com/julienschmidt/httprouter" - "github.com/intelsdi-x/snap/core" - "github.com/intelsdi-x/snap/mgmt/rest/rbody" - apiV2 "github.com/intelsdi-x/snap/mgmt/rest/v2" + "github.com/intelsdi-x/snap/mgmt/rest/v1/rbody" "github.com/intelsdi-x/snap/pkg/stringutils" + "github.com/julienschmidt/httprouter" ) -func (s *Server) getMetrics(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { +func (s *apiV1) getMetrics(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { ver := 0 // 0: get all metrics // If we are provided a parameter with the name 'ns' we need to // perform a query q := r.URL.Query() v := q.Get("ver") - // apiVersion should be "v1" or "v2". - // If apiVersion != "v2" calls default to V1 functionality - apiVersion := r.URL.Path[1:3] ns_query := q.Get("ns") if ns_query != "" { ver = 0 // 0: get all versions @@ -52,7 +47,7 @@ func (s *Server) getMetrics(w http.ResponseWriter, r *http.Request, _ httprouter var err error ver, err = strconv.Atoi(v) if err != nil { - respond(400, rbody.FromError(err), w) + rbody.Write(400, rbody.FromError(err), w) return } } @@ -63,24 +58,24 @@ func (s *Server) getMetrics(w http.ResponseWriter, r *http.Request, _ httprouter ns = ns[:len(ns)-1] } - mts, err := s.mm.FetchMetrics(core.NewNamespace(ns...), ver) + mts, err := s.metricManager.FetchMetrics(core.NewNamespace(ns...), ver) if err != nil { - respond(404, rbody.FromError(err), w) + rbody.Write(404, rbody.FromError(err), w) return } - respondWithMetrics(r.Host, mts, w, apiVersion) + respondWithMetrics(r.Host, mts, w) return } - mts, err := s.mm.MetricCatalog() + mts, err := s.metricManager.MetricCatalog() if err != nil { - respond(500, rbody.FromError(err), w) + rbody.Write(500, rbody.FromError(err), w) return } - respondWithMetrics(r.Host, mts, w, apiVersion) + respondWithMetrics(r.Host, mts, w) } -func (s *Server) getMetricsFromTree(w http.ResponseWriter, r *http.Request, params httprouter.Params) { +func (s *apiV1) getMetricsFromTree(w http.ResponseWriter, r *http.Request, params httprouter.Params) { namespace := params.ByName("namespace") // we land here if the request contains a trailing slash, because it matches the tree @@ -100,9 +95,6 @@ func (s *Server) getMetricsFromTree(w http.ResponseWriter, r *http.Request, para ) q := r.URL.Query() v := q.Get("ver") - // apiVersion should be "v1" or "v2". - // If apiVersion != "v2" calls default to V1 functionality - apiVersion := r.URL.Path[1:3] if ns[len(ns)-1] == "*" { if v == "" { @@ -110,40 +102,40 @@ func (s *Server) getMetricsFromTree(w http.ResponseWriter, r *http.Request, para } else { ver, err = strconv.Atoi(v) if err != nil { - respond(400, rbody.FromError(err), w) + rbody.Write(400, rbody.FromError(err), w) return } } - mts, err := s.mm.FetchMetrics(core.NewNamespace(ns[:len(ns)-1]...), ver) + mts, err := s.metricManager.FetchMetrics(core.NewNamespace(ns[:len(ns)-1]...), ver) if err != nil { - respond(404, rbody.FromError(err), w) + rbody.Write(404, rbody.FromError(err), w) return } - respondWithMetrics(r.Host, mts, w, apiVersion) + respondWithMetrics(r.Host, mts, w) return } // If no version was given, get all version that fall at this namespace. if v == "" { - mts, err := s.mm.FetchMetrics(core.NewNamespace(ns...), 0) + mts, err := s.metricManager.FetchMetrics(core.NewNamespace(ns...), 0) if err != nil { - respond(404, rbody.FromError(err), w) + rbody.Write(404, rbody.FromError(err), w) return } - respondWithMetrics(r.Host, mts, w, apiVersion) + respondWithMetrics(r.Host, mts, w) return } // if an explicit version is given, get that single one. ver, err = strconv.Atoi(v) if err != nil { - respond(400, rbody.FromError(err), w) + rbody.Write(400, rbody.FromError(err), w) return } - mt, err := s.mm.GetMetric(core.NewNamespace(ns...), ver) + mt, err := s.metricManager.GetMetric(core.NewNamespace(ns...), ver) if err != nil { - respond(404, rbody.FromError(err), w) + rbody.Write(404, rbody.FromError(err), w) return } @@ -161,19 +153,15 @@ func (s *Server) getMetricsFromTree(w http.ResponseWriter, r *http.Request, para Description: mt.Description(), Unit: mt.Unit(), LastAdvertisedTimestamp: mt.LastAdvertisedTime().Unix(), - Href: catalogedMetricURI(r.Host, mt), + Href: catalogedMetricURI(r.Host, version, mt), } policies := rbody.PolicyTableSlice(mt.Policy().RulesAsTable()) mb.Policy = policies b.Metric = mb - respond(200, b, w) + rbody.Write(200, b, w) } -func respondWithMetrics(host string, mts []core.CatalogedMetric, w http.ResponseWriter, version string) { - if version == "v2" { - apiV2.RespondWithMetrics(host, mts, w) - return - } +func respondWithMetrics(host string, mts []core.CatalogedMetric, w http.ResponseWriter) { b := rbody.NewMetricsReturned() for _, m := range mts { policies := rbody.PolicyTableSlice(m.Policy().RulesAsTable()) @@ -191,15 +179,11 @@ func respondWithMetrics(host string, mts []core.CatalogedMetric, w http.Response DynamicElements: dynamicElements, Unit: m.Unit(), Policy: policies, - Href: catalogedMetricURI(host, m), + Href: catalogedMetricURI(host, version, m), }) } sort.Sort(b) - respond(200, b, w) -} - -func catalogedMetricURI(host string, mt core.CatalogedMetric) string { - return fmt.Sprintf("%s://%s/v1/metrics?ns=%s&ver=%d", protocolPrefix, host, url.QueryEscape(mt.Namespace().String()), mt.Version()) + rbody.Write(200, b, w) } func getDynamicElements(ns core.Namespace, indexes []int) []rbody.DynamicElement { @@ -214,3 +198,13 @@ func getDynamicElements(ns core.Namespace, indexes []int) []rbody.DynamicElement } return elements } + +func catalogedMetricURI(host, version string, mt core.CatalogedMetric) string { + return fmt.Sprintf("%s://%s/%s/metrics?ns=%s&ver=%d", protocolPrefix, host, version, url.QueryEscape(mt.Namespace().String()), mt.Version()) +} + +func parseNamespace(ns string) []string { + fc := stringutils.GetFirstChar(ns) + ns = strings.Trim(ns, fc) + return strings.Split(ns, fc) +} diff --git a/mgmt/rest/v1/metric_test.go b/mgmt/rest/v1/metric_test.go new file mode 100644 index 000000000..d847621e8 --- /dev/null +++ b/mgmt/rest/v1/metric_test.go @@ -0,0 +1,69 @@ +// +build small + +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2015 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1 + +import ( + "testing" + + . "github.com/smartystreets/goconvey/convey" +) + +func TestParseNamespace(t *testing.T) { + tcs := getNsTestCases() + + Convey("Test parseNamespace", t, func() { + for _, c := range tcs { + Convey("Test parseNamespace "+c.input, func() { + So(c.output, ShouldResemble, parseNamespace(c.input)) + }) + } + }) +} + +type nsTestCase struct { + input string + output []string +} + +func getNsTestCases() []nsTestCase { + tcs := []nsTestCase{ + { + input: "小a小b小c", + output: []string{"a", "b", "c"}}, + { + input: "%a%b%c", + output: []string{"a", "b", "c"}}, + { + input: "-aヒ-b/-c|", + output: []string{"aヒ", "b/", "c|"}}, + { + input: ">a>b=>c=", + output: []string{"a", "b=", "c="}}, + { + input: ">a>b<>c<", + output: []string{"a", "b<", "c<"}}, + { + input: "㊽a㊽b%㊽c/|", + output: []string{"a", "b%", "c/|"}}, + } + return tcs +} diff --git a/mgmt/rest/plugin.go b/mgmt/rest/v1/plugin.go similarity index 79% rename from mgmt/rest/plugin.go rename to mgmt/rest/v1/plugin.go index e07e0c087..6451084b0 100644 --- a/mgmt/rest/plugin.go +++ b/mgmt/rest/v1/plugin.go @@ -17,7 +17,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package rest +package v1 import ( "compress/gzip" @@ -37,11 +37,11 @@ import ( "strings" log "github.com/Sirupsen/logrus" - "github.com/julienschmidt/httprouter" - "github.com/intelsdi-x/snap/core" "github.com/intelsdi-x/snap/core/serror" - "github.com/intelsdi-x/snap/mgmt/rest/rbody" + "github.com/intelsdi-x/snap/mgmt/rest/api" + "github.com/intelsdi-x/snap/mgmt/rest/v1/rbody" + "github.com/julienschmidt/httprouter" ) const PluginAlreadyLoaded = "plugin is already loaded" @@ -69,10 +69,10 @@ func (p *plugin) TypeName() string { return p.pluginType } -func (s *Server) loadPlugin(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { +func (s *apiV1) loadPlugin(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { mediaType, params, err := mime.ParseMediaType(r.Header.Get("Content-Type")) if err != nil { - respond(500, rbody.FromError(err), w) + rbody.Write(500, rbody.FromError(err), w) return } if strings.HasPrefix(mediaType, "multipart/") { @@ -90,25 +90,25 @@ func (s *Server) loadPlugin(w http.ResponseWriter, r *http.Request, _ httprouter break } if err != nil { - respond(500, rbody.FromError(err), w) + rbody.Write(500, rbody.FromError(err), w) return } if r.Header.Get("Plugin-Compression") == "gzip" { g, err := gzip.NewReader(p) defer g.Close() if err != nil { - respond(500, rbody.FromError(err), w) + rbody.Write(500, rbody.FromError(err), w) return } b, err = ioutil.ReadAll(g) if err != nil { - respond(500, rbody.FromError(err), w) + rbody.Write(500, rbody.FromError(err), w) return } } else { b, err = ioutil.ReadAll(p) if err != nil { - respond(500, rbody.FromError(err), w) + rbody.Write(500, rbody.FromError(err), w) return } } @@ -125,11 +125,11 @@ func (s *Server) loadPlugin(w http.ResponseWriter, r *http.Request, _ httprouter case i == 0: if filepath.Ext(p.FileName()) == ".asc" { e := errors.New("Error: first file passed to load plugin api can not be signature file") - respond(500, rbody.FromError(e), w) + rbody.Write(500, rbody.FromError(e), w) return } if pluginPath, err = writeFile(p.FileName(), b); err != nil { - respond(500, rbody.FromError(err), w) + rbody.Write(500, rbody.FromError(err), w) return } checkSum = sha256.Sum256(b) @@ -138,19 +138,19 @@ func (s *Server) loadPlugin(w http.ResponseWriter, r *http.Request, _ httprouter signature = b } else { e := errors.New("Error: second file passed was not a signature file") - respond(500, rbody.FromError(e), w) + rbody.Write(500, rbody.FromError(e), w) return } case i == 2: e := errors.New("Error: More than two files passed to the load plugin api") - respond(500, rbody.FromError(e), w) + rbody.Write(500, rbody.FromError(e), w) return } i++ } rp, err := core.NewRequestedPlugin(pluginPath) if err != nil { - respond(500, rbody.FromError(err), w) + rbody.Write(500, rbody.FromError(err), w) return } rp.SetAutoLoaded(false) @@ -158,12 +158,12 @@ func (s *Server) loadPlugin(w http.ResponseWriter, r *http.Request, _ httprouter // as after it is written to disk. if rp.CheckSum() != checkSum { e := errors.New("Error: CheckSum mismatch on requested plugin to load") - respond(500, rbody.FromError(e), w) + rbody.Write(500, rbody.FromError(e), w) return } rp.SetSignature(signature) restLogger.Info("Loading plugin: ", rp.Path()) - pl, err := s.mm.Load(rp) + pl, err := s.metricManager.Load(rp) if err != nil { var ec int restLogger.Error(err) @@ -179,11 +179,11 @@ func (s *Server) loadPlugin(w http.ResponseWriter, r *http.Request, _ httprouter default: ec = 500 } - respond(ec, rb, w) + rbody.Write(ec, rb, w) return } - lp.LoadedPlugins = append(lp.LoadedPlugins, *catalogedPluginToLoaded(r.Host, pl)) - respond(201, lp, w) + lp.LoadedPlugins = append(lp.LoadedPlugins, catalogedPluginToLoaded(r.Host, pl)) + rbody.Write(201, lp, w) } } @@ -197,6 +197,9 @@ func writeFile(filename string, b []byte) (string, error) { if err != nil { return "", err } + // Close before load + defer f.Close() + n, err := f.Write(b) log.Debugf("wrote %v to %v", n, f.Name()) if err != nil { @@ -208,12 +211,10 @@ func writeFile(filename string, b []byte) (string, error) { return "", err } } - // Close before load - f.Close() return f.Name(), nil } -func (s *Server) unloadPlugin(w http.ResponseWriter, r *http.Request, p httprouter.Params) { +func (s *apiV1) unloadPlugin(w http.ResponseWriter, r *http.Request, p httprouter.Params) { plName := p.ByName("name") plType := p.ByName("type") plVersion, iErr := strconv.ParseInt(p.ByName("version"), 10, 0) @@ -226,30 +227,30 @@ func (s *Server) unloadPlugin(w http.ResponseWriter, r *http.Request, p httprout if iErr != nil { se := serror.New(errors.New("invalid version")) se.SetFields(f) - respond(400, rbody.FromSnapError(se), w) + rbody.Write(400, rbody.FromSnapError(se), w) return } if plName == "" { se := serror.New(errors.New("missing plugin name")) se.SetFields(f) - respond(400, rbody.FromSnapError(se), w) + rbody.Write(400, rbody.FromSnapError(se), w) return } if plType == "" { se := serror.New(errors.New("missing plugin type")) se.SetFields(f) - respond(400, rbody.FromSnapError(se), w) + rbody.Write(400, rbody.FromSnapError(se), w) return } - up, se := s.mm.Unload(&plugin{ + up, se := s.metricManager.Unload(&plugin{ name: plName, version: int(plVersion), pluginType: plType, }) if se != nil { se.SetFields(f) - respond(500, rbody.FromSnapError(se), w) + rbody.Write(500, rbody.FromSnapError(se), w) return } pr := &rbody.PluginUnloaded{ @@ -257,10 +258,10 @@ func (s *Server) unloadPlugin(w http.ResponseWriter, r *http.Request, p httprout Version: up.Version(), Type: up.TypeName(), } - respond(200, pr, w) + rbody.Write(200, pr, w) } -func (s *Server) getPlugins(w http.ResponseWriter, r *http.Request, params httprouter.Params) { +func (s *apiV1) getPlugins(w http.ResponseWriter, r *http.Request, params httprouter.Params) { var detail bool for k := range r.URL.Query() { if k == "details" { @@ -269,10 +270,10 @@ func (s *Server) getPlugins(w http.ResponseWriter, r *http.Request, params httpr } plName := params.ByName("name") plType := params.ByName("type") - respond(200, getPlugins(s.mm, detail, r.Host, plName, plType), w) + rbody.Write(200, getPlugins(s.metricManager, detail, r.Host, plName, plType), w) } -func getPlugins(mm managesMetrics, detail bool, h string, plName string, plType string) *rbody.PluginList { +func getPlugins(mm api.Metrics, detail bool, h string, plName string, plType string) *rbody.PluginList { plCatalog := mm.PluginCatalog() @@ -280,7 +281,7 @@ func getPlugins(mm managesMetrics, detail bool, h string, plName string, plType plugins.LoadedPlugins = make([]rbody.LoadedPlugin, len(plCatalog)) for i, p := range plCatalog { - plugins.LoadedPlugins[i] = *catalogedPluginToLoaded(h, p) + plugins.LoadedPlugins[i] = catalogedPluginToLoaded(h, p) } if detail { @@ -294,7 +295,7 @@ func getPlugins(mm managesMetrics, detail bool, h string, plName string, plType HitCount: p.HitCount(), LastHitTimestamp: p.LastHit().Unix(), ID: p.ID(), - Href: pluginURI(h, p), + Href: pluginURI(h, version, p), PprofPort: p.Port(), } } @@ -337,19 +338,19 @@ func getPlugins(mm managesMetrics, detail bool, h string, plName string, plType return &plugins } -func catalogedPluginToLoaded(host string, c core.CatalogedPlugin) *rbody.LoadedPlugin { - return &rbody.LoadedPlugin{ +func catalogedPluginToLoaded(host string, c core.CatalogedPlugin) rbody.LoadedPlugin { + return rbody.LoadedPlugin{ Name: c.Name(), Version: c.Version(), Type: c.TypeName(), Signed: c.IsSigned(), Status: c.Status(), LoadedTimestamp: c.LoadedTimestamp().Unix(), - Href: pluginURI(host, c), + Href: pluginURI(host, version, c), } } -func (s *Server) getPlugin(w http.ResponseWriter, r *http.Request, p httprouter.Params) { +func (s *apiV1) getPlugin(w http.ResponseWriter, r *http.Request, p httprouter.Params) { plName := p.ByName("name") plType := p.ByName("type") plVersion, iErr := strconv.ParseInt(p.ByName("version"), 10, 0) @@ -362,24 +363,24 @@ func (s *Server) getPlugin(w http.ResponseWriter, r *http.Request, p httprouter. if iErr != nil { se := serror.New(errors.New("invalid version")) se.SetFields(f) - respond(400, rbody.FromSnapError(se), w) + rbody.Write(400, rbody.FromSnapError(se), w) return } if plName == "" { se := serror.New(errors.New("missing plugin name")) se.SetFields(f) - respond(400, rbody.FromSnapError(se), w) + rbody.Write(400, rbody.FromSnapError(se), w) return } if plType == "" { se := serror.New(errors.New("missing plugin type")) se.SetFields(f) - respond(400, rbody.FromSnapError(se), w) + rbody.Write(400, rbody.FromSnapError(se), w) return } - pluginCatalog := s.mm.PluginCatalog() + pluginCatalog := s.metricManager.PluginCatalog() var plugin core.CatalogedPlugin for _, item := range pluginCatalog { if item.Name() == plName && @@ -391,7 +392,7 @@ func (s *Server) getPlugin(w http.ResponseWriter, r *http.Request, p httprouter. } if plugin == nil { se := serror.New(ErrPluginNotFound, f) - respond(404, rbody.FromSnapError(se), w) + rbody.Write(404, rbody.FromSnapError(se), w) return } @@ -421,7 +422,7 @@ func (s *Server) getPlugin(w http.ResponseWriter, r *http.Request, p httprouter. if err != nil { f["plugin-path"] = plugin.PluginPath() se := serror.New(err, f) - respond(500, rbody.FromSnapError(se), w) + rbody.Write(500, rbody.FromSnapError(se), w) return } @@ -432,7 +433,7 @@ func (s *Server) getPlugin(w http.ResponseWriter, r *http.Request, p httprouter. if err != nil { f["plugin-path"] = plugin.PluginPath() se := serror.New(err, f) - respond(500, rbody.FromSnapError(se), w) + rbody.Write(500, rbody.FromSnapError(se), w) return } return @@ -444,13 +445,13 @@ func (s *Server) getPlugin(w http.ResponseWriter, r *http.Request, p httprouter. Signed: plugin.IsSigned(), Status: plugin.Status(), LoadedTimestamp: plugin.LoadedTimestamp().Unix(), - Href: pluginURI(r.Host, plugin), + Href: pluginURI(r.Host, version, plugin), ConfigPolicy: configPolicy, } - respond(200, pluginRet, w) + rbody.Write(200, pluginRet, w) } } -func pluginURI(host string, c core.Plugin) string { - return fmt.Sprintf("%s://%s/v1/plugins/%s/%s/%d", protocolPrefix, host, c.TypeName(), c.Name(), c.Version()) +func pluginURI(host, version string, c core.Plugin) string { + return fmt.Sprintf("%s://%s/%s/plugins/%s/%s/%d", protocolPrefix, host, version, c.TypeName(), c.Name(), c.Version()) } diff --git a/mgmt/rest/plugin_test.go b/mgmt/rest/v1/plugin_test.go similarity index 97% rename from mgmt/rest/plugin_test.go rename to mgmt/rest/v1/plugin_test.go index 772a3edce..0e8f4d245 100644 --- a/mgmt/rest/plugin_test.go +++ b/mgmt/rest/v1/plugin_test.go @@ -1,4 +1,4 @@ -// +build legacy +// +build medium /* http://www.apache.org/licenses/LICENSE-2.0.txt @@ -19,12 +19,12 @@ See the License for the specific language governing permissions and limitations under the License. */ -package rest +package v1 import ( "testing" - "github.com/intelsdi-x/snap/mgmt/rest/fixtures" + "github.com/intelsdi-x/snap/mgmt/rest/v1/fixtures" . "github.com/smartystreets/goconvey/convey" ) diff --git a/mgmt/rest/rbody/body.go b/mgmt/rest/v1/rbody/body.go similarity index 88% rename from mgmt/rest/rbody/body.go rename to mgmt/rest/v1/rbody/body.go index 7d26981dc..1827a8cd6 100644 --- a/mgmt/rest/rbody/body.go +++ b/mgmt/rest/v1/rbody/body.go @@ -23,7 +23,13 @@ import ( "encoding/json" "errors" + "bytes" + "fmt" + "net/http" + + "github.com/Sirupsen/logrus" "github.com/intelsdi-x/snap/core/cdata" + "github.com/urfave/negroni" ) type Body interface { @@ -33,6 +39,29 @@ type Body interface { ResponseBodyType() string } +func Write(code int, b Body, w http.ResponseWriter) { + w.Header().Set("Deprecated", "true") + resp := &APIResponse{ + Meta: &APIResponseMeta{ + Code: code, + Message: b.ResponseBodyMessage(), + Type: b.ResponseBodyType(), + Version: 1, + }, + Body: b, + } + if !w.(negroni.ResponseWriter).Written() { + w.WriteHeader(code) + } + + j, err := json.MarshalIndent(resp, "", " ") + if err != nil { + logrus.Fatalln(err) + } + j = bytes.Replace(j, []byte("\\u0026"), []byte("&"), -1) + fmt.Fprint(w, string(j)) +} + var ( ErrCannotUnmarshalBody = errors.New("Cannot unmarshal body: invalid type") ) diff --git a/mgmt/rest/rbody/config.go b/mgmt/rest/v1/rbody/config.go similarity index 100% rename from mgmt/rest/rbody/config.go rename to mgmt/rest/v1/rbody/config.go diff --git a/mgmt/rest/rbody/error.go b/mgmt/rest/v1/rbody/error.go similarity index 100% rename from mgmt/rest/rbody/error.go rename to mgmt/rest/v1/rbody/error.go diff --git a/mgmt/rest/rbody/metric.go b/mgmt/rest/v1/rbody/metric.go similarity index 100% rename from mgmt/rest/rbody/metric.go rename to mgmt/rest/v1/rbody/metric.go diff --git a/mgmt/rest/rbody/plugin.go b/mgmt/rest/v1/rbody/plugin.go similarity index 100% rename from mgmt/rest/rbody/plugin.go rename to mgmt/rest/v1/rbody/plugin.go diff --git a/mgmt/rest/rbody/task.go b/mgmt/rest/v1/rbody/task.go similarity index 100% rename from mgmt/rest/rbody/task.go rename to mgmt/rest/v1/rbody/task.go diff --git a/mgmt/rest/rbody/tribe.go b/mgmt/rest/v1/rbody/tribe.go similarity index 100% rename from mgmt/rest/rbody/tribe.go rename to mgmt/rest/v1/rbody/tribe.go diff --git a/mgmt/rest/task.go b/mgmt/rest/v1/task.go similarity index 70% rename from mgmt/rest/task.go rename to mgmt/rest/v1/task.go index a524a63b0..90da2182d 100644 --- a/mgmt/rest/task.go +++ b/mgmt/rest/v1/task.go @@ -17,7 +17,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package rest +package v1 import ( "errors" @@ -28,11 +28,9 @@ import ( "time" log "github.com/Sirupsen/logrus" - apiV2 "github.com/intelsdi-x/snap/mgmt/rest/v2" - "github.com/julienschmidt/httprouter" - "github.com/intelsdi-x/snap/core" - "github.com/intelsdi-x/snap/mgmt/rest/rbody" + "github.com/intelsdi-x/snap/mgmt/rest/v1/rbody" + "github.com/julienschmidt/httprouter" ) var ( @@ -42,28 +40,23 @@ var ( ErrStreamingUnsupported = errors.New("Streaming unsupported") ErrTaskNotFound = errors.New("Task not found") ErrTaskDisabledNotRunnable = errors.New("Task is disabled. Cannot be started") + ErrNoActionSpecified = errors.New("No action was specified in the request") + ErrWrongAction = errors.New("Wrong action requested") ) -func (s *Server) addTask(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { - task, err := core.CreateTaskFromContent(r.Body, nil, s.mt.CreateTask) +func (s *apiV1) addTask(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { + task, err := core.CreateTaskFromContent(r.Body, nil, s.taskManager.CreateTask) if err != nil { - respond(500, rbody.FromError(err), w) + rbody.Write(500, rbody.FromError(err), w) return } taskB := rbody.AddSchedulerTaskFromTask(task) - taskB.Href = taskURI(r.Host, task) - respond(201, taskB, w) + taskB.Href = taskURI(r.Host, version, task) + rbody.Write(201, taskB, w) } -func (s *Server) getTasks(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { - sts := s.mt.GetTasks() - // apiVersion should be "v1" or "v2". - // If apiVersion != "v2" calls default to V1 functionality - apiVersion := r.URL.Path[1:3] - if apiVersion == "v2" { - apiV2.GetTasks(w, r, sts) - return - } +func (s *apiV1) getTasks(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { + sts := s.taskManager.GetTasks() tasks := &rbody.ScheduledTaskListReturned{} tasks.ScheduledTasks = make([]rbody.ScheduledTask, len(sts)) @@ -71,27 +64,27 @@ func (s *Server) getTasks(w http.ResponseWriter, r *http.Request, _ httprouter.P i := 0 for _, t := range sts { tasks.ScheduledTasks[i] = *rbody.SchedulerTaskFromTask(t) - tasks.ScheduledTasks[i].Href = taskURI(r.Host, t) + tasks.ScheduledTasks[i].Href = taskURI(r.Host, version, t) i++ } sort.Sort(tasks) - respond(200, tasks, w) + rbody.Write(200, tasks, w) } -func (s *Server) getTask(w http.ResponseWriter, r *http.Request, p httprouter.Params) { +func (s *apiV1) getTask(w http.ResponseWriter, r *http.Request, p httprouter.Params) { id := p.ByName("id") - t, err1 := s.mt.GetTask(id) + t, err1 := s.taskManager.GetTask(id) if err1 != nil { - respond(404, rbody.FromError(err1), w) + rbody.Write(404, rbody.FromError(err1), w) return } task := &rbody.ScheduledTaskReturned{} task.AddScheduledTask = *rbody.AddSchedulerTaskFromTask(t) - task.Href = taskURI(r.Host, t) - respond(200, task, w) + task.Href = taskURI(r.Host, version, t) + rbody.Write(200, task, w) } -func (s *Server) watchTask(w http.ResponseWriter, r *http.Request, p httprouter.Params) { +func (s *apiV1) watchTask(w http.ResponseWriter, r *http.Request, p httprouter.Params) { s.wg.Add(1) defer s.wg.Done() logger := log.WithFields(log.Fields{ @@ -109,13 +102,13 @@ func (s *Server) watchTask(w http.ResponseWriter, r *http.Request, p httprouter. alive: true, mChan: make(chan rbody.StreamedTaskEvent), } - tc, err1 := s.mt.WatchTask(id, tw) + tc, err1 := s.taskManager.WatchTask(id, tw) if err1 != nil { if strings.Contains(err1.Error(), ErrTaskNotFound.Error()) { - respond(404, rbody.FromError(err1), w) + rbody.Write(404, rbody.FromError(err1), w) return } - respond(500, rbody.FromError(err1), w) + rbody.Write(500, rbody.FromError(err1), w) return } @@ -129,7 +122,7 @@ func (s *Server) watchTask(w http.ResponseWriter, r *http.Request, p httprouter. flusher, ok := w.(http.Flusher) if !ok { // This only works on ResponseWriters that support streaming - respond(500, rbody.FromError(ErrStreamingUnsupported), w) + rbody.Write(500, rbody.FromError(ErrStreamingUnsupported), w) return } // send initial stream open event @@ -164,7 +157,7 @@ func (s *Server) watchTask(w http.ResponseWriter, r *http.Request, p httprouter. // Close out watcher removing it from the scheduler tc.Close() // exit since this client is no longer listening - respond(200, &rbody.ScheduledTaskWatchingEnded{}, w) + rbody.Write(200, &rbody.ScheduledTaskWatchingEnded{}, w) } // If we are at least above our minimum buffer time we flush to send if time.Now().Sub(t).Seconds() > StreamingBufferWindow { @@ -180,7 +173,7 @@ func (s *Server) watchTask(w http.ResponseWriter, r *http.Request, p httprouter. // Close out watcher removing it from the scheduler tc.Close() // exit since this client is no longer listening - respond(200, &rbody.ScheduledTaskWatchingEnded{}, w) + rbody.Write(200, &rbody.ScheduledTaskWatchingEnded{}, w) return case <-s.killChan: logger.WithFields(log.Fields{ @@ -191,74 +184,74 @@ func (s *Server) watchTask(w http.ResponseWriter, r *http.Request, p httprouter. // Close out watcher removing it from the scheduler tc.Close() // exit since this client is no longer listening - respond(200, &rbody.ScheduledTaskWatchingEnded{}, w) + rbody.Write(200, &rbody.ScheduledTaskWatchingEnded{}, w) return } } } -func (s *Server) startTask(w http.ResponseWriter, r *http.Request, p httprouter.Params) { +func (s *apiV1) startTask(w http.ResponseWriter, r *http.Request, p httprouter.Params) { id := p.ByName("id") - errs := s.mt.StartTask(id) + errs := s.taskManager.StartTask(id) if errs != nil { if strings.Contains(errs[0].Error(), ErrTaskNotFound.Error()) { - respond(404, rbody.FromSnapErrors(errs), w) + rbody.Write(404, rbody.FromSnapErrors(errs), w) return } if strings.Contains(errs[0].Error(), ErrTaskDisabledNotRunnable.Error()) { - respond(409, rbody.FromSnapErrors(errs), w) + rbody.Write(409, rbody.FromSnapErrors(errs), w) return } - respond(500, rbody.FromSnapErrors(errs), w) + rbody.Write(500, rbody.FromSnapErrors(errs), w) return } // TODO should return resource - respond(200, &rbody.ScheduledTaskStarted{ID: id}, w) + rbody.Write(200, &rbody.ScheduledTaskStarted{ID: id}, w) } -func (s *Server) stopTask(w http.ResponseWriter, r *http.Request, p httprouter.Params) { +func (s *apiV1) stopTask(w http.ResponseWriter, r *http.Request, p httprouter.Params) { id := p.ByName("id") - errs := s.mt.StopTask(id) + errs := s.taskManager.StopTask(id) if errs != nil { if strings.Contains(errs[0].Error(), ErrTaskNotFound.Error()) { - respond(404, rbody.FromSnapErrors(errs), w) + rbody.Write(404, rbody.FromSnapErrors(errs), w) return } - respond(500, rbody.FromSnapErrors(errs), w) + rbody.Write(500, rbody.FromSnapErrors(errs), w) return } - respond(200, &rbody.ScheduledTaskStopped{ID: id}, w) + rbody.Write(200, &rbody.ScheduledTaskStopped{ID: id}, w) } -func (s *Server) removeTask(w http.ResponseWriter, r *http.Request, p httprouter.Params) { +func (s *apiV1) removeTask(w http.ResponseWriter, r *http.Request, p httprouter.Params) { id := p.ByName("id") - err := s.mt.RemoveTask(id) + err := s.taskManager.RemoveTask(id) if err != nil { if strings.Contains(err.Error(), ErrTaskNotFound.Error()) { - respond(404, rbody.FromError(err), w) + rbody.Write(404, rbody.FromError(err), w) return } - respond(500, rbody.FromError(err), w) + rbody.Write(500, rbody.FromError(err), w) return } - respond(200, &rbody.ScheduledTaskRemoved{ID: id}, w) + rbody.Write(200, &rbody.ScheduledTaskRemoved{ID: id}, w) } //enableTask changes the task state from Disabled to Stopped -func (s *Server) enableTask(w http.ResponseWriter, r *http.Request, p httprouter.Params) { +func (s *apiV1) enableTask(w http.ResponseWriter, r *http.Request, p httprouter.Params) { id := p.ByName("id") - tsk, err := s.mt.EnableTask(id) + tsk, err := s.taskManager.EnableTask(id) if err != nil { if strings.Contains(err.Error(), ErrTaskNotFound.Error()) { - respond(404, rbody.FromError(err), w) + rbody.Write(404, rbody.FromError(err), w) return } - respond(500, rbody.FromError(err), w) + rbody.Write(500, rbody.FromError(err), w) return } task := &rbody.ScheduledTaskEnabled{} task.AddScheduledTask = *rbody.AddSchedulerTaskFromTask(tsk) - respond(200, task, w) + rbody.Write(200, task, w) } type TaskWatchHandler struct { @@ -303,6 +296,6 @@ func (t *TaskWatchHandler) CatchTaskDisabled(why string) { } } -func taskURI(host string, t core.Task) string { - return fmt.Sprintf("%s://%s/v1/tasks/%s", protocolPrefix, host, t.ID()) +func taskURI(host, version string, t core.Task) string { + return fmt.Sprintf("%s://%s/%s/tasks/%s", protocolPrefix, host, version, t.ID()) } diff --git a/mgmt/rest/tribe.go b/mgmt/rest/v1/tribe.go similarity index 63% rename from mgmt/rest/tribe.go rename to mgmt/rest/v1/tribe.go index a9073247d..2ad32a1d1 100644 --- a/mgmt/rest/tribe.go +++ b/mgmt/rest/v1/tribe.go @@ -17,7 +17,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package rest +package v1 import ( "encoding/json" @@ -28,7 +28,7 @@ import ( log "github.com/Sirupsen/logrus" "github.com/intelsdi-x/snap/core/serror" - "github.com/intelsdi-x/snap/mgmt/rest/rbody" + "github.com/intelsdi-x/snap/mgmt/rest/v1/rbody" "github.com/julienschmidt/httprouter" ) @@ -42,75 +42,75 @@ var ( ErrMemberNotFound = errors.New("Member not found") ) -func (s *Server) getAgreements(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { +func (s *apiV1) getAgreements(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { res := &rbody.TribeListAgreement{} - res.Agreements = s.tr.GetAgreements() - respond(200, res, w) + res.Agreements = s.tribeManager.GetAgreements() + rbody.Write(200, res, w) } -func (s *Server) getAgreement(w http.ResponseWriter, r *http.Request, p httprouter.Params) { +func (s *apiV1) getAgreement(w http.ResponseWriter, r *http.Request, p httprouter.Params) { tribeLogger = tribeLogger.WithField("_block", "getAgreement") name := p.ByName("name") - if _, ok := s.tr.GetAgreements()[name]; !ok { + if _, ok := s.tribeManager.GetAgreements()[name]; !ok { fields := map[string]interface{}{ "agreement_name": name, } tribeLogger.WithFields(fields).Error(ErrAgreementDoesNotExist) - respond(400, rbody.FromSnapError(serror.New(ErrAgreementDoesNotExist, fields)), w) + rbody.Write(400, rbody.FromSnapError(serror.New(ErrAgreementDoesNotExist, fields)), w) return } a := &rbody.TribeGetAgreement{} var serr serror.SnapError - a.Agreement, serr = s.tr.GetAgreement(name) + a.Agreement, serr = s.tribeManager.GetAgreement(name) if serr != nil { tribeLogger.Error(serr) - respond(400, rbody.FromSnapError(serr), w) + rbody.Write(400, rbody.FromSnapError(serr), w) return } - respond(200, a, w) + rbody.Write(200, a, w) } -func (s *Server) deleteAgreement(w http.ResponseWriter, r *http.Request, p httprouter.Params) { +func (s *apiV1) deleteAgreement(w http.ResponseWriter, r *http.Request, p httprouter.Params) { tribeLogger = tribeLogger.WithField("_block", "deleteAgreement") name := p.ByName("name") - if _, ok := s.tr.GetAgreements()[name]; !ok { + if _, ok := s.tribeManager.GetAgreements()[name]; !ok { fields := map[string]interface{}{ "agreement_name": name, } tribeLogger.WithFields(fields).Error(ErrAgreementDoesNotExist) - respond(400, rbody.FromSnapError(serror.New(ErrAgreementDoesNotExist, fields)), w) + rbody.Write(400, rbody.FromSnapError(serror.New(ErrAgreementDoesNotExist, fields)), w) return } var serr serror.SnapError - serr = s.tr.RemoveAgreement(name) + serr = s.tribeManager.RemoveAgreement(name) if serr != nil { tribeLogger.Error(serr) - respond(400, rbody.FromSnapError(serr), w) + rbody.Write(400, rbody.FromSnapError(serr), w) return } a := &rbody.TribeDeleteAgreement{} - a.Agreements = s.tr.GetAgreements() - respond(200, a, w) + a.Agreements = s.tribeManager.GetAgreements() + rbody.Write(200, a, w) } -func (s *Server) joinAgreement(w http.ResponseWriter, r *http.Request, p httprouter.Params) { +func (s *apiV1) joinAgreement(w http.ResponseWriter, r *http.Request, p httprouter.Params) { tribeLogger = tribeLogger.WithField("_block", "joinAgreement") name := p.ByName("name") - if _, ok := s.tr.GetAgreements()[name]; !ok { + if _, ok := s.tribeManager.GetAgreements()[name]; !ok { fields := map[string]interface{}{ "agreement_name": name, } tribeLogger.WithFields(fields).Error(ErrAgreementDoesNotExist) - respond(400, rbody.FromSnapError(serror.New(ErrAgreementDoesNotExist, fields)), w) + rbody.Write(400, rbody.FromSnapError(serror.New(ErrAgreementDoesNotExist, fields)), w) return } b, err := ioutil.ReadAll(r.Body) if err != nil { tribeLogger.Error(err) - respond(500, rbody.FromError(err), w) + rbody.Write(500, rbody.FromError(err), w) return } @@ -125,37 +125,37 @@ func (s *Server) joinAgreement(w http.ResponseWriter, r *http.Request, p httprou } se := serror.New(ErrInvalidJSON, fields) tribeLogger.WithFields(fields).Error(ErrInvalidJSON) - respond(400, rbody.FromSnapError(se), w) + rbody.Write(400, rbody.FromSnapError(se), w) return } - serr := s.tr.JoinAgreement(name, m.MemberName) + serr := s.tribeManager.JoinAgreement(name, m.MemberName) if serr != nil { tribeLogger.Error(serr) - respond(400, rbody.FromSnapError(serr), w) + rbody.Write(400, rbody.FromSnapError(serr), w) return } - agreement, _ := s.tr.GetAgreement(name) - respond(200, &rbody.TribeJoinAgreement{Agreement: agreement}, w) + agreement, _ := s.tribeManager.GetAgreement(name) + rbody.Write(200, &rbody.TribeJoinAgreement{Agreement: agreement}, w) } -func (s *Server) leaveAgreement(w http.ResponseWriter, r *http.Request, p httprouter.Params) { +func (s *apiV1) leaveAgreement(w http.ResponseWriter, r *http.Request, p httprouter.Params) { tribeLogger = tribeLogger.WithField("_block", "leaveAgreement") name := p.ByName("name") - if _, ok := s.tr.GetAgreements()[name]; !ok { + if _, ok := s.tribeManager.GetAgreements()[name]; !ok { fields := map[string]interface{}{ "agreement_name": name, } tribeLogger.WithFields(fields).Error(ErrAgreementDoesNotExist) - respond(400, rbody.FromSnapError(serror.New(ErrAgreementDoesNotExist, fields)), w) + rbody.Write(400, rbody.FromSnapError(serror.New(ErrAgreementDoesNotExist, fields)), w) return } b, err := ioutil.ReadAll(r.Body) if err != nil { tribeLogger.Error(err) - respond(500, rbody.FromError(err), w) + rbody.Write(500, rbody.FromError(err), w) return } @@ -170,35 +170,35 @@ func (s *Server) leaveAgreement(w http.ResponseWriter, r *http.Request, p httpro } se := serror.New(ErrInvalidJSON, fields) tribeLogger.WithFields(fields).Error(ErrInvalidJSON) - respond(400, rbody.FromSnapError(se), w) + rbody.Write(400, rbody.FromSnapError(se), w) return } - serr := s.tr.LeaveAgreement(name, m.MemberName) + serr := s.tribeManager.LeaveAgreement(name, m.MemberName) if serr != nil { tribeLogger.Error(serr) - respond(400, rbody.FromSnapError(serr), w) + rbody.Write(400, rbody.FromSnapError(serr), w) return } - agreement, _ := s.tr.GetAgreement(name) - respond(200, &rbody.TribeLeaveAgreement{Agreement: agreement}, w) + agreement, _ := s.tribeManager.GetAgreement(name) + rbody.Write(200, &rbody.TribeLeaveAgreement{Agreement: agreement}, w) } -func (s *Server) getMembers(w http.ResponseWriter, r *http.Request, p httprouter.Params) { - members := s.tr.GetMembers() - respond(200, &rbody.TribeMemberList{Members: members}, w) +func (s *apiV1) getMembers(w http.ResponseWriter, r *http.Request, p httprouter.Params) { + members := s.tribeManager.GetMembers() + rbody.Write(200, &rbody.TribeMemberList{Members: members}, w) } -func (s *Server) getMember(w http.ResponseWriter, r *http.Request, p httprouter.Params) { +func (s *apiV1) getMember(w http.ResponseWriter, r *http.Request, p httprouter.Params) { tribeLogger = tribeLogger.WithField("_block", "getMember") name := p.ByName("name") - member := s.tr.GetMember(name) + member := s.tribeManager.GetMember(name) if member == nil { fields := map[string]interface{}{ "name": name, } tribeLogger.WithFields(fields).Error(ErrMemberNotFound) - respond(404, rbody.FromSnapError(serror.New(ErrMemberNotFound, fields)), w) + rbody.Write(404, rbody.FromSnapError(serror.New(ErrMemberNotFound, fields)), w) return } resp := &rbody.TribeMemberShow{ @@ -213,15 +213,15 @@ func (s *Server) getMember(w http.ResponseWriter, r *http.Request, p httprouter. resp.TaskAgreements = append(resp.TaskAgreements, k) } } - respond(200, resp, w) + rbody.Write(200, resp, w) } -func (s *Server) addAgreement(w http.ResponseWriter, r *http.Request, p httprouter.Params) { +func (s *apiV1) addAgreement(w http.ResponseWriter, r *http.Request, p httprouter.Params) { tribeLogger = tribeLogger.WithField("_block", "addAgreement") b, err := ioutil.ReadAll(r.Body) if err != nil { tribeLogger.Error(err) - respond(500, rbody.FromError(err), w) + rbody.Write(500, rbody.FromError(err), w) return } @@ -234,7 +234,7 @@ func (s *Server) addAgreement(w http.ResponseWriter, r *http.Request, p httprout } se := serror.New(ErrInvalidJSON, fields) tribeLogger.WithFields(fields).Error(ErrInvalidJSON) - respond(400, rbody.FromSnapError(se), w) + rbody.Write(400, rbody.FromSnapError(se), w) return } @@ -244,19 +244,19 @@ func (s *Server) addAgreement(w http.ResponseWriter, r *http.Request, p httprout } se := serror.New(ErrInvalidJSON, fields) tribeLogger.WithFields(fields).Error(ErrInvalidJSON) - respond(400, rbody.FromSnapError(se), w) + rbody.Write(400, rbody.FromSnapError(se), w) return } - err = s.tr.AddAgreement(a.Name) + err = s.tribeManager.AddAgreement(a.Name) if err != nil { tribeLogger.WithField("agreement-name", a.Name).Error(err) - respond(400, rbody.FromError(err), w) + rbody.Write(400, rbody.FromError(err), w) return } res := &rbody.TribeAddAgreement{} - res.Agreements = s.tr.GetAgreements() + res.Agreements = s.tribeManager.GetAgreements() - respond(200, res, w) + rbody.Write(200, res, w) } diff --git a/mgmt/rest/v2/metric.go b/mgmt/rest/v2/metric.go deleted file mode 100644 index be8360644..000000000 --- a/mgmt/rest/v2/metric.go +++ /dev/null @@ -1,81 +0,0 @@ -package v2 - -import ( - "bytes" - "encoding/json" - "fmt" - "net/http" - "net/url" - "sort" - - "github.com/intelsdi-x/snap/core" - "github.com/intelsdi-x/snap/mgmt/rest/rbody" - "github.com/intelsdi-x/snap/mgmt/rest/rbody/v2" - "github.com/urfave/negroni" -) - -func RespondWithMetrics(host string, mts []core.CatalogedMetric, w http.ResponseWriter) { - b := v2.NewMetricsReturned() - for _, m := range mts { - var policies rbody.PolicyTableSlice - p := m.Policy().RulesAsTable() - policies = rbody.PolicyTableSlice(p) - dyn, indexes := m.Namespace().IsDynamic() - var dynamicElements []rbody.DynamicElement - if dyn { - dynamicElements = getDynamicElements(m.Namespace(), indexes) - } - b.Metrics = append(b.Metrics, rbody.Metric{ - Namespace: m.Namespace().String(), - Version: m.Version(), - LastAdvertisedTimestamp: m.LastAdvertisedTime().Unix(), - Description: m.Description(), - Dynamic: dyn, - DynamicElements: dynamicElements, - Unit: m.Unit(), - Policy: policies, - Href: catalogedMetricURI(host, m), - }) - } - sort.Sort(b) - respond(200, b, w) -} - -func catalogedMetricURI(host string, mt core.CatalogedMetric) string { - return fmt.Sprintf("%s://%s/v1/metrics?ns=%s&ver=%d", "http", host, url.QueryEscape(mt.Namespace().String()), mt.Version()) -} - -func getDynamicElements(ns core.Namespace, indexes []int) []rbody.DynamicElement { - elements := make([]rbody.DynamicElement, 0, len(indexes)) - for _, v := range indexes { - e := ns.Element(v) - elements = append(elements, rbody.DynamicElement{ - Index: v, - Name: e.Name, - Description: e.Description, - }) - } - return elements -} - -func respond(code int, b rbody.Body, w http.ResponseWriter) { - resp := &rbody.APIResponse{ - Meta: &rbody.APIResponseMeta{ - Code: code, - Message: b.ResponseBodyMessage(), - Type: b.ResponseBodyType(), - Version: 2, - }, - Body: b, - } - if !w.(negroni.ResponseWriter).Written() { - w.WriteHeader(code) - } - - j, err := json.MarshalIndent(resp, "", " ") - if err != nil { - panic(err) - } - j = bytes.Replace(j, []byte("\\u0026"), []byte("&"), -1) - fmt.Fprint(w, string(j)) -} diff --git a/mgmt/rest/v2/task.go b/mgmt/rest/v2/task.go deleted file mode 100644 index 328ea9828..000000000 --- a/mgmt/rest/v2/task.go +++ /dev/null @@ -1,29 +0,0 @@ -package v2 - -import ( - "fmt" - "net/http" - "sort" - - "github.com/intelsdi-x/snap/core" - "github.com/intelsdi-x/snap/mgmt/rest/rbody" - "github.com/intelsdi-x/snap/mgmt/rest/rbody/v2" -) - -func GetTasks(w http.ResponseWriter, r *http.Request, sts map[string]core.Task) { - tasks := &v2.ScheduledTaskListReturned{} - tasks.ScheduledTasks = make([]rbody.ScheduledTask, len(sts)) - - i := 0 - for _, t := range sts { - tasks.ScheduledTasks[i] = *rbody.SchedulerTaskFromTask(t) - tasks.ScheduledTasks[i].Href = taskURI(r.Host, t) - i++ - } - sort.Sort(tasks) - respond(200, tasks, w) -} - -func taskURI(host string, t core.Task) string { - return fmt.Sprintf("%s://%s/v1/tasks/%s", "http", host, t.ID()) -} diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 74805c267..1288555cf 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -211,9 +211,9 @@ func autoDiscoverTasks(taskFiles []os.FileInfo, fullPath string, continue } //TODO: see if the following is really mandatory - //in which case mgmt/rest/rbody/task.go contents might also + //in which case mgmt/rest/response/task.go contents might also //move into pkg/task - //rbody.AddSchedulerTaskFromTask(task) + //response.AddSchedulerTaskFromTask(task) log.WithFields(log.Fields{ "_block": "autoDiscoverTasks", "_module": "scheduler",