Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[backport v8] force http2 kubernetes #9294 #9796

Merged
merged 5 commits into from
Jan 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 81 additions & 10 deletions integration/kube_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,35 +33,37 @@ import (
"testing"
"time"

"github.com/gravitational/trace"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"

"github.com/gravitational/teleport"
apidefaults "github.com/gravitational/teleport/api/defaults"
"github.com/gravitational/teleport/api/profile"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib"
"github.com/gravitational/teleport/lib/auth/testauthority"
"github.com/gravitational/teleport/lib/events"
kubeutils "github.com/gravitational/teleport/lib/kube/utils"
"github.com/gravitational/teleport/lib/service"
"github.com/gravitational/teleport/lib/services"
"github.com/gravitational/teleport/lib/session"
"github.com/gravitational/teleport/lib/tlsca"
"github.com/gravitational/teleport/lib/utils"
"github.com/gravitational/teleport/lib/utils/testlog"

v1 "k8s.io/api/core/v1"
apidefaults "github.com/gravitational/teleport/api/defaults"
kubeutils "github.com/gravitational/teleport/lib/kube/utils"

"github.com/gravitational/trace"
"github.com/stretchr/testify/require"
"golang.org/x/net/http2"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
streamspdy "k8s.io/apimachinery/pkg/util/httpstream/spdy"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/client-go/transport"
"k8s.io/client-go/transport/spdy"

log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
streamspdy "k8s.io/apimachinery/pkg/util/httpstream/spdy"
)

type KubeSuite struct {
Expand Down Expand Up @@ -156,6 +158,7 @@ func TestKube(t *testing.T) {
t.Run("Exec", suite.bind(testKubeExec))
t.Run("Deny", suite.bind(testKubeDeny))
t.Run("PortForward", suite.bind(testKubePortForward))
t.Run("TransportProtocol", suite.bind(testKubeTransportProtocol))
t.Run("TrustedClustersClientCert", suite.bind(testKubeTrustedClustersClientCert))
t.Run("TrustedClustersSNI", suite.bind(testKubeTrustedClustersSNI))
t.Run("Disconnect", suite.bind(testKubeDisconnect))
Expand Down Expand Up @@ -979,7 +982,6 @@ loop:
err = impersonatingForwarder.ForwardPorts()
require.Error(t, err)
require.Regexp(t, ".*impersonation request has been denied.*", err.Error())

}

// TestKubeDisconnect tests kubernetes session disconnects
Expand Down Expand Up @@ -1094,6 +1096,75 @@ func runKubeDisconnectTest(t *testing.T, suite *KubeSuite, tc disconnectTestCase
}
}

// testKubeTransportProtocol tests the proxy transport protocol capabilities
func testKubeTransportProtocol(t *testing.T, suite *KubeSuite) {
tconf := suite.teleKubeConfig(Host)

teleport := NewInstance(InstanceConfig{
ClusterName: Site,
HostID: HostID,
NodeName: Host,
Priv: suite.priv,
Pub: suite.pub,
log: suite.log,
})

username := suite.me.Username
kubeGroups := []string{testImpersonationGroup}
role, err := types.NewRole("kubemaster", types.RoleSpecV4{
Allow: types.RoleConditions{
Logins: []string{username},
KubeGroups: kubeGroups,
},
})
require.NoError(t, err)
teleport.AddUserWithRole(username, role)

err = teleport.CreateEx(t, nil, tconf)
require.NoError(t, err)

err = teleport.Start()
require.NoError(t, err)
defer teleport.StopAll()

// set up kube configuration using proxy
_, proxyClientConfig, err := kubeProxyClient(kubeProxyConfig{
t: teleport,
username: username,
kubeGroups: kubeGroups,
})
require.NoError(t, err)

u, err := url.Parse(proxyClientConfig.Host)
require.NoError(t, err)

u.Scheme = "https"
u.Path = fmt.Sprintf("/api/v1/namespaces/%v/pods/%v", testNamespace, testPod)

tlsConfig, err := tlsClientConfig(proxyClientConfig)
require.NoError(t, err)

trans := &http.Transport{
TLSClientConfig: tlsConfig,
}

// call proxy with an HTTP1 client
client := &http.Client{Transport: trans}
resp1, err := client.Get(u.String())
require.NoError(t, err)
defer resp1.Body.Close()
require.Equal(t, resp1.Proto, "HTTP/1.1")

// call proxy with an HTTP2 client
err = http2.ConfigureTransport(trans)
require.NoError(t, err)

resp2, err := client.Get(u.String())
require.NoError(t, err)
defer resp2.Body.Close()
require.Equal(t, resp2.Proto, "HTTP/2.0")
}

