Skip to content

Commit

Permalink
[backport v8] force http2 kubernetes #9294 (#9796)
Browse files Browse the repository at this point in the history
* [cloud#1043] force kubernetes proxy to use http2

* typo

* typo x2

* test k8 proxy http2 capabilities

* linting
NajiObeid authored Jan 18, 2022

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 8f40a7c commit 50e79c8
Showing 3 changed files with 115 additions and 16 deletions.
91 changes: 81 additions & 10 deletions integration/kube_integration_test.go
Original file line number Diff line number Diff line change
@@ -33,35 +33,37 @@ import (
"testing"
"time"

"github.com/gravitational/trace"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"

"github.com/gravitational/teleport"
apidefaults "github.com/gravitational/teleport/api/defaults"
"github.com/gravitational/teleport/api/profile"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib"
"github.com/gravitational/teleport/lib/auth/testauthority"
"github.com/gravitational/teleport/lib/events"
kubeutils "github.com/gravitational/teleport/lib/kube/utils"
"github.com/gravitational/teleport/lib/service"
"github.com/gravitational/teleport/lib/services"
"github.com/gravitational/teleport/lib/session"
"github.com/gravitational/teleport/lib/tlsca"
"github.com/gravitational/teleport/lib/utils"
"github.com/gravitational/teleport/lib/utils/testlog"

v1 "k8s.io/api/core/v1"
apidefaults "github.com/gravitational/teleport/api/defaults"
kubeutils "github.com/gravitational/teleport/lib/kube/utils"

"github.com/gravitational/trace"
"github.com/stretchr/testify/require"
"golang.org/x/net/http2"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
streamspdy "k8s.io/apimachinery/pkg/util/httpstream/spdy"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/client-go/transport"
"k8s.io/client-go/transport/spdy"

log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
streamspdy "k8s.io/apimachinery/pkg/util/httpstream/spdy"
)

type KubeSuite struct {
@@ -156,6 +158,7 @@ func TestKube(t *testing.T) {
t.Run("Exec", suite.bind(testKubeExec))
t.Run("Deny", suite.bind(testKubeDeny))
t.Run("PortForward", suite.bind(testKubePortForward))
t.Run("TransportProtocol", suite.bind(testKubeTransportProtocol))
t.Run("TrustedClustersClientCert", suite.bind(testKubeTrustedClustersClientCert))
t.Run("TrustedClustersSNI", suite.bind(testKubeTrustedClustersSNI))
t.Run("Disconnect", suite.bind(testKubeDisconnect))
@@ -979,7 +982,6 @@ loop:
err = impersonatingForwarder.ForwardPorts()
require.Error(t, err)
require.Regexp(t, ".*impersonation request has been denied.*", err.Error())

}

// TestKubeDisconnect tests kubernetes session disconnects
@@ -1094,6 +1096,75 @@ func runKubeDisconnectTest(t *testing.T, suite *KubeSuite, tc disconnectTestCase
}
}

// testKubeTransportProtocol tests the proxy transport protocol capabilities
func testKubeTransportProtocol(t *testing.T, suite *KubeSuite) {
tconf := suite.teleKubeConfig(Host)

teleport := NewInstance(InstanceConfig{
ClusterName: Site,
HostID: HostID,
NodeName: Host,
Priv: suite.priv,
Pub: suite.pub,
log: suite.log,
})

username := suite.me.Username
kubeGroups := []string{testImpersonationGroup}
role, err := types.NewRole("kubemaster", types.RoleSpecV4{
Allow: types.RoleConditions{
Logins: []string{username},
KubeGroups: kubeGroups,
},
})
require.NoError(t, err)
teleport.AddUserWithRole(username, role)

err = teleport.CreateEx(t, nil, tconf)
require.NoError(t, err)

err = teleport.Start()
require.NoError(t, err)
defer teleport.StopAll()

// set up kube configuration using proxy
_, proxyClientConfig, err := kubeProxyClient(kubeProxyConfig{
t: teleport,
username: username,
kubeGroups: kubeGroups,
})
require.NoError(t, err)

u, err := url.Parse(proxyClientConfig.Host)
require.NoError(t, err)

u.Scheme = "https"
u.Path = fmt.Sprintf("/api/v1/namespaces/%v/pods/%v", testNamespace, testPod)

tlsConfig, err := tlsClientConfig(proxyClientConfig)
require.NoError(t, err)

trans := &http.Transport{
TLSClientConfig: tlsConfig,
}

// call proxy with an HTTP1 client
client := &http.Client{Transport: trans}
resp1, err := client.Get(u.String())
require.NoError(t, err)
defer resp1.Body.Close()
require.Equal(t, resp1.Proto, "HTTP/1.1")

// call proxy with an HTTP2 client
err = http2.ConfigureTransport(trans)
require.NoError(t, err)

resp2, err := client.Get(u.String())
require.NoError(t, err)
defer resp2.Body.Close()
require.Equal(t, resp2.Proto, "HTTP/2.0")
}

// teleKubeConfig sets up teleport with kubernetes turned on
func (s *KubeSuite) teleKubeConfig(hostname string) *service.Config {
tconf := service.MakeDefaultConfig()
34 changes: 28 additions & 6 deletions lib/kube/proxy/forwarder.go
Original file line number Diff line number Diff line change
@@ -60,6 +60,7 @@ import (
"github.com/julienschmidt/httprouter"
log "github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"
"golang.org/x/net/http2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/httpstream"
@@ -1440,7 +1441,11 @@ func (f *Forwarder) newClusterSessionRemoteCluster(ctx authContext) (*clusterSes
tlsConfig: tlsConfig,
}

transport := f.newTransport(sess.Dial, sess.tlsConfig)
transport, err := f.newTransport(sess.Dial, sess.tlsConfig)
if err != nil {
return nil, trace.Wrap(err)
}

sess.forwarder, err = forward.New(
forward.FlushInterval(100*time.Millisecond),
forward.RoundTripper(transport),
@@ -1512,13 +1517,18 @@ func (f *Forwarder) newClusterSessionLocal(ctx authContext) (*clusterSession, er
tlsConfig: creds.tlsConfig,
}

t, err := f.newTransport(sess.Dial, sess.tlsConfig)
if err != nil {
return nil, trace.Wrap(err)
}

// When running inside Kubernetes cluster or using auth/exec providers,
// kubeconfig provides a transport wrapper that adds a bearer token to
// requests
//
// When forwarding request to a remote cluster, this is not needed
// as the proxy uses client cert auth to reach out to remote proxy.
transport, err := creds.wrapTransport(f.newTransport(sess.Dial, sess.tlsConfig))
transport, err := creds.wrapTransport(t)
if err != nil {
return nil, trace.Wrap(err)
}
@@ -1558,7 +1568,11 @@ func (f *Forwarder) newClusterSessionDirect(ctx authContext, endpoints []kubeClu
noAuditEvents: true,
}

transport := f.newTransport(sess.Dial, sess.tlsConfig)
transport, err := f.newTransport(sess.Dial, sess.tlsConfig)
if err != nil {
return nil, trace.Wrap(err)
}

sess.forwarder, err = forward.New(
forward.FlushInterval(100*time.Millisecond),
forward.RoundTripper(transport),
@@ -1575,8 +1589,8 @@ func (f *Forwarder) newClusterSessionDirect(ctx authContext, endpoints []kubeClu
// DialFunc is a network dialer function that returns a network connection
type DialFunc func(string, string) (net.Conn, error)

func (f *Forwarder) newTransport(dial DialFunc, tlsConfig *tls.Config) *http.Transport {
return &http.Transport{
func (f *Forwarder) newTransport(dial DialFunc, tlsConfig *tls.Config) (*http.Transport, error) {
t := &http.Transport{
Dial: dial,
TLSClientConfig: tlsConfig,
// Increase the size of the connection pool. This substantially improves the
@@ -1587,8 +1601,16 @@ func (f *Forwarder) newTransport(dial DialFunc, tlsConfig *tls.Config) *http.Tra
// IdleConnTimeout defines the maximum amount of time before idle connections
// are closed. Leaving this unset will lead to connections open forever and
// will cause memory leaks in a long running process.
IdleConnTimeout: defaults.HTTPIdleTimeout,
IdleConnTimeout: defaults.HTTPIdleTimeout,
ForceAttemptHTTP2: true,
}

err := http2.ConfigureTransport(t)
if err != nil {
return nil, trace.Wrap(err)
}

return t, nil
}

// getOrCreateRequestContext creates a new certificate request for a given context,
6 changes: 6 additions & 0 deletions lib/kube/proxy/server.go
Original file line number Diff line number Diff line change
@@ -34,6 +34,7 @@ import (

"github.com/gravitational/trace"
log "github.com/sirupsen/logrus"
"golang.org/x/net/http2"
)

// TLSServerConfig is a configuration for TLS server
@@ -124,6 +125,7 @@ func NewTLSServer(cfg TLSServerConfig) (*TLSServer, error) {
Server: &http.Server{
Handler: limiter,
ReadHeaderTimeout: apidefaults.DefaultDialTimeout * 2,
TLSConfig: cfg.TLS,
},
}
server.TLS.GetConfigForClient = server.GetConfigForClient
@@ -173,11 +175,15 @@ func (t *TLSServer) Serve(listener net.Listener) error {
if err != nil {
return trace.Wrap(err)
}

go mux.Serve()
defer mux.Close()

t.mu.Lock()
t.listener = mux.TLS()
if err = http2.ConfigureServer(t.Server, &http2.Server{}); err != nil {
return trace.Wrap(err)
}
t.mu.Unlock()

if t.heartbeat != nil {

0 comments on commit 50e79c8

Please sign in to comment.