Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NET-4943 - Implement ProxyTracker #18535

Merged
merged 1 commit into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions agent/proxy-tracker/mock_Logger.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 53 additions & 0 deletions agent/proxy-tracker/mock_SessionLimiter.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

278 changes: 278 additions & 0 deletions agent/proxy-tracker/proxy_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

package proxytracker

import (
"errors"
"fmt"
"sync"

"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/mesh"
"github.com/hashicorp/consul/internal/resource"

"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/agent/proxycfg"
pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1"
"github.com/hashicorp/consul/proto-public/pbresource"
)

// Proxy implements the queue.ItemType interface so that it can be used in a controller.Event.
// It is sent on the newProxyConnectionCh channel.
// TODO(ProxyState): needs to support tenancy in the future.
// Key() is current resourceID.Name.
type ProxyConnection struct {
ProxyID *pbresource.ID
}

func (e *ProxyConnection) Key() string {
return e.ProxyID.GetName()
}

// proxyWatchData is a handle on all of the relevant bits that is created by calling Watch().
// It is meant to be stored in the proxies cache by proxyID so that watches can be notified
// when the ProxyState for that proxyID has changed.
type proxyWatchData struct {
// notifyCh is the channel that the watcher receives updates from ProxyTracker.
notifyCh chan *pbmesh.ProxyState
// state is the current/last updated ProxyState for a given proxy.
state *pbmesh.ProxyState
// token is the ACL token provided by the watcher.
token string
// nodeName is the node where the given proxy resides.
nodeName string
}

type ProxyTrackerConfig struct {
// logger will be used to write log messages.
Logger Logger

// sessionLimiter is used to enforce xDS concurrency limits.
SessionLimiter SessionLimiter
}

// ProxyTracker implements the Watcher and Updater interfaces. The Watcher is used by the xds server to add a new proxy
// to this server, and get back a channel for updates. The Updater is used by the ProxyState controller running on the
// server to push ProxyState updates to the notify channel.
type ProxyTracker struct {
config ProxyTrackerConfig
// proxies is a cache of the proxies connected to this server and configuration information for each one.
proxies map[resource.ReferenceKey]*proxyWatchData
// newProxyConnectionCh is the channel that the "updater" retains to receive messages from ProxyTracker that a new
// proxy has connected to ProxyTracker and a signal the "updater" should call PushChanges with a new state.
newProxyConnectionCh chan controller.Event
// shutdownCh is a channel that closes when ProxyTracker is shutdown. ShutdownChannel is never written to, only closed to
// indicate a shutdown has been initiated.
shutdownCh chan struct{}
// mu is a mutex that is used internally for locking when reading and modifying ProxyTracker state, namely the proxies map.
mu sync.Mutex
}

// NewProxyTracker returns a ProxyTracker instance given a configuration.
func NewProxyTracker(cfg ProxyTrackerConfig) *ProxyTracker {
return &ProxyTracker{
config: cfg,
proxies: make(map[resource.ReferenceKey]*proxyWatchData),
// buffering this channel since ProxyTracker will be registering watches for all proxies.
// using the buffer will limit errors related to controller and the proxy are both running
// but the controllers listening function is not blocking on the particular receive line.
// This channel is meant to error when the controller is "not ready" which means up and alive.
// This buffer will try to reduce false negatives and limit unnecessary erroring.
newProxyConnectionCh: make(chan controller.Event, 1000),
shutdownCh: make(chan struct{}),
}
}

// Watch connects a proxy with ProxyTracker and returns the consumer a channel to receive updates,
// a channel to notify of xDS terminated session, and a cancel function to cancel the watch.
func (pt *ProxyTracker) Watch(proxyID *pbresource.ID,
nodeName string, token string) (<-chan *pbmesh.ProxyState,
limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) {

if err := validateArgs(proxyID, nodeName, token); err != nil {
pt.config.Logger.Error("args failed validation", err)
return nil, nil, nil, err
}
// Begin a session with the xDS session concurrency limiter.
//
// See: https://github.com/hashicorp/consul/issues/15753
session, err := pt.config.SessionLimiter.BeginSession()
if err != nil {
pt.config.Logger.Error("failed to begin session with xDS session concurrency limiter", err)
return nil, nil, nil, err
}

// This buffering is crucial otherwise we'd block immediately trying to
// deliver the current snapshot below if we already have one.
proxyStateChan := make(chan *pbmesh.ProxyState, 1)
watchData := &proxyWatchData{
notifyCh: proxyStateChan,
state: nil,
token: token,
nodeName: nodeName,
}

proxyReferenceKey := resource.NewReferenceKey(proxyID)
cancel := func() {
pt.mu.Lock()
defer pt.mu.Unlock()
pt.cancelWatchLocked(proxyReferenceKey, proxyStateChan, session)
}

pt.mu.Lock()
defer pt.mu.Unlock()

pt.proxies[proxyReferenceKey] = watchData

//Send an event to the controller
err = pt.notifyNewProxyChannel(proxyID)
if err != nil {
pt.cancelWatchLocked(proxyReferenceKey, watchData.notifyCh, session)
return nil, nil, nil, err
}

return proxyStateChan, session.Terminated(), cancel, nil
}

// notifyNewProxyChannel attempts to send a message to newProxyConnectionCh and will return an error if there's no receiver.
// This will handle conditions where a proxy is connected but there's no controller for some reason to receive the event.
// This will error back to the proxy's Watch call and will cause the proxy call Watch again to retry connection until the controller
// is available.
func (pt *ProxyTracker) notifyNewProxyChannel(proxyID *pbresource.ID) error {
controllerEvent := controller.Event{
Obj: &ProxyConnection{
ProxyID: proxyID,
},
}
select {
case pt.newProxyConnectionCh <- controllerEvent:
return nil
// using default here to return errors is only safe when we have a large buffer.
// the receiver is on a loop to read from the channel. If the sequence of
// sender blocks on the channel and then the receiver blocks on the channel is not
// aligned, then extraneous errors could be returned to the proxy that are just
// false negatives and the controller could be up and healthy.
default:
return fmt.Errorf("failed to notify the controller of the proxy connecting")
}
}

// cancelWatchLocked does the following:
// - deletes the key from the proxies array.
// - ends the session with xDS session limiter.
// - closes the proxy state channel assigned to the proxy.
// This function assumes the state lock is already held.
func (pt *ProxyTracker) cancelWatchLocked(proxyReferenceKey resource.ReferenceKey, proxyStateChan chan *pbmesh.ProxyState, session limiter.Session) {
delete(pt.proxies, proxyReferenceKey)
session.End()
close(proxyStateChan)
}

func validateArgs(proxyID *pbresource.ID,
nodeName string, token string) error {
if proxyID == nil {
return errors.New("proxyID is required")
} else if proxyID.Type.Kind != mesh.ProxyStateTemplateConfigurationType.Kind {
return fmt.Errorf("proxyID must be a %s", mesh.ProxyStateTemplateConfigurationType.GetKind())
} else if nodeName == "" {
return errors.New("nodeName is required")
} else if token == "" {
return errors.New("token is required")
}

return nil
}

// PushChange allows pushing a computed ProxyState to xds for xds resource generation to send to a proxy.
func (pt *ProxyTracker) PushChange(proxyID *pbresource.ID, proxyState *pbmesh.ProxyState) error {
proxyReferenceKey := resource.NewReferenceKey(proxyID)
pt.mu.Lock()
defer pt.mu.Unlock()
if data, ok := pt.proxies[proxyReferenceKey]; ok {
data.state = proxyState
pt.deliverLatest(proxyID, proxyState, data.notifyCh)
} else {
return errors.New("proxyState change could not be sent because proxy is not connected")
}

return nil
}

func (pt *ProxyTracker) deliverLatest(proxyID *pbresource.ID, proxyState *pbmesh.ProxyState, ch chan *pbmesh.ProxyState) {
// Send if chan is empty
select {
case ch <- proxyState:
return
default:
}

// Not empty, drain the chan of older snapshots and redeliver. For now we only
// use 1-buffered chans but this will still work if we change that later.
OUTER:
for {
select {
case <-ch:
continue
default:
break OUTER
}
}

// Now send again
select {
case ch <- proxyState:
return
default:
// This should not be possible since we should be the only sender, enforced
// by m.mu but error and drop the update rather than panic.
pt.config.Logger.Error("failed to deliver proxyState to proxy",
"proxy", proxyID.String(),
)
}
}

// EventChannel returns an event channel that sends controller events when a proxy connects to a server.
func (pt *ProxyTracker) EventChannel() chan controller.Event {
return pt.newProxyConnectionCh
}

// ShutdownChannel returns a channel that closes when ProxyTracker is shutdown. ShutdownChannel is never written to, only closed to
// indicate a shutdown has been initiated.
func (pt *ProxyTracker) ShutdownChannel() chan struct{} {
return pt.shutdownCh
}

// ProxyConnectedToServer returns whether this id is connected to this server.
func (pt *ProxyTracker) ProxyConnectedToServer(proxyID *pbresource.ID) bool {
pt.mu.Lock()
defer pt.mu.Unlock()
proxyReferenceKey := resource.NewReferenceKey(proxyID)
_, ok := pt.proxies[proxyReferenceKey]
return ok
}

// Shutdown removes all state and close all channels.
func (pt *ProxyTracker) Shutdown() {
pt.mu.Lock()
defer pt.mu.Unlock()

// Close all current watchers first
for proxyID, watchData := range pt.proxies {
close(watchData.notifyCh)
delete(pt.proxies, proxyID)
}

close(pt.newProxyConnectionCh)
close(pt.shutdownCh)
}

//go:generate mockery --name SessionLimiter --inpackage
type SessionLimiter interface {
BeginSession() (limiter.Session, error)
}

//go:generate mockery --name Logger --inpackage
type Logger interface {
Error(args ...any)
}
Loading