Skip to content

Commit

Permalink
Add gke package, add kubenetes.Dialer type.
Browse files Browse the repository at this point in the history
Updates golang/go#18817

Change-Id: Ifee53384486b0692899b77be2eaa42ca9006ef8e
Reviewed-on: https://go-review.googlesource.com/36016
Reviewed-by: Chris Broadfoot <[email protected]>
  • Loading branch information
bradfitz committed Jan 31, 2017
1 parent 310c021 commit adc161a
Show file tree
Hide file tree
Showing 5 changed files with 315 additions and 59 deletions.
68 changes: 9 additions & 59 deletions cmd/coordinator/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,10 @@ package main

import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/base64"
"errors"
"fmt"
"io"
"log"
"net/http"
"sort"
"strconv"
"strings"
Expand All @@ -24,7 +20,7 @@ import (
"golang.org/x/build/dashboard"
"golang.org/x/build/kubernetes"
"golang.org/x/build/kubernetes/api"
"golang.org/x/oauth2"
"golang.org/x/build/kubernetes/gke"
container "google.golang.org/api/container/v1"
)

Expand Down Expand Up @@ -56,64 +52,18 @@ func initKube() error {
if !hasCloudPlatformScope() {
return errors.New("coordinator not running with access to the Cloud Platform scope.")
}
httpClient := oauth2.NewClient(oauth2.NoContext, tokenSource)
var err error

containerService, err = container.New(httpClient)
if err != nil {
return fmt.Errorf("could not create client for Google Container Engine: %v", err)
}

kubeCluster, err = containerService.Projects.Zones.Clusters.Get(buildEnv.ProjectName, buildEnv.Zone, clusterName).Do()
if err != nil {
return fmt.Errorf("cluster %q could not be found in project %q, zone %q: %v", clusterName, buildEnv.ProjectName, buildEnv.Zone, err)
}

// Decode certs
decode := func(which string, cert string) []byte {
if err != nil {
return nil
}
s, decErr := base64.StdEncoding.DecodeString(cert)
if decErr != nil {
err = fmt.Errorf("error decoding %s cert: %v", which, decErr)
}
return []byte(s)
}
clientCert := decode("client cert", kubeCluster.MasterAuth.ClientCertificate)
clientKey := decode("client key", kubeCluster.MasterAuth.ClientKey)
caCert := decode("cluster cert", kubeCluster.MasterAuth.ClusterCaCertificate)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel() // ctx is only used for discovery and connect; not retained.
kc, err := gke.NewClient(ctx,
clusterName,
gke.OptZone(buildEnv.Zone),
gke.OptProject(buildEnv.ProjectName),
gke.OptTokenSource(tokenSource))
if err != nil {
return err
}

// HTTPS client
cert, err := tls.X509KeyPair(clientCert, clientKey)
if err != nil {
return fmt.Errorf("x509 client key pair could not be generated: %v", err)
}

// CA Cert from kube master
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM([]byte(caCert))

// Setup TLS config
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
}
tlsConfig.BuildNameToCertificate()

kubeHTTPClient := &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsConfig,
},
}

kubeClient, err = kubernetes.NewClient("https://"+kubeCluster.Endpoint, kubeHTTPClient)
if err != nil {
return fmt.Errorf("kubernetes HTTP client could not be created: %v", err)
}
kubeClient = kc

go kubePool.pollCapacityLoop()
return nil
Expand Down
8 changes: 8 additions & 0 deletions kubernetes/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ func NewClient(baseURL string, client *http.Client) (*Client, error) {
}, nil
}

// Close closes any idle HTTP connections still connected to the Kubernetes master.
func (c *Client) Close() error {
if tr, ok := c.httpClient.Transport.(*http.Transport); ok {
tr.CloseIdleConnections()
}
return nil
}

// RunLongLivedPod creates a new pod resource in the default pod namespace with
// the given pod API specification. It assumes the pod runs a
// long-lived server (i.e. if the container exit quickly quickly, even
Expand Down
35 changes: 35 additions & 0 deletions kubernetes/dialer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2017 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package kubernetes

import (
"context"
"fmt"
"net"
"strconv"
)

// Dialer dials Kubernetes pods.
//
// TODO: services also.
type Dialer struct {
kc *Client
}

func NewDialer(kc *Client) *Dialer {
return &Dialer{kc: kc}
}

func (d *Dialer) Dial(ctx context.Context, podName string, port int) (net.Conn, error) {
status, err := d.kc.PodStatus(ctx, podName)
if err != nil {
return nil, fmt.Errorf("PodStatus of %q: %v", podName, err)
}
if status.Phase != "Running" {
return nil, fmt.Errorf("pod %q in state %q", podName, status.Phase)
}
var dialer net.Dialer
return dialer.DialContext(ctx, "tcp", net.JoinHostPort(status.PodIP, strconv.Itoa(port)))
}
174 changes: 174 additions & 0 deletions kubernetes/gke/gke.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
// Copyright 2017 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Package gke contains code for interacting with Google Container Engine (GKE),
// the hosted version of Kubernetes on Google Cloud Platform.
//
// The API is not subject to the Go 1 compatibility promise and may change at
// any time. Users should vendor this package and deal with API changes.
package gke

