Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Follow-up tests for multi-tenant work #279

Merged
merged 21 commits into from
Oct 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 119 additions & 9 deletions interceptor/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ import (
"fmt"
"io"
"net/http"
"strconv"
"testing"
"time"

"github.com/go-logr/logr"
"github.com/kedacore/http-add-on/interceptor/config"
"github.com/kedacore/http-add-on/pkg/k8s"
kedanet "github.com/kedacore/http-add-on/pkg/net"
"github.com/kedacore/http-add-on/pkg/queue"
"github.com/kedacore/http-add-on/pkg/routing"
"github.com/kedacore/http-add-on/pkg/test"
Expand All @@ -23,17 +25,126 @@ import (
)

func TestRunProxyServerCountMiddleware(t *testing.T) {
// r := require.New(t)
// ctx, done := context.WithCancel(
// context.Background(),
// )
// defer done()
// r.NoError(runProxyServer(ctx, logr.Discard(), q, waitFunc, routingTable, timeouts, port))

// see https://github.com/kedacore/http-add-on/issues/245
const (
port = 8080
host = "samplehost"
)
r := require.New(t)
ctx, done := context.WithCancel(
context.Background(),
)
defer done()

originHdl := kedanet.NewTestHTTPHandlerWrapper(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}),
)
originSrv, originURL, err := kedanet.StartTestServer(originHdl)
r.NoError(err)
defer originSrv.Close()
originPort, err := strconv.Atoi(originURL.Port())
r.NoError(err)
g, ctx := errgroup.WithContext(ctx)
q := queue.NewFakeCounter()
routingTable := routing.NewTable()
// set up a fake host that we can spoof
// when we later send request to the proxy,
// so that the proxy calculates a URL for that
// host that points to the (above) fake origin
// server.
routingTable.AddTarget(
host,
targetFromURL(
originURL,
originPort,
"testdepl",
123,
),
)
timeouts := &config.Timeouts{}
waiterCh := make(chan struct{})
waitFunc := func(ctx context.Context, name string) error {
<-waiterCh
return nil
}
g.Go(func() error {
return runProxyServer(
ctx,
logr.Discard(),
q,
waitFunc,
routingTable,
timeouts,
port,
)
})
// wait for server to start
time.Sleep(500 * time.Millisecond)

// make an HTTP request in the background
g.Go(func() error {
req, err := http.NewRequest(
"GET",
fmt.Sprintf(
"http://0.0.0.0:%d", port,
), nil,
)
if err != nil {
return err
}
req.Host = host
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf(
"unexpected status code: %d",
resp.StatusCode,
)
}
return nil
})
time.Sleep(100 * time.Millisecond)
select {
case hostAndCount := <-q.ResizedCh:
r.Equal(host, hostAndCount.Host)
r.Equal(+1, hostAndCount.Count)
case <-time.After(500 * time.Millisecond):
r.Fail("timeout waiting for +1 queue resize")
}

// tell the wait func to proceed
waiterCh <- struct{}{}

select {
case hostAndCount := <-q.ResizedCh:
r.Equal(host, hostAndCount.Host)
r.Equal(-1, hostAndCount.Count)
case <-time.After(500 * time.Millisecond):
r.Fail("timeout waiting for -1 queue resize")
}

// check the queue to make sure all counts are at 0
countsPtr, err := q.Current()
r.NoError(err)
counts := countsPtr.Counts
r.Equal(1, len(counts))
_, foundHost := counts[host]
r.True(
foundHost,
"couldn't find host %s in the queue",
host,
)
r.Equal(0, counts[host])

done()
r.Error(g.Wait())
}

func TestRunAdminServerDeploymentsEndpoint(t *testing.T) {

ctx := context.Background()
ctx, done := context.WithCancel(ctx)
defer done()
Expand Down Expand Up @@ -148,5 +259,4 @@ func TestRunAdminServerConfig(t *testing.T) {

done()
r.Error(errgrp.Wait())

}
42 changes: 24 additions & 18 deletions interceptor/proxy_handlers_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,14 @@ func TestIntegrationHappyPath(t *testing.T) {
},
})

originHost, originPort, err := splitHostPort(h.originURL.Host)
originPort, err := strconv.Atoi(h.originURL.Port())
r.NoError(err)
h.routingTable.AddTarget(hostForTest(t), routing.Target{
Service: originHost,
Port: originPort,
Deployment: deplName,
})
h.routingTable.AddTarget(hostForTest(t), targetFromURL(
h.originURL,
originPort,
deplName,
123,
))

