diff --git a/app/kuma-cp/cmd/run.go b/app/kuma-cp/cmd/run.go index 646a1fba1fb3..87f42be04bbe 100644 --- a/app/kuma-cp/cmd/run.go +++ b/app/kuma-cp/cmd/run.go @@ -16,14 +16,13 @@ import ( "github.com/kumahq/kuma/pkg/defaults" "github.com/kumahq/kuma/pkg/diagnostics" "github.com/kumahq/kuma/pkg/dns/components" + dp_server "github.com/kumahq/kuma/pkg/dp-server" "github.com/kumahq/kuma/pkg/gc" kds_global "github.com/kumahq/kuma/pkg/kds/global" kds_remote "github.com/kumahq/kuma/pkg/kds/remote" mads_server "github.com/kumahq/kuma/pkg/mads/server" metrics "github.com/kumahq/kuma/pkg/metrics/components" - sds_server "github.com/kumahq/kuma/pkg/sds/server" kuma_version "github.com/kumahq/kuma/pkg/version" - xds_server "github.com/kumahq/kuma/pkg/xds/server" ) var ( @@ -76,14 +75,6 @@ func newRunCmdWithOpts(opts runCmdOpts) *cobra.Command { runLog.Info(fmt.Sprintf("Running in mode `%s`", cfg.Mode)) switch cfg.Mode { case config_core.Standalone: - if err := sds_server.SetupServer(rt); err != nil { - runLog.Error(err, "unable to set up SDS server") - return err - } - if err := xds_server.SetupServer(rt); err != nil { - runLog.Error(err, "unable to set up xDS server") - return err - } if err := mads_server.SetupServer(rt); err != nil { runLog.Error(err, "unable to set up Monitoring Assignment server") return err @@ -96,15 +87,11 @@ func newRunCmdWithOpts(opts runCmdOpts) *cobra.Command { runLog.Error(err, "unable to set up GC") return err } - case config_core.Remote: - if err := sds_server.SetupServer(rt); err != nil { - runLog.Error(err, "unable to set up SDS server") - return err - } - if err := xds_server.SetupServer(rt); err != nil { - runLog.Error(err, "unable to set up xDS server") + if err := dp_server.SetupServer(rt); err != nil { + runLog.Error(err, "unable to set up DP Server") return err } + case config_core.Remote: if err := mads_server.SetupServer(rt); err != nil { runLog.Error(err, "unable to set up Monitoring Assignment server") return err @@ -121,6 +108,10 @@ func newRunCmdWithOpts(opts runCmdOpts) *cobra.Command { runLog.Error(err, "unable to set up GC") return err } + if err := dp_server.SetupServer(rt); err != nil { + runLog.Error(err, "unable to set up DP Server") + return err + } case config_core.Global: if err := kds_global.Setup(rt); err != nil { runLog.Error(err, "unable to set up KDS Global") diff --git a/mk/kind.mk b/mk/kind.mk index e799434d38a1..28ce1cde34bd 100644 --- a/mk/kind.mk +++ b/mk/kind.mk @@ -119,8 +119,6 @@ kind/deploy/example-app: run/k8s: fmt vet ## Dev: Run Control Plane locally in Kubernetes mode @KUBECONFIG=$(KIND_KUBECONFIG) $(MAKE) crd/upgrade -C pkg/plugins/resources/k8s/native KUBECONFIG=$(KIND_KUBECONFIG) \ - KUMA_SDS_SERVER_GRPC_PORT=$(SDS_GRPC_PORT) \ - KUMA_GRPC_PORT=$(CP_GRPC_PORT) \ KUMA_ENVIRONMENT=kubernetes \ KUMA_STORE_TYPE=kubernetes \ KUMA_SDS_SERVER_TLS_CERT_FILE=app/kuma-cp/cmd/testdata/tls.crt \ diff --git a/mk/run.mk b/mk/run.mk index 0e576213b41d..486bd2143fb7 100644 --- a/mk/run.mk +++ b/mk/run.mk @@ -1,10 +1,5 @@ GO_RUN := CGO_ENABLED=0 go run $(GOFLAGS) $(LD_FLAGS) -CP_BIND_HOST ?= localhost -CP_GRPC_PORT ?= 5678 -SDS_GRPC_PORT ?= 5677 -CP_K8S_ADMISSION_PORT ?= 5443 - EXAMPLE_DATAPLANE_MESH ?= default EXAMPLE_DATAPLANE_NAME ?= example ENVOY_ADMIN_PORT ?= 9901 @@ -32,8 +27,6 @@ run/universal/postgres: fmt vet ## Dev: Run Control Plane locally in universal m KUMA_STORE_POSTGRES_TLS_CA_PATH=$(POSTGRES_SSL_ROOT_CERT_PATH) \ $(GO_RUN) ./app/kuma-cp/main.go migrate up --log-level=debug - KUMA_SDS_SERVER_GRPC_PORT=$(SDS_GRPC_PORT) \ - KUMA_GRPC_PORT=$(CP_GRPC_PORT) \ KUMA_ENVIRONMENT=universal \ KUMA_STORE_TYPE=postgres \ KUMA_STORE_POSTGRES_HOST=localhost \ @@ -65,8 +58,6 @@ config_dump/example/envoy: ## Dev: Dump effective configuration of example Envoy .PHONY: run/universal/memory run/universal/memory: ## Dev: Run Control Plane locally in universal mode with in-memory store - KUMA_SDS_SERVER_GRPC_PORT=$(SDS_GRPC_PORT) \ - KUMA_GRPC_PORT=$(CP_GRPC_PORT) \ KUMA_ENVIRONMENT=universal \ KUMA_STORE_TYPE=memory \ $(GO_RUN) ./app/kuma-cp/main.go run --log-level=debug diff --git a/pkg/api-server/config_ws_test.go b/pkg/api-server/config_ws_test.go index ca86e962ac8a..eff7a9c862dc 100644 --- a/pkg/api-server/config_ws_test.go +++ b/pkg/api-server/config_ws_test.go @@ -77,10 +77,7 @@ var _ = Describe("Config WS", func() { "xdsConnectTimeout": "1s", "xdsHost": "", "xdsPort": 0 - }, - "port": 5682, - "tlsCertFile": "", - "tlsKeyFile": "" + } }, "adminServer": { "local": { @@ -217,11 +214,13 @@ var _ = Describe("Config WS", func() { } }, "sdsServer": { - "grpcPort": 5677, - "tlsCertFile": "", - "tlsKeyFile": "", "dataplaneConfigurationRefreshInterval": "1s" }, + "dpServer": { + "port": 5678, + "tlsCertFile": "", + "tlsKeyFile": "" + }, "store": { "kubernetes": { "systemNamespace": "kuma-system" @@ -250,10 +249,7 @@ var _ = Describe("Config WS", func() { "xdsServer": { "dataplaneConfigurationRefreshInterval": "1s", "dataplaneStatusFlushInterval": "1s", - "diagnosticsPort": 5680, - "grpcPort": 5678, - "tlsCertFile": "", - "tlsKeyFile": "" + "diagnosticsPort": 5680 }, "diagnostics": { "debugEndpoints": false diff --git a/pkg/config/app/kuma-cp/config.go b/pkg/config/app/kuma-cp/config.go index c02d7ca7cce9..bb5e60e8634c 100644 --- a/pkg/config/app/kuma-cp/config.go +++ b/pkg/config/app/kuma-cp/config.go @@ -5,17 +5,17 @@ import ( "github.com/pkg/errors" - "github.com/kumahq/kuma/pkg/config/diagnostics" - "github.com/kumahq/kuma/pkg/config/multicluster" - "github.com/kumahq/kuma/pkg/config" admin_server "github.com/kumahq/kuma/pkg/config/admin-server" api_server "github.com/kumahq/kuma/pkg/config/api-server" "github.com/kumahq/kuma/pkg/config/core" "github.com/kumahq/kuma/pkg/config/core/resources/store" + "github.com/kumahq/kuma/pkg/config/diagnostics" dns_server "github.com/kumahq/kuma/pkg/config/dns-server" + dp_server "github.com/kumahq/kuma/pkg/config/dp-server" gui_server "github.com/kumahq/kuma/pkg/config/gui-server" "github.com/kumahq/kuma/pkg/config/mads" + "github.com/kumahq/kuma/pkg/config/multicluster" "github.com/kumahq/kuma/pkg/config/plugins/runtime" "github.com/kumahq/kuma/pkg/config/sds" "github.com/kumahq/kuma/pkg/config/xds" @@ -124,6 +124,8 @@ type Config struct { DNSServer *dns_server.DNSServerConfig `yaml:"dnsServer,omitempty"` // Diagnostics configuration Diagnostics *diagnostics.DiagnosticsConfig `yaml:"diagnostics,omitempty"` + // Dataplane Server configuration + DpServer *dp_server.DpServerConfig `yaml:"dpServer"` } func (c *Config) Sanitize() { @@ -177,6 +179,7 @@ func DefaultConfig() Config { DNSServer: dns_server.DefaultDNSServerConfig(), Multicluster: multicluster.DefaultMulticlusterConfig(), Diagnostics: diagnostics.DefaultDiagnosticsConfig(), + DpServer: dp_server.DefaultDpServerConfig(), } } diff --git a/pkg/config/app/kuma-cp/kuma-cp.defaults.yaml b/pkg/config/app/kuma-cp/kuma-cp.defaults.yaml index e77b4e332d62..46a438bb2b29 100644 --- a/pkg/config/app/kuma-cp/kuma-cp.defaults.yaml +++ b/pkg/config/app/kuma-cp/kuma-cp.defaults.yaml @@ -49,12 +49,6 @@ store: # Configuration of Bootstrap Server, which provides bootstrap config to Dataplanes bootstrapServer: - # Port of Server that provides bootstrap configuration for dataplanes - port: 5682 # ENV: KUMA_BOOTSTRAP_SERVER_PORT - # TlsCertFile defines a path to a file with PEM-encoded TLS cert. - tlsCertFile: # ENV: KUMA_BOOTSTRAP_SERVER_TLS_CERT_FILE - # TlsKeyFile defines a path to a file with PEM-encoded TLS key. - tlsKeyFile: # ENV: KUMA_BOOTSTRAP_SERVER_TLS_KEY_FILE # Parameters of bootstrap configuration params: # Address of Envoy Admin @@ -72,12 +66,6 @@ bootstrapServer: # Envoy SDS server configuration sdsServer: - # Port of GRPC server that Envoy connects to - grpcPort: 5677 # ENV: KUMA_SDS_SERVER_GRPC_PORT - # TlsCertFile defines a path to a file with PEM-encoded TLS cert. - tlsCertFile: # ENV: KUMA_SDS_SERVER_TLS_CERT_FILE - # TlsKeyFile defines a path to a file with PEM-encoded TLS key. - tlsKeyFile: # ENV: KUMA_SDS_SERVER_TLS_KEY_FILE # Interval for re-genarting configuration for Dataplanes connected to the Control Plane dataplaneConfigurationRefreshInterval: 1s # ENV: KUMA_SDS_SERVER_DATAPLANE_CONFIGURATION_REFRESH_INTERVAL @@ -117,18 +105,12 @@ adminServer: # Envoy XDS server configuration xdsServer: - # Port of GRPC server that Envoy connects to - grpcPort: 5678 # ENV: KUMA_XDS_SERVER_GRPC_PORT # Port of Diagnostic Server for checking health and readiness of the Control Plane diagnosticsPort: 5680 # ENV: KUMA_XDS_SERVER_DIAGNOSTICS_PORT # Interval for re-genarting configuration for Dataplanes connected to the Control Plane dataplaneConfigurationRefreshInterval: 1s # ENV: KUMA_XDS_SERVER_DATAPLANE_CONFIGURATION_REFRESH_INTERVAL # Interval for flushing status of Dataplanes connected to the Control Plane dataplaneStatusFlushInterval: 1s # ENV: KUMA_XDS_SERVER_DATAPLANE_STATUS_FLUSH_INTERVAL - # TlsCertFile defines a path to a file with PEM-encoded TLS cert. - tlsCertFile: # ENV: KUMA_XDS_SERVER_TLS_CERT_FILE - # TlsKeyFile defines a path to a file with PEM-encoded TLS key. - tlsKeyFile: # ENV: KUMA_XDS_SERVER_TLS_KEY_FILE # API Server configuration apiServer: @@ -279,7 +261,11 @@ general: # Control Plane will use this value in configuration generated for dataplanes, in responses to `kumactl`, etc. advertisedHostname: localhost # ENV: KUMA_GENERAL_ADVERTISED_HOSTNAME # dnsCacheTTL represents duration for how long Kuma CP will cache result of resolving dataplane's domain name - dnsCacheTTL: 10s + dnsCacheTTL: 10s # ENV: KUMA_GENERAL_DNS_CACHE_TTL + # TlsCertFile defines a path to a file with PEM-encoded TLS cert that will be used across all the Kuma Servers. + tlsCertFile: # ENV: KUMA_GENERAL_TLS_CERT_FILE + # TlsKeyFile defines a path to a file with PEM-encoded TLS key that will be used across all the Kuma Servers. + tlsKeyFile: # ENV: KUMA_GENERAL_TLS_KEY_FILE # Web GUI Server configuration guiServer: @@ -324,3 +310,12 @@ multicluster: diagnostics: # If true, enables https://golang.org/pkg/net/http/pprof/ debug endpoints debugEndpoints: false # ENV: KUMA_DIAGNOSTICS_DEBUG_ENDPOINTS + +# Dataplane Server configuration that servers API like Bootstrap/XDS/SDS for the Dataplane. +dpServer: + # Port of the DP Server + port: 5678 # ENV: KUMA_DP_SERVER_PORT + # TlsCertFile defines a path to a file with PEM-encoded TLS cert. If empty, autoconfigured from general.tlsCertFile + tlsCertFile: # ENV: KUMA_DP_SERVER_TLS_CERT_FILE + # TlsKeyFile defines a path to a file with PEM-encoded TLS key. If empty, autoconfigured from general.tlsKeyFile + tlsKeyFile: # ENV: KUMA_DP_SERVER_TLS_KEY_FILE diff --git a/pkg/config/dp-server/config.go b/pkg/config/dp-server/config.go new file mode 100644 index 000000000000..b09809e47f73 --- /dev/null +++ b/pkg/config/dp-server/config.go @@ -0,0 +1,35 @@ +package dp_server + +import ( + "errors" + + "github.com/kumahq/kuma/pkg/config" +) + +var _ config.Config = &DpServerConfig{} + +// Dataplane Server configuration that servers API like Bootstrap/XDS/SDS. +type DpServerConfig struct { + // Port of the DP Server + Port int `yaml:"port" envconfig:"kuma_dp_server_port"` + // TlsCertFile defines a path to a file with PEM-encoded TLS cert. If empty, autoconfigured from general.tlsCertFile + TlsCertFile string `yaml:"tlsCertFile" envconfig:"kuma_dp_server_tls_cert_file"` + // TlsKeyFile defines a path to a file with PEM-encoded TLS key. If empty, autoconfigured from general.tlsKeyFile + TlsKeyFile string `yaml:"tlsKeyFile" envconfig:"kuma_dp_server_tls_key_file"` +} + +func (a *DpServerConfig) Sanitize() { +} + +func (a *DpServerConfig) Validate() error { + if a.Port < 0 { + return errors.New("Port cannot be negative") + } + return nil +} + +func DefaultDpServerConfig() *DpServerConfig { + return &DpServerConfig{ + Port: 5678, + } +} diff --git a/pkg/config/loader_test.go b/pkg/config/loader_test.go index 4fe02df0d9a6..3ff4d3c14f7d 100644 --- a/pkg/config/loader_test.go +++ b/pkg/config/loader_test.go @@ -63,10 +63,8 @@ var _ = Describe("Config loader", func() { Expect(err).ToNot(HaveOccurred()) // then - Expect(cfg.XdsServer.GrpcPort).To(Equal(5000)) Expect(cfg.XdsServer.DiagnosticsPort).To(Equal(5003)) - Expect(cfg.BootstrapServer.Port).To(Equal(uint32(5004))) Expect(cfg.BootstrapServer.Params.AdminPort).To(Equal(uint32(1234))) Expect(cfg.BootstrapServer.Params.XdsHost).To(Equal("kuma-control-plane")) Expect(cfg.BootstrapServer.Params.XdsPort).To(Equal(uint32(4321))) @@ -160,10 +158,8 @@ store: enabled: false expirationTime: 3s xdsServer: - grpcPort: 5000 diagnosticsPort: 5003 bootstrapServer: - port: 5004 params: adminPort: 1234 xdsHost: kuma-control-plane @@ -238,9 +234,7 @@ diagnostics: }), Entry("from env variables", testCase{ envVars: map[string]string{ - "KUMA_XDS_SERVER_GRPC_PORT": "5000", "KUMA_XDS_SERVER_DIAGNOSTICS_PORT": "5003", - "KUMA_BOOTSTRAP_SERVER_PORT": "5004", "KUMA_BOOTSTRAP_SERVER_PARAMS_ADMIN_PORT": "1234", "KUMA_BOOTSTRAP_SERVER_PARAMS_XDS_HOST": "kuma-control-plane", "KUMA_BOOTSTRAP_SERVER_PARAMS_XDS_PORT": "4321", diff --git a/pkg/config/sds/config.go b/pkg/config/sds/config.go index 241aff1b4ad7..2ba0abcbfd9b 100644 --- a/pkg/config/sds/config.go +++ b/pkg/config/sds/config.go @@ -3,27 +3,17 @@ package sds import ( "time" - "github.com/pkg/errors" - "github.com/kumahq/kuma/pkg/config" ) func DefaultSdsServerConfig() *SdsServerConfig { return &SdsServerConfig{ - GrpcPort: 5677, DataplaneConfigurationRefreshInterval: 1 * time.Second, } } // Envoy SDS server configuration type SdsServerConfig struct { - // Port of GRPC server that Envoy connects to - GrpcPort int `yaml:"grpcPort" envconfig:"kuma_sds_server_grpc_port"` - // TlsCertFile defines a path to a file with PEM-encoded TLS cert. - TlsCertFile string `yaml:"tlsCertFile" envconfig:"kuma_sds_server_tls_cert_file"` - // TlsKeyFile defines a path to a file with PEM-encoded TLS key. - TlsKeyFile string `yaml:"tlsKeyFile" envconfig:"kuma_sds_server_tls_key_file"` - // Interval for re-genarting configuration for Dataplanes connected to the Control Plane DataplaneConfigurationRefreshInterval time.Duration `yaml:"dataplaneConfigurationRefreshInterval" envconfig:"kuma_sds_server_dataplane_configuration_refresh_interval"` } @@ -34,14 +24,5 @@ func (c *SdsServerConfig) Sanitize() { } func (c *SdsServerConfig) Validate() error { - if c.GrpcPort < 0 { - return errors.New("GrpcPort cannot be negative") - } - if c.TlsCertFile == "" && c.TlsKeyFile != "" { - return errors.New("TlsCertFile cannot be empty if TlsKeyFile has been set") - } - if c.TlsKeyFile == "" && c.TlsCertFile != "" { - return errors.New("TlsKeyFile cannot be empty if TlsCertFile has been set") - } return nil } diff --git a/pkg/config/xds/bootstrap/config.go b/pkg/config/xds/bootstrap/config.go index 1c66e5619fd8..a5bae355edc2 100644 --- a/pkg/config/xds/bootstrap/config.go +++ b/pkg/config/xds/bootstrap/config.go @@ -12,14 +12,8 @@ import ( var _ config.Config = &BootstrapServerConfig{} type BootstrapServerConfig struct { - // Port of Server that provides bootstrap configuration for dataplanes - Port uint32 `yaml:"port" envconfig:"kuma_bootstrap_server_port"` // Parameters of bootstrap configuration Params *BootstrapParamsConfig `yaml:"params"` - // TlsCertFile defines a path to a file with PEM-encoded TLS cert. - TlsCertFile string `yaml:"tlsCertFile" envconfig:"kuma_bootstrap_server_tls_cert_file"` - // TlsKeyFile defines a path to a file with PEM-encoded TLS key. - TlsKeyFile string `yaml:"tlsKeyFile" envconfig:"kuma_bootstrap_server_tls_key_file"` } func (b *BootstrapServerConfig) Sanitize() { @@ -27,9 +21,6 @@ func (b *BootstrapServerConfig) Sanitize() { } func (b *BootstrapServerConfig) Validate() error { - if b.Port > 65535 { - return errors.New("Port must be in the range [0, 65535]") - } if err := b.Params.Validate(); err != nil { return errors.Wrap(err, "Params validation failed") } @@ -38,7 +29,6 @@ func (b *BootstrapServerConfig) Validate() error { func DefaultBootstrapServerConfig() *BootstrapServerConfig { return &BootstrapServerConfig{ - Port: 5682, Params: DefaultBootstrapParamsConfig(), } } diff --git a/pkg/config/xds/bootstrap/config_test.go b/pkg/config/xds/bootstrap/config_test.go index dc31cef443f6..f564a00f9d70 100644 --- a/pkg/config/xds/bootstrap/config_test.go +++ b/pkg/config/xds/bootstrap/config_test.go @@ -26,7 +26,6 @@ var _ = Describe("BootstrappServerConfig", func() { Expect(err).ToNot(HaveOccurred()) // and - Expect(cfg.Port).To(Equal(uint32(1234))) Expect(cfg.Params.AdminAddress).To(Equal("192.168.0.1")) Expect(cfg.Params.AdminPort).To(Equal(uint32(4321))) Expect(cfg.Params.AdminAccessLogPath).To(Equal("/var/log")) @@ -54,7 +53,6 @@ var _ = Describe("BootstrappServerConfig", func() { It("should be loadable from environment variables", func() { // setup env := map[string]string{ - "KUMA_BOOTSTRAP_SERVER_PORT": "1234", "KUMA_BOOTSTRAP_SERVER_PARAMS_ADMIN_ADDRESS": "192.168.0.1", "KUMA_BOOTSTRAP_SERVER_PARAMS_ADMIN_PORT": "4321", "KUMA_BOOTSTRAP_SERVER_PARAMS_ADMIN_ACCESS_LOG_PATH": "/var/log", @@ -76,7 +74,6 @@ var _ = Describe("BootstrappServerConfig", func() { Expect(err).ToNot(HaveOccurred()) // and - Expect(cfg.Port).To(Equal(uint32(1234))) Expect(cfg.Params.AdminAddress).To(Equal("192.168.0.1")) Expect(cfg.Params.AdminPort).To(Equal(uint32(4321))) Expect(cfg.Params.AdminAccessLogPath).To(Equal("/var/log")) @@ -102,15 +99,4 @@ var _ = Describe("BootstrappServerConfig", func() { // and Expect(actual).To(MatchYAML(expected)) }) - - It("should have validators", func() { - // given - cfg := BootstrapServerConfig{} - - // when - err := config.Load(filepath.Join("testdata", "invalid-config.input.yaml"), &cfg) - - // then - Expect(err).To(MatchError(`Invalid configuration: Port must be in the range [0, 65535]`)) - }) }) diff --git a/pkg/config/xds/bootstrap/testdata/default-config.golden.yaml b/pkg/config/xds/bootstrap/testdata/default-config.golden.yaml index 332cd681d560..10fdbe7ceed1 100644 --- a/pkg/config/xds/bootstrap/testdata/default-config.golden.yaml +++ b/pkg/config/xds/bootstrap/testdata/default-config.golden.yaml @@ -1,4 +1,3 @@ -port: 5682 params: adminAccessLogPath: /dev/null adminAddress: 127.0.0.1 @@ -6,5 +5,3 @@ params: xdsConnectTimeout: 1s xdsHost: "" xdsPort: 0 -tlsCertFile: "" -tlsKeyFile: "" diff --git a/pkg/config/xds/bootstrap/testdata/invalid-config.input.yaml b/pkg/config/xds/bootstrap/testdata/invalid-config.input.yaml deleted file mode 100644 index df0c4746caa0..000000000000 --- a/pkg/config/xds/bootstrap/testdata/invalid-config.input.yaml +++ /dev/null @@ -1,10 +0,0 @@ -port: 65536 -params: - adminAccessLogPath: - adminAddress: localhost - adminPort: 65537 - xdsConnectTimeout: -1s - xdsHost: - xdsPort: 65538 -tlsCertFile: "" -tlsKeyFile: "" diff --git a/pkg/config/xds/bootstrap/testdata/valid-config.input.yaml b/pkg/config/xds/bootstrap/testdata/valid-config.input.yaml index 1c635f6f7254..c00b37d2e48a 100644 --- a/pkg/config/xds/bootstrap/testdata/valid-config.input.yaml +++ b/pkg/config/xds/bootstrap/testdata/valid-config.input.yaml @@ -1,4 +1,3 @@ -port: 1234 params: adminAddress: 192.168.0.1 adminPort: 4321 diff --git a/pkg/config/xds/config.go b/pkg/config/xds/config.go index a10f98c57654..82c71f1ce0bc 100644 --- a/pkg/config/xds/config.go +++ b/pkg/config/xds/config.go @@ -12,28 +12,18 @@ var _ config.Config = &XdsServerConfig{} // Envoy XDS server configuration type XdsServerConfig struct { - // Port of GRPC server that Envoy connects to - GrpcPort int `yaml:"grpcPort" envconfig:"kuma_xds_server_grpc_port"` // Port of Diagnostic Server for checking health and readiness of the Control Plane DiagnosticsPort int `yaml:"diagnosticsPort" envconfig:"kuma_xds_server_diagnostics_port"` - // Interval for re-genarting configuration for Dataplanes connected to the Control Plane DataplaneConfigurationRefreshInterval time.Duration `yaml:"dataplaneConfigurationRefreshInterval" envconfig:"kuma_xds_server_dataplane_configuration_refresh_interval"` // Interval for flushing status of Dataplanes connected to the Control Plane DataplaneStatusFlushInterval time.Duration `yaml:"dataplaneStatusFlushInterval" envconfig:"kuma_xds_server_dataplane_status_flush_interval"` - // TlsCertFile defines a path to a file with PEM-encoded TLS cert. - TlsCertFile string `yaml:"tlsCertFile" envconfig:"kuma_xds_server_tls_cert_file"` - // TlsKeyFile defines a path to a file with PEM-encoded TLS key. - TlsKeyFile string `yaml:"tlsKeyFile" envconfig:"kuma_xds_server_tls_key_file"` } func (x *XdsServerConfig) Sanitize() { } func (x *XdsServerConfig) Validate() error { - if x.GrpcPort < 0 { - return errors.New("GrpcPort cannot be negative") - } if x.DiagnosticsPort < 0 { return errors.New("DiagnosticPort cannot be negative") } @@ -43,22 +33,13 @@ func (x *XdsServerConfig) Validate() error { if x.DataplaneStatusFlushInterval <= 0 { return errors.New("DataplaneStatusFlushInterval must be positive") } - if x.TlsCertFile == "" && x.TlsKeyFile != "" { - return errors.New("TlsCertFile cannot be empty if TlsKeyFile has been set") - } - if x.TlsKeyFile == "" && x.TlsCertFile != "" { - return errors.New("TlsKeyFile cannot be empty if TlsCertFile has been set") - } return nil } func DefaultXdsServerConfig() *XdsServerConfig { return &XdsServerConfig{ - GrpcPort: 5678, DiagnosticsPort: 5680, DataplaneConfigurationRefreshInterval: 1 * time.Second, DataplaneStatusFlushInterval: 1 * time.Second, - TlsCertFile: "", - TlsKeyFile: "", } } diff --git a/pkg/config/xds/config_test.go b/pkg/config/xds/config_test.go index d3117a83c33e..519c4f241678 100644 --- a/pkg/config/xds/config_test.go +++ b/pkg/config/xds/config_test.go @@ -26,12 +26,9 @@ var _ = Describe("XdsServerConfig", func() { Expect(err).ToNot(HaveOccurred()) // and - Expect(cfg.GrpcPort).To(Equal(1234)) Expect(cfg.DiagnosticsPort).To(Equal(3456)) Expect(cfg.DataplaneConfigurationRefreshInterval).To(Equal(3 * time.Second)) Expect(cfg.DataplaneStatusFlushInterval).To(Equal(5 * time.Second)) - Expect(cfg.TlsCertFile).To(Equal("/tmp/cert.pem")) - Expect(cfg.TlsKeyFile).To(Equal("/tmp/key.pem")) }) Context("with modified environment variables", func() { @@ -53,12 +50,9 @@ var _ = Describe("XdsServerConfig", func() { It("should be loadable from environment variables", func() { // setup env := map[string]string{ - "KUMA_XDS_SERVER_GRPC_PORT": "1234", "KUMA_XDS_SERVER_DIAGNOSTICS_PORT": "3456", "KUMA_XDS_SERVER_DATAPLANE_CONFIGURATION_REFRESH_INTERVAL": "3s", "KUMA_XDS_SERVER_DATAPLANE_STATUS_FLUSH_INTERVAL": "5s", - "KUMA_XDS_SERVER_TLS_CERT_FILE": "/tmp/cert-env.pem", - "KUMA_XDS_SERVER_TLS_KEY_FILE": "/tmp/key-env.pem", } for key, value := range env { os.Setenv(key, value) @@ -74,12 +68,9 @@ var _ = Describe("XdsServerConfig", func() { Expect(err).ToNot(HaveOccurred()) // and - Expect(cfg.GrpcPort).To(Equal(1234)) Expect(cfg.DiagnosticsPort).To(Equal(3456)) Expect(cfg.DataplaneConfigurationRefreshInterval).To(Equal(3 * time.Second)) Expect(cfg.DataplaneStatusFlushInterval).To(Equal(5 * time.Second)) - Expect(cfg.TlsCertFile).To(Equal("/tmp/cert-env.pem")) - Expect(cfg.TlsKeyFile).To(Equal("/tmp/key-env.pem")) }) }) diff --git a/pkg/config/xds/testdata/default-config.golden.yaml b/pkg/config/xds/testdata/default-config.golden.yaml index 1c480a8f9d14..99bfb9696b28 100644 --- a/pkg/config/xds/testdata/default-config.golden.yaml +++ b/pkg/config/xds/testdata/default-config.golden.yaml @@ -1,6 +1,3 @@ -grpcPort: 5678 diagnosticsPort: 5680 dataplaneConfigurationRefreshInterval: 1s dataplaneStatusFlushInterval: 1s -tlsCertFile: "" -tlsKeyFile: "" \ No newline at end of file diff --git a/pkg/config/xds/testdata/invalid-config.input.yaml b/pkg/config/xds/testdata/invalid-config.input.yaml index 4cdca33ad602..b8a02436776e 100644 --- a/pkg/config/xds/testdata/invalid-config.input.yaml +++ b/pkg/config/xds/testdata/invalid-config.input.yaml @@ -1,6 +1,3 @@ -grpcPort: 1234 diagnosticsPort: 3456 dataplaneConfigurationRefreshInterval: 0 dataplaneStatusFlushInterval: 0 -tlsCertFile: "/tmp/cert.pem" -tlsKeyFile: "" \ No newline at end of file diff --git a/pkg/config/xds/testdata/valid-config.input.yaml b/pkg/config/xds/testdata/valid-config.input.yaml index 1087e6681600..d7f195939c1b 100644 --- a/pkg/config/xds/testdata/valid-config.input.yaml +++ b/pkg/config/xds/testdata/valid-config.input.yaml @@ -1,6 +1,3 @@ -grpcPort: 1234 diagnosticsPort: 3456 dataplaneConfigurationRefreshInterval: 3s dataplaneStatusFlushInterval: 5s -tlsCertFile: "/tmp/cert.pem" -tlsKeyFile: "/tmp/key.pem" \ No newline at end of file diff --git a/pkg/core/bootstrap/autoconfig.go b/pkg/core/bootstrap/autoconfig.go index 731097ae7eb2..3b63270ad296 100644 --- a/pkg/core/bootstrap/autoconfig.go +++ b/pkg/core/bootstrap/autoconfig.go @@ -28,22 +28,14 @@ func autoconfigure(cfg *kuma_cp.Config) error { } func autoconfigureServersTLS(cfg *kuma_cp.Config) { - if cfg.XdsServer.TlsCertFile == "" { - cfg.XdsServer.TlsCertFile = cfg.General.TlsCertFile - cfg.XdsServer.TlsKeyFile = cfg.General.TlsKeyFile - } - if cfg.BootstrapServer.TlsCertFile == "" { - cfg.BootstrapServer.TlsCertFile = cfg.General.TlsCertFile - cfg.BootstrapServer.TlsKeyFile = cfg.General.TlsKeyFile - } - if cfg.SdsServer.TlsCertFile == "" { - cfg.SdsServer.TlsCertFile = cfg.General.TlsCertFile - cfg.SdsServer.TlsKeyFile = cfg.General.TlsKeyFile - } if cfg.Multicluster.Global.KDS.TlsCertFile == "" { cfg.Multicluster.Global.KDS.TlsCertFile = cfg.General.TlsCertFile cfg.Multicluster.Global.KDS.TlsKeyFile = cfg.General.TlsKeyFile } + if cfg.DpServer.TlsCertFile == "" { + cfg.DpServer.TlsCertFile = cfg.General.TlsCertFile + cfg.DpServer.TlsKeyFile = cfg.General.TlsKeyFile + } } func autoconfigureTLS(cfg *kuma_cp.Config) error { @@ -70,7 +62,7 @@ func autoconfigureTLS(cfg *kuma_cp.Config) error { func autoconfigureCatalog(cfg *kuma_cp.Config) { bootstrapUrl := cfg.ApiServer.Catalog.Bootstrap.Url if len(bootstrapUrl) == 0 { - bootstrapUrl = fmt.Sprintf("https://%s:%d", cfg.General.AdvertisedHostname, cfg.BootstrapServer.Port) + bootstrapUrl = fmt.Sprintf("https://%s:%d", cfg.General.AdvertisedHostname, cfg.DpServer.Port) } madsUrl := cfg.ApiServer.Catalog.MonitoringAssignment.Url if len(madsUrl) == 0 { @@ -116,7 +108,7 @@ func autoconfigBootstrapXdsParams(cfg *kuma_cp.Config) { cfg.BootstrapServer.Params.XdsHost = cfg.General.AdvertisedHostname } if cfg.BootstrapServer.Params.XdsPort == 0 { - cfg.BootstrapServer.Params.XdsPort = uint32(cfg.XdsServer.GrpcPort) + cfg.BootstrapServer.Params.XdsPort = uint32(cfg.DpServer.Port) } } diff --git a/pkg/core/bootstrap/autoconfig_test.go b/pkg/core/bootstrap/autoconfig_test.go index bd4e465c9356..f9735ce2c177 100644 --- a/pkg/core/bootstrap/autoconfig_test.go +++ b/pkg/core/bootstrap/autoconfig_test.go @@ -38,7 +38,7 @@ var _ = Describe("Auto configuration", func() { cfg.AdminServer.Public.Enabled = true cfg.AdminServer.Public.Interface = "192.168.0.1" cfg.AdminServer.Public.Port = 2222 - cfg.BootstrapServer.Port = 3333 + cfg.DpServer.Port = 3333 cfg.ApiServer.Port = 1234 return cfg }, @@ -69,7 +69,7 @@ var _ = Describe("Auto configuration", func() { cfg.AdminServer.Local.Port = 1111 cfg.AdminServer.Public.Enabled = true cfg.AdminServer.Public.Interface = "192.168.0.1" - cfg.BootstrapServer.Port = 3333 + cfg.DpServer.Port = 3333 return cfg }, expectedCatalogConfig: catalog.CatalogConfig{ @@ -97,7 +97,7 @@ var _ = Describe("Auto configuration", func() { cfg := kuma_cp.DefaultConfig() cfg.General.AdvertisedHostname = "kuma.internal" cfg.AdminServer.Local.Port = 1111 - cfg.BootstrapServer.Port = 3333 + cfg.DpServer.Port = 3333 return cfg }, expectedCatalogConfig: catalog.CatalogConfig{ @@ -131,7 +131,7 @@ var _ = Describe("Auto configuration", func() { Url: "http://localhost:5681", }, Bootstrap: catalog.BootstrapApiConfig{ - Url: "https://localhost:5682", + Url: "https://localhost:5678", }, DataplaneToken: catalog.DataplaneTokenApiConfig{ LocalUrl: "", @@ -154,7 +154,7 @@ var _ = Describe("Auto configuration", func() { cfg.AdminServer.Public.Enabled = true cfg.AdminServer.Public.Interface = "192.168.0.1" cfg.AdminServer.Public.Port = 2222 - cfg.BootstrapServer.Port = 3333 + cfg.DpServer.Port = 3333 cfg.ApiServer.Catalog.Bootstrap.Url = "https://bootstrap.kuma.com:1234" cfg.ApiServer.Catalog.MonitoringAssignment.Url = "grpcs://mads.kuma.com:1234" return cfg @@ -202,7 +202,7 @@ var _ = Describe("Auto configuration", func() { // given cfg := kuma_cp.DefaultConfig() cfg.General.AdvertisedHostname = "kuma.internal" - cfg.XdsServer.GrpcPort = 1234 + cfg.DpServer.Port = 1234 // when err := autoconfigure(&cfg) diff --git a/pkg/dp-server/components.go b/pkg/dp-server/components.go new file mode 100644 index 000000000000..d9651fb5a868 --- /dev/null +++ b/pkg/dp-server/components.go @@ -0,0 +1,23 @@ +package dp_server + +import ( + "github.com/kumahq/kuma/pkg/core/runtime" + sds_server "github.com/kumahq/kuma/pkg/sds/server" + "github.com/kumahq/kuma/pkg/xds/bootstrap" + xds_server "github.com/kumahq/kuma/pkg/xds/server" +) + +func SetupServer(rt runtime.Runtime) error { + dpServer := NewDpServer(*rt.Config().DpServer, rt.Metrics()) + if err := sds_server.RegisterSDS(rt, dpServer.grpcServer); err != nil { + return err + } + if err := xds_server.RegisterXDS(rt, dpServer.grpcServer); err != nil { + return err + } + bootstrap.RegisterBootstrap(rt, dpServer.httpMux) + if err := rt.Add(dpServer); err != nil { + return err + } + return nil +} diff --git a/pkg/dp-server/server.go b/pkg/dp-server/server.go new file mode 100644 index 000000000000..4222eed4d2fe --- /dev/null +++ b/pkg/dp-server/server.go @@ -0,0 +1,101 @@ +package dp_server + +import ( + "context" + "fmt" + "net/http" + "strings" + + http_prometheus "github.com/slok/go-http-metrics/metrics/prometheus" + "github.com/slok/go-http-metrics/middleware" + "github.com/slok/go-http-metrics/middleware/std" + "google.golang.org/grpc" + + dp_server "github.com/kumahq/kuma/pkg/config/dp-server" + "github.com/kumahq/kuma/pkg/core" + "github.com/kumahq/kuma/pkg/core/runtime/component" + "github.com/kumahq/kuma/pkg/metrics" +) + +var log = core.Log.WithName("dp-server") + +const grpcMaxConcurrentStreams = 1000000 + +type DpServer struct { + config dp_server.DpServerConfig + httpMux *http.ServeMux + grpcServer *grpc.Server + promMiddleware middleware.Middleware +} + +var _ component.Component = &DpServer{} + +func NewDpServer(config dp_server.DpServerConfig, metrics metrics.Metrics) *DpServer { + grpcOptions := []grpc.ServerOption{ + grpc.MaxConcurrentStreams(grpcMaxConcurrentStreams), + } + grpcOptions = append(grpcOptions, metrics.GRPCServerInterceptors()...) + grpcServer := grpc.NewServer(grpcOptions...) + + promMiddleware := middleware.New(middleware.Config{ + Recorder: http_prometheus.NewRecorder(http_prometheus.Config{ + Registry: metrics, + Prefix: "dp_server", + }), + }) + + return &DpServer{ + config: config, + httpMux: http.NewServeMux(), + grpcServer: grpcServer, + promMiddleware: promMiddleware, + } +} + +func (d *DpServer) Start(stop <-chan struct{}) error { + server := &http.Server{ + Addr: fmt.Sprintf(":%d", d.config.Port), + Handler: http.HandlerFunc(d.handle), + } + + errChan := make(chan error) + + go func() { + defer close(errChan) + if err := server.ListenAndServeTLS(d.config.TlsCertFile, d.config.TlsKeyFile); err != nil { + if err != http.ErrServerClosed { + log.Error(err, "terminated with an error") + errChan <- err + return + } + } + log.Info("terminated normally") + }() + log.Info("starting", "interface", "0.0.0.0", "port", d.config.Port, "tls", true) + + select { + case <-stop: + log.Info("stopping") + return server.Shutdown(context.Background()) + case err := <-errChan: + return err + } +} + +func (d *DpServer) NeedLeaderElection() bool { + return false +} + +func (d *DpServer) handle(writer http.ResponseWriter, request *http.Request) { + if request.ProtoMajor == 2 && strings.Contains(request.Header.Get("Content-Type"), "application/grpc") { + d.grpcServer.ServeHTTP(writer, request) + } else { + // we only want to measure HTTP not GRPC requests because they can mess up metrics + // for example ADS bi-directional stream counts as one really long request + std.Handler("", d.promMiddleware, d.httpMux).ServeHTTP(writer, request) + } +} + +func (d *DpServer) HTTPMux() *http.ServeMux { + return d.httpMux +} diff --git a/pkg/sds/server/grpc.go b/pkg/sds/server/grpc.go deleted file mode 100644 index 6a24c449821c..000000000000 --- a/pkg/sds/server/grpc.go +++ /dev/null @@ -1,85 +0,0 @@ -package server - -import ( - "fmt" - "net" - - envoy_discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" - - "github.com/pkg/errors" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials" - - envoy_server "github.com/envoyproxy/go-control-plane/pkg/server/v2" - - sds_config "github.com/kumahq/kuma/pkg/config/sds" - "github.com/kumahq/kuma/pkg/core" - "github.com/kumahq/kuma/pkg/core/runtime/component" - "github.com/kumahq/kuma/pkg/metrics" -) - -const grpcMaxConcurrentStreams = 1000000 - -var ( - grpcServerLog = core.Log.WithName("sds-server").WithName("grpc") -) - -type grpcServer struct { - server envoy_server.Server - config sds_config.SdsServerConfig - metrics metrics.Metrics -} - -func (s *grpcServer) NeedLeaderElection() bool { - return false -} - -var ( - _ component.Component = &grpcServer{} -) - -func (s *grpcServer) Start(stop <-chan struct{}) error { - grpcOptions := []grpc.ServerOption{ - grpc.MaxConcurrentStreams(grpcMaxConcurrentStreams), - } - grpcOptions = append(grpcOptions, s.metrics.GRPCServerInterceptors()...) - useTLS := s.config.TlsCertFile != "" - if useTLS { - creds, err := credentials.NewServerTLSFromFile(s.config.TlsCertFile, s.config.TlsKeyFile) - if err != nil { - return errors.Wrap(err, "failed to load TLS certificate") - } - grpcOptions = append(grpcOptions, grpc.Creds(creds)) - } - grpcServer := grpc.NewServer(grpcOptions...) - - // register services - envoy_discovery.RegisterSecretDiscoveryServiceServer(grpcServer, s.server) - s.metrics.RegisterGRPC(grpcServer) - - lis, err := net.Listen("tcp", fmt.Sprintf(":%d", s.config.GrpcPort)) - if err != nil { - return err - } - - errChan := make(chan error) - go func() { - defer close(errChan) - if err = grpcServer.Serve(lis); err != nil { - grpcServerLog.Error(err, "terminated with an error") - errChan <- err - } else { - grpcServerLog.Info("terminated normally") - } - }() - grpcServerLog.Info("starting", "interface", "0.0.0.0", "port", s.config.GrpcPort, "tls", useTLS) - - select { - case <-stop: - grpcServerLog.Info("stopping gracefully") - grpcServer.GracefulStop() - return nil - case err := <-errChan: - return err - } -} diff --git a/pkg/sds/server/server.go b/pkg/sds/server/server.go index ae7b87550e9a..24fecb728b4f 100644 --- a/pkg/sds/server/server.go +++ b/pkg/sds/server/server.go @@ -5,10 +5,12 @@ import ( "time" envoy_core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" + envoy_discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" envoy_cache "github.com/envoyproxy/go-control-plane/pkg/cache/v2" envoy_server "github.com/envoyproxy/go-control-plane/pkg/server/v2" "github.com/go-logr/logr" "github.com/prometheus/client_golang/prometheus" + "google.golang.org/grpc" "github.com/kumahq/kuma/pkg/core" core_model "github.com/kumahq/kuma/pkg/core/resources/model" @@ -26,7 +28,7 @@ var ( sdsServerLog = core.Log.WithName("sds-server") ) -func SetupServer(rt core_runtime.Runtime) error { +func RegisterSDS(rt core_runtime.Runtime, server *grpc.Server) error { hasher := hasher{sdsServerLog} logger := util_xds.NewLogger(sdsServerLog) cache := envoy_cache.NewSnapshotCache(false, hasher, logger) @@ -73,11 +75,9 @@ func SetupServer(rt core_runtime.Runtime) error { srv := envoy_server.NewServer(context.Background(), cache, callbacks) - return rt.Add(&grpcServer{ - server: srv, - config: *rt.Config().SdsServer, - metrics: rt.Metrics(), - }) + sdsServerLog.Info("registering Secret Discovery Service in Dataplane Server") + envoy_discovery.RegisterSecretDiscoveryServiceServer(server, srv) + return nil } func syncTracker(reconciler *DataplaneReconciler, refresh time.Duration, metrics core_metrics.Metrics) (envoy_server.Callbacks, error) { diff --git a/pkg/sds/server/server_test.go b/pkg/sds/server/server_test.go index e33f920c0ac8..9bc72e9b2911 100644 --- a/pkg/sds/server/server_test.go +++ b/pkg/sds/server/server_test.go @@ -3,6 +3,7 @@ package server_test import ( "context" "fmt" + "path/filepath" "sync/atomic" "time" @@ -11,6 +12,7 @@ import ( envoy_discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" envoy_resource "github.com/envoyproxy/go-control-plane/pkg/resource/v2" "google.golang.org/grpc" + "google.golang.org/grpc/credentials" "google.golang.org/grpc/metadata" mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1" @@ -19,8 +21,8 @@ import ( mesh_core "github.com/kumahq/kuma/pkg/core/resources/apis/mesh" core_manager "github.com/kumahq/kuma/pkg/core/resources/manager" core_store "github.com/kumahq/kuma/pkg/core/resources/store" + dp_server "github.com/kumahq/kuma/pkg/dp-server" core_metrics "github.com/kumahq/kuma/pkg/metrics" - "github.com/kumahq/kuma/pkg/sds/server" "github.com/kumahq/kuma/pkg/test" test_metrics "github.com/kumahq/kuma/pkg/test/metrics" "github.com/kumahq/kuma/pkg/test/runtime" @@ -54,7 +56,9 @@ var _ = Describe("SDS Server", func() { cfg.SdsServer.DataplaneConfigurationRefreshInterval = 100 * time.Millisecond port, err := test.GetFreePort() Expect(err).ToNot(HaveOccurred()) - cfg.SdsServer.GrpcPort = port + cfg.DpServer.Port = port + cfg.DpServer.TlsCertFile = filepath.Join("..", "..", "..", "test", "certs", "server-cert.pem") + cfg.DpServer.TlsKeyFile = filepath.Join("..", "..", "..", "test", "certs", "server-key.pem") runtime, err := runtime.BuilderFor(cfg).Build() Expect(err).ToNot(HaveOccurred()) @@ -127,7 +131,7 @@ var _ = Describe("SDS Server", func() { Expect(err).ToNot(HaveOccurred()) // start the runtime - Expect(server.SetupServer(runtime)).To(Succeed()) + Expect(dp_server.SetupServer(runtime)).To(Succeed()) stop = make(chan struct{}) go func() { defer GinkgoRecover() @@ -137,9 +141,17 @@ var _ = Describe("SDS Server", func() { // wait for SDS server Eventually(func() error { - c, err := grpc.Dial(fmt.Sprintf("localhost:%d", port), grpc.WithInsecure()) + creds, err := credentials.NewClientTLSFromFile(filepath.Join("..", "..", "..", "test", "certs", "server-cert.pem"), "") + if err != nil { + return err + } + c, err := grpc.Dial(fmt.Sprintf("localhost:%d", port), grpc.WithTransportCredentials(creds)) + if err != nil { + return err + } conn = c client = envoy_discovery.NewSecretDiscoveryServiceClient(conn) + _, err = client.StreamSecrets(context.Background()) // dial is not enough, we need to double check if we can start to stream secrets return err }).ShouldNot(HaveOccurred()) }) diff --git a/pkg/xds/bootstrap/components.go b/pkg/xds/bootstrap/components.go new file mode 100644 index 000000000000..63897d03580e --- /dev/null +++ b/pkg/xds/bootstrap/components.go @@ -0,0 +1,15 @@ +package bootstrap + +import ( + "net/http" + + core_runtime "github.com/kumahq/kuma/pkg/core/runtime" +) + +func RegisterBootstrap(rt core_runtime.Runtime, mux *http.ServeMux) { + bootstrapHandler := BootstrapHandler{ + Generator: NewDefaultBootstrapGenerator(rt.ResourceManager(), rt.Config().BootstrapServer.Params, rt.Config().DpServer.TlsCertFile), + } + log.Info("registering Bootstrap in Dataplane Server") + mux.HandleFunc("/bootstrap", bootstrapHandler.Handle) +} diff --git a/pkg/xds/bootstrap/server.go b/pkg/xds/bootstrap/handler.go similarity index 53% rename from pkg/xds/bootstrap/server.go rename to pkg/xds/bootstrap/handler.go index 9cf2569089b6..88e0ca7ce490 100644 --- a/pkg/xds/bootstrap/server.go +++ b/pkg/xds/bootstrap/handler.go @@ -1,81 +1,24 @@ package bootstrap import ( - "context" "encoding/json" - "fmt" "io/ioutil" "net/http" - "github.com/prometheus/client_golang/prometheus" - http_prometheus "github.com/slok/go-http-metrics/metrics/prometheus" - "github.com/slok/go-http-metrics/middleware" - "github.com/slok/go-http-metrics/middleware/std" - - "github.com/kumahq/kuma/pkg/config/xds/bootstrap" "github.com/kumahq/kuma/pkg/core" "github.com/kumahq/kuma/pkg/core/resources/store" - "github.com/kumahq/kuma/pkg/core/runtime/component" "github.com/kumahq/kuma/pkg/core/validators" "github.com/kumahq/kuma/pkg/util/proto" "github.com/kumahq/kuma/pkg/xds/bootstrap/types" ) -var log = core.Log.WithName("bootstrap-server") +var log = core.Log.WithName("bootstrap") -type BootstrapServer struct { - Config *bootstrap.BootstrapServerConfig +type BootstrapHandler struct { Generator BootstrapGenerator - Metrics prometheus.Registerer -} - -func (b *BootstrapServer) NeedLeaderElection() bool { - return false -} - -var _ component.Component = &BootstrapServer{} - -func (b *BootstrapServer) Start(stop <-chan struct{}) error { - promMiddleware := middleware.New(middleware.Config{ - Recorder: http_prometheus.NewRecorder(http_prometheus.Config{ - Registry: b.Metrics, - Prefix: "bootstrap_server", - }), - }) - - mux := http.NewServeMux() - mux.HandleFunc("/bootstrap", b.handleBootstrapRequest) - - bootstrapServer := &http.Server{ - Addr: fmt.Sprintf(":%d", b.Config.Port), - Handler: std.Handler("", promMiddleware, mux), - } - - errChan := make(chan error) - - go func() { - defer close(errChan) - if err := bootstrapServer.ListenAndServeTLS(b.Config.TlsCertFile, b.Config.TlsKeyFile); err != nil { - if err != http.ErrServerClosed { - log.Error(err, "terminated with an error") - errChan <- err - return - } - } - log.Info("terminated normally") - }() - log.Info("starting", "interface", "0.0.0.0", "port", b.Config.Port, "tls", true) - - select { - case <-stop: - log.Info("stopping") - return bootstrapServer.Shutdown(context.Background()) - case err := <-errChan: - return err - } } -func (b *BootstrapServer) handleBootstrapRequest(resp http.ResponseWriter, req *http.Request) { +func (b *BootstrapHandler) Handle(resp http.ResponseWriter, req *http.Request) { bytes, err := ioutil.ReadAll(req.Body) if err != nil { log.Error(err, "Could not read a request") diff --git a/pkg/xds/bootstrap/server_test.go b/pkg/xds/bootstrap/server_test.go index 74c29c0342ca..10e0009d9d1d 100644 --- a/pkg/xds/bootstrap/server_test.go +++ b/pkg/xds/bootstrap/server_test.go @@ -1,4 +1,4 @@ -package bootstrap +package bootstrap_test import ( "context" @@ -10,21 +10,23 @@ import ( "strings" "time" - "github.com/kumahq/kuma/pkg/core" - . "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo/extensions/table" . "github.com/onsi/gomega" mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1" + dp_server_cfg "github.com/kumahq/kuma/pkg/config/dp-server" bootstrap_config "github.com/kumahq/kuma/pkg/config/xds/bootstrap" + "github.com/kumahq/kuma/pkg/core" "github.com/kumahq/kuma/pkg/core/resources/apis/mesh" "github.com/kumahq/kuma/pkg/core/resources/manager" "github.com/kumahq/kuma/pkg/core/resources/store" + dp_server "github.com/kumahq/kuma/pkg/dp-server" core_metrics "github.com/kumahq/kuma/pkg/metrics" "github.com/kumahq/kuma/pkg/plugins/resources/memory" "github.com/kumahq/kuma/pkg/test" test_metrics "github.com/kumahq/kuma/pkg/test/metrics" + "github.com/kumahq/kuma/pkg/xds/bootstrap" ) var _ = Describe("Bootstrap Server", func() { @@ -51,20 +53,22 @@ var _ = Describe("Bootstrap Server", func() { metrics, err = core_metrics.NewMetrics("Standalone") Expect(err).ToNot(HaveOccurred()) - server := BootstrapServer{ - Config: &bootstrap_config.BootstrapServerConfig{ - Port: uint32(port), - TlsCertFile: filepath.Join("..", "..", "..", "test", "certs", "server-cert.pem"), - TlsKeyFile: filepath.Join("..", "..", "..", "test", "certs", "server-key.pem"), - Params: config, - }, - Generator: NewDefaultBootstrapGenerator(resManager, config, ""), - Metrics: metrics, + dpServerCfg := dp_server_cfg.DpServerConfig{ + Port: port, + TlsCertFile: filepath.Join("..", "..", "..", "test", "certs", "server-cert.pem"), + TlsKeyFile: filepath.Join("..", "..", "..", "test", "certs", "server-key.pem"), } + dpServer := dp_server.NewDpServer(dpServerCfg, metrics) + + bootstrapHandler := bootstrap.BootstrapHandler{ + Generator: bootstrap.NewDefaultBootstrapGenerator(resManager, config, ""), + } + dpServer.HTTPMux().HandleFunc("/bootstrap", bootstrapHandler.Handle) + stop = make(chan struct{}) go func() { defer GinkgoRecover() - err := server.Start(stop) + err := dpServer.Start(stop) Expect(err).ToNot(HaveOccurred()) }() Eventually(func() bool { @@ -192,8 +196,8 @@ var _ = Describe("Bootstrap Server", func() { // then Expect(err).ToNot(HaveOccurred()) - Expect(test_metrics.FindMetric(metrics, "bootstrap_server_http_request_duration_seconds", "handler", "/bootstrap")).ToNot(BeNil()) - Expect(test_metrics.FindMetric(metrics, "bootstrap_server_http_requests_inflight", "handler", "/bootstrap")).ToNot(BeNil()) - Expect(test_metrics.FindMetric(metrics, "bootstrap_server_http_response_size_bytes", "handler", "/bootstrap")).ToNot(BeNil()) + Expect(test_metrics.FindMetric(metrics, "dp_server_http_request_duration_seconds", "handler", "/bootstrap")).ToNot(BeNil()) + Expect(test_metrics.FindMetric(metrics, "dp_server_http_requests_inflight", "handler", "/bootstrap")).ToNot(BeNil()) + Expect(test_metrics.FindMetric(metrics, "dp_server_http_response_size_bytes", "handler", "/bootstrap")).ToNot(BeNil()) }) }) diff --git a/pkg/xds/context/context.go b/pkg/xds/context/context.go index 6783f96a92fc..70b529e08d13 100644 --- a/pkg/xds/context/context.go +++ b/pkg/xds/context/context.go @@ -26,8 +26,8 @@ type MeshContext struct { func BuildControlPlaneContext(config kuma_cp.Config) (*ControlPlaneContext, error) { var cert []byte - if config.SdsServer.TlsCertFile != "" { - c, err := ioutil.ReadFile(config.SdsServer.TlsCertFile) + if config.DpServer.TlsCertFile != "" { + c, err := ioutil.ReadFile(config.DpServer.TlsCertFile) if err != nil { return nil, err } @@ -42,7 +42,7 @@ func BuildControlPlaneContext(config kuma_cp.Config) (*ControlPlaneContext, erro sdsLocation = u.Host } if len(sdsLocation) == 0 { - sdsLocation = fmt.Sprintf("%s:%d", config.BootstrapServer.Params.XdsHost, config.SdsServer.GrpcPort) + sdsLocation = fmt.Sprintf("%s:%d", config.BootstrapServer.Params.XdsHost, config.DpServer.Port) } return &ControlPlaneContext{ diff --git a/pkg/xds/server/components.go b/pkg/xds/server/components.go index 1f1fd895b149..ce49cf0e29f1 100644 --- a/pkg/xds/server/components.go +++ b/pkg/xds/server/components.go @@ -4,11 +4,14 @@ import ( "context" "time" + "google.golang.org/grpc" + core_system "github.com/kumahq/kuma/pkg/core/resources/apis/system" "github.com/kumahq/kuma/pkg/core/resources/registry" "github.com/kumahq/kuma/pkg/xds/cache/cla" "github.com/kumahq/kuma/pkg/xds/cache/mesh" + envoy_service_discovery_v2 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" envoy_xds "github.com/envoyproxy/go-control-plane/pkg/server/v2" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -33,7 +36,6 @@ import ( "github.com/kumahq/kuma/pkg/xds/auth" k8s_auth "github.com/kumahq/kuma/pkg/xds/auth/k8s" universal_auth "github.com/kumahq/kuma/pkg/xds/auth/universal" - xds_bootstrap "github.com/kumahq/kuma/pkg/xds/bootstrap" xds_context "github.com/kumahq/kuma/pkg/xds/context" "github.com/kumahq/kuma/pkg/xds/ingress" xds_sync "github.com/kumahq/kuma/pkg/xds/sync" @@ -64,7 +66,7 @@ func meshResourceTypes(exclude map[core_model.ResourceType]bool) []core_model.Re return types } -func SetupServer(rt core_runtime.Runtime) error { +func RegisterXDS(rt core_runtime.Runtime, server *grpc.Server) error { reconciler := DefaultReconciler(rt) authenticator, err := DefaultAuthenticator(rt) @@ -101,22 +103,10 @@ func SetupServer(rt core_runtime.Runtime) error { } srv := NewServer(rt.XDS().Cache(), callbacks) - return rt.Add( - // xDS gRPC API - &grpcServer{ - server: srv, - port: rt.Config().XdsServer.GrpcPort, - tlsCertFile: rt.Config().XdsServer.TlsCertFile, - tlsKeyFile: rt.Config().XdsServer.TlsKeyFile, - metrics: rt.Metrics(), - }, - // bootstrap server - &xds_bootstrap.BootstrapServer{ - Config: rt.Config().BootstrapServer, - Generator: xds_bootstrap.NewDefaultBootstrapGenerator(rt.ResourceManager(), rt.Config().BootstrapServer.Params, rt.Config().XdsServer.TlsCertFile), - Metrics: rt.Metrics(), - }, - ) + + xdsServerLog.Info("registering Aggregated Discovery Service in Dataplane Server") + envoy_service_discovery_v2.RegisterAggregatedDiscoveryServiceServer(server, srv) + return nil } func NewKubeAuthenticator(rt core_runtime.Runtime) (auth.Authenticator, error) {