Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry app access requests in different servers when there is a connection failure #9288

Merged
merged 1 commit into from
Jan 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 171 additions & 69 deletions integration/app_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -633,6 +632,92 @@ func TestAppAuditEvents(t *testing.T) {
})
}

func TestAppServersHA(t *testing.T) {
testCases := map[string]struct {
publicAddr func(pack *pack) string
makeRequest func(pack *pack, inCookie string) (status int, err error)
}{
"HTTPApp": {
publicAddr: func(pack *pack) string { return pack.rootAppPublicAddr },
makeRequest: func(pack *pack, inCookie string) (int, error) {
status, _, err := pack.makeRequest(inCookie, http.MethodGet, "/")
return status, err
},
},
"WebSocketApp": {
publicAddr: func(pack *pack) string { return pack.rootWSPublicAddr },
makeRequest: func(pack *pack, inCookie string) (int, error) {
_, err := pack.makeWebsocketRequest(inCookie, "/")
return 0, err
},
},
}

// asserts that the response has error.
responseWithError := func(t *testing.T, status int, err error) {
if status > 0 {
require.NoError(t, err)
require.Equal(t, http.StatusInternalServerError, status)
return
}

require.Error(t, err)
}
// asserts that the response has no errors.
responseWithoutError := func(t *testing.T, status int, err error) {
if status > 0 {
require.NoError(t, err)
require.Equal(t, http.StatusOK, status)
return
}

require.NoError(t, err)
}

for name, test := range testCases {
t.Run(name, func(t *testing.T) {
pack := setupWithOptions(t, appTestOptions{rootAppServersCount: 3})
inCookie := pack.createAppSession(t, test.publicAddr(pack), pack.rootAppClusterName)

status, err := test.makeRequest(pack, inCookie)
responseWithoutError(t, status, err)

// Stop all root app servers.
for i, appServer := range pack.rootAppServers {
appServer.Close()

// issue a request right after a server is gone.
status, err = test.makeRequest(pack, inCookie)
if i == len(pack.rootAppServers)-1 {
// fails only when the last one is closed.
responseWithError(t, status, err)
} else {
// otherwise the request should be handled by another
// server.
responseWithoutError(t, status, err)
}
}

servers := pack.startRootAppServers(t, 3, []service.App{})
status, err = test.makeRequest(pack, inCookie)
responseWithoutError(t, status, err)

// Start an additional app server and stop all current running
// ones.
pack.startRootAppServers(t, 1, []service.App{})
for _, appServer := range servers {
appServer.Close()

// Everytime a app server stops we issue a request to
// guarantee that the requests are going to be resolved by
// the remaining app servers.
status, err = test.makeRequest(pack, inCookie)
responseWithoutError(t, status, err)
}
})
}
}

// pack contains identity as well as initialized Teleport clusters and instances.
type pack struct {
username string
Expand All @@ -645,9 +730,9 @@ type pack struct {
webCookie string
webToken string

rootCluster *TeleInstance
rootAppServer *service.TeleportProcess
rootCertPool *x509.CertPool
rootCluster *TeleInstance
rootAppServers []*service.TeleportProcess
rootCertPool *x509.CertPool

rootAppName string
rootAppPublicAddr string
Expand Down Expand Up @@ -701,12 +786,13 @@ type pack struct {
}

type appTestOptions struct {
extraRootApps []service.App
extraLeafApps []service.App
userLogins []string
userTraits map[string][]string
rootClusterPorts *InstancePorts
leafClusterPorts *InstancePorts
extraRootApps []service.App
extraLeafApps []service.App
userLogins []string
userTraits map[string][]string
rootClusterPorts *InstancePorts
leafClusterPorts *InstancePorts
rootAppServersCount int

rootConfig func(config *service.Config)
leafConfig func(config *service.Config)
Expand Down Expand Up @@ -871,8 +957,7 @@ func setupWithOptions(t *testing.T, opts appTestOptions) *pack {
rcConf := service.MakeDefaultConfig()
rcConf.Console = nil
rcConf.Log = log
rcConf.DataDir, err = ioutil.TempDir("", "cluster-"+p.rootCluster.Secrets.SiteName)
require.NoError(t, err)
rcConf.DataDir = t.TempDir()
t.Cleanup(func() { os.RemoveAll(rcConf.DataDir) })
rcConf.Auth.Enabled = true
rcConf.Auth.Preference.SetSecondFactor("off")
Expand All @@ -888,8 +973,7 @@ func setupWithOptions(t *testing.T, opts appTestOptions) *pack {
lcConf := service.MakeDefaultConfig()
lcConf.Console = nil
lcConf.Log = log
lcConf.DataDir, err = ioutil.TempDir("", "cluster-"+p.leafCluster.Secrets.SiteName)
require.NoError(t, err)
lcConf.DataDir = t.TempDir()
t.Cleanup(func() { os.RemoveAll(lcConf.DataDir) })
lcConf.Auth.Enabled = true
lcConf.Auth.Preference.SetSecondFactor("off")
Expand Down Expand Up @@ -918,64 +1002,17 @@ func setupWithOptions(t *testing.T, opts appTestOptions) *pack {
p.rootCluster.StopAll()
})

raConf := service.MakeDefaultConfig()
raConf.Console = nil
raConf.Log = log
raConf.DataDir, err = ioutil.TempDir("", "app-server-"+p.rootCluster.Secrets.SiteName)
require.NoError(t, err)
t.Cleanup(func() { os.RemoveAll(raConf.DataDir) })
raConf.Token = "static-token-value"
raConf.AuthServers = []utils.NetAddr{
{
AddrNetwork: "tcp",
Addr: net.JoinHostPort(Loopback, p.rootCluster.GetPortWeb()),
},
// At least one rootAppServer should start during the setup
rootAppServersCount := 1
if opts.rootAppServersCount > 0 {
rootAppServersCount = opts.rootAppServersCount
}
raConf.Auth.Enabled = false
raConf.Proxy.Enabled = false
raConf.SSH.Enabled = false
raConf.Apps.Enabled = true
raConf.Apps.Apps = append([]service.App{
{
Name: p.rootAppName,
URI: rootServer.URL,
PublicAddr: p.rootAppPublicAddr,
},
{
Name: p.rootWSAppName,
URI: rootWSServer.URL,
PublicAddr: p.rootWSPublicAddr,
},
{
Name: p.rootWSSAppName,
URI: rootWSSServer.URL,
PublicAddr: p.rootWSSPublicAddr,
},
{
Name: p.jwtAppName,
URI: jwtServer.URL,
PublicAddr: p.jwtAppPublicAddr,
},
{
Name: p.headerAppName,
URI: headerServer.URL,
PublicAddr: p.headerAppPublicAddr,
},
{
Name: p.flushAppName,
URI: flushServer.URL,
PublicAddr: p.flushAppPublicAddr,
},
}, opts.extraRootApps...)
p.rootAppServer, err = p.rootCluster.StartApp(raConf)
require.NoError(t, err)
t.Cleanup(func() { p.rootAppServer.Close() })
p.rootAppServers = p.startRootAppServers(t, rootAppServersCount, opts.extraRootApps)

laConf := service.MakeDefaultConfig()
laConf.Console = nil
laConf.Log = log
laConf.DataDir, err = ioutil.TempDir("", "app-server-"+p.leafCluster.Secrets.SiteName)
require.NoError(t, err)
laConf.DataDir = t.TempDir()
t.Cleanup(func() { os.RemoveAll(laConf.DataDir) })
laConf.Token = "static-token-value"
laConf.AuthServers = []utils.NetAddr{
Expand Down Expand Up @@ -1323,7 +1360,7 @@ func (p *pack) makeWebsocketRequest(sessionCookie, endpoint string) (string, err
return "", trace.Wrap(err)
}
defer conn.Close()
data, err := ioutil.ReadAll(conn)
data, err := io.ReadAll(conn)
if err != nil {
return "", trace.Wrap(err)
}
Expand Down Expand Up @@ -1364,7 +1401,7 @@ func (p *pack) sendRequest(req *http.Request, tlsConfig *tls.Config) (int, strin
defer resp.Body.Close()

// Read in response body.
body, err := ioutil.ReadAll(resp.Body)
body, err := io.ReadAll(resp.Body)
if err != nil {
return 0, "", trace.Wrap(err)
}
Expand Down Expand Up @@ -1396,6 +1433,71 @@ func (p *pack) waitForLogout(appCookie string) (int, error) {
}
}

func (p *pack) startRootAppServers(t *testing.T, count int, extraApps []service.App) []*service.TeleportProcess {
log := utils.NewLoggerForTests()

servers := make([]*service.TeleportProcess, count)

for i := 0; i < count; i++ {
raConf := service.MakeDefaultConfig()
raConf.Console = nil
raConf.Log = log
raConf.DataDir = t.TempDir()
t.Cleanup(func() { os.RemoveAll(raConf.DataDir) })
raConf.Token = "static-token-value"
raConf.AuthServers = []utils.NetAddr{
{
AddrNetwork: "tcp",
Addr: net.JoinHostPort(Loopback, p.rootCluster.GetPortWeb()),
},
}
raConf.Auth.Enabled = false
raConf.Proxy.Enabled = false
raConf.SSH.Enabled = false
raConf.Apps.Enabled = true
raConf.Apps.Apps = append([]service.App{
{
Name: p.rootAppName,
URI: p.rootAppURI,
PublicAddr: p.rootAppPublicAddr,
},
{
Name: p.rootWSAppName,
URI: p.rootWSAppURI,
PublicAddr: p.rootWSPublicAddr,
},
{
Name: p.rootWSSAppName,
URI: p.rootWSSAppURI,
PublicAddr: p.rootWSSPublicAddr,
},
{
Name: p.jwtAppName,
URI: p.jwtAppURI,
PublicAddr: p.jwtAppPublicAddr,
},
{
Name: p.headerAppName,
URI: p.headerAppURI,
PublicAddr: p.headerAppPublicAddr,
},
{
Name: p.flushAppName,
URI: p.flushAppURI,
PublicAddr: p.flushAppPublicAddr,
},
}, extraApps...)

appServer, err := p.rootCluster.StartApp(raConf)
require.NoError(t, err)
t.Cleanup(func() { appServer.Close() })

servers[i] = appServer
}

return servers
}

var forwardedHeaderNames = []string{
teleport.AppJWTHeader,
teleport.AppCFHeader,
Expand Down
Loading