// teleKubeConfig sets up teleport with kubernetes turned on
func (s *KubeSuite) teleKubeConfig(hostname string) *service.Config {
tconf := service.MakeDefaultConfig()
Expand Down
34 changes: 28 additions & 6 deletions lib/kube/proxy/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import (
"github.com/julienschmidt/httprouter"
log "github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"
"golang.org/x/net/http2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/httpstream"
Expand Down Expand Up @@ -1440,7 +1441,11 @@ func (f *Forwarder) newClusterSessionRemoteCluster(ctx authContext) (*clusterSes
tlsConfig: tlsConfig,
}

transport := f.newTransport(sess.Dial, sess.tlsConfig)
transport, err := f.newTransport(sess.Dial, sess.tlsConfig)
if err != nil {
return nil, trace.Wrap(err)
}

sess.forwarder, err = forward.New(
forward.FlushInterval(100*time.Millisecond),
forward.RoundTripper(transport),
Expand Down Expand Up @@ -1512,13 +1517,18 @@ func (f *Forwarder) newClusterSessionLocal(ctx authContext) (*clusterSession, er
tlsConfig: creds.tlsConfig,
}

t, err := f.newTransport(sess.Dial, sess.tlsConfig)
if err != nil {
return nil, trace.Wrap(err)
}

// When running inside Kubernetes cluster or using auth/exec providers,
// kubeconfig provides a transport wrapper that adds a bearer token to
// requests
//
// When forwarding request to a remote cluster, this is not needed
// as the proxy uses client cert auth to reach out to remote proxy.
transport, err := creds.wrapTransport(f.newTransport(sess.Dial, sess.tlsConfig))
transport, err := creds.wrapTransport(t)
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down Expand Up @@ -1558,7 +1568,11 @@ func (f *Forwarder) newClusterSessionDirect(ctx authContext, endpoints []kubeClu
noAuditEvents: true,
}

transport := f.newTransport(sess.Dial, sess.tlsConfig)
transport, err := f.newTransport(sess.Dial, sess.tlsConfig)
if err != nil {
return nil, trace.Wrap(err)
}

sess.forwarder, err = forward.New(
forward.FlushInterval(100*time.Millisecond),
forward.RoundTripper(transport),
Expand All @@ -1575,8 +1589,8 @@ func (f *Forwarder) newClusterSessionDirect(ctx authContext, endpoints []kubeClu
// DialFunc is a network dialer function that returns a network connection
type DialFunc func(string, string) (net.Conn, error)

func (f *Forwarder) newTransport(dial DialFunc, tlsConfig *tls.Config) *http.Transport {
return &http.Transport{
func (f *Forwarder) newTransport(dial DialFunc, tlsConfig *tls.Config) (*http.Transport, error) {
t := &http.Transport{
Dial: dial,
TLSClientConfig: tlsConfig,
// Increase the size of the connection pool. This substantially improves the
Expand All @@ -1587,8 +1601,16 @@ func (f *Forwarder) newTransport(dial DialFunc, tlsConfig *tls.Config) *http.Tra
// IdleConnTimeout defines the maximum amount of time before idle connections
// are closed. Leaving this unset will lead to connections open forever and
// will cause memory leaks in a long running process.
IdleConnTimeout: defaults.HTTPIdleTimeout,
IdleConnTimeout: defaults.HTTPIdleTimeout,
ForceAttemptHTTP2: true,
}

err := http2.ConfigureTransport(t)
if err != nil {
return nil, trace.Wrap(err)
}

return t, nil
}

// getOrCreateRequestContext creates a new certificate request for a given context,
Expand Down
6 changes: 6 additions & 0 deletions lib/kube/proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

"github.com/gravitational/trace"
log "github.com/sirupsen/logrus"
"golang.org/x/net/http2"
)

// TLSServerConfig is a configuration for TLS server
Expand Down Expand Up @@ -124,6 +125,7 @@ func NewTLSServer(cfg TLSServerConfig) (*TLSServer, error) {
Server: &http.Server{
Handler: limiter,
ReadHeaderTimeout: apidefaults.DefaultDialTimeout * 2,
TLSConfig: cfg.TLS,
},
}
server.TLS.GetConfigForClient = server.GetConfigForClient
Expand Down Expand Up @@ -173,11 +175,15 @@ func (t *TLSServer) Serve(listener net.Listener) error {
if err != nil {
return trace.Wrap(err)
}

go mux.Serve()
defer mux.Close()

t.mu.Lock()
t.listener = mux.TLS()
if err = http2.ConfigureServer(t.Server, &http2.Server{}); err != nil {
return trace.Wrap(err)
}
t.mu.Unlock()

if t.heartbeat != nil {
Expand Down