Skip to content

Commit

Permalink
Allow the ports for gRPC and REST to be configured for the allocator …
Browse files Browse the repository at this point in the history
…service (#2272)

* Allow the ports for gRPC and REST to be configured for the allocator
service.

* Updated documentation for the allocator service.

* Make the port name used by the allocator service a variable as well.
  • Loading branch information
roberthbailey authored Sep 24, 2021
1 parent 8f813e0 commit fbe5380
Show file tree
Hide file tree
Showing 8 changed files with 233 additions and 47 deletions.
4 changes: 2 additions & 2 deletions build/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -328,11 +328,11 @@ install: $(ensure-build-image) install-custom-pull-secret
--set agones.image.controller.pullPolicy=$(IMAGE_PULL_POLICY),agones.image.sdk.alwaysPull=$(ALWAYS_PULL_SIDECAR) \
--set agones.image.controller.pullSecret=$(IMAGE_PULL_SECRET) \
--set agones.ping.http.serviceType=$(PING_SERVICE_TYPE),agones.ping.udp.serviceType=$(PING_SERVICE_TYPE) \
--set agones.allocator.http.serviceType=$(ALLOCATOR_SERVICE_TYPE) \
--set agones.allocator.service.serviceType=$(ALLOCATOR_SERVICE_TYPE) \
--set agones.controller.logLevel=$(LOG_LEVEL) \
--set agones.crds.cleanupOnDelete=$(CRD_CLEANUP) \
--set agones.featureGates=$(FEATURE_GATES) \
--set agones.allocator.http.loadBalancerIP=$(EXTERNAL_IP) \
--set agones.allocator.service.loadBalancerIP=$(EXTERNAL_IP) \
$(HELM_ARGS) \
agones $(mount_path)/install/helm/agones/

Expand Down
126 changes: 107 additions & 19 deletions cmd/allocator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"crypto/x509"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"path/filepath"
Expand All @@ -43,6 +44,7 @@ import (
"go.opencensus.io/plugin/ocgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"
"gopkg.in/fsnotify.v1"
Expand All @@ -53,14 +55,11 @@ import (
"k8s.io/client-go/rest"
)

var (
logger = runtime.NewLoggerWithSource("main")
)
var logger = runtime.NewLoggerWithSource("main")

const (
certDir = "/home/allocator/client-ca/"
tlsDir = "/home/allocator/tls/"
sslPort = "8443"
)

// grpcHandlerFunc returns an http.Handler that delegates to grpcServer on incoming gRPC
Expand All @@ -80,7 +79,7 @@ func main() {
conf := parseEnvFlags()

logger.WithField("version", pkg.Version).WithField("ctlConf", conf).
WithField("featureGates", runtime.EncodeFeatures()).WithField("sslPort", sslPort).
WithField("featureGates", runtime.EncodeFeatures()).
Info("Starting agones-allocator")

logger.WithField("logLevel", conf.LogLevel).Info("Setting LogLevel configuration")
Expand All @@ -92,6 +91,10 @@ func main() {
runtime.SetLevel(logrus.InfoLevel)
}

if !validPort(conf.GRPCPort) && !validPort(conf.HTTPPort) {
logger.WithField("grpc-port", conf.GRPCPort).WithField("http-port", conf.HTTPPort).Fatal("Must specify a valid gRPC port or an HTTP port for the allocator service")
}

health, closer := setupMetricsRecorder(conf)
defer closer()

Expand Down Expand Up @@ -181,17 +184,53 @@ func main() {
}
}

opts := h.getServerOptions()
// If grpc and http use the same port then use a mux.
if conf.GRPCPort == conf.HTTPPort {
runMux(h, conf.HTTPPort)
} else {
// Otherwise, run each on a dedicated port.
if validPort(conf.HTTPPort) {
runREST(h, conf.HTTPPort)
}
if validPort(conf.GRPCPort) {
runGRPC(h, conf.GRPCPort)
}
}

// Finally listen on 8080 (http) and block the main goroutine
// this is used to serve /live and /ready handlers for Kubernetes probes.
err = http.ListenAndServe(":8080", http.DefaultServeMux)
logger.WithError(err).Fatal("allocation service crashed")
}

func validPort(port int) bool {
const maxPort = 65535
return port >= 0 && port < maxPort
}

grpcServer := grpc.NewServer(opts...)
func runMux(h *serviceHandler, httpPort int) {
logger.Infof("Running the mux handler on port %d", httpPort)
grpcServer := grpc.NewServer(h.getMuxServerOptions()...)
pb.RegisterAllocationServiceServer(grpcServer, h)

mux := gw_runtime.NewServeMux()
err = pb.RegisterAllocationServiceHandlerServer(context.Background(), mux, h)
if err != nil {
if err := pb.RegisterAllocationServiceHandlerServer(context.Background(), mux, h); err != nil {
panic(err)
}

runHTTP(h, httpPort, grpcHandlerFunc(grpcServer, mux))
}

func runREST(h *serviceHandler, httpPort int) {
logger.WithField("port", httpPort).Info("Running the rest handler")
mux := gw_runtime.NewServeMux()
if err := pb.RegisterAllocationServiceHandlerServer(context.Background(), mux, h); err != nil {
panic(err)
}
runHTTP(h, httpPort, mux)
}

func runHTTP(h *serviceHandler, httpPort int, handler http.Handler) {
cfg := &tls.Config{}
if !h.tlsDisabled {
cfg.GetCertificate = h.getTLSCert
Expand All @@ -201,30 +240,44 @@ func main() {
cfg.VerifyPeerCertificate = h.verifyClientCertificate
}

// Create a Server instance to listen on port 8443 with the TLS config
// Create a Server instance to listen on the http port with the TLS config.
server := &http.Server{
Addr: ":8443",
Addr: fmt.Sprintf(":%d", httpPort),
TLSConfig: cfg,
Handler: grpcHandlerFunc(grpcServer, mux),
Handler: handler,
}

go func() {
var err error
if !h.tlsDisabled {
err = server.ListenAndServeTLS("", "")
} else {
err = server.ListenAndServe()
}

if err != nil {
logger.WithError(err).Fatal("unable to start HTTP/HTTPS listener")
logger.WithError(err).Fatal("Unable to start HTTP/HTTPS listener")
os.Exit(1)
}
}()
}

// Finally listen on 8080 (http) and block the main goroutine
// this is used to serve /live and /ready handlers for Kubernetes probes.
err = http.ListenAndServe(":8080", http.DefaultServeMux)
logger.WithError(err).Fatal("allocation service crashed")
func runGRPC(h *serviceHandler, grpcPort int) {
logger.WithField("port", grpcPort).Info("Running the grpc handler on port")
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", grpcPort))
if err != nil {
logger.WithError(err).Fatalf("failed to listen on TCP port %d", grpcPort)
os.Exit(1)
}

grpcServer := grpc.NewServer(h.getGRPCServerOptions()...)
pb.RegisterAllocationServiceServer(grpcServer, h)

go func() {
err := grpcServer.Serve(listener)
logger.WithError(err).Fatal("allocation service crashed")
os.Exit(1)
}()
}

func newServiceHandler(kubeClient kubernetes.Interface, agonesClient versioned.Interface, health healthcheck.Handler, mTLSDisabled bool, tlsDisabled bool, remoteAllocationTimeout time.Duration, totalRemoteAllocationTimeout time.Duration) *serviceHandler {
Expand Down Expand Up @@ -288,9 +341,10 @@ func readTLSCert() (*tls.Certificate, error) {
return &tlsCert, nil
}

// getServerOptions returns a list of GRPC server options.
// getMuxServerOptions returns a list of GRPC server option to use when
// serving gRPC and REST over an HTTP multiplexer.
// Current options are opencensus stats handler.
func (h *serviceHandler) getServerOptions() []grpc.ServerOption {
func (h *serviceHandler) getMuxServerOptions() []grpc.ServerOption {
// Add options for OpenCensus stats handler to enable stats and tracing.
// The keepalive options are useful for efficiency purposes (keeping a single connection alive
// instead of constantly recreating connections), when placing the Agones allocator behind load balancers.
Expand All @@ -307,6 +361,40 @@ func (h *serviceHandler) getServerOptions() []grpc.ServerOption {
}
}

// getGRPCServerOptions returns a list of GRPC server options to use when
// only serving gRPC requests.
// Current options are TLS certs and opencensus stats handler.
func (h *serviceHandler) getGRPCServerOptions() []grpc.ServerOption {
// Add options for OpenCensus stats handler to enable stats and tracing.
// The keepalive options are useful for efficiency purposes (keeping a single connection alive
// instead of constantly recreating connections), when placing the Agones allocator behind load balancers.
opts := []grpc.ServerOption{
grpc.StatsHandler(&ocgrpc.ServerHandler{}),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: 1 * time.Minute,
PermitWithoutStream: true,
}),
grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionIdle: 5 * time.Minute,
Timeout: 10 * time.Minute,
}),
}
if h.tlsDisabled {
return opts
}

cfg := &tls.Config{
GetCertificate: h.getTLSCert,
}

if !h.mTLSDisabled {
cfg.ClientAuth = tls.RequireAnyClientCert
cfg.VerifyPeerCertificate = h.verifyClientCertificate
}

return append([]grpc.ServerOption{grpc.Creds(credentials.NewTLS(cfg))}, opts...)
}

func (h *serviceHandler) getTLSCert(ch *tls.ClientHelloInfo) (*tls.Certificate, error) {
h.tlsMutex.RLock()
defer h.tlsMutex.RUnlock()
Expand Down
12 changes: 12 additions & 0 deletions cmd/allocator/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
)

const (
httpPortFlag = "http-port"
grpcPortFlag = "grpc-port"
enableStackdriverMetricsFlag = "stackdriver-exporter"
enablePrometheusMetricsFlag = "prometheus-exporter"
projectIDFlag = "gcp-project-id"
Expand All @@ -47,6 +49,8 @@ func init() {
}

type config struct {
GRPCPort int
HTTPPort int
APIServerSustainedQPS int
APIServerBurstQPS int
TLSDisabled bool
Expand All @@ -62,6 +66,8 @@ type config struct {

func parseEnvFlags() config {

viper.SetDefault(httpPortFlag, -1)
viper.SetDefault(grpcPortFlag, -1)
viper.SetDefault(apiServerSustainedQPSFlag, 400)
viper.SetDefault(apiServerBurstQPSFlag, 500)
viper.SetDefault(enablePrometheusMetricsFlag, true)
Expand All @@ -74,6 +80,8 @@ func parseEnvFlags() config {
viper.SetDefault(totalRemoteAllocationTimeoutFlag, 30*time.Second)
viper.SetDefault(logLevelFlag, "Info")

pflag.Int32(httpPortFlag, viper.GetInt32(httpPortFlag), "Port to listen on for REST requests")
pflag.Int32(grpcPortFlag, viper.GetInt32(grpcPortFlag), "Port to listen on for gRPC requests")
pflag.Int32(apiServerSustainedQPSFlag, viper.GetInt32(apiServerSustainedQPSFlag), "Maximum sustained queries per second to send to the API server")
pflag.Int32(apiServerBurstQPSFlag, viper.GetInt32(apiServerBurstQPSFlag), "Maximum burst queries per second to send to the API server")
pflag.Bool(enablePrometheusMetricsFlag, viper.GetBool(enablePrometheusMetricsFlag), "Flag to activate metrics of Agones. Can also use PROMETHEUS_EXPORTER env variable.")
Expand All @@ -89,6 +97,8 @@ func parseEnvFlags() config {
pflag.Parse()

viper.SetEnvKeyReplacer(strings.NewReplacer("-", "_"))
runtime.Must(viper.BindEnv(httpPortFlag))
runtime.Must(viper.BindEnv(grpcPortFlag))
runtime.Must(viper.BindEnv(apiServerSustainedQPSFlag))
runtime.Must(viper.BindEnv(apiServerBurstQPSFlag))
runtime.Must(viper.BindEnv(enablePrometheusMetricsFlag))
Expand All @@ -104,6 +114,8 @@ func parseEnvFlags() config {
runtime.Must(runtime.ParseFeaturesFromEnv())

return config{
HTTPPort: int(viper.GetInt32(httpPortFlag)),
GRPCPort: int(viper.GetInt32(grpcPortFlag)),
APIServerSustainedQPS: int(viper.GetInt32(apiServerSustainedQPSFlag)),
APIServerBurstQPS: int(viper.GetInt32(apiServerBurstQPSFlag)),
PrometheusMetrics: viper.GetBool(enablePrometheusMetricsFlag),
Expand Down
64 changes: 49 additions & 15 deletions install/helm/agones/templates/service/allocation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

{{- $useLoadBalancerIP := and (ne .Values.agones.allocator.http.loadBalancerIP "") (eq .Values.agones.allocator.http.serviceType "LoadBalancer") }}
{{- $useLoadBalancerIP := and (ne .Values.agones.allocator.service.loadBalancerIP "") (eq .Values.agones.allocator.service.serviceType "LoadBalancer") }}
{{- if .Values.agones.allocator.install }}
# Define a Service for the agones-allocator
apiVersion: v1
Expand All @@ -26,31 +26,46 @@ metadata:
chart: {{ template "agones.chart" . }}
release: {{ .Release.Name }}
heritage: {{ .Release.Service }}
{{- if .Values.agones.allocator.http.annotations }}
{{- if .Values.agones.allocator.service.annotations }}
annotations:
{{ toYaml .Values.agones.allocator.http.annotations | indent 4 }}
{{ toYaml .Values.agones.allocator.service.annotations | indent 4 }}
{{- end }}
spec:
selector:
multicluster.agones.dev/role: allocator
ports:
- port: {{ .Values.agones.allocator.http.port }}
name: https
targetPort: 8443
{{- if .Values.agones.allocator.service.http.enabled }}
- port: {{ .Values.agones.allocator.service.http.port }}
name: {{ .Values.agones.allocator.service.http.portName }}
targetPort: {{ .Values.agones.allocator.service.http.targetPort }}
protocol: TCP
type: {{ .Values.agones.allocator.http.serviceType }}
{{- if .Values.agones.allocator.service.grpc.enabled }}
{{- if ne .Values.agones.allocator.service.grpc.port .Values.agones.allocator.service.http.port }}
- port: {{ .Values.agones.allocator.service.grpc.port }}
name: {{ .Values.agones.allocator.service.grpc.portName }}
targetPort: {{ .Values.agones.allocator.service.grpc.targetPort }}
protocol: TCP
{{- end }}
{{- end }}
{{- else if .Values.agones.allocator.service.grpc.enabled }}
- port: {{ .Values.agones.allocator.service.grpc.port }}
name: {{ .Values.agones.allocator.service.grpc.portName }}
targetPort: {{ .Values.agones.allocator.service.grpc.targetPort }}
protocol: TCP
{{- end }}
type: {{ .Values.agones.allocator.service.serviceType }}
{{- if $useLoadBalancerIP }}
loadBalancerIP: {{ .Values.agones.allocator.http.loadBalancerIP }}
loadBalancerIP: {{ .Values.agones.allocator.service.loadBalancerIP }}
{{- end }}
{{- if eq .Values.agones.allocator.http.serviceType "LoadBalancer" }}
{{- if .Values.agones.allocator.http.loadBalancerSourceRanges }}
{{- if eq .Values.agones.allocator.service.serviceType "LoadBalancer" }}
{{- if .Values.agones.allocator.service.loadBalancerSourceRanges }}
loadBalancerSourceRanges:
{{ toYaml .Values.agones.allocator.http.loadBalancerSourceRanges | indent 4 }}
{{ toYaml .Values.agones.allocator.service.loadBalancerSourceRanges | indent 4 }}
{{- end }}
{{- end }}

---
# Deploy a pod to run the agones-allocator code
# Deploy pods to run the agones-allocator code
apiVersion: apps/v1
kind: Deployment
metadata:
Expand Down Expand Up @@ -126,6 +141,14 @@ spec:
path: /ready
port: 8080
env:
{{- if .Values.agones.allocator.service.http.enabled }}
- name: HTTP_PORT
value: {{ .Values.agones.allocator.service.http.targetPort | quote }}
{{- end }}
{{- if .Values.agones.allocator.service.grpc.enabled }}
- name: GRPC_PORT
value: {{ .Values.agones.allocator.service.grpc.targetPort | quote }}
{{- end }}
- name: API_SERVER_QPS
value: {{ .Values.agones.allocator.apiServerQPS | quote }}
- name: API_SERVER_QPS_BURST
Expand Down Expand Up @@ -161,8 +184,19 @@ spec:
- name: FEATURE_GATES
value: {{ .Values.agones.featureGates | quote }}
ports:
- name: https
containerPort: 8443
{{- if .Values.agones.allocator.service.http.enabled }}
- name: {{ .Values.agones.allocator.service.http.portName }}
containerPort: {{ .Values.agones.allocator.service.http.targetPort }}
{{- if .Values.agones.allocator.service.grpc.enabled }}
{{- if ne .Values.agones.allocator.service.grpc.port .Values.agones.allocator.service.http.port }}
- name: {{ .Values.agones.allocator.service.grpc.portName }}
containerPort: {{ .Values.agones.allocator.service.grpc.targetPort }}
{{- end }}
{{- end }}
{{- else if .Values.agones.allocator.service.grpc.enabled }}
- name: {{ .Values.agones.allocator.service.grpc.portName }}
containerPort: {{ .Values.agones.allocator.service.grpc.targetPort }}
{{- end }}
volumeMounts:
- mountPath: /home/allocator/tls
name: tls
Expand Down Expand Up @@ -277,7 +311,7 @@ data:

---
# Allocation TLS certs
{{- $cert := genSignedCert "" ($useLoadBalancerIP | ternary (list .Values.agones.allocator.http.loadBalancerIP) nil) nil 3650 $ca }}
{{- $cert := genSignedCert "" ($useLoadBalancerIP | ternary (list .Values.agones.allocator.service.loadBalancerIP) nil) nil 3650 $ca }}
apiVersion: v1
kind: Secret
type: kubernetes.io/tls
Expand Down
Loading

0 comments on commit fbe5380

Please sign in to comment.