Skip to content

Commit

Permalink
Refactor BackendManager / BackendStorage.
Browse files Browse the repository at this point in the history
This also fixes kubernetes-sigs#294
  • Loading branch information
jkh52 committed Apr 17, 2024
1 parent a650c3b commit c50b7e0
Show file tree
Hide file tree
Showing 11 changed files with 866 additions and 983 deletions.
363 changes: 74 additions & 289 deletions pkg/server/backend_manager.go

Large diffs are not rendered by default.

435 changes: 181 additions & 254 deletions pkg/server/backend_manager_test.go

Large diffs are not rendered by default.

59 changes: 0 additions & 59 deletions pkg/server/default_route_backend_manager.go

This file was deleted.

86 changes: 0 additions & 86 deletions pkg/server/desthost_backend_manager.go

This file was deleted.

9 changes: 1 addition & 8 deletions pkg/server/readiness_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,4 @@ type ReadinessManager interface {
Ready() (bool, string)
}

var _ ReadinessManager = &DefaultBackendStorage{}

func (s *DefaultBackendStorage) Ready() (bool, string) {
if s.NumBackends() == 0 {
return false, "no connection to any proxy agent"
}
return true, ""
}
var _ ReadinessManager = &DefaultBackendManager{}
66 changes: 8 additions & 58 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,6 @@ type ProxyClientConnection struct {
dialAddress string // cached for logging
}

const (
destHostKey key = iota
)

func (c *ProxyClientConnection) send(pkt *client.Packet) error {
defer func(start time.Time) { metrics.Metrics.ObserveFrontendWriteLatency(time.Since(start)) }(time.Now())
if c.Mode == ModeGRPC {
Expand Down Expand Up @@ -193,8 +189,7 @@ func (pm *PendingDialManager) removeForStream(streamUID string) []*ProxyClientCo

// ProxyServer
type ProxyServer struct {
// BackendManagers contains a list of BackendManagers
BackendManagers []BackendManager
BackendManager BackendManager

// Readiness reports if the proxy server is ready, i.e., if the proxy
// server has connections to proxy agents (backends). Note that the
Expand All @@ -215,9 +210,6 @@ type ProxyServer struct {

// agent authentication
AgentAuthenticationOptions *AgentTokenAuthenticationOptions

// TODO: move strategies into BackendStorage
proxyStrategies []ProxyStrategy
}

// AgentTokenAuthenticationOptions contains list of parameters required for agent token based authentication
Expand All @@ -233,45 +225,17 @@ var _ agent.AgentServiceServer = &ProxyServer{}

var _ client.ProxyServiceServer = &ProxyServer{}

func genContext(proxyStrategies []ProxyStrategy, reqHost string) context.Context {
ctx := context.Background()
for _, ps := range proxyStrategies {
switch ps {
case ProxyStrategyDestHost:
addr := util.RemovePortFromHost(reqHost)
ctx = context.WithValue(ctx, destHostKey, addr)
}
}
return ctx
}

func (s *ProxyServer) getBackend(reqHost string) (Backend, error) {
ctx := genContext(s.proxyStrategies, reqHost)
for _, bm := range s.BackendManagers {
be, err := bm.Backend(ctx)
if err == nil {
return be, nil
}
if ignoreNotFound(err) != nil {
// if can't find a backend through current BackendManager, move on
// to the next one
return nil, err
}
}
return nil, &ErrNotFound{}
addr := util.RemovePortFromHost(reqHost)
return s.BackendManager.Backend(addr)
}

func (s *ProxyServer) addBackend(backend Backend) {
// TODO: refactor BackendStorage to acquire lock once, not up to 3 times.
for _, bm := range s.BackendManagers {
bm.AddBackend(backend)
}
s.BackendManager.AddBackend(backend)
}

func (s *ProxyServer) removeBackend(backend Backend) {
for _, bm := range s.BackendManagers {
bm.RemoveBackend(backend)
}
s.BackendManager.RemoveBackend(backend)
}

func (s *ProxyServer) addEstablished(agentID string, connID int64, p *ProxyClientConnection) {
Expand Down Expand Up @@ -377,30 +341,16 @@ func (s *ProxyServer) removeEstablishedForStream(streamUID string) []*ProxyClien

// NewProxyServer creates a new ProxyServer instance
func NewProxyServer(serverID string, proxyStrategies []ProxyStrategy, serverCount int, agentAuthenticationOptions *AgentTokenAuthenticationOptions) *ProxyServer {
var bms []BackendManager
for _, ps := range proxyStrategies {
switch ps {
case ProxyStrategyDestHost:
bms = append(bms, NewDestHostBackendManager())
case ProxyStrategyDefault:
bms = append(bms, NewDefaultBackendManager())
case ProxyStrategyDefaultRoute:
bms = append(bms, NewDefaultRouteBackendManager())
default:
klog.ErrorS(nil, "Unknown proxy strategy", "strategy", ps)
}
}
bm := NewDefaultBackendManager(proxyStrategies)

return &ProxyServer{
established: make(map[string](map[int64]*ProxyClientConnection)),
PendingDial: NewPendingDialManager(),
serverID: serverID,
serverCount: serverCount,
BackendManagers: bms,
BackendManager: bm,
AgentAuthenticationOptions: agentAuthenticationOptions,
// use the first backend-manager as the Readiness Manager
Readiness: bms[0],
proxyStrategies: proxyStrategies,
Readiness: bm,
}
}

Expand Down
Loading

0 comments on commit c50b7e0

Please sign in to comment.