// happy path
res, err := doRequest(
Expand Down Expand Up @@ -115,13 +116,14 @@ func TestIntegrationNoReplicas(t *testing.T) {
h, err := newHarness(deployTimeout, time.Second)
r.NoError(err)

originHost, originPort, err := splitHostPort(h.originURL.Host)
originPort, err := strconv.Atoi(h.originURL.Port())
r.NoError(err)
h.routingTable.AddTarget(hostForTest(t), routing.Target{
Service: originHost,
Port: originPort,
Deployment: deployName,
})
h.routingTable.AddTarget(hostForTest(t), targetFromURL(
h.originURL,
originPort,
deployName,
123,
))

// 0 replicas
h.deplCache.Set(deployName, appsv1.Deployment{
Expand Down Expand Up @@ -162,13 +164,17 @@ func TestIntegrationWaitReplicas(t *testing.T) {
r.NoError(err)

// add host to routing table
originHost, originPort, err := splitHostPort(h.originURL.Host)
originPort, err := strconv.Atoi(h.originURL.Port())
r.NoError(err)
h.routingTable.AddTarget(hostForTest(t), routing.Target{
Service: originHost,
Port: originPort,
Deployment: deployName,
})
h.routingTable.AddTarget(
hostForTest(t),
targetFromURL(
h.originURL,
originPort,
deployName,
123,
),
)

// set up a deployment with zero replicas and create
// a watcher we can use later to fake-send a deployment
Expand Down
28 changes: 22 additions & 6 deletions interceptor/proxy_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net/http"
"net/url"
"strconv"
"strings"
"testing"
Expand All @@ -30,13 +31,14 @@ func TestImmediatelySuccessfulProxy(t *testing.T) {
r.NoError(err)
defer srv.Close()
routingTable := routing.NewTable()
portInt, err := strconv.Atoi(originURL.Port())
originPort, err := strconv.Atoi(originURL.Port())
r.NoError(err)
target := routing.Target{
Service: strings.Split(originURL.Host, ":")[0],
Port: portInt,
Deployment: "testdepl",
}
target := targetFromURL(
originURL,
originPort,
"testdepl",
123,
)
routingTable.AddTarget(host, target)

timeouts := defaultTimeouts()
Expand Down Expand Up @@ -332,3 +334,17 @@ func notifyingFunc() (func(context.Context, string) error, <-chan struct{}, func
}
}, calledCh, finishFunc
}

func targetFromURL(
u *url.URL,
port int,
deployment string,
targetPendingReqs int32,
) routing.Target {
return routing.Target{
Service: strings.Split(u.Host, ":")[0],
Port: port,
Deployment: deployment,
TargetPendingRequests: targetPendingReqs,
}
}
53 changes: 52 additions & 1 deletion pkg/k8s/deployment_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,58 @@ func callMergeAndBcast(
}

func TestK8sDeploymentCacheAddEvt(t *testing.T) {
// see https://github.com/kedacore/http-add-on/issues/245
r := require.New(t)
ctx, done := context.WithCancel(
context.Background(),
)
defer done()
cache, err := NewK8sDeploymentCache(ctx, logr.Discard(), newFakeDeploymentListerWatcher())
r.NoError(err)
checkDeploymentsInCache := func(depls ...appsv1.Deployment) {
t.Helper()
r := require.New(t)
for _, depl := range depls {
ret, ok := cache.latest[depl.GetName()]
r.True(ok)
r.Equal(depl, ret)
}
}

// add a first deployment and make sure it exists in
// the latest cache
depl1 := newDeployment("testns", "testdepl1", "testing", nil, nil, nil, core.PullAlways)
evt1 := watch.Event{
Type: watch.Added, // doesn't matter, addEvt doesn't look at this
Object: depl1,
}
r.NoError(cache.addEvt(evt1))
r.Equal(1, len(cache.latest))
checkDeploymentsInCache(*depl1)

// add a second (different name) and make sure both exist
// in the cache
depl2 := *depl1
depl2.Name = "testdepl2"
evt2 := watch.Event{
Type: watch.Modified,
Object: &depl2,
}
r.NoError(cache.addEvt(evt2))
r.Equal(2, len(cache.latest))
checkDeploymentsInCache(*depl1, depl2)

// try to add a pod, make sure addEvt fails, and afterward, make sure
// the original 2 deployments are still in the cache
otherObj := &core.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "somepod"},
}
evt3 := watch.Event{
Type: watch.Added,
Object: otherObj,
}
r.Error(cache.addEvt(evt3))
r.Equal(2, len(cache.latest))
checkDeploymentsInCache(*depl1, depl2)
}

// test to make sure that, even when no events come through, the
Expand Down
10 changes: 8 additions & 2 deletions pkg/net/mock_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,14 @@ func NewTestHTTPHandlerWrapper(hdl http.Handler) *TestHTTPHandlerWrapper {
}
}

// StartTestServer starts an *httptest.Server, parses its URL, and returns both values.
// The caller is responsible for closing the returned server
// StartTestServer creates and starts an *httptest.Server
// in the background, then parses its URL and returns both
// values.
//
// If this function returns a nil error, the caller is
// responsible for closing the returned server. If it
// returns a non-nil error, the server and URL
// will be nil.
func StartTestServer(hdl http.Handler) (*httptest.Server, *url.URL, error) {
srv := httptest.NewServer(hdl)
u, err := url.Parse(srv.URL)
Expand Down
Loading