Skip to content

Commit

Permalink
CP/DP Split: track agent connections (#2970)
Browse files Browse the repository at this point in the history
Added the following:
- middleware to extract IP address of agent and store it in the grpc context
- link the agent's hostname to its IP address when connecting and track it
- use this linkage to pause the Subscription until the agent registers itself, then proceeding

This logic is subject to change as we enhance this (like tracking auth token instead of IP address).
  • Loading branch information
sjberman authored Jan 6, 2025
1 parent 2e529b4 commit f8bbcbe
Show file tree
Hide file tree
Showing 12 changed files with 392 additions and 115 deletions.
11 changes: 6 additions & 5 deletions internal/mode/static/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/licensing"
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/metrics/collectors"
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent"
agentgrpc "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc"
ngxcfg "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/config"
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/config/policies"
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/config/policies/clientsettings"
Expand Down Expand Up @@ -180,14 +181,14 @@ func StartManager(cfg config.Config) error {

nginxUpdater := agent.NewNginxUpdater(cfg.Logger.WithName("nginxUpdater"), cfg.Plus)

grpcServer := &agent.GRPCServer{
Logger: cfg.Logger.WithName("agentGRPCServer"),
RegisterServices: []func(*grpc.Server){
grpcServer := agentgrpc.NewServer(
cfg.Logger.WithName("agentGRPCServer"),
grpcServerPort,
[]func(*grpc.Server){
nginxUpdater.CommandService.Register,
nginxUpdater.FileService.Register,
},
Port: grpcServerPort,
}
)

if err = mgr.Add(&runnables.LeaderOrNonLeader{Runnable: grpcServer}); err != nil {
return fmt.Errorf("cannot register grpc server: %w", err)
Expand Down
19 changes: 10 additions & 9 deletions internal/mode/static/nginx/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,31 @@ type NginxUpdater interface {
type NginxUpdaterImpl struct {
CommandService *commandService
FileService *fileService
Logger logr.Logger
Plus bool
logger logr.Logger
plus bool
}

// NewNginxUpdater returns a new NginxUpdaterImpl instance.
func NewNginxUpdater(logger logr.Logger, plus bool) *NginxUpdaterImpl {
return &NginxUpdaterImpl{
Logger: logger,
Plus: plus,
CommandService: newCommandService(),
FileService: newFileService(),
logger: logger,
plus: plus,
CommandService: newCommandService(logger.WithName("commandService")),
FileService: newFileService(logger.WithName("fileService")),
}
}

// UpdateConfig sends the nginx configuration to the agent.
func (n *NginxUpdaterImpl) UpdateConfig(files int) {
n.Logger.Info("Sending nginx configuration to agent", "numFiles", files)
n.logger.Info("Sending nginx configuration to agent", "numFiles", files)
}

// UpdateUpstreamServers sends an APIRequest to the agent to update upstream servers using the NGINX Plus API.
// Only applicable when using NGINX Plus.
func (n *NginxUpdaterImpl) UpdateUpstreamServers() {
if !n.Plus {
if !n.plus {
return
}

n.Logger.Info("Updating upstream servers using NGINX Plus API")
n.logger.Info("Updating upstream servers using NGINX Plus API")
}
120 changes: 94 additions & 26 deletions internal/mode/static/nginx/agent/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,51 @@ import (
"fmt"
"time"

"github.com/go-logr/logr"
pb "github.com/nginx/agent/v3/api/grpc/mpi/v1"
"google.golang.org/grpc"

agentgrpc "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc"
grpcContext "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc/context"
)

// commandService handles the connection and subscription to the agent.
// commandService handles the connection and subscription to the data plane agent.
type commandService struct {
pb.CommandServiceServer
connTracker *agentgrpc.ConnectionsTracker
// TODO(sberman): all logs are at Info level right now. Adjust appropriately.
logger logr.Logger
}

func newCommandService() *commandService {
return &commandService{}
func newCommandService(logger logr.Logger) *commandService {
return &commandService{
logger: logger,
connTracker: agentgrpc.NewConnectionsTracker(),
}
}

func (cs *commandService) Register(server *grpc.Server) {
pb.RegisterCommandServiceServer(server, cs)
}

// CreateConnection registers a data plane agent with the control plane.
func (cs *commandService) CreateConnection(
_ context.Context,
ctx context.Context,
req *pb.CreateConnectionRequest,
) (*pb.CreateConnectionResponse, error) {
if req == nil {
return nil, errors.New("empty connection request")
}

fmt.Printf("Creating connection for nginx pod: %s\n", req.GetResource().GetContainerInfo().GetHostname())
gi, ok := grpcContext.GrpcInfoFromContext(ctx)
if !ok {
return nil, agentgrpc.ErrStatusInvalidConnection
}

podName := req.GetResource().GetContainerInfo().GetHostname()

cs.logger.Info(fmt.Sprintf("Creating connection for nginx pod: %s", podName))
cs.connTracker.Track(gi.IPAddress, podName)

return &pb.CreateConnectionResponse{
Response: &pb.CommandResponse{
Expand All @@ -40,50 +59,99 @@ func (cs *commandService) CreateConnection(
}, nil
}

// Subscribe is a decoupled communication mechanism between the data plane agent and control plane.
func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error {
fmt.Println("Received subscribe request")

ctx := in.Context()

gi, ok := grpcContext.GrpcInfoFromContext(ctx)
if !ok {
return agentgrpc.ErrStatusInvalidConnection
}

cs.logger.Info(fmt.Sprintf("Received subscribe request from %q", gi.IPAddress))

go cs.listenForDataPlaneResponse(ctx, in)

// wait for the agent to report itself
podName, err := cs.waitForConnection(ctx, gi)
if err != nil {
cs.logger.Error(err, "error waiting for connection")
return err
}

cs.logger.Info(fmt.Sprintf("Handling subscription for %s/%s", podName, gi.IPAddress))
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(1 * time.Minute):
dummyRequest := &pb.ManagementPlaneRequest{
Request: &pb.ManagementPlaneRequest_StatusRequest{
StatusRequest: &pb.StatusRequest{},
Request: &pb.ManagementPlaneRequest_HealthRequest{
HealthRequest: &pb.HealthRequest{},
},
}
if err := in.Send(dummyRequest); err != nil { // will likely need retry logic
fmt.Printf("ERROR: %v\n", err)
if err := in.Send(dummyRequest); err != nil { // TODO(sberman): will likely need retry logic
cs.logger.Error(err, "error sending request to agent")
}
}
}
}

func (cs *commandService) UpdateDataPlaneStatus(
_ context.Context,
req *pb.UpdateDataPlaneStatusRequest,
) (*pb.UpdateDataPlaneStatusResponse, error) {
fmt.Println("Updating data plane status")
// TODO(sberman): current issue: when control plane restarts, agent doesn't re-establish a CreateConnection call,
// so this fails.
func (cs *commandService) waitForConnection(ctx context.Context, gi grpcContext.GrpcInfo) (string, error) {
var podName string
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

if req == nil {
return nil, errors.New("empty update data plane status request")
timer := time.NewTimer(30 * time.Second)
defer timer.Stop()

for {
select {
case <-ctx.Done():
return "", ctx.Err()
case <-timer.C:
return "", errors.New("timed out waiting for agent connection")
case <-ticker.C:
if podName = cs.connTracker.GetConnection(gi.IPAddress); podName != "" {
return podName, nil
}
}
}
}

return &pb.UpdateDataPlaneStatusResponse{}, nil
func (cs *commandService) listenForDataPlaneResponse(ctx context.Context, in pb.CommandService_SubscribeServer) {
for {
select {
case <-ctx.Done():
return
default:
dataPlaneResponse, err := in.Recv()
cs.logger.Info(fmt.Sprintf("Received data plane response: %v", dataPlaneResponse))
if err != nil {
cs.logger.Error(err, "failed to receive data plane response")
return
}
}
}
}

// UpdateDataPlaneHealth includes full health information about the data plane as reported by the agent.
// TODO(sberman): Is health monitoring the data planes something useful for us to do?
func (cs *commandService) UpdateDataPlaneHealth(
_ context.Context,
req *pb.UpdateDataPlaneHealthRequest,
_ *pb.UpdateDataPlaneHealthRequest,
) (*pb.UpdateDataPlaneHealthResponse, error) {
fmt.Println("Updating data plane health")

if req == nil {
return nil, errors.New("empty update dataplane health request")
}

return &pb.UpdateDataPlaneHealthResponse{}, nil
}

// UpdateDataPlaneStatus is called by agent on startup and upon any change in agent metadata,
// instance metadata, or configurations. Since directly changing nginx configuration on the instance
// is not supported, this is a no-op for NGF.
func (cs *commandService) UpdateDataPlaneStatus(
_ context.Context,
_ *pb.UpdateDataPlaneStatusRequest,
) (*pb.UpdateDataPlaneStatusResponse, error) {
return &pb.UpdateDataPlaneStatusResponse{}, nil
}
38 changes: 22 additions & 16 deletions internal/mode/static/nginx/agent/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,59 +4,65 @@ import (
"context"
"fmt"

"github.com/go-logr/logr"
pb "github.com/nginx/agent/v3/api/grpc/mpi/v1"
"google.golang.org/grpc"
)

// fileService handles file management between the control plane and the agent.
type fileService struct {
pb.FileServiceServer
// TODO(sberman): all logs are at Info level right now. Adjust appropriately.
logger logr.Logger
}

func newFileService() *fileService {
return &fileService{}
func newFileService(logger logr.Logger) *fileService {
return &fileService{logger: logger}
}

func (fs *fileService) Register(server *grpc.Server) {
pb.RegisterFileServiceServer(server, fs)
}

// GetOverview gets the overview of files for a particular configuration version of an instance.
// Agent calls this if it's missing an overview when a ConfigApplyRequest is called by the control plane.
func (fs *fileService) GetOverview(
_ context.Context,
_ *pb.GetOverviewRequest,
) (*pb.GetOverviewResponse, error) {
fmt.Println("Get overview request")
fs.logger.Info("Get overview request")

return &pb.GetOverviewResponse{
Overview: &pb.FileOverview{},
}, nil
}

func (fs *fileService) UpdateOverview(
_ context.Context,
_ *pb.UpdateOverviewRequest,
) (*pb.UpdateOverviewResponse, error) {
fmt.Println("Update overview request")

return &pb.UpdateOverviewResponse{}, nil
}

// GetFile is called by the agent when it needs to download a file for a ConfigApplyRequest.
func (fs *fileService) GetFile(
_ context.Context,
req *pb.GetFileRequest,
) (*pb.GetFileResponse, error) {
filename := req.GetFileMeta().GetName()
hash := req.GetFileMeta().GetHash()
fmt.Printf("Getting file: %s, %s\n", filename, hash)
fs.logger.Info(fmt.Sprintf("Getting file: %s, %s", filename, hash))

return &pb.GetFileResponse{}, nil
}

// UpdateOverview is called by agent on startup and whenever any files change on the instance.
// Since directly changing nginx configuration on the instance is not supported, this is a no-op for NGF.
func (fs *fileService) UpdateOverview(
_ context.Context,
_ *pb.UpdateOverviewRequest,
) (*pb.UpdateOverviewResponse, error) {
return &pb.UpdateOverviewResponse{}, nil
}

// UpdateFile is called by agent whenever any files change on the instance.
// Since directly changing nginx configuration on the instance is not supported, this is a no-op for NGF.
func (fs *fileService) UpdateFile(
_ context.Context,
req *pb.UpdateFileRequest,
_ *pb.UpdateFileRequest,
) (*pb.UpdateFileResponse, error) {
fmt.Println("Update file request for: ", req.GetFile().GetFileMeta().GetName())

return &pb.UpdateFileResponse{}, nil
}
59 changes: 0 additions & 59 deletions internal/mode/static/nginx/agent/grpc.go

This file was deleted.

Loading

0 comments on commit f8bbcbe

Please sign in to comment.