Skip to content

Commit

Permalink
Allow node to handle old and new way of client IP propagation on the …
Browse files Browse the repository at this point in the history
…same listener (#22572)

* Allow node to handle old and new way of client IP propagation on same listener

With addition of signed PROXY headers, node was listening on multiplexer, but because
 of that it couldn't processing incoming connection from older proxies
 when ProxyHelloSignature was used, because
 both ends were waiting for the other side to send data first.
 Here we integrate ability to handle PROXY headers into connection itself,
 so we can start ssh server without waiting for multiplexer to detect connection

* Remove unneeded code

* Improve comment's wording

Co-authored-by: Michael Wilson <[email protected]>

* Improve comment's wording

Co-authored-by: Michael Wilson <[email protected]>

* Fix imports

* Unexport function

* Add godoc

* Remove unneeded comment

* Move ProxyHelloSignature to constants

* Check that proxyline is verified

* Use Warn() instead of Warnf()

* Move ProxyHelloSignature to api/constants

* Add timeout for getting host CA during proxyline verification.

* Rearrange conditions

Co-authored-by: Edoardo Spadolini <[email protected]>

* Clarify comment.

---------

Co-authored-by: Michael Wilson <[email protected]>
Co-authored-by: Edoardo Spadolini <[email protected]>
  • Loading branch information
3 people authored Mar 7, 2023
1 parent 0d6f7a4 commit a5370d5
Show file tree
Hide file tree
Showing 14 changed files with 374 additions and 146 deletions.
8 changes: 8 additions & 0 deletions api/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,14 @@ const (
// allowed GCP service accounts.
TraitGCPServiceAccounts = "gcp_service_accounts"
)
const (
// ProxyHelloSignature is a string which Teleport proxy will send
// right after the initial SSH "handshake/version" message if it detects
// talking to a Teleport server.
//
// This is also leveraged by tsh to propagate its tracing span ID.
ProxyHelloSignature = "Teleport-Proxy"
)

const (
// TimeoutGetClusterAlerts is the timeout for grabbing cluster alerts from tctl and tsh
Expand Down
3 changes: 2 additions & 1 deletion api/observability/tracing/ssh/ssh.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
oteltrace "go.opentelemetry.io/otel/trace"
"golang.org/x/crypto/ssh"

"github.com/gravitational/teleport/api/constants"
"github.com/gravitational/teleport/api/observability/tracing"
"github.com/gravitational/teleport/api/utils/sshutils"
)
Expand Down Expand Up @@ -136,7 +137,7 @@ func NewClientConn(ctx context.Context, conn net.Conn, addr string, config *ssh.
if len(hp.TracingContext) > 0 {
payloadJSON, err := json.Marshal(hp)
if err == nil {
payload := fmt.Sprintf("%s%s\x00", sshutils.ProxyHelloSignature, payloadJSON)
payload := fmt.Sprintf("%s%s\x00", constants.ProxyHelloSignature, payloadJSON)
if _, err := conn.Write([]byte(payload)); err != nil {
log.WithError(err).Warnf("Failed to pass along tracing context to proxy %v", addr)
}
Expand Down
9 changes: 0 additions & 9 deletions api/utils/sshutils/ssh.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,6 @@ import (
"github.com/gravitational/teleport/api/defaults"
)

const (
// ProxyHelloSignature is a string which Teleport proxy will send
// right after the initial SSH "handshake/version" message if it detects
// talking to a Teleport server.
//
// This is also leveraged by tsh to propagate its tracing span ID.
ProxyHelloSignature = "Teleport-Proxy"
)

// HandshakePayload structure is sent as a JSON blob by the teleport
// proxy to every SSH server who identifies itself as Teleport server
//
Expand Down
5 changes: 4 additions & 1 deletion integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4201,8 +4201,11 @@ func testProxyHostKeyCheck(t *testing.T, suite *integrationTestSuite) {
instance := suite.NewTeleportWithConfig(makeConfig())
defer instance.StopAll()

caGetter := func(ctx context.Context, id types.CertAuthID, loadKeys bool) (types.CertAuthority, error) {
return instance.Process.GetAuthServer().Cache.GetCertAuthority(ctx, id, loadKeys)
}
proxyEnabledListener, err := helpers.CreatePROXYEnabledListener(context.Background(), t, net.JoinHostPort(Host, strconv.Itoa(nodePort)),
instance.Process.GetAuthServer().Cache, instance.Secrets.SiteName)
caGetter, instance.Secrets.SiteName)
require.NoError(t, err)

sshNode, err := helpers.NewDiscardServer(hostSigner, proxyEnabledListener)
Expand Down
20 changes: 9 additions & 11 deletions lib/multiplexer/multiplexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,10 @@ import (
log "github.com/sirupsen/logrus"

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/constants"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/api/utils/sshutils"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/jwt"
"github.com/gravitational/teleport/lib/services"
"github.com/gravitational/teleport/lib/tlsca"
"github.com/gravitational/teleport/lib/utils"
)
Expand All @@ -54,9 +53,8 @@ var (
)

// CertAuthorityGetter allows to get cluster's host CA for verification of signed PROXY headers.
type CertAuthorityGetter interface {
GetCertAuthority(ctx context.Context, id types.CertAuthID, loadKeys bool, opts ...services.MarshalOption) (types.CertAuthority, error)
}
// We define our own version to not create dependency on the 'services' package, which causes circular references
type CertAuthorityGetter = func(ctx context.Context, id types.CertAuthID, loadKeys bool) (types.CertAuthority, error)

// Config is a multiplexer config
type Config struct {
Expand Down Expand Up @@ -422,7 +420,7 @@ func (m *Mux) detect(conn net.Conn) (*Conn, error) {
continue
}

if m.CertAuthorityGetter != nil && newProxyLine.isSigned() && !newProxyLine.IsVerified {
if m.CertAuthorityGetter != nil && newProxyLine.IsSigned() && !newProxyLine.IsVerified {
return nil, trace.BadParameter("could not verify proxy line signature")
}

Expand Down Expand Up @@ -501,10 +499,10 @@ func (p Protocol) String() string {

var (
proxyPrefix = []byte{'P', 'R', 'O', 'X', 'Y'}
proxyV2Prefix = []byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A}
ProxyV2Prefix = []byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A}
sshPrefix = []byte{'S', 'S', 'H'}
tlsPrefix = []byte{0x16}
proxyHelloPrefix = []byte(sshutils.ProxyHelloSignature)
proxyHelloPrefix = []byte(constants.ProxyHelloSignature)
)

// This section defines Postgres wire protocol messages detected by Teleport:
Expand Down Expand Up @@ -564,15 +562,15 @@ func detectProto(r *bufio.Reader) (Protocol, error) {
switch {
case bytes.HasPrefix(in, proxyPrefix):
return ProtoProxy, nil
case bytes.HasPrefix(in, proxyV2Prefix[:8]):
case bytes.HasPrefix(in, ProxyV2Prefix[:8]):
// if the first 8 bytes matches the first 8 bytes of the proxy
// protocol v2 magic bytes, read more of the connection so we can
// ensure all magic bytes match
in, err = r.Peek(len(proxyV2Prefix))
in, err = r.Peek(len(ProxyV2Prefix))
if err != nil {
return ProtoUnknown, trace.Wrap(err, "failed to peek connection")
}
if bytes.HasPrefix(in, proxyV2Prefix) {
if bytes.HasPrefix(in, ProxyV2Prefix) {
return ProtoProxyV2, nil
}
case bytes.HasPrefix(in, proxyHelloPrefix[:8]):
Expand Down
132 changes: 62 additions & 70 deletions lib/multiplexer/multiplexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"errors"
"fmt"
"io"
Expand All @@ -36,6 +37,7 @@ import (

"github.com/jackc/pgproto3/v2"
"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/crypto/ssh"
"google.golang.org/grpc"
Expand All @@ -49,8 +51,6 @@ import (
"github.com/gravitational/teleport/lib/httplib"
"github.com/gravitational/teleport/lib/jwt"
"github.com/gravitational/teleport/lib/multiplexer/test"
"github.com/gravitational/teleport/lib/services"
"github.com/gravitational/teleport/lib/sshutils"
"github.com/gravitational/teleport/lib/tlsca"
"github.com/gravitational/teleport/lib/utils"
"github.com/gravitational/teleport/lib/utils/cert"
Expand Down Expand Up @@ -93,36 +93,20 @@ func TestMux(t *testing.T) {
backend1.StartTLS()
defer backend1.Close()

called := false
sshHandler := sshutils.NewChanHandlerFunc(func(_ context.Context, _ *sshutils.ConnectionContext, nch ssh.NewChannel) {
called = true
err := nch.Reject(ssh.Prohibited, "nothing to see here")
require.NoError(t, err)
})
go startSSHServer(t, mux.SSH())

srv, err := sshutils.NewServer(
"test",
utils.NetAddr{AddrNetwork: "tcp", Addr: "localhost:0"},
sshHandler,
[]ssh.Signer{signer},
sshutils.AuthMethods{Password: pass("abc123")},
)
require.NoError(t, err)
go srv.Serve(mux.SSH())
defer srv.Close()
clt, err := ssh.Dial("tcp", listener.Addr().String(), &ssh.ClientConfig{
Auth: []ssh.AuthMethod{ssh.Password("abc123")},
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
Timeout: time.Second,
HostKeyCallback: ssh.FixedHostKey(signer.PublicKey()),
})
require.NoError(t, err)
defer clt.Close()

// call new session to initiate opening new channel
_, err = clt.NewSession()
require.NotNil(t, err)
// make sure the channel handler was called OK
require.True(t, called)
// Make sure the SSH connection works correctly
ok, response, err := clt.SendRequest("echo", true, []byte("beep"))
require.NoError(t, err)
require.True(t, ok)
require.Equal(t, "beep", string(response))

client := testClient(backend1)
re, err := client.Get(backend1.URL)
Expand Down Expand Up @@ -424,36 +408,20 @@ func TestMux(t *testing.T) {
backend1.StartTLS()
defer backend1.Close()

called := false
sshHandler := sshutils.NewChanHandlerFunc(func(_ context.Context, _ *sshutils.ConnectionContext, nch ssh.NewChannel) {
called = true
err := nch.Reject(ssh.Prohibited, "nothing to see here")
require.NoError(t, err)
})
go startSSHServer(t, mux.SSH())

srv, err := sshutils.NewServer(
"test",
utils.NetAddr{AddrNetwork: "tcp", Addr: "localhost:0"},
sshHandler,
[]ssh.Signer{signer},
sshutils.AuthMethods{Password: pass("abc123")},
)
require.NoError(t, err)
go srv.Serve(mux.SSH())
defer srv.Close()
clt, err := ssh.Dial("tcp", listener.Addr().String(), &ssh.ClientConfig{
Auth: []ssh.AuthMethod{ssh.Password("abc123")},
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
Timeout: time.Second,
HostKeyCallback: ssh.FixedHostKey(signer.PublicKey()),
})
require.NoError(t, err)
defer clt.Close()

// call new session to initiate opening new channel
_, err = clt.NewSession()
require.NotNil(t, err)
// make sure the channel handler was called OK
require.Equal(t, called, true)
// Make sure the SSH connection works correctly
ok, response, err := clt.SendRequest("echo", true, []byte("beep"))
require.NoError(t, err)
require.True(t, ok)
require.Equal(t, "beep", string(response))

client := testClient(backend1)
re, err := client.Get(backend1.URL)
Expand Down Expand Up @@ -973,14 +941,6 @@ func TestMux(t *testing.T) {
})
}

type mockCAsGetter struct {
HostCA types.CertAuthority
}

func (m *mockCAsGetter) GetCertAuthority(ctx context.Context, id types.CertAuthID, loadKeys bool, opts ...services.MarshalOption) (types.CertAuthority, error) {
return m.HostCA, nil
}

func TestProtocolString(t *testing.T) {
for i := -1; i < len(protocolStrings)+1; i++ {
got := Protocol(i).String()
Expand Down Expand Up @@ -1027,15 +987,6 @@ func testClient(srv *httptest.Server) *http.Client {
}
}

func pass(need string) sshutils.PasswordFunc {
return func(conn ssh.ConnMetadata, password []byte) (*ssh.Permissions, error) {
if string(password) == need {
return nil, nil
}
return nil, fmt.Errorf("passwords don't match")
}
}

type noopListener struct {
addr net.Addr
}
Expand Down Expand Up @@ -1096,8 +1047,10 @@ func getTestCertCAsGetterAndSigner(t testing.TB, clusterName string) ([]byte, Ce
},
})
require.NoError(t, err)
mockCAsGetter := &mockCAsGetter{HostCA: ca}

mockCAGetter := func(ctx context.Context, id types.CertAuthID, loadKeys bool) (types.CertAuthority, error) {
return ca, nil
}
proxyPriv, err := rsa.GenerateKey(rand.Reader, constants.RSAKeySize)
require.NoError(t, err)

Expand All @@ -1120,22 +1073,61 @@ func getTestCertCAsGetterAndSigner(t testing.TB, clusterName string) ([]byte, Ce
tlsProxyCertPEM, err := tlsCA.GenerateCertificate(certReq)
require.NoError(t, err)
clock := clockwork.NewFakeClockAt(time.Now())
jwtSigner, err := services.GetJWTSigner(proxyPriv, clusterName, clock)
jwtSigner, err := jwt.New(&jwt.Config{
Clock: clock,
Algorithm: defaults.ApplicationTokenAlgorithm,
ClusterName: clusterName,
PrivateKey: proxyPriv,
})
require.NoError(t, err)

tlsProxyCertDER, err := tlsca.ParseCertificatePEM(tlsProxyCertPEM)
require.NoError(t, err)

return tlsProxyCertDER.Raw, mockCAsGetter, jwtSigner
return tlsProxyCertDER.Raw, mockCAGetter, jwtSigner
}

func startSSHServer(t *testing.T, listener net.Listener) {
nConn, err := listener.Accept()
assert.NoError(t, err)

t.Cleanup(func() { nConn.Close() })

block, _ := pem.Decode(fixtures.LocalhostKey)
pkey, err := x509.ParsePKCS1PrivateKey(block.Bytes)
assert.NoError(t, err)

signer, err := ssh.NewSignerFromKey(pkey)
assert.NoError(t, err)

config := &ssh.ServerConfig{NoClientAuth: true}
config.AddHostKey(signer)

conn, _, reqs, err := ssh.NewServerConn(nConn, config)
assert.NoError(t, err)
if err != nil {
return
}
t.Cleanup(func() { conn.Close() })

go func() {
for newReq := range reqs {
if newReq.Type == "echo" {
newReq.Reply(true, newReq.Payload)
}
err := newReq.Reply(false, nil)
assert.NoError(t, err)
}
}()
}

func BenchmarkMux_ProxyV2Signature(b *testing.B) {
const clusterName = "test-teleport"

clock := clockwork.NewFakeClockAt(time.Now())
tlsProxyCert, casGetter, jwtSigner := getTestCertCAsGetterAndSigner(b, clusterName)
tlsProxyCert, caGetter, jwtSigner := getTestCertCAsGetterAndSigner(b, clusterName)

ca, err := casGetter.GetCertAuthority(context.Background(), types.CertAuthID{
ca, err := caGetter(context.Background(), types.CertAuthID{
Type: types.HostCA,
DomainName: clusterName,
}, false)
Expand Down
Loading

0 comments on commit a5370d5

Please sign in to comment.