import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/base64"
"fmt"
"net/http"

"cloud.google.com/go/compute/metadata"

"golang.org/x/build/kubernetes"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
compute "google.golang.org/api/compute/v1"
"google.golang.org/api/container/v1"
)

// ClientOpt represents an option that can be passed to the Client function.
type ClientOpt interface {
modify(*clientOpt)
}

type clientOpt struct {
Project string
TokenSource oauth2.TokenSource
Zone string
}

type clientOptFunc func(*clientOpt)

func (f clientOptFunc) modify(o *clientOpt) { f(o) }

// OptProject returns an option setting the GCE Project ID to projectName.
// This is the named project ID, not the numeric ID.
// If unspecified, the current active project ID is used, if the program is running
// on a GCE intance.
func OptProject(projectName string) ClientOpt {
return clientOptFunc(func(o *clientOpt) {
o.Project = projectName
})
}

// OptZone specifies the GCP zone the cluster is located in.
// This is necessary if and only if there are multiple GKE clusters with
// the same name in different zones.
func OptZone(zoneName string) ClientOpt {
return clientOptFunc(func(o *clientOpt) {
o.Zone = zoneName
})
}

// OptTokenSource sets the oauth2 token source for making
// authenticated requests to the GKE API. If unset, the default token
// source is used (https://godoc.org/golang.org/x/oauth2/google#DefaultTokenSource).
func OptTokenSource(ts oauth2.TokenSource) ClientOpt {
return clientOptFunc(func(o *clientOpt) {
o.TokenSource = ts
})
}

// NewClient returns an Kubernetes client to a GKE cluster.
func NewClient(ctx context.Context, clusterName string, opts ...ClientOpt) (*kubernetes.Client, error) {
var opt clientOpt
for _, o := range opts {
o.modify(&opt)
}
if opt.TokenSource == nil {
var err error
opt.TokenSource, err = google.DefaultTokenSource(ctx, compute.CloudPlatformScope)
if err != nil {
return nil, fmt.Errorf("failed to get a token source: %v", err)
}
}
if opt.Project == "" {
proj, err := metadata.ProjectID()
if err != nil {
return nil, fmt.Errorf("metadata.ProjectID: %v", err)
}
opt.Project = proj
}

httpClient := oauth2.NewClient(ctx, opt.TokenSource)
containerService, err := container.New(httpClient)
if err != nil {
return nil, fmt.Errorf("could not create client for Google Container Engine: %v", err)
}

var cluster *container.Cluster
if opt.Zone == "" {
clusters, err := containerService.Projects.Zones.Clusters.List(opt.Project, "-").Context(ctx).Do()
if err != nil {
return nil, err
}
if len(clusters.MissingZones) > 0 {
return nil, fmt.Errorf("GKE cluster list response contains missing zones: %v", clusters.MissingZones)
}
matches := 0
for _, cl := range clusters.Clusters {
if cl.Name == clusterName {
cluster = cl
matches++
}
}
if matches == 0 {
return nil, fmt.Errorf("cluster %q not found in any zone", clusterName)
}
if matches > 1 {
return nil, fmt.Errorf("cluster %q is ambiguous without using gke.OptZone to specify a zone", clusterName)
}
} else {
cluster, err = containerService.Projects.Zones.Clusters.Get(opt.Project, opt.Zone, clusterName).Context(ctx).Do()
if err != nil {
return nil, fmt.Errorf("cluster %q could not be found in project %q, zone %q: %v", clusterName, opt.Project, opt.Zone, err)
}
}

// Decode certs
decode := func(which string, cert string) []byte {
if err != nil {
return nil
}
s, decErr := base64.StdEncoding.DecodeString(cert)
if decErr != nil {
err = fmt.Errorf("error decoding %s cert: %v", which, decErr)
}
return []byte(s)
}
clientCert := decode("client cert", cluster.MasterAuth.ClientCertificate)
clientKey := decode("client key", cluster.MasterAuth.ClientKey)
caCert := decode("cluster cert", cluster.MasterAuth.ClusterCaCertificate)
if err != nil {
return nil, err
}

// HTTPS client
cert, err := tls.X509KeyPair(clientCert, clientKey)
if err != nil {
return nil, fmt.Errorf("x509 client key pair could not be generated: %v", err)
}

// CA Cert from kube master
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM([]byte(caCert))

// Setup TLS config
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
}
tlsConfig.BuildNameToCertificate()

kubeHTTPClient := &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsConfig,
},
}

kubeClient, err := kubernetes.NewClient("https://"+cluster.Endpoint, kubeHTTPClient)
if err != nil {
return nil, fmt.Errorf("kubernetes HTTP client could not be created: %v", err)
}
return kubeClient, nil
}
Loading

0 comments on commit adc161a

Please sign in to comment.