diff --git a/Dockerfile b/Dockerfile index 82a5b9b..a94956b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -13,4 +13,4 @@ COPY --from=0 /go/src/github.com/nemosupremo/vault-gatekeeper/dist/gatekeeper / WORKDIR /tmp WORKDIR / ENTRYPOINT ["/gatekeeper"] -CMD ["server"] \ No newline at end of file +CMD ["server"] diff --git a/Gopkg.lock b/Gopkg.lock index 1fadc58..bbdd117 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -3,38 +3,7 @@ [[projects]] name = "github.com/aws/aws-sdk-go" - packages = [ - "aws", - "aws/awserr", - "aws/awsutil", - "aws/client", - "aws/client/metadata", - "aws/corehandlers", - "aws/credentials", - "aws/credentials/ec2rolecreds", - "aws/credentials/endpointcreds", - "aws/credentials/stscreds", - "aws/csm", - "aws/defaults", - "aws/ec2metadata", - "aws/endpoints", - "aws/request", - "aws/session", - "aws/signer/v4", - "internal/sdkio", - "internal/sdkrand", - "internal/sdkuri", - "internal/shareddefaults", - "private/protocol", - "private/protocol/json/jsonutil", - "private/protocol/jsonrpc", - "private/protocol/query", - "private/protocol/query/queryutil", - "private/protocol/rest", - "private/protocol/xml/xmlutil", - "service/ecs", - "service/sts" - ] + packages = ["aws","aws/awserr","aws/awsutil","aws/client","aws/client/metadata","aws/corehandlers","aws/credentials","aws/credentials/ec2rolecreds","aws/credentials/endpointcreds","aws/credentials/stscreds","aws/csm","aws/defaults","aws/ec2metadata","aws/endpoints","aws/request","aws/session","aws/signer/v4","internal/sdkio","internal/sdkrand","internal/sdkuri","internal/shareddefaults","private/protocol","private/protocol/json/jsonutil","private/protocol/jsonrpc","private/protocol/query","private/protocol/query/queryutil","private/protocol/rest","private/protocol/xml/xmlutil","service/ecs","service/sts"] revision = "bc3f534c19ffdf835e524e11f0f825b3eaf541c3" version = "v1.14.31" @@ -44,17 +13,16 @@ revision = "2ea60e5f094469f9e65adb9cd103795b73ae743e" version = "v2.0.0" +[[projects]] + name = "github.com/dgrijalva/jwt-go" + packages = ["."] + revision = "06ea1031745cb8b3dab3f6a236daf2b0aa468b7e" + version = "v3.2.0" + [[projects]] branch = "master" name = "github.com/dsnet/compress" - packages = [ - ".", - "bzip2", - "bzip2/internal/sais", - "internal", - "internal/errors", - "internal/prefix" - ] + packages = [".","bzip2","bzip2/internal/sais","internal","internal/errors","internal/prefix"] revision = "cc9eb1d7ad760af14e8f918698f745e80377af4f" [[projects]] @@ -71,10 +39,7 @@ [[projects]] name = "github.com/go-chi/chi" - packages = [ - ".", - "middleware" - ] + packages = [".","middleware"] revision = "e83ac2304db3c50cf03d96a2fcd39009d458bc35" version = "v3.3.2" @@ -99,18 +64,7 @@ [[projects]] branch = "master" name = "github.com/hashicorp/hcl" - packages = [ - ".", - "hcl/ast", - "hcl/parser", - "hcl/printer", - "hcl/scanner", - "hcl/strconv", - "hcl/token", - "json/parser", - "json/scanner", - "json/token" - ] + packages = [".","hcl/ast","hcl/parser","hcl/printer","hcl/scanner","hcl/strconv","hcl/token","json/parser","json/scanner","json/token"] revision = "ef8a98b0bbce4a65b5aa4c368430a80ddc533168" [[projects]] @@ -180,10 +134,7 @@ [[projects]] name = "github.com/spf13/afero" - packages = [ - ".", - "mem" - ] + packages = [".","mem"] revision = "787d034dfe70e44075ccc060d346146ef53270ad" version = "v1.1.1" @@ -224,30 +175,20 @@ revision = "a2144134853fc9a27a7b1e3eb4f19f1a76df13c9" [[projects]] - branch = "master" + branch = "release-branch.go1.10" name = "golang.org/x/net" packages = ["context"] - revision = "49bb7cea24b1df9410e1712aa6433dae904ff66a" + revision = "0ed95abb35c445290478a5348a7b38bb154135fd" [[projects]] branch = "master" name = "golang.org/x/sys" - packages = [ - "unix", - "windows" - ] + packages = ["unix","windows"] revision = "ac767d655b305d4e9612f5f6e33120b9176c4ad4" [[projects]] name = "golang.org/x/text" - packages = [ - "internal/gen", - "internal/triegen", - "internal/ucd", - "transform", - "unicode/cldr", - "unicode/norm" - ] + packages = ["internal/gen","internal/triegen","internal/ucd","transform","unicode/cldr","unicode/norm"] revision = "f21a4dfb5e38f5895301dc265a8def02365cc3d0" version = "v0.3.0" @@ -266,6 +207,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "bed642cfe4467d987157c0e31e00dd96397c58ee4d4ab12a0836578341817471" + inputs-digest = "5aa3462ab40fbec732e1c10b42709cccee1db78c0ba607427f56d4c26fb7604e" solver-name = "gps-cdcl" solver-version = 1 diff --git a/cmd/gatekeeper/main.go b/cmd/gatekeeper/main.go index 3be110b..affb7d8 100644 --- a/cmd/gatekeeper/main.go +++ b/cmd/gatekeeper/main.go @@ -4,6 +4,7 @@ import ( "runtime" // import schedulers before cmd + _ "github.com/nemosupremo/vault-gatekeeper/scheduler/dcosee_mesos" _ "github.com/nemosupremo/vault-gatekeeper/scheduler/ecs" _ "github.com/nemosupremo/vault-gatekeeper/scheduler/mesos" diff --git a/cmd/policy.go b/cmd/policy.go index c2d2cf5..6239dd5 100644 --- a/cmd/policy.go +++ b/cmd/policy.go @@ -8,11 +8,12 @@ import ( "net/url" "os" - "github.com/franela/goreq" "github.com/nemosupremo/vault-gatekeeper" "github.com/nemosupremo/vault-gatekeeper/policy" "github.com/nemosupremo/vault-gatekeeper/scheduler" "github.com/nemosupremo/vault-gatekeeper/vault" + + "github.com/franela/goreq" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/viper" diff --git a/cmd/root.go b/cmd/root.go index b4bf4d3..85967a3 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -7,6 +7,7 @@ import ( "time" "github.com/nemosupremo/vault-gatekeeper/scheduler" + "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/viper" diff --git a/cmd/server.go b/cmd/server.go index ad95af6..a9fcbbd 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -9,6 +9,7 @@ import ( "github.com/nemosupremo/vault-gatekeeper" "github.com/nemosupremo/vault-gatekeeper/scheduler" "github.com/nemosupremo/vault-gatekeeper/vault/unsealer" + "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/viper" diff --git a/cmd/unseal.go b/cmd/unseal.go index c77dfc3..033924a 100644 --- a/cmd/unseal.go +++ b/cmd/unseal.go @@ -3,8 +3,9 @@ package cmd import ( "net/url" - "github.com/franela/goreq" "github.com/nemosupremo/vault-gatekeeper/scheduler" + + "github.com/franela/goreq" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/viper" diff --git a/gatekeeper.go b/gatekeeper.go index 739ef7a..ecf00af 100644 --- a/gatekeeper.go +++ b/gatekeeper.go @@ -14,14 +14,15 @@ import ( "sync/atomic" "time" - "github.com/cenkalti/backoff" - "github.com/franela/goreq" gkClient "github.com/nemosupremo/vault-gatekeeper/gatekeeper" "github.com/nemosupremo/vault-gatekeeper/policy" "github.com/nemosupremo/vault-gatekeeper/scheduler" "github.com/nemosupremo/vault-gatekeeper/usagestore" "github.com/nemosupremo/vault-gatekeeper/vault" "github.com/nemosupremo/vault-gatekeeper/vault/unsealer" + + "github.com/cenkalti/backoff" + "github.com/franela/goreq" "github.com/segmentio/ksuid" log "github.com/sirupsen/logrus" ) @@ -480,6 +481,7 @@ func (g *Gatekeeper) RequestToken(providerKey string, taskId string, requestedRo currentPolicies := g.Policies g.RUnlock() + log.Debugf("Find Policy: %s\n", policyKey) if policy, ok := currentPolicies.Get(policyKey); ok && len(policy.Roles) > 0 { if err := g.Store.Acquire(g.Token, providerKey+":"+task.Id(), policy.NumUses, g.config.MaxTaskLife+1*time.Minute); err == nil { roleName := policy.Roles[0] diff --git a/gatekeeper/gatekeeper.go b/gatekeeper/gatekeeper.go index bf4b04b..1be087b 100644 --- a/gatekeeper/gatekeeper.go +++ b/gatekeeper/gatekeeper.go @@ -155,9 +155,9 @@ func (c *Client) requestTempToken(taskID string) (string, error) { return "", err } - if !gkTokResp.OK { - return "", errors.New(gkTokResp.Error) - } + // if !gkTokResp.OK { + // return "", errors.New(gkTokResp.Error) + // } return gkTokResp.Token, nil } diff --git a/gatekeeper/types.go b/gatekeeper/types.go index fc39d58..b0a4233 100644 --- a/gatekeeper/types.go +++ b/gatekeeper/types.go @@ -34,10 +34,10 @@ type gkTokenReq struct { } type gkTokenResp struct { - OK bool `json:"ok"` - Token string `json:"token"` - Status string `json:"status"` - Error string `json:"error"` + Unsealed bool `json:"unsealed"` + Token string `json:"token"` + Ttl string `json:"ttl"` + VaultAddr string `json:"vault_addr"` } type vaultWrappedResponse struct { diff --git a/gatekeeper_test.go b/gatekeeper_test.go index ada557f..8f5ac21 100644 --- a/gatekeeper_test.go +++ b/gatekeeper_test.go @@ -6,12 +6,13 @@ import ( "testing" "time" - "github.com/franela/goreq" "github.com/nemosupremo/vault-gatekeeper/policy" "github.com/nemosupremo/vault-gatekeeper/scheduler" "github.com/nemosupremo/vault-gatekeeper/scheduler/mock" "github.com/nemosupremo/vault-gatekeeper/vault" "github.com/nemosupremo/vault-gatekeeper/vault/unsealer" + + "github.com/franela/goreq" "github.com/segmentio/ksuid" "github.com/spf13/viper" ) diff --git a/policy.go b/policy.go index 0a7c82e..0df6ecf 100644 --- a/policy.go +++ b/policy.go @@ -4,12 +4,14 @@ import ( "encoding/json" "errors" "fmt" - log "github.com/sirupsen/logrus" "strings" - "github.com/franela/goreq" + log "github.com/sirupsen/logrus" + "github.com/nemosupremo/vault-gatekeeper/policy" "github.com/nemosupremo/vault-gatekeeper/vault" + + "github.com/franela/goreq" ) var policyNotFound = errors.New("No policy saved at configured location.") @@ -33,21 +35,28 @@ func (g *Gatekeeper) loadPolicies() (*policy.Policies, error) { func (g *Gatekeeper) GetPolicyConfig() ([]byte, error) { initialPolicyDir := g.config.PolicyPath policies := make(map[string]policy.Policy) + log.Debugf("Policy Dir: %s\n", initialPolicyDir) if policyDirectories, err := g.getNestedPolicyDirs(initialPolicyDir, g.Token); err == nil { for _, dir := range policyDirectories { + log.Debugf("Dir: %s\n", dir) if policy, err := getPolicy(dir, g.Token); err == nil { + log.Debugf("Policies: %+v\n", policy) for k, v := range policy { policies[k] = v } } else if err == policyNotFound { + log.Warnf("%v\n", policyNotFound) continue } else { return nil, err } } } else { + log.Errorf("Failed to find direcotries in %s\n", initialPolicyDir) return nil, err } + + log.Debugf("Policies: %+v\n", policies) if len(policies) == 0 { return nil, policyNotFound } @@ -64,6 +73,7 @@ func (g *Gatekeeper) getNestedPolicyDirs(initialPolicyDir string, authToken stri err := g.getDirList(initialPolicyDir, authToken, &nestedPolicyDirs, &subDirs) if err != nil { + log.Errorf("Error getting dir list: %v\n", err) return nestedPolicyDirs, err } @@ -84,6 +94,9 @@ func (g *Gatekeeper) getNestedPolicyDirs(initialPolicyDir string, authToken stri } } } + + log.Debugf("Nested Policy Dirs: %+v\n", nestedPolicyDirs) + return nestedPolicyDirs, err } @@ -111,6 +124,7 @@ func (g *Gatekeeper) getDirList(path string, authToken string, nestedPolicies *[ Renewable bool `json:"renewable"` } if err := r.Body.FromJsonTo(&scrts); err == nil { + log.Debugf("Secrets List Data: %+v\n", scrts) for i := range scrts.Data.Keys { //add to sub dir list when "/" suffix if strings.HasSuffix(scrts.Data.Keys[i], "/") { @@ -121,10 +135,12 @@ func (g *Gatekeeper) getDirList(path string, authToken string, nestedPolicies *[ } return nil } else { + log.Errorf("Get Dir List: %+v\n", err) return err } case 404: /* A 404 is returned when no sub directories exist below the current directory which is ok. */ + log.Infof("Get Dir List is empty\n") return nil case 403: @@ -160,6 +176,7 @@ func getPolicy(path string, authToken string) (map[string]policy.Policy, error) } `json:"data"` } if err := r.Body.FromJsonTo(&resp); err == nil { + log.Debugf("Response Body: %+v\n", resp) return resp.Data.Data, nil } else { return nil, policyLoadError{fmt.Errorf("There was an error decoding policy from vault. This can occur " + diff --git a/policy/policy.go b/policy/policy.go index 32d41c5..ff4c800 100644 --- a/policy/policy.go +++ b/policy/policy.go @@ -6,6 +6,8 @@ import ( "errors" "strings" + log "github.com/sirupsen/logrus" + "github.com/hashicorp/go-immutable-radix" ) @@ -95,6 +97,8 @@ func (p *Policies) Get(path string) (*Policy, bool) { } p.Tree.Root().WalkPath([]byte(path), walkFn) + + log.Debugf("Get Policy: Found: %t Ret: %+v\n", foundPolicy, ret) return ret, foundPolicy } diff --git a/routes.go b/routes.go index bf832c3..eb2e226 100644 --- a/routes.go +++ b/routes.go @@ -7,10 +7,11 @@ import ( "strconv" "time" - "github.com/franela/goreq" - "github.com/go-chi/chi" "github.com/nemosupremo/vault-gatekeeper/scheduler" "github.com/nemosupremo/vault-gatekeeper/vault/unsealer" + + "github.com/franela/goreq" + "github.com/go-chi/chi" ) func (g *Gatekeeper) OkResponse(w http.ResponseWriter, message string) { @@ -161,6 +162,7 @@ func (g *Gatekeeper) requestToken(w http.ResponseWriter, r *http.Request) { if g.IsUnsealed() { if err := json.NewDecoder(r.Body).Decode(&body); err == nil { if token, ttl, err := g.RequestToken(body.Scheduler, body.TaskId, body.Role, r.RemoteAddr); err == nil { + log.Debugf("Reponse Token: %s\n", token) resp := struct { Unsealed bool `json:"unsealed"` Token string `json:"token"` @@ -175,6 +177,7 @@ func (g *Gatekeeper) requestToken(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(resp) } else { + log.Debugf("Reponse Token Error: %v\n", err) switch err { case scheduler.ErrTaskNotFound, ErrHostMismatch: g.ErrorResponse(w, http.StatusUnauthorized, err.Error()) diff --git a/scheduler/dcosee_mesos/args.go b/scheduler/dcosee_mesos/args.go new file mode 100644 index 0000000..848dcde --- /dev/null +++ b/scheduler/dcosee_mesos/args.go @@ -0,0 +1,40 @@ +package dcosee_mesos + +import ( + "os" + + "github.com/nemosupremo/vault-gatekeeper/scheduler" + + "github.com/spf13/viper" +) + +func init() { + scheduler.RegisterScheduler("dcosee-mesos", newMesosScheduler, Args()) +} + +type args struct { + Name string + Default interface{} + Description string +} + +func Args() []scheduler.Args { + return []scheduler.Args{ + // {"mesos-master", "http://localhost:5050", "Address to mesos masters in either zookeeper (zk://zoo1:2181,zoo2:2181/path) or http format http://master:5050,master:5050/."}, + {"dcos-uid", "vgm", "DCOS Enterprise requires all calls to mesos be protected with a userid and privatekey. The is the userid of the service account for VGM to use."}, + {"dcos-privatekey", "/certs/vgm-privatekey.pem", "DCOS Enterprise requires all calls to mesos be protected with a userid and privatekey. The is the privatekey to sign the jwt login with for the service"}, + {"mesos-skipverify", false, "If mesos is using TLS then do we need to veify the tls cert."}, + } +} + +func newMesosScheduler() (scheduler.Scheduler, error) { + master := viper.GetString("mesos-master") + uid := viper.GetString("dcos-uid") + privateKeyFile := viper.GetString("dcos-privatekey") + pkRdr, err := os.Open(privateKeyFile) + if err != nil { + return nil, err + } + + return NewMesosScheduler(master, uid, pkRdr) +} diff --git a/scheduler/dcosee_mesos/mesos.go b/scheduler/dcosee_mesos/mesos.go new file mode 100644 index 0000000..2bbbfda --- /dev/null +++ b/scheduler/dcosee_mesos/mesos.go @@ -0,0 +1,632 @@ +package dcosee_mesos + +import ( + "bytes" + "crypto/rsa" + "crypto/tls" + "encoding/json" + "errors" + "fmt" + "io" + "io/ioutil" + "net" + "net/http" + "net/url" + "sort" + "strings" + "time" + + "github.com/spf13/viper" + + "github.com/nemosupremo/vault-gatekeeper/scheduler" + + jwt "github.com/dgrijalva/jwt-go" + "github.com/mesos/mesos-go/api/v0/upid" + "github.com/samuel/go-zookeeper/zk" + log "github.com/sirupsen/logrus" +) + +const defaultLoginExp time.Duration = time.Minute * 5 + +type defaultLogger struct{} + +func (d defaultLogger) Printf(s string, a ...interface{}) { + log.Debugf(s, a...) +} + +type mesosMaster struct { + Address struct { + Hostname string `json:"hostname"` + IP string `json:"ip"` + Port int `json:"port"` + } `json:"address"` + Hostname string `json:"hostname"` + ID string `json:"id"` + IP int64 `json:"ip"` + Pid string `json:"pid"` + Port int `json:"port"` + Version string `json:"version"` +} + +type mesosFramework struct { + ID string `json:"id"` + Name string `json:"name"` +} + +type mesosTask struct { + ID string `json:"id"` + Name string `json:"name"` + State string `json:"state"` + SlaveID string `json:"slave_id"` + FrameworkID string `json:"framework_id"` + Resources struct { + Cpus float64 `json:"cpus"` + Disk float64 `json:"disk"` + Mem float64 `json:"mem"` + Ports string `json:"ports"` + } `json:"resources"` + Statuses []struct { + State string `json:"state"` + Timestamp float64 `json:"timestamp"` + } `json:"statuses"` + Container struct { + Type string `json:"type"` + Docker struct { + Image string `json:"image"` + } `json:"docker"` + } `json:"container"` +} + +type mesosSlave struct { + ID string `json:"id"` + Hostname string `json:"hostname"` + PID string `json:"pid"` +} + +type mesosSlaves struct { + Slaves []*mesosSlave `json:"slaves"` +} + +type mesosTasks struct { + Tasks []*mesosTask `json:"tasks"` +} + +type mesosFrameworks struct { + Frameworks []*mesosFramework `json:"frameworks"` +} + +var errMesosNoPath = errors.New("No path specified for mesos zk lookup.") +var errMesosParseError = errors.New("Error parsing mesos master data in zk.") +var errMesosNoMaster = errors.New("Error finding mesos master.") +var errUnknownScheme = errors.New("Unknown mesos scheme.") +var errZKNotSupportedScheme = errors.New("zk or zks is not supported for the enterprise edition of DCOS. Please add the list of master nodes directly.") +var errMesosUnreachable = errors.New("No reachable mesos masters.") + +type mesosScheduler struct { + master string + uid string + privateKey *rsa.PrivateKey + authorizationToken string + expiration time.Time + client *http.Client +} + +//Returned Task information from the lookup +type task struct { + id string + name string + group string + image string + ip net.IP + startTime time.Time +} + +// Return the Task ID +func (t task) Id() string { + return t.id +} + +// Return the Task Group +func (t task) Group() string { + return t.group +} + +// Return the Task Image +func (t task) Image() string { + return t.image +} + +// Return the Task Name +func (t task) Name() string { + return t.name +} + +// Return the Task IP +func (t task) IP() net.IP { + return t.ip +} + +//Return the Task Start Time +func (t task) StartTime() time.Time { + return t.startTime +} + +//NewMesosScheduler creates the object for talking to mesos. In this case it +// will create the object to talk to DCOS EE Mesos which needs some authentication +// pieces setup +func NewMesosScheduler(master string, uid string, privateKeyFile io.Reader) (scheduler.Scheduler, error) { + //Create the object + m := &mesosScheduler{ + master: master, + uid: uid, + } + + //Parse and validate the private key + if err := m.parsePrivateKey(privateKeyFile); err != nil { + return nil, err + } + + //Build the transport + tr := &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + DualStack: true, + }).DialContext, + MaxIdleConns: 100, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + } + + //If requested ignore the TLS verify + if viper.GetBool("mesos-skipverify") { + if tr.TLSClientConfig == nil { + tr.TLSClientConfig = &tls.Config{ + InsecureSkipVerify: true, + } + } else { + tr.TLSClientConfig.InsecureSkipVerify = true + } + } + client := http.DefaultClient + client.Transport = tr + m.client = client + + //Check to see if you can communicate with the Mesos Master + if _, _, err := m.getMesosMaster(); err != nil { + return nil, err + } + + //Create initial authorization token + if err := m.createAuthorizationToken(); err != nil { + return nil, err + } + + return m, nil +} + +//Parse the private Key for the JWT +func (m *mesosScheduler) parsePrivateKey(privateKeyFile io.Reader) error { + //Read the Data from the private key file + data, err := ioutil.ReadAll(privateKeyFile) + if err != nil { + return err + } + + //Parse the data in to the Private Key object + //If we need a password object for this we will have to use a different function + pk, err := jwt.ParseRSAPrivateKeyFromPEM(data) + if err != nil { + return err + } + + //Validate the private key + if err := pk.Validate(); err != nil { + return err + } + + //Set the key in the scheduler object + m.privateKey = pk + return nil +} + +//Create the DCOS login token based on the private key and uid +func (m *mesosScheduler) createLoginToken() ([]byte, error) { + // create a signer for rsa 256 + sm := jwt.GetSigningMethod("RS256") + mc := jwt.MapClaims{} + t := jwt.NewWithClaims(sm, mc) + + // set our claims + mc["uid"] = m.uid + + // set the expire time + // see http://tools.ietf.org/html/draft-ietf-oauth-json-web-token-20#section-4.1.4 + mc["exp"] = time.Now().Add(defaultLoginExp).Unix() + + tokenString, err := t.SignedString(m.privateKey) + if err != nil { + log.Errorf("Token Signing error: %v\n", err) + return nil, err + } + + log.Debugf("Login Token: %s\n", tokenString) + + return []byte(tokenString), nil +} + +type loginStruct struct { + UID string `json:"uid"` + Exp uint `json:"exp"` + Token string `json:"token"` +} + +func (m *mesosScheduler) createAuthorizationToken() error { + dur := m.expiration.Sub(time.Now()) + if len(m.authorizationToken) > 0 && dur.Hours() > 1 { + return nil + } + + loginToken, err := m.createLoginToken() + if err != nil { + return err + } + + if masterHosts, protocol, err := m.getMesosMaster(); err == nil { + for _, host := range masterHosts { + hostport := strings.Split(host, ":") + + login := &loginStruct{ + UID: m.uid, + Exp: 0, + Token: string(loginToken), + } + + data, _ := json.Marshal(login) + + log.Debugf("Login Data: %s\n", data) + + bodyRdr := bytes.NewReader(data) + req, _ := http.NewRequest("POST", protocol+"://"+hostport[0]+"/acs/api/v1/auth/login", bodyRdr) + req.Header.Add("content-type", "application/json") + + resp, err := m.client.Do(req) + if err == nil && resp.StatusCode == 200 { + defer resp.Body.Close() + + jsonMap := make(map[string]string) + + derr := json.NewDecoder(resp.Body).Decode(&jsonMap) + if derr != nil { + return derr + } + + log.Debugf("Authorization Response: %+v\n", jsonMap) + token, ok := jsonMap["token"] + if ok { + m.authorizationToken = token + exp := time.Duration(login.Exp) * time.Second + if login.Exp == 0 { + exp = time.Duration(5*24) * time.Hour + } + m.expiration = time.Now().Add(exp) + } + + } else if err != nil { + return err + } else { + defer resp.Body.Close() + msg, _ := ioutil.ReadAll(resp.Body) + return fmt.Errorf("Error Code: %d Message: %s", resp.StatusCode, msg) + } + } + } else { + return err + } + + return nil +} + +//LookupTask takes the provided taskId and looks up the task in Mesos +// to verify the task in fact does exist +func (m *mesosScheduler) LookupTask(taskID string) (scheduler.Task, error) { + + //Try to fetch the Mesos Task ID Information Keep trying for a short period. + mesosTask, slaveHost, framework, err := m.getMesosTask(taskID) + for i := time.Duration(0); i < 3 && err == nil && len(mesosTask.Statuses) == 0; i++ { + time.Sleep((500 + 250*i) * time.Millisecond) + mesosTask, slaveHost, framework, err = m.getMesosTask(taskID) + } + + //See how long the task has been running + runningTime := time.Unix(0, 0) + if len(mesosTask.Statuses) > 0 { + // https://github.com/apache/mesos/blob/a61074586d778d432ba991701c9c4de9459db897/src/webui/master/static/js/controllers.js#L148 + runningTime = time.Unix(0, int64(mesosTask.Statuses[0].Timestamp*1000000000)) + } + + //Find the task IP by looking up the slave it is running on + var ip net.IP + if err == nil { + if ips, err := net.LookupIP(slaveHost); err == nil { + ip = ips[0] + } + } + + log.Debugf("Return Looked Up Value as %s@%s:%s\n", ip, framework, mesosTask.Name) + + return &task{ + id: mesosTask.ID, + name: mesosTask.Name, + image: mesosTask.Container.Docker.Image, + group: framework, + startTime: runningTime, + ip: ip, + }, err +} + +func (m *mesosScheduler) getMesosMaster() ([]string, string, error) { + var masterHosts []string + protocol := "http" + if path, err := url.Parse(m.master); err == nil { + switch path.Scheme { + case "zks": + protocol = "https" + fallthrough + case "zk": + masterHosts, err = m.parseZK(path) + if err != nil { + return nil, protocol, err + } + // return nil, protocol, errZKNotSupportedScheme + case "https": + protocol = "https" + fallthrough + case "http": + masterHosts = strings.Split(path.Host, ",") + default: + return nil, protocol, errUnknownScheme + } + } else { + masterHosts = strings.Split(m.master, ",") + } + + if len(masterHosts) == 0 { + return nil, protocol, errMesosUnreachable + } + return masterHosts, protocol, nil +} + +//parse the zookeeper data to find the master nodes +func (m *mesosScheduler) parseZK(path *url.URL) ([]string, error) { + //No path provided in url for the root location + if path.Path == "" || path.Path == "/" { + return nil, errMesosNoPath + } + + //Get the path for zookeeper to look into it + zookeeperPath := path.Path + if zookeeperPath[0] != '/' { + zookeeperPath = "/" + zookeeperPath + } + + //Connect to zookeeper and find the master objects to parse + var masterHosts []string + if zoo, _, err := zk.Connect(zk.FormatServers(strings.Split(path.Host, ",")), 10*time.Second, zk.WithLogger(&defaultLogger{})); err == nil { + defer zoo.Close() + if children, _, err := zoo.Children(zookeeperPath); err == nil { + sort.Strings(children) + for _, child := range children { + if strings.HasPrefix(child, "json.info_") { + if data, _, err := zoo.Get(zookeeperPath + "/" + child); err == nil { + var masterInfo mesosMaster + if err := json.Unmarshal(data, &masterInfo); err == nil { + masterHosts = []string{fmt.Sprintf("%s:%d", masterInfo.Address.Hostname, masterInfo.Address.Port)} + break + } else { + return nil, errMesosParseError + } + } + } + } + } else { + return nil, errMesosNoMaster + } + } + + return masterHosts, nil +} + +func (m *mesosScheduler) getSlaveInfo(slaveId, protocol, host string) (*mesosSlave, error) { + var slaves mesosSlaves + + log.Debugf("Get Slave information. \n") + slavesReq, err := http.NewRequest("GET", protocol+"://"+host+"/slaves", nil) + if err != nil { + return nil, err + } + + slavesReq.Header.Add("content-type", "application/json") + slavesReq.Header.Add("authorization", "token="+m.authorizationToken) + resp, err := m.client.Do(slavesReq) + if err == nil && resp != nil && resp.StatusCode == 200 { + defer resp.Body.Close() + derr := json.NewDecoder(resp.Body).Decode(&slaves) + if derr != nil { + return nil, derr + } + log.Debugf("Slave Info: %+v\n", slaves) + } else { + sc := -1 + if resp != nil { + sc = resp.StatusCode + } + log.Errorf("Error Slave Req. Error Code: %d Error: %v\n", sc, err) + return nil, err + } + + for _, slave := range slaves.Slaves { + if slave.ID == slaveId { + return slave, nil + } + } + + return nil, fmt.Errorf("Could not find the slaveId %s in the master.", slaveId) +} + +func (m *mesosScheduler) getFrameworkInfo(frameworkId, protocol, host string) (*mesosFramework, error) { + var frameworks mesosFrameworks + + log.Debugf("Get Framework Info %s\n", frameworkId) + frameworkReq, err := http.NewRequest("GET", protocol+"://"+host+"/frameworks?framwork_id="+frameworkId, nil) + if err != nil { + return nil, err + } + frameworkReq.Header.Add("content-type", "application/json") + frameworkReq.Header.Add("authorization", "token="+m.authorizationToken) + + //Execute the request + fresp, err := m.client.Do(frameworkReq) + if err == nil && fresp != nil && fresp.StatusCode == 200 { + defer fresp.Body.Close() + derr := json.NewDecoder(fresp.Body).Decode(&frameworks) + if derr != nil { + return nil, derr + } + log.Debugf("Framework Info: %+v\n", frameworks) + } else { + sc := -1 + if fresp != nil { + sc = fresp.StatusCode + } + + log.Errorf("Error Task Req. Error Code: %d Error: %v\n", sc, err) + return nil, err + } + + for _, framework := range frameworks.Frameworks { + if framework.ID == frameworkId { + return framework, nil + } + } + + return nil, fmt.Errorf("Could not find frameworkId %s in mesos.", frameworkId) +} + +func (m *mesosScheduler) getTaskInfo(taskId, protocol, host string) (*mesosTask, error) { + var tasks mesosTasks + + log.Debugf("Get Task Info %s\n", taskId) + taskReq, err := http.NewRequest("GET", protocol+"://"+host+"/tasks?task_id="+taskId, nil) + if err != nil { + return nil, err + } + taskReq.Header.Add("content-type", "application/json") + taskReq.Header.Add("authorization", "token="+m.authorizationToken) + + //Execute the request + tresp, err := m.client.Do(taskReq) + if err == nil && tresp != nil && tresp.StatusCode == 200 { + defer tresp.Body.Close() + derr := json.NewDecoder(tresp.Body).Decode(&tasks) + if derr != nil { + return nil, derr + } + log.Debugf("Task Info: %+v\n", tasks) + } else { + sc := -1 + if tresp != nil { + sc = tresp.StatusCode + } + + log.Errorf("Error Task Req. Error Code: %d Error: %v\n", sc, err) + return nil, err + } + + for _, task := range tasks.Tasks { + if task.ID == taskId { + return task, nil + } + } + + return nil, fmt.Errorf("Could not find taskId %s in mesos.", taskId) +} + +func (m *mesosScheduler) getMesosTask(taskId string) (*mesosTask, string, string, error) { + + var slaveHost string + + //Get the master hosts to try + if masterHosts, protocol, err := m.getMesosMaster(); err == nil { + //Check the Authorization Token and create if needed + m.createAuthorizationToken() + + var masterErr error + + //Loop over the master hosts trying to get a response + var task *mesosTask + for _, host := range masterHosts { + var err error + task, err = m.getTaskInfo(taskId, protocol, host) + if err != nil { + masterErr = err + continue + } else if task == nil { + masterErr = fmt.Errorf("Could not find taskId %s in mesos.", taskId) + continue + } else { + masterErr = nil + } + } + + if masterErr != nil { + return &mesosTask{}, "", "", masterErr + } + + var slave *mesosSlave + for _, host := range masterHosts { + var err error + slave, err = m.getSlaveInfo(task.SlaveID, protocol, host) + if err != nil { + masterErr = err + continue + } else if slave == nil { + masterErr = fmt.Errorf("Could not find slaveId %s in mesos.", task.SlaveID) + continue + } else { + masterErr = nil + } + } + + if masterErr != nil { + log.Warnf("Mesos: Task ID %v was running on Slave %v, but no information about that slave was found.", task.ID, task.SlaveID) + } else { + if pid, err := upid.Parse(slave.PID); err == nil { + slaveHost = pid.Host + } else { + log.Warnf("Mesos: Failed to parse PID %v.", slave.PID) + } + } + + var framework *mesosFramework + for _, host := range masterHosts { + var err error + framework, err = m.getFrameworkInfo(task.FrameworkID, protocol, host) + if err != nil { + masterErr = err + continue + } else if framework == nil { + masterErr = fmt.Errorf("Could not find frameworkId %s in mesos.", task.FrameworkID) + continue + } else { + masterErr = nil + } + } + + return task, slaveHost, framework.Name, nil + } else { + return &mesosTask{}, "", "", err + } +} diff --git a/scheduler/dcosee_mesos/mesos_test.go b/scheduler/dcosee_mesos/mesos_test.go new file mode 100644 index 0000000..77c95fa --- /dev/null +++ b/scheduler/dcosee_mesos/mesos_test.go @@ -0,0 +1,18 @@ +package dcosee_mesos + +import ( + "os" + "testing" +) + +var testMesosMaster = os.Getenv("MESOS_MASTER") + +func TestGetMesos(t *testing.T) { + if len(testMesosMaster) == 0 { + t.Skip() + } else { + if _, err := NewMesosScheduler(testMesosMaster, "", nil); err != nil { + t.Fatalf("Failed to get mesos masters: %v", err) + } + } +} diff --git a/scheduler/ecs/ecs.go b/scheduler/ecs/ecs.go index f876aae..e2fee58 100644 --- a/scheduler/ecs/ecs.go +++ b/scheduler/ecs/ecs.go @@ -5,12 +5,13 @@ import ( "strings" "time" + "github.com/nemosupremo/vault-gatekeeper/scheduler" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/ec2metadata" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/ecs" "github.com/franela/goreq" - "github.com/nemosupremo/vault-gatekeeper/scheduler" ) type ecsScheduler struct { diff --git a/scheduler/mesos/args.go b/scheduler/mesos/args.go index d89677e..a2b3b7a 100644 --- a/scheduler/mesos/args.go +++ b/scheduler/mesos/args.go @@ -2,6 +2,7 @@ package mesos import ( "github.com/nemosupremo/vault-gatekeeper/scheduler" + "github.com/spf13/viper" ) diff --git a/scheduler/mesos/mesos.go b/scheduler/mesos/mesos.go index 60597ca..c713543 100644 --- a/scheduler/mesos/mesos.go +++ b/scheduler/mesos/mesos.go @@ -11,8 +11,9 @@ import ( "strings" "time" - "github.com/mesos/mesos-go/api/v0/upid" "github.com/nemosupremo/vault-gatekeeper/scheduler" + + "github.com/mesos/mesos-go/api/v0/upid" "github.com/samuel/go-zookeeper/zk" "github.com/sirupsen/logrus" ) diff --git a/usagestore/vault.go b/usagestore/vault.go index 471c38d..ba016f7 100644 --- a/usagestore/vault.go +++ b/usagestore/vault.go @@ -6,8 +6,9 @@ import ( "path" "time" - "github.com/franela/goreq" "github.com/nemosupremo/vault-gatekeeper/vault" + + "github.com/franela/goreq" log "github.com/sirupsen/logrus" ) diff --git a/usagestore/vault_test.go b/usagestore/vault_test.go index bf4e475..06827c1 100644 --- a/usagestore/vault_test.go +++ b/usagestore/vault_test.go @@ -8,8 +8,9 @@ import ( "testing" "time" - "github.com/franela/goreq" "github.com/nemosupremo/vault-gatekeeper/vault" + + "github.com/franela/goreq" "github.com/segmentio/ksuid" "github.com/spf13/viper" ) diff --git a/vault/unsealer/unsealer.go b/vault/unsealer/unsealer.go index e52391a..9ff6469 100644 --- a/vault/unsealer/unsealer.go +++ b/vault/unsealer/unsealer.go @@ -7,6 +7,7 @@ import ( "github.com/aws/aws-sdk-go/aws/ec2metadata" "github.com/aws/aws-sdk-go/aws/session" "github.com/franela/goreq" + "github.com/nemosupremo/vault-gatekeeper/vault" ) diff --git a/vault/unsealer/unsealer_test.go b/vault/unsealer/unsealer_test.go index c01e229..5f75636 100644 --- a/vault/unsealer/unsealer_test.go +++ b/vault/unsealer/unsealer_test.go @@ -4,8 +4,9 @@ import ( "os" "testing" - "github.com/franela/goreq" "github.com/nemosupremo/vault-gatekeeper/vault" + + "github.com/franela/goreq" "github.com/segmentio/ksuid" "github.com/spf13/viper" )