Skip to content

Commit

Permalink
Introduce endpointregistry component (#2535)
Browse files Browse the repository at this point in the history
This component is needed to create an easy way for loadbalancer,
fadein, etc. to find out required information about endpoints to
increase readability of those parts and enable us to provide
new features there, for example, passive health checks for endpoints.

Signed-off-by: Roman Zavodskikh <[email protected]>
Co-authored-by: Roman Zavodskikh <[email protected]>
  • Loading branch information
RomanZavodskikh and Roman Zavodskikh authored Sep 8, 2023
1 parent c7faea9 commit 239a5b3
Show file tree
Hide file tree
Showing 10 changed files with 351 additions and 36 deletions.
9 changes: 1 addition & 8 deletions filters/fadein/fadein.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ func NewPostProcessor() routing.PostProcessor {
}

func (p *postProcessor) Do(r []*routing.Route) []*routing.Route {
const configErrFmt = "Error while processing endpoint fade-in settings: %s, %s, %v."
now := time.Now()

for _, ri := range r {
Expand Down Expand Up @@ -229,13 +228,7 @@ func (p *postProcessor) Do(r []*routing.Route) []*routing.Route {
for i := range ri.LBEndpoints {
ep := &ri.LBEndpoints[i]

s, h, err := normalizeSchemeHost(ep.Scheme, ep.Host)
if err != nil {
log.Errorf(configErrFmt, ep.Scheme, ep.Host, err)
continue
}

key := endpointKey(s, h)
key := endpointKey(ep.Scheme, ep.Host)
detected := p.detected[key].when
if detected.IsZero() || endpointsCreated[key].After(detected) {
detected = now
Expand Down
4 changes: 2 additions & 2 deletions filters/fadein/fadein_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,8 @@ func TestPostProcessor(t *testing.T) {

rt, _ := createRouting(t, routes)
r := route(rt, "/")
if r == nil || len(r.LBEndpoints) == 0 || !r.LBEndpoints[0].Detected.IsZero() {
t.Fatal("failed to ignore invalid LB endpoint")
if r != nil {
t.Fatal("created invalid LB endpoint")
}
})

Expand Down
42 changes: 38 additions & 4 deletions loadbalancer/algorithm.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@ import (
"fmt"
"math"
"math/rand"
"net"
"net/url"
"sort"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/cespare/xxhash/v2"
log "github.com/sirupsen/logrus"
"github.com/zalando/skipper/eskip"
"github.com/zalando/skipper/net"
snet "github.com/zalando/skipper/net"
"github.com/zalando/skipper/routing"
)

Expand Down Expand Up @@ -308,7 +310,7 @@ func (ch *consistentHash) Apply(ctx *routing.LBContext) routing.LBEndpoint {
func (ch *consistentHash) chooseConsistentHashEndpoint(ctx *routing.LBContext, skipEndpoint func(int) bool) int {
key, ok := ctx.Params[ConsistentHashKey].(string)
if !ok {
key = net.RemoteHost(ctx.Request).String()
key = snet.RemoteHost(ctx.Request).String()
}
balanceFactor, ok := ctx.Params[ConsistentHashBalanceFactor].(float64)
var choice int
Expand Down Expand Up @@ -431,9 +433,14 @@ func parseEndpoints(r *routing.Route) error {
return err
}

scheme, host, err := normalizeSchemeHost(eu.Scheme, eu.Host)
if err != nil {
return err
}

r.LBEndpoints[i] = routing.LBEndpoint{
Scheme: eu.Scheme,
Host: eu.Host,
Scheme: scheme,
Host: host,
Metrics: &routing.LBMetrics{},
}
}
Expand All @@ -456,6 +463,33 @@ func setAlgorithm(r *routing.Route) error {
return nil
}

func normalizeSchemeHost(s, h string) (string, string, error) {
// endpoint address cannot contain path, the rest is not case sensitive
s, h = strings.ToLower(s), strings.ToLower(h)

hh, p, err := net.SplitHostPort(h)
if err != nil {
// what is the actual right way of doing this, considering IPv6 addresses, too?
if !strings.Contains(err.Error(), "missing port") {
return "", "", err
}

p = ""
} else {
h = hh
}

switch {
case p == "" && s == "http":
p = "80"
case p == "" && s == "https":
p = "443"
}

h = net.JoinHostPort(h, p)
return s, h, nil
}

// Do implements routing.PostProcessor
func (p *algorithmProvider) Do(r []*routing.Route) []*routing.Route {
rr := make([]*routing.Route, 0, len(r))
Expand Down
39 changes: 26 additions & 13 deletions loadbalancer/algorithm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestSelectAlgorithm(t *testing.T) {

if len(rr[0].LBEndpoints) != 1 ||
rr[0].LBEndpoints[0].Scheme != "https" ||
rr[0].LBEndpoints[0].Host != "www.example.org" ||
rr[0].LBEndpoints[0].Host != "www.example.org:443" ||
rr[0].LBEndpoints[0].Metrics == nil {
t.Fatal("failed to set the endpoints")
}
Expand All @@ -70,7 +70,7 @@ func TestSelectAlgorithm(t *testing.T) {

if len(rr[0].LBEndpoints) != 1 ||
rr[0].LBEndpoints[0].Scheme != "https" ||
rr[0].LBEndpoints[0].Host != "www.example.org" ||
rr[0].LBEndpoints[0].Host != "www.example.org:443" ||
rr[0].LBEndpoints[0].Metrics == nil {
t.Fatal("failed to set the endpoints")
}
Expand All @@ -97,7 +97,7 @@ func TestSelectAlgorithm(t *testing.T) {

if len(rr[0].LBEndpoints) != 1 ||
rr[0].LBEndpoints[0].Scheme != "https" ||
rr[0].LBEndpoints[0].Host != "www.example.org" ||
rr[0].LBEndpoints[0].Host != "www.example.org:443" ||
rr[0].LBEndpoints[0].Metrics == nil {
t.Fatal("failed to set the endpoints")
}
Expand All @@ -124,7 +124,7 @@ func TestSelectAlgorithm(t *testing.T) {

if len(rr[0].LBEndpoints) != 1 ||
rr[0].LBEndpoints[0].Scheme != "https" ||
rr[0].LBEndpoints[0].Host != "www.example.org" ||
rr[0].LBEndpoints[0].Host != "www.example.org:443" ||
rr[0].LBEndpoints[0].Metrics == nil {
t.Fatal("failed to set the endpoints")
}
Expand All @@ -151,7 +151,7 @@ func TestSelectAlgorithm(t *testing.T) {

if len(rr[0].LBEndpoints) != 1 ||
rr[0].LBEndpoints[0].Scheme != "https" ||
rr[0].LBEndpoints[0].Host != "www.example.org" ||
rr[0].LBEndpoints[0].Host != "www.example.org:443" ||
rr[0].LBEndpoints[0].Metrics == nil {
t.Fatal("failed to set the endpoints")
}
Expand Down Expand Up @@ -258,8 +258,9 @@ func TestApply(t *testing.T) {
rt := p.Do([]*routing.Route{r})

lbctx := &routing.LBContext{
Request: req,
Route: rt[0],
Request: req,
Route: rt[0],
Registry: routing.NewEndpointRegistry(routing.RegistryOptions{}),
}

h := make(map[string]int)
Expand Down Expand Up @@ -314,29 +315,34 @@ func TestConsistentHashBoundedLoadSearch(t *testing.T) {
},
}})[0]
ch := route.LBAlgorithm.(*consistentHash)
ctx := &routing.LBContext{Request: r, Route: route, Params: map[string]interface{}{ConsistentHashBalanceFactor: 1.25}}
ctx := &routing.LBContext{
Request: r,
Route: route,
Params: map[string]interface{}{ConsistentHashBalanceFactor: 1.25},
Registry: routing.NewEndpointRegistry(routing.RegistryOptions{}),
}
noLoad := ch.Apply(ctx)
nonBounded := ch.Apply(&routing.LBContext{Request: r, Route: route, Params: map[string]interface{}{}})

if noLoad != nonBounded {
t.Error("When no endpoints are overloaded, the chosen endpoint should be the same as standard consistentHash")
}
// now we know that noLoad is the endpoint which should be requested for somekey if load is not an issue.
addInflightRequests(noLoad, 20)
addInflightRequests(ctx.Registry, noLoad, 20)
failover1 := ch.Apply(ctx)
if failover1 == nonBounded {
t.Error("When the selected endpoint is overloaded, the chosen endpoint should be different to standard consistentHash")
}

// now if 2 endpoints are overloaded, the request should go to the final endpoint
addInflightRequests(failover1, 20)
addInflightRequests(ctx.Registry, failover1, 20)
failover2 := ch.Apply(ctx)
if failover2 == nonBounded || failover2 == failover1 {
t.Error("Only the final endpoint had load below the average * balanceFactor, so it should have been selected.")
}

// now all will have same load, should select the original endpoint again
addInflightRequests(failover2, 20)
addInflightRequests(ctx.Registry, failover2, 20)
allLoaded := ch.Apply(ctx)
if allLoaded != nonBounded {
t.Error("When all endpoints have the same load, the consistentHash endpoint should be chosen again.")
Expand Down Expand Up @@ -386,7 +392,12 @@ func TestConsistentHashBoundedLoadDistribution(t *testing.T) {
}})[0]
ch := route.LBAlgorithm.(*consistentHash)
balanceFactor := 1.25
ctx := &routing.LBContext{Request: r, Route: route, Params: map[string]interface{}{ConsistentHashBalanceFactor: balanceFactor}}
ctx := &routing.LBContext{
Request: r,
Route: route,
Params: map[string]interface{}{ConsistentHashBalanceFactor: balanceFactor},
Registry: routing.NewEndpointRegistry(routing.RegistryOptions{}),
}

for i := 0; i < 100; i++ {
ep := ch.Apply(ctx)
Expand All @@ -399,6 +410,7 @@ func TestConsistentHashBoundedLoadDistribution(t *testing.T) {
t.Errorf("Expected in-flight requests for each endpoint to be less than %d. In-flight request counts: %d, %d, %d", limit, ifr0, ifr1, ifr2)
}
ep.Metrics.IncInflightRequest()
ctx.Registry.IncInflightRequest(ep.Host)
}
}

Expand All @@ -417,9 +429,10 @@ func TestConsistentHashKeyDistribution(t *testing.T) {
}
}

func addInflightRequests(endpoint routing.LBEndpoint, count int) {
func addInflightRequests(registry *routing.EndpointRegistry, endpoint routing.LBEndpoint, count int) {
for i := 0; i < count; i++ {
endpoint.Metrics.IncInflightRequest()
registry.IncInflightRequest(endpoint.Host)
}
}

Expand Down
9 changes: 7 additions & 2 deletions loadbalancer/fadein_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,15 @@ func initializeEndpoints(endpointAges []time.Duration, fadeInDuration time.Durat
LBFadeInDuration: fadeInDuration,
LBFadeInExponent: 1,
},
Registry: routing.NewEndpointRegistry(routing.RegistryOptions{}),
}

for i := range eps {
ctx.Route.LBEndpoints = append(ctx.Route.LBEndpoints, routing.LBEndpoint{
Host: eps[i],
Detected: detectionTimes[i],
})
ctx.Registry.SetDetectedTime(eps[i], detectionTimes[i])
}

return ctx, eps
Expand Down Expand Up @@ -323,11 +325,13 @@ func benchmarkFadeIn(
LBFadeInDuration: fadeInDuration,
LBFadeInExponent: 1,
}
registry := routing.NewEndpointRegistry(routing.RegistryOptions{})
for i := range eps {
route.LBEndpoints = append(route.LBEndpoints, routing.LBEndpoint{
Host: eps[i],
Detected: detectionTimes[i],
})
registry.SetDetectedTime(eps[i], detectionTimes[i])
}

var wg sync.WaitGroup
Expand All @@ -342,8 +346,9 @@ func benchmarkFadeIn(

rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
ctx := &routing.LBContext{
Params: map[string]interface{}{},
Route: route,
Params: map[string]interface{}{},
Route: route,
Registry: registry,
}

for j := 0; j < b.N/clients; j++ {
Expand Down
22 changes: 18 additions & 4 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,11 @@ type Params struct {
// It allows to add additional logic (for example tracing) by providing a wrapper function
// which accepts original skipper http.RoundTripper as an argument and returns a wrapped roundtripper
CustomHttpRoundTripperWrap func(http.RoundTripper) http.RoundTripper

// Registry provides key-value API which uses "host:port" string as a key
// and returns some metadata about endpoint. Information about the metadata
// returned from the registry could be found in routing.Metrics interface.
EndpointRegistry *routing.EndpointRegistry
}

type (
Expand Down Expand Up @@ -323,6 +328,7 @@ type Proxy struct {
maxLoops int
defaultHTTPStatus int
routing *routing.Routing
registry *routing.EndpointRegistry
roundTripper http.RoundTripper
priorityRoutes []PriorityRoute
flags Flags
Expand Down Expand Up @@ -471,7 +477,7 @@ func setRequestURLForLoadBalancedBackend(u *url.URL, rt *routing.Route, lbctx *r

// creates an outgoing http request to be forwarded to the route endpoint
// based on the augmented incoming request
func mapRequest(ctx *context, requestContext stdlibcontext.Context, removeHopHeaders bool) (*http.Request, *routing.LBEndpoint, error) {
func mapRequest(ctx *context, requestContext stdlibcontext.Context, removeHopHeaders bool, registry *routing.EndpointRegistry) (*http.Request, *routing.LBEndpoint, error) {
var endpoint *routing.LBEndpoint
r := ctx.request
rt := ctx.route
Expand All @@ -484,7 +490,7 @@ func mapRequest(ctx *context, requestContext stdlibcontext.Context, removeHopHea
setRequestURLFromRequest(u, r)
setRequestURLForDynamicBackend(u, stateBag)
case eskip.LBBackend:
endpoint = setRequestURLForLoadBalancedBackend(u, rt, &routing.LBContext{Request: r, Route: rt, Params: stateBag})
endpoint = setRequestURLForLoadBalancedBackend(u, rt, &routing.LBContext{Request: r, Route: rt, Params: stateBag, Registry: registry})
default:
u.Scheme = rt.Scheme
u.Host = rt.Host
Expand Down Expand Up @@ -696,10 +702,15 @@ func WithParams(p Params) *Proxy {
defaultHTTPStatus = p.DefaultHTTPStatus
}

if p.EndpointRegistry == nil {
p.EndpointRegistry = routing.NewEndpointRegistry(routing.RegistryOptions{})
}

hostname := os.Getenv("HOSTNAME")

return &Proxy{
routing: p.Routing,
registry: p.EndpointRegistry,
roundTripper: p.CustomHttpRoundTripperWrap(tr),
priorityRoutes: p.PriorityRoutes,
flags: p.Flags,
Expand Down Expand Up @@ -817,7 +828,7 @@ func (p *Proxy) makeUpgradeRequest(ctx *context, req *http.Request) error {
}

func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Context) (*http.Response, *proxyError) {
req, endpoint, err := mapRequest(ctx, requestContext, p.flags.HopHeadersRemoval())
req, endpoint, err := mapRequest(ctx, requestContext, p.flags.HopHeadersRemoval(), p.registry)
if err != nil {
return nil, &proxyError{err: fmt.Errorf("could not map backend request: %w", err)}
}
Expand All @@ -829,6 +840,9 @@ func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Co
if endpoint != nil {
endpoint.Metrics.IncInflightRequest()
defer endpoint.Metrics.DecInflightRequest()

p.registry.IncInflightRequest(endpoint.Host)
defer p.registry.DecInflightRequest(endpoint.Host)
}

if p.experimentalUpgrade && isUpgradeRequest(req) {
Expand Down Expand Up @@ -1104,7 +1118,7 @@ func (p *Proxy) do(ctx *context) (err error) {
ctx.setResponse(loopCTX.response, p.flags.PreserveOriginal())
ctx.proxySpan = loopCTX.proxySpan
} else if p.flags.Debug() {
debugReq, _, err := mapRequest(ctx, ctx.request.Context(), p.flags.HopHeadersRemoval())
debugReq, _, err := mapRequest(ctx, ctx.request.Context(), p.flags.HopHeadersRemoval(), p.registry)
if err != nil {
perr := &proxyError{err: err}
p.makeErrorResponse(ctx, perr)
Expand Down
Loading

0 comments on commit 239a5b3

Please sign in to comment.