Skip to content

Commit

Permalink
feat: app server requests failover
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrielcorado committed Jan 13, 2022
1 parent c7797fc commit ae08bc4
Show file tree
Hide file tree
Showing 7 changed files with 503 additions and 155 deletions.
240 changes: 171 additions & 69 deletions integration/app_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -633,6 +632,92 @@ func TestAppAuditEvents(t *testing.T) {
})
}

func TestAppServersHA(t *testing.T) {
testCases := map[string]struct {
publicAddr func(pack *pack) string
makeRequest func(pack *pack, inCookie string) (status int, err error)
}{
"HTTPApp": {
publicAddr: func(pack *pack) string { return pack.rootAppPublicAddr },
makeRequest: func(pack *pack, inCookie string) (int, error) {
status, _, err := pack.makeRequest(inCookie, http.MethodGet, "/")
return status, err
},
},
"WebSocketApp": {
publicAddr: func(pack *pack) string { return pack.rootWSPublicAddr },
makeRequest: func(pack *pack, inCookie string) (int, error) {
_, err := pack.makeWebsocketRequest(inCookie, "/")
return 0, err
},
},
}

// asserts that the response has error.
responseWithError := func(t *testing.T, status int, err error) {
if status > 0 {
require.NoError(t, err)
require.Equal(t, http.StatusInternalServerError, status)
return
}

require.Error(t, err)
}
// asserts that the response has no errors.
responseWithoutError := func(t *testing.T, status int, err error) {
if status > 0 {
require.NoError(t, err)
require.Equal(t, http.StatusOK, status)
return
}

require.NoError(t, err)
}

for name, test := range testCases {
t.Run(name, func(t *testing.T) {
pack := setupWithOptions(t, appTestOptions{rootAppServersCount: 3})
inCookie := pack.createAppSession(t, test.publicAddr(pack), pack.rootAppClusterName)

status, err := test.makeRequest(pack, inCookie)
responseWithoutError(t, status, err)

// Stop all root app servers.
for i, appServer := range pack.rootAppServers {
appServer.Close()

// issue a request right after a server is gone.
status, err = test.makeRequest(pack, inCookie)
if i == len(pack.rootAppServers)-1 {
// fails only when the last one is closed.
responseWithError(t, status, err)
} else {
// otherwise the request should be handled by another
// server.
responseWithoutError(t, status, err)
}
}

servers := pack.startRootAppServers(t, 3, []service.App{})
status, err = test.makeRequest(pack, inCookie)
responseWithoutError(t, status, err)

// Start an additional app server and stop all current running
// ones.
pack.startRootAppServers(t, 1, []service.App{})
for _, appServer := range servers {
appServer.Close()

// Everytime a app server stops we issue a request to
// guarantee that the requests are going to be resolved by
// the remaining app servers.
status, err = test.makeRequest(pack, inCookie)
responseWithoutError(t, status, err)
}
})
}
}

// pack contains identity as well as initialized Teleport clusters and instances.
type pack struct {
username string
Expand All @@ -645,9 +730,9 @@ type pack struct {
webCookie string
webToken string

rootCluster *TeleInstance
rootAppServer *service.TeleportProcess
rootCertPool *x509.CertPool
rootCluster *TeleInstance
rootAppServers []*service.TeleportProcess
rootCertPool *x509.CertPool

rootAppName string
rootAppPublicAddr string
Expand Down Expand Up @@ -701,12 +786,13 @@ type pack struct {
}

type appTestOptions struct {
extraRootApps []service.App
extraLeafApps []service.App
userLogins []string
userTraits map[string][]string
rootClusterPorts *InstancePorts
leafClusterPorts *InstancePorts
extraRootApps []service.App
extraLeafApps []service.App
userLogins []string
userTraits map[string][]string
rootClusterPorts *InstancePorts
leafClusterPorts *InstancePorts
rootAppServersCount int

rootConfig func(config *service.Config)
leafConfig func(config *service.Config)
Expand Down Expand Up @@ -871,8 +957,7 @@ func setupWithOptions(t *testing.T, opts appTestOptions) *pack {
rcConf := service.MakeDefaultConfig()
rcConf.Console = nil
rcConf.Log = log
rcConf.DataDir, err = ioutil.TempDir("", "cluster-"+p.rootCluster.Secrets.SiteName)
require.NoError(t, err)
rcConf.DataDir = t.TempDir()
t.Cleanup(func() { os.RemoveAll(rcConf.DataDir) })
rcConf.Auth.Enabled = true
rcConf.Auth.Preference.SetSecondFactor("off")
Expand All @@ -888,8 +973,7 @@ func setupWithOptions(t *testing.T, opts appTestOptions) *pack {
lcConf := service.MakeDefaultConfig()
lcConf.Console = nil
lcConf.Log = log
lcConf.DataDir, err = ioutil.TempDir("", "cluster-"+p.leafCluster.Secrets.SiteName)
require.NoError(t, err)
lcConf.DataDir = t.TempDir()
t.Cleanup(func() { os.RemoveAll(lcConf.DataDir) })
lcConf.Auth.Enabled = true
lcConf.Auth.Preference.SetSecondFactor("off")
Expand Down Expand Up @@ -918,64 +1002,17 @@ func setupWithOptions(t *testing.T, opts appTestOptions) *pack {
p.rootCluster.StopAll()
})

raConf := service.MakeDefaultConfig()
raConf.Console = nil
raConf.Log = log
raConf.DataDir, err = ioutil.TempDir("", "app-server-"+p.rootCluster.Secrets.SiteName)
require.NoError(t, err)
t.Cleanup(func() { os.RemoveAll(raConf.DataDir) })
raConf.Token = "static-token-value"
raConf.AuthServers = []utils.NetAddr{
{
AddrNetwork: "tcp",
Addr: net.JoinHostPort(Loopback, p.rootCluster.GetPortWeb()),
},
// At least one rootAppServer should start during the setup
rootAppServersCount := 1
if opts.rootAppServersCount > 0 {
rootAppServersCount = opts.rootAppServersCount
}
raConf.Auth.Enabled = false
raConf.Proxy.Enabled = false
raConf.SSH.Enabled = false
raConf.Apps.Enabled = true
raConf.Apps.Apps = append([]service.App{
{
Name: p.rootAppName,
URI: rootServer.URL,
PublicAddr: p.rootAppPublicAddr,
},
{
Name: p.rootWSAppName,
URI: rootWSServer.URL,
PublicAddr: p.rootWSPublicAddr,
},
{
Name: p.rootWSSAppName,
URI: rootWSSServer.URL,
PublicAddr: p.rootWSSPublicAddr,
},
{
Name: p.jwtAppName,
URI: jwtServer.URL,
PublicAddr: p.jwtAppPublicAddr,
},
{
Name: p.headerAppName,
URI: headerServer.URL,
PublicAddr: p.headerAppPublicAddr,
},
{
Name: p.flushAppName,
URI: flushServer.URL,
PublicAddr: p.flushAppPublicAddr,
},
}, opts.extraRootApps...)
p.rootAppServer, err = p.rootCluster.StartApp(raConf)
require.NoError(t, err)
t.Cleanup(func() { p.rootAppServer.Close() })
p.rootAppServers = p.startRootAppServers(t, rootAppServersCount, opts.extraRootApps)

laConf := service.MakeDefaultConfig()
laConf.Console = nil
laConf.Log = log
laConf.DataDir, err = ioutil.TempDir("", "app-server-"+p.leafCluster.Secrets.SiteName)
require.NoError(t, err)
laConf.DataDir = t.TempDir()
t.Cleanup(func() { os.RemoveAll(laConf.DataDir) })
laConf.Token = "static-token-value"
laConf.AuthServers = []utils.NetAddr{
Expand Down Expand Up @@ -1323,7 +1360,7 @@ func (p *pack) makeWebsocketRequest(sessionCookie, endpoint string) (string, err
return "", trace.Wrap(err)
}
defer conn.Close()
data, err := ioutil.ReadAll(conn)
data, err := io.ReadAll(conn)
if err != nil {
return "", trace.Wrap(err)
}
Expand Down Expand Up @@ -1364,7 +1401,7 @@ func (p *pack) sendRequest(req *http.Request, tlsConfig *tls.Config) (int, strin
defer resp.Body.Close()

// Read in response body.
body, err := ioutil.ReadAll(resp.Body)
body, err := io.ReadAll(resp.Body)
if err != nil {
return 0, "", trace.Wrap(err)
}
Expand Down Expand Up @@ -1396,6 +1433,71 @@ func (p *pack) waitForLogout(appCookie string) (int, error) {
}
}

func (p *pack) startRootAppServers(t *testing.T, count int, extraApps []service.App) []*service.TeleportProcess {
log := utils.NewLoggerForTests()

servers := make([]*service.TeleportProcess, count)

for i := 0; i < count; i++ {
raConf := service.MakeDefaultConfig()
raConf.Console = nil
raConf.Log = log
raConf.DataDir = t.TempDir()
t.Cleanup(func() { os.RemoveAll(raConf.DataDir) })
raConf.Token = "static-token-value"
raConf.AuthServers = []utils.NetAddr{
{
AddrNetwork: "tcp",
Addr: net.JoinHostPort(Loopback, p.rootCluster.GetPortWeb()),
},
}
raConf.Auth.Enabled = false
raConf.Proxy.Enabled = false
raConf.SSH.Enabled = false
raConf.Apps.Enabled = true
raConf.Apps.Apps = append([]service.App{
{
Name: p.rootAppName,
URI: p.rootAppURI,
PublicAddr: p.rootAppPublicAddr,
},
{
Name: p.rootWSAppName,
URI: p.rootWSAppURI,
PublicAddr: p.rootWSPublicAddr,
},
{
Name: p.rootWSSAppName,
URI: p.rootWSSAppURI,
PublicAddr: p.rootWSSPublicAddr,
},
{
Name: p.jwtAppName,
URI: p.jwtAppURI,
PublicAddr: p.jwtAppPublicAddr,
},
{
Name: p.headerAppName,
URI: p.headerAppURI,
PublicAddr: p.headerAppPublicAddr,
},
{
Name: p.flushAppName,
URI: p.flushAppURI,
PublicAddr: p.flushAppPublicAddr,
},
}, extraApps...)

appServer, err := p.rootCluster.StartApp(raConf)
require.NoError(t, err)
t.Cleanup(func() { appServer.Close() })

servers[i] = appServer
}

return servers
}

var forwardedHeaderNames = []string{
teleport.AppJWTHeader,
teleport.AppCFHeader,
Expand Down
Loading

0 comments on commit ae08bc4

Please sign in to comment.