From adc161a2433b5e74799672a1ce894b6fec95a0b8 Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Tue, 31 Jan 2017 20:31:07 +0000 Subject: [PATCH] Add gke package, add kubenetes.Dialer type. Updates golang/go#18817 Change-Id: Ifee53384486b0692899b77be2eaa42ca9006ef8e Reviewed-on: https://go-review.googlesource.com/36016 Reviewed-by: Chris Broadfoot --- cmd/coordinator/kube.go | 68 ++------------- kubernetes/client.go | 8 ++ kubernetes/dialer.go | 35 ++++++++ kubernetes/gke/gke.go | 174 +++++++++++++++++++++++++++++++++++++ kubernetes/gke/gke_test.go | 89 +++++++++++++++++++ 5 files changed, 315 insertions(+), 59 deletions(-) create mode 100644 kubernetes/dialer.go create mode 100644 kubernetes/gke/gke.go create mode 100644 kubernetes/gke/gke_test.go diff --git a/cmd/coordinator/kube.go b/cmd/coordinator/kube.go index b57677d8b3..1f184e0445 100644 --- a/cmd/coordinator/kube.go +++ b/cmd/coordinator/kube.go @@ -6,14 +6,10 @@ package main import ( "context" - "crypto/tls" - "crypto/x509" - "encoding/base64" "errors" "fmt" "io" "log" - "net/http" "sort" "strconv" "strings" @@ -24,7 +20,7 @@ import ( "golang.org/x/build/dashboard" "golang.org/x/build/kubernetes" "golang.org/x/build/kubernetes/api" - "golang.org/x/oauth2" + "golang.org/x/build/kubernetes/gke" container "google.golang.org/api/container/v1" ) @@ -56,64 +52,18 @@ func initKube() error { if !hasCloudPlatformScope() { return errors.New("coordinator not running with access to the Cloud Platform scope.") } - httpClient := oauth2.NewClient(oauth2.NoContext, tokenSource) - var err error - containerService, err = container.New(httpClient) - if err != nil { - return fmt.Errorf("could not create client for Google Container Engine: %v", err) - } - - kubeCluster, err = containerService.Projects.Zones.Clusters.Get(buildEnv.ProjectName, buildEnv.Zone, clusterName).Do() - if err != nil { - return fmt.Errorf("cluster %q could not be found in project %q, zone %q: %v", clusterName, buildEnv.ProjectName, buildEnv.Zone, err) - } - - // Decode certs - decode := func(which string, cert string) []byte { - if err != nil { - return nil - } - s, decErr := base64.StdEncoding.DecodeString(cert) - if decErr != nil { - err = fmt.Errorf("error decoding %s cert: %v", which, decErr) - } - return []byte(s) - } - clientCert := decode("client cert", kubeCluster.MasterAuth.ClientCertificate) - clientKey := decode("client key", kubeCluster.MasterAuth.ClientKey) - caCert := decode("cluster cert", kubeCluster.MasterAuth.ClusterCaCertificate) + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancel() // ctx is only used for discovery and connect; not retained. + kc, err := gke.NewClient(ctx, + clusterName, + gke.OptZone(buildEnv.Zone), + gke.OptProject(buildEnv.ProjectName), + gke.OptTokenSource(tokenSource)) if err != nil { return err } - - // HTTPS client - cert, err := tls.X509KeyPair(clientCert, clientKey) - if err != nil { - return fmt.Errorf("x509 client key pair could not be generated: %v", err) - } - - // CA Cert from kube master - caCertPool := x509.NewCertPool() - caCertPool.AppendCertsFromPEM([]byte(caCert)) - - // Setup TLS config - tlsConfig := &tls.Config{ - Certificates: []tls.Certificate{cert}, - RootCAs: caCertPool, - } - tlsConfig.BuildNameToCertificate() - - kubeHTTPClient := &http.Client{ - Transport: &http.Transport{ - TLSClientConfig: tlsConfig, - }, - } - - kubeClient, err = kubernetes.NewClient("https://"+kubeCluster.Endpoint, kubeHTTPClient) - if err != nil { - return fmt.Errorf("kubernetes HTTP client could not be created: %v", err) - } + kubeClient = kc go kubePool.pollCapacityLoop() return nil diff --git a/kubernetes/client.go b/kubernetes/client.go index 5aa82d6bbb..80604d586d 100644 --- a/kubernetes/client.go +++ b/kubernetes/client.go @@ -51,6 +51,14 @@ func NewClient(baseURL string, client *http.Client) (*Client, error) { }, nil } +// Close closes any idle HTTP connections still connected to the Kubernetes master. +func (c *Client) Close() error { + if tr, ok := c.httpClient.Transport.(*http.Transport); ok { + tr.CloseIdleConnections() + } + return nil +} + // RunLongLivedPod creates a new pod resource in the default pod namespace with // the given pod API specification. It assumes the pod runs a // long-lived server (i.e. if the container exit quickly quickly, even diff --git a/kubernetes/dialer.go b/kubernetes/dialer.go new file mode 100644 index 0000000000..6439635c0a --- /dev/null +++ b/kubernetes/dialer.go @@ -0,0 +1,35 @@ +// Copyright 2017 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package kubernetes + +import ( + "context" + "fmt" + "net" + "strconv" +) + +// Dialer dials Kubernetes pods. +// +// TODO: services also. +type Dialer struct { + kc *Client +} + +func NewDialer(kc *Client) *Dialer { + return &Dialer{kc: kc} +} + +func (d *Dialer) Dial(ctx context.Context, podName string, port int) (net.Conn, error) { + status, err := d.kc.PodStatus(ctx, podName) + if err != nil { + return nil, fmt.Errorf("PodStatus of %q: %v", podName, err) + } + if status.Phase != "Running" { + return nil, fmt.Errorf("pod %q in state %q", podName, status.Phase) + } + var dialer net.Dialer + return dialer.DialContext(ctx, "tcp", net.JoinHostPort(status.PodIP, strconv.Itoa(port))) +} diff --git a/kubernetes/gke/gke.go b/kubernetes/gke/gke.go new file mode 100644 index 0000000000..0811b532a6 --- /dev/null +++ b/kubernetes/gke/gke.go @@ -0,0 +1,174 @@ +// Copyright 2017 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package gke contains code for interacting with Google Container Engine (GKE), +// the hosted version of Kubernetes on Google Cloud Platform. +// +// The API is not subject to the Go 1 compatibility promise and may change at +// any time. Users should vendor this package and deal with API changes. +package gke + +import ( + "context" + "crypto/tls" + "crypto/x509" + "encoding/base64" + "fmt" + "net/http" + + "cloud.google.com/go/compute/metadata" + + "golang.org/x/build/kubernetes" + "golang.org/x/oauth2" + "golang.org/x/oauth2/google" + compute "google.golang.org/api/compute/v1" + "google.golang.org/api/container/v1" +) + +// ClientOpt represents an option that can be passed to the Client function. +type ClientOpt interface { + modify(*clientOpt) +} + +type clientOpt struct { + Project string + TokenSource oauth2.TokenSource + Zone string +} + +type clientOptFunc func(*clientOpt) + +func (f clientOptFunc) modify(o *clientOpt) { f(o) } + +// OptProject returns an option setting the GCE Project ID to projectName. +// This is the named project ID, not the numeric ID. +// If unspecified, the current active project ID is used, if the program is running +// on a GCE intance. +func OptProject(projectName string) ClientOpt { + return clientOptFunc(func(o *clientOpt) { + o.Project = projectName + }) +} + +// OptZone specifies the GCP zone the cluster is located in. +// This is necessary if and only if there are multiple GKE clusters with +// the same name in different zones. +func OptZone(zoneName string) ClientOpt { + return clientOptFunc(func(o *clientOpt) { + o.Zone = zoneName + }) +} + +// OptTokenSource sets the oauth2 token source for making +// authenticated requests to the GKE API. If unset, the default token +// source is used (https://godoc.org/golang.org/x/oauth2/google#DefaultTokenSource). +func OptTokenSource(ts oauth2.TokenSource) ClientOpt { + return clientOptFunc(func(o *clientOpt) { + o.TokenSource = ts + }) +} + +// NewClient returns an Kubernetes client to a GKE cluster. +func NewClient(ctx context.Context, clusterName string, opts ...ClientOpt) (*kubernetes.Client, error) { + var opt clientOpt + for _, o := range opts { + o.modify(&opt) + } + if opt.TokenSource == nil { + var err error + opt.TokenSource, err = google.DefaultTokenSource(ctx, compute.CloudPlatformScope) + if err != nil { + return nil, fmt.Errorf("failed to get a token source: %v", err) + } + } + if opt.Project == "" { + proj, err := metadata.ProjectID() + if err != nil { + return nil, fmt.Errorf("metadata.ProjectID: %v", err) + } + opt.Project = proj + } + + httpClient := oauth2.NewClient(ctx, opt.TokenSource) + containerService, err := container.New(httpClient) + if err != nil { + return nil, fmt.Errorf("could not create client for Google Container Engine: %v", err) + } + + var cluster *container.Cluster + if opt.Zone == "" { + clusters, err := containerService.Projects.Zones.Clusters.List(opt.Project, "-").Context(ctx).Do() + if err != nil { + return nil, err + } + if len(clusters.MissingZones) > 0 { + return nil, fmt.Errorf("GKE cluster list response contains missing zones: %v", clusters.MissingZones) + } + matches := 0 + for _, cl := range clusters.Clusters { + if cl.Name == clusterName { + cluster = cl + matches++ + } + } + if matches == 0 { + return nil, fmt.Errorf("cluster %q not found in any zone", clusterName) + } + if matches > 1 { + return nil, fmt.Errorf("cluster %q is ambiguous without using gke.OptZone to specify a zone", clusterName) + } + } else { + cluster, err = containerService.Projects.Zones.Clusters.Get(opt.Project, opt.Zone, clusterName).Context(ctx).Do() + if err != nil { + return nil, fmt.Errorf("cluster %q could not be found in project %q, zone %q: %v", clusterName, opt.Project, opt.Zone, err) + } + } + + // Decode certs + decode := func(which string, cert string) []byte { + if err != nil { + return nil + } + s, decErr := base64.StdEncoding.DecodeString(cert) + if decErr != nil { + err = fmt.Errorf("error decoding %s cert: %v", which, decErr) + } + return []byte(s) + } + clientCert := decode("client cert", cluster.MasterAuth.ClientCertificate) + clientKey := decode("client key", cluster.MasterAuth.ClientKey) + caCert := decode("cluster cert", cluster.MasterAuth.ClusterCaCertificate) + if err != nil { + return nil, err + } + + // HTTPS client + cert, err := tls.X509KeyPair(clientCert, clientKey) + if err != nil { + return nil, fmt.Errorf("x509 client key pair could not be generated: %v", err) + } + + // CA Cert from kube master + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM([]byte(caCert)) + + // Setup TLS config + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{cert}, + RootCAs: caCertPool, + } + tlsConfig.BuildNameToCertificate() + + kubeHTTPClient := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: tlsConfig, + }, + } + + kubeClient, err := kubernetes.NewClient("https://"+cluster.Endpoint, kubeHTTPClient) + if err != nil { + return nil, fmt.Errorf("kubernetes HTTP client could not be created: %v", err) + } + return kubeClient, nil +} diff --git a/kubernetes/gke/gke_test.go b/kubernetes/gke/gke_test.go new file mode 100644 index 0000000000..c86efa4ad6 --- /dev/null +++ b/kubernetes/gke/gke_test.go @@ -0,0 +1,89 @@ +// Copyright 2017 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package gke_test + +import ( + "context" + "strings" + "testing" + + "cloud.google.com/go/compute/metadata" + "golang.org/x/build/kubernetes" + "golang.org/x/build/kubernetes/gke" + "golang.org/x/oauth2" + "golang.org/x/oauth2/google" + compute "google.golang.org/api/compute/v1" + container "google.golang.org/api/container/v1" +) + +// Tests NewClient and also Dialer. +func TestNewClient(t *testing.T) { + if !metadata.OnGCE() { + t.Skip("not on GCE; skipping") + } + ctx := context.Background() + ts, err := google.DefaultTokenSource(ctx, compute.CloudPlatformScope) + if err != nil { + t.Fatal(err) + } + httpClient := oauth2.NewClient(ctx, ts) + containerService, err := container.New(httpClient) + if err != nil { + t.Fatal(err) + } + proj, err := metadata.ProjectID() + if err != nil { + t.Fatal(err) + } + + clusters, err := containerService.Projects.Zones.Clusters.List(proj, "-").Context(ctx).Do() + if err != nil { + t.Fatal(err) + } + + if len(clusters.Clusters) == 0 { + t.Skip("no GKE clusters") + } + var candidates int + for _, cl := range clusters.Clusters { + kc, err := gke.NewClient(ctx, cl.Name, gke.OptZone(cl.Zone)) + if err != nil { + t.Fatal(err) + } + defer kc.Close() + + pods, err := kc.GetPods(ctx) + if err != nil { + t.Fatal(err) + } + for _, pod := range pods { + if pod.Status.Phase != "Running" { + continue + } + for _, container := range pod.Spec.Containers { + name := container.Name + for _, port := range container.Ports { + if strings.ToLower(string(port.Protocol)) == "udp" || port.ContainerPort == 0 { + continue + } + candidates++ + d := kubernetes.NewDialer(kc) + c, err := d.Dial(ctx, name, port.ContainerPort) + if err != nil { + t.Logf("Dial %q/%q/%d: %v", cl.Name, name, port.ContainerPort, err) + continue + } + c.Close() + t.Logf("Dialed %q/%q/%d.", cl.Name, name, port.ContainerPort) + return + } + } + } + } + if candidates == 0 { + t.Skip("no pods to dial") + } + t.Errorf("dial failures") +}