diff --git a/cmd/agent/api/internal/config/endpoint.go b/cmd/agent/api/internal/config/endpoint.go new file mode 100644 index 0000000000000..095d233bdc6ab --- /dev/null +++ b/cmd/agent/api/internal/config/endpoint.go @@ -0,0 +1,118 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +// Package config defines the config endpoint of the IPC API Server. +package config + +import ( + "encoding/json" + "expvar" + "fmt" + "net/http" + + gorilla "github.com/gorilla/mux" + + "github.com/DataDog/datadog-agent/pkg/config" + "github.com/DataDog/datadog-agent/pkg/util/log" +) + +type authorizedSet map[string]struct{} + +var authorizedConfigPathsCore = authorizedSet{ + "api_key": {}, +} + +type configEndpoint struct { + cfg config.Reader + authorizedConfigPaths authorizedSet + + // runtime metrics about the config endpoint usage + expvars *expvar.Map + successExpvar expvar.Map + unauthorizedExpvar expvar.Map + unsetExpvar expvar.Map + failedExpvar expvar.Map +} + +func (c *configEndpoint) serveHTTP(w http.ResponseWriter, r *http.Request) { + body, statusCode, err := c.getConfigValueAsJSON(r) + if err != nil { + http.Error(w, err.Error(), statusCode) + return + } + + w.WriteHeader(statusCode) + _, err = w.Write(body) + if err != nil { + log.Warnf("config endpoint: could not write response body: %v", err) + } +} + +// GetConfigEndpointMuxCore builds and returns the mux for the config endpoint with default values +// for the core agent +func GetConfigEndpointMuxCore() *gorilla.Router { + return GetConfigEndpointMux(config.Datadog, authorizedConfigPathsCore, "core") +} + +// GetConfigEndpointMux builds and returns the mux for the config endpoint, with the given config, +// authorized paths, and expvar namespace +func GetConfigEndpointMux(cfg config.Reader, authorizedConfigPaths authorizedSet, expvarNamespace string) *gorilla.Router { + mux, _ := getConfigEndpoint(cfg, authorizedConfigPaths, expvarNamespace) + return mux +} + +// getConfigEndpoint builds and returns the mux and the endpoint state. +func getConfigEndpoint(cfg config.Reader, authorizedConfigPaths authorizedSet, expvarNamespace string) (*gorilla.Router, *configEndpoint) { + configEndpoint := &configEndpoint{ + cfg: cfg, + authorizedConfigPaths: authorizedConfigPaths, + expvars: expvar.NewMap(expvarNamespace + "_config_endpoint"), + } + + for name, expv := range map[string]*expvar.Map{ + "success": &configEndpoint.successExpvar, + "unauthorized": &configEndpoint.unauthorizedExpvar, + "unset": &configEndpoint.unsetExpvar, + "failed": &configEndpoint.failedExpvar, + } { + configEndpoint.expvars.Set(name, expv) + } + + configEndpointHandler := http.HandlerFunc(configEndpoint.serveHTTP) + configEndpointMux := gorilla.NewRouter() + configEndpointMux.HandleFunc("/", configEndpointHandler).Methods("GET") + configEndpointMux.HandleFunc("/{path}", configEndpointHandler).Methods("GET") + + return configEndpointMux, configEndpoint +} + +// returns the marshalled JSON value of the config path requested +// or an error and http status code in case of failure +func (c *configEndpoint) getConfigValueAsJSON(r *http.Request) ([]byte, int, error) { + vars := gorilla.Vars(r) + path := vars["path"] + + if _, ok := c.authorizedConfigPaths[path]; !ok { + c.unauthorizedExpvar.Add(path, 1) + log.Warnf("config endpoint received a request from '%s' for config '%s' which is not allowed", r.RemoteAddr, path) + return nil, http.StatusForbidden, fmt.Errorf("querying config value '%s' is not allowed", path) + } + + log.Debug("config endpoint received a request from '%s' for config '%s'", r.RemoteAddr, path) + value := c.cfg.Get(path) + if value == nil { + c.unsetExpvar.Add(path, 1) + return nil, http.StatusNotFound, fmt.Errorf("no runtime setting found for %s", path) + } + + body, err := json.Marshal(value) + if err != nil { + c.failedExpvar.Add(path, 1) + return nil, http.StatusInternalServerError, fmt.Errorf("could not marshal config value of '%s': %v", path, err) + } + + c.successExpvar.Add(path, 1) + return body, http.StatusOK, nil +} diff --git a/cmd/agent/api/internal/config/endpoint_test.go b/cmd/agent/api/internal/config/endpoint_test.go new file mode 100644 index 0000000000000..9998d0f454266 --- /dev/null +++ b/cmd/agent/api/internal/config/endpoint_test.go @@ -0,0 +1,146 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package config + +import ( + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/DataDog/datadog-agent/pkg/config" +) + +type testCase struct { + name string + authorized bool + existing bool + expectedStatus int +} + +type expvals struct { + Success map[string]int `json:"success"` + Failed map[string]int `json:"failed"` + Unauthorized map[string]int `json:"unauthorized"` + Unset map[string]int `json:"unset"` +} + +func testConfigValue(t *testing.T, configEndpoint *configEndpoint, server *httptest.Server, configName string, expectedStatus int) { + t.Helper() + + beforeVars := getExpvals(t, configEndpoint) + resp, err := server.Client().Get(server.URL + "/" + configName) + require.NoError(t, err) + defer resp.Body.Close() + + afterVars := getExpvals(t, configEndpoint) + checkExpvars(t, beforeVars, afterVars, configName, expectedStatus) + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + + require.Equal(t, expectedStatus, resp.StatusCode, string(body)) + + if resp.StatusCode != http.StatusOK { + return + } + + var configValue interface{} + err = json.Unmarshal(body, &configValue) + require.NoError(t, err) + + require.EqualValues(t, configEndpoint.cfg.Get(configName), configValue) +} + +func TestConfigEndpoint(t *testing.T) { + t.Run("core_config", func(t *testing.T) { + cfg, server, configEndpoint := getConfigServer(t, authorizedConfigPathsCore) + for configName := range authorizedConfigPathsCore { + var expectedStatus int + if cfg.IsSet(configName) { + expectedStatus = http.StatusOK + } else { + expectedStatus = http.StatusNotFound + } + testConfigValue(t, configEndpoint, server, configName, expectedStatus) + } + }) + + for _, testCase := range []testCase{ + {"authorized_existing_config", true, true, http.StatusOK}, + {"authorized_missing_config", true, false, http.StatusNotFound}, + {"unauthorized_existing_config", false, true, http.StatusForbidden}, + {"unauthorized_missing_config", false, false, http.StatusForbidden}, + } { + t.Run(testCase.name, func(t *testing.T) { + configName := "my.config.value" + authorizedConfigPaths := authorizedSet{} + if testCase.authorized { + authorizedConfigPaths[configName] = struct{}{} + } + cfg, server, configEndpoint := getConfigServer(t, authorizedConfigPaths) + if testCase.existing { + cfg.SetWithoutSource(configName, "some_value") + } + testConfigValue(t, configEndpoint, server, configName, testCase.expectedStatus) + }) + } + + t.Run("authorized_not_marshallable", func(t *testing.T) { + configName := "my.config.value" + cfg, server, configEndpoint := getConfigServer(t, authorizedSet{configName: {}}) + cfg.SetWithoutSource(configName, make(chan int)) + testConfigValue(t, configEndpoint, server, configName, http.StatusInternalServerError) + }) +} + +func checkExpvars(t *testing.T, beforeVars, afterVars expvals, configName string, expectedStatus int) { + t.Helper() + + switch expectedStatus { + case http.StatusOK: + beforeVars.Success[configName]++ + case http.StatusNotFound: + beforeVars.Unset[configName]++ + case http.StatusForbidden: + beforeVars.Unauthorized[configName]++ + case http.StatusInternalServerError: + beforeVars.Failed[configName]++ + default: + t.Fatalf("unexpected status: %d", expectedStatus) + } + + require.EqualValues(t, beforeVars, afterVars) +} + +func getConfigServer(t *testing.T, authorizedConfigPaths map[string]struct{}) (*config.MockConfig, *httptest.Server, *configEndpoint) { + t.Helper() + + cfg := config.Mock(t) + configEndpointMux, configEndpoint := getConfigEndpoint(cfg, authorizedConfigPaths, t.Name()) + server := httptest.NewServer(configEndpointMux) + t.Cleanup(server.Close) + + return cfg, server, configEndpoint +} + +func getExpvals(t *testing.T, configEndpoint *configEndpoint) expvals { + t.Helper() + + vars := expvals{} + // error on unknown fields + dec := json.NewDecoder(strings.NewReader(configEndpoint.expvars.String())) + dec.DisallowUnknownFields() + err := dec.Decode(&vars) + require.NoError(t, err) + require.False(t, dec.More()) + + return vars +} diff --git a/cmd/agent/api/listener.go b/cmd/agent/api/listener.go index 6f1a8ddafe73d..b8d4e2fa5e885 100644 --- a/cmd/agent/api/listener.go +++ b/cmd/agent/api/listener.go @@ -8,6 +8,7 @@ package api import ( "fmt" "net" + "strconv" "github.com/DataDog/datadog-agent/pkg/config" ) @@ -22,10 +23,19 @@ func getIPCAddressPort() (string, error) { } // getListener returns a listening connection -func getListener() (net.Listener, error) { - address, err := getIPCAddressPort() - if err != nil { - return nil, err - } +func getListener(address string) (net.Listener, error) { return net.Listen("tcp", address) } + +// returns whether the IPC server is enabled, and if so its host and host:port +func getIPCServerAddressPort() (string, string, bool) { + ipcServerPort := config.Datadog.GetInt("agent_ipc_port") + if ipcServerPort == 0 { + return "", "", false + } + + ipcServerHost := config.Datadog.GetString("agent_ipc_host") + ipcServerHostPort := net.JoinHostPort(ipcServerHost, strconv.Itoa(ipcServerPort)) + + return ipcServerHost, ipcServerHostPort, true +} diff --git a/cmd/agent/api/listener_test.go b/cmd/agent/api/listener_test.go new file mode 100644 index 0000000000000..45aa587605688 --- /dev/null +++ b/cmd/agent/api/listener_test.go @@ -0,0 +1,40 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package api + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/DataDog/datadog-agent/pkg/config" +) + +func TestGetIPCServerAddressPort(t *testing.T) { + t.Run("default", func(t *testing.T) { + config.Mock(t) + _, _, enabled := getIPCServerAddressPort() + require.False(t, enabled) + }) + + t.Run("enabled", func(t *testing.T) { + cfg := config.Mock(t) + cfg.SetWithoutSource("agent_ipc_port", 1234) + + host, hostPort, enabled := getIPCServerAddressPort() + require.Equal(t, "localhost", host) + require.Equal(t, "localhost:1234", hostPort) + require.True(t, enabled) + }) + + t.Run("disabled", func(t *testing.T) { + cfg := config.Mock(t) + cfg.SetWithoutSource("agent_ipc_port", 0) + + _, _, enabled := getIPCServerAddressPort() + require.False(t, enabled) + }) +} diff --git a/cmd/agent/api/security.go b/cmd/agent/api/security.go index 4d30af54912a4..65a920cc5d655 100644 --- a/cmd/agent/api/security.go +++ b/cmd/agent/api/security.go @@ -15,12 +15,7 @@ import ( "github.com/DataDog/datadog-agent/pkg/api/security" "github.com/DataDog/datadog-agent/pkg/api/util" -) - -var ( - tlsKeyPair *tls.Certificate - tlsCertPool *x509.CertPool - tlsAddr string + "github.com/DataDog/datadog-agent/pkg/util/log" ) // validateToken - validates token for legacy API @@ -46,13 +41,9 @@ func parseToken(token string) (interface{}, error) { return struct{}{}, nil } -func buildSelfSignedKeyPair() ([]byte, []byte) { - +func buildSelfSignedKeyPair(additionalHostIdentities ...string) ([]byte, []byte) { hosts := []string{"127.0.0.1", "localhost", "::1"} - ipcAddr, err := getIPCAddressPort() - if err == nil { - hosts = append(hosts, ipcAddr) - } + hosts = append(hosts, additionalHostIdentities...) _, rootCertPEM, rootKey, err := security.GenerateRootCert(hosts, 2048) if err != nil { return nil, nil @@ -67,26 +58,23 @@ func buildSelfSignedKeyPair() ([]byte, []byte) { return rootCertPEM, rootKeyPEM } -func initializeTLS() error { - cert, key := buildSelfSignedKeyPair() +func initializeTLS(additionalHostIdentities ...string) (*tls.Certificate, *x509.CertPool, error) { + log.Info("Initializing TLS certificates") + + cert, key := buildSelfSignedKeyPair(additionalHostIdentities...) if cert == nil { - return errors.New("unable to generate certificate") + return nil, nil, errors.New("unable to generate certificate") } pair, err := tls.X509KeyPair(cert, key) if err != nil { - return fmt.Errorf("unable to generate TLS key pair: %v", err) + return nil, nil, fmt.Errorf("unable to generate TLS key pair: %v", err) } - tlsKeyPair = &pair - tlsCertPool = x509.NewCertPool() + + tlsCertPool := x509.NewCertPool() ok := tlsCertPool.AppendCertsFromPEM(cert) if !ok { - return fmt.Errorf("unable to add new certificate to pool") - } - - tlsAddr, err = getIPCAddressPort() - if err != nil { - return fmt.Errorf("unable to get IPC address and port: %v", err) + return nil, nil, fmt.Errorf("unable to add new certificate to pool") } - return nil + return &pair, tlsCertPool, nil } diff --git a/cmd/agent/api/server.go b/cmd/agent/api/server.go index 88b12f464255a..3552ae2515fa3 100644 --- a/cmd/agent/api/server.go +++ b/cmd/agent/api/server.go @@ -11,28 +11,18 @@ sending commands and receiving infos. package api import ( - "context" "crypto/tls" "fmt" stdLog "log" "net" "net/http" - "time" "github.com/cihub/seelog" - gorilla "github.com/gorilla/mux" - grpc_auth "github.com/grpc-ecosystem/go-grpc-middleware/auth" - "github.com/grpc-ecosystem/grpc-gateway/runtime" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials" - - "github.com/DataDog/datadog-agent/cmd/agent/api/internal/agent" - "github.com/DataDog/datadog-agent/cmd/agent/api/internal/check" + "github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer" "github.com/DataDog/datadog-agent/comp/core/flare" "github.com/DataDog/datadog-agent/comp/core/secrets" "github.com/DataDog/datadog-agent/comp/core/workloadmeta" - workloadmetaServer "github.com/DataDog/datadog-agent/comp/core/workloadmeta/server" "github.com/DataDog/datadog-agent/comp/dogstatsd/replay" dogstatsdServer "github.com/DataDog/datadog-agent/comp/dogstatsd/server" dogstatsddebug "github.com/DataDog/datadog-agent/comp/dogstatsd/serverDebug" @@ -44,17 +34,35 @@ import ( "github.com/DataDog/datadog-agent/pkg/api/util" "github.com/DataDog/datadog-agent/pkg/config" remoteconfig "github.com/DataDog/datadog-agent/pkg/config/remote/service" - pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/core" - "github.com/DataDog/datadog-agent/pkg/tagger" - taggerserver "github.com/DataDog/datadog-agent/pkg/tagger/server" - grpcutil "github.com/DataDog/datadog-agent/pkg/util/grpc" + "github.com/DataDog/datadog-agent/pkg/util/log" "github.com/DataDog/datadog-agent/pkg/util/optional" ) -var listener net.Listener +func startServer(listener net.Listener, srv *http.Server, name string) { + // Use a stack depth of 4 on top of the default one to get a relevant filename in the stdlib + logWriter, _ := config.NewLogWriter(5, seelog.ErrorLvl) + + srv.ErrorLog = stdLog.New(logWriter, fmt.Sprintf("Error from the Agent HTTP server '%s': ", name), 0) // log errors to seelog + + tlsListener := tls.NewListener(listener, srv.TLSConfig) + + go srv.Serve(tlsListener) //nolint:errcheck + + log.Infof("Started HTTP server '%s' on %s", name, listener.Addr().String()) +} + +func stopServer(listener net.Listener, name string) { + if listener != nil { + if err := listener.Close(); err != nil { + log.Errorf("Error stopping HTTP server '%s': %s", name, err) + } else { + log.Infof("Stopped HTTP server '%s'", name) + } + } +} -// StartServer creates the router and starts the HTTP server -func StartServer( +// StartServers creates certificates and starts API servers +func StartServers( configService *remoteconfig.Service, flare flare.Component, dogstatsdServer dogstatsdServer.Component, @@ -69,124 +77,58 @@ func StartServer( invHost inventoryhost.Component, secretResolver secrets.Component, ) error { - err := initializeTLS() + apiAddr, err := getIPCAddressPort() if err != nil { - return fmt.Errorf("unable to initialize TLS: %v", err) + return fmt.Errorf("unable to get IPC address and port: %v", err) } - // get the transport we're going to use under HTTP - listener, err = getListener() - if err != nil { - // we use the listener to handle commands for the Agent, there's - // no way we can recover from this error - return fmt.Errorf("Unable to create the api server: %v", err) + additionalHostIdentities := []string{apiAddr} + + ipcServerHost, ipcServerHostPort, ipcServerEnabled := getIPCServerAddressPort() + if ipcServerEnabled { + additionalHostIdentities = append(additionalHostIdentities, ipcServerHost) } - err = util.CreateAndSetAuthToken() + tlsKeyPair, tlsCertPool, err := initializeTLS(additionalHostIdentities...) if err != nil { - return err + return fmt.Errorf("unable to initialize TLS: %v", err) } - // gRPC server - authInterceptor := grpcutil.AuthInterceptor(parseToken) - opts := []grpc.ServerOption{ - grpc.Creds(credentials.NewClientTLSFromCert(tlsCertPool, tlsAddr)), - grpc.StreamInterceptor(grpc_auth.StreamServerInterceptor(authInterceptor)), - grpc.UnaryInterceptor(grpc_auth.UnaryServerInterceptor(authInterceptor)), + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{*tlsKeyPair}, + NextProtos: []string{"h2"}, + MinVersion: tls.VersionTLS12, } - s := grpc.NewServer(opts...) - pb.RegisterAgentServer(s, &server{}) - pb.RegisterAgentSecureServer(s, &serverSecure{ - configService: configService, - taggerServer: taggerserver.NewServer(tagger.GetDefaultTagger()), - // TODO(components): decide if workloadmetaServer should be componentized itself - workloadmetaServer: workloadmetaServer.NewServer(wmeta), - dogstatsdServer: dogstatsdServer, - capture: capture, - }) - - dcreds := credentials.NewTLS(&tls.Config{ - ServerName: tlsAddr, - RootCAs: tlsCertPool, - }) - dopts := []grpc.DialOption{grpc.WithTransportCredentials(dcreds)} - - // starting grpc gateway - ctx := context.Background() - gwmux := runtime.NewServeMux() - err = pb.RegisterAgentHandlerFromEndpoint( - ctx, gwmux, tlsAddr, dopts) - if err != nil { - return fmt.Errorf("error registering agent handler from endpoint %s: %v", tlsAddr, err) + if err := util.CreateAndSetAuthToken(); err != nil { + return err } - err = pb.RegisterAgentSecureHandlerFromEndpoint( - ctx, gwmux, tlsAddr, dopts) - if err != nil { - return fmt.Errorf("error registering agent secure handler from endpoint %s: %v", tlsAddr, err) + // start the CMD server + if err := startCMDServer( + apiAddr, tlsConfig, tlsCertPool, + configService, flare, dogstatsdServer, + capture, serverDebug, wmeta, logsAgent, + senderManager, hostMetadata, invAgent, + demux, invHost, secretResolver, + ); err != nil { + return fmt.Errorf("unable to start CMD API server: %v", err) } - // Setup multiplexer - // create the REST HTTP router - agentMux := gorilla.NewRouter() - checkMux := gorilla.NewRouter() - // Validate token for every request - agentMux.Use(validateToken) - checkMux.Use(validateToken) - - mux := http.NewServeMux() - mux.Handle( - "/agent/", - http.StripPrefix("/agent", - agent.SetupHandlers( - agentMux, - flare, - dogstatsdServer, - serverDebug, - wmeta, - logsAgent, - senderManager, - hostMetadata, - invAgent, - demux, - invHost, - secretResolver, - ))) - mux.Handle("/check/", http.StripPrefix("/check", check.SetupHandlers(checkMux))) - mux.Handle("/", gwmux) - - // Use a stack depth of 4 on top of the default one to get a relevant filename in the stdlib - logWriter, _ := config.NewLogWriter(5, seelog.ErrorLvl) - - srv := grpcutil.NewMuxedGRPCServer( - tlsAddr, - &tls.Config{ - Certificates: []tls.Certificate{*tlsKeyPair}, - NextProtos: []string{"h2"}, - MinVersion: tls.VersionTLS12, - }, - s, - grpcutil.TimeoutHandlerFunc(mux, time.Duration(config.Datadog.GetInt64("server_timeout"))*time.Second), - ) - - srv.ErrorLog = stdLog.New(logWriter, "Error from the agent http API server: ", 0) // log errors to seelog - - tlsListener := tls.NewListener(listener, srv.TLSConfig) + // start the IPC server + if ipcServerEnabled { + if err := startIPCServer(ipcServerHostPort, tlsConfig); err != nil { + // if we fail to start the IPC server, we should stop the CMD server + StopServers() + return fmt.Errorf("unable to start IPC API server: %v", err) + } + } - go srv.Serve(tlsListener) //nolint:errcheck return nil } -// StopServer closes the connection and the server -// stops listening to new commands. -func StopServer() { - if listener != nil { - listener.Close() - } -} - -// ServerAddress retruns the server address. -func ServerAddress() *net.TCPAddr { - return listener.Addr().(*net.TCPAddr) +// StopServers closes the connections and the servers +func StopServers() { + stopCMDServer() + stopIPCServer() } diff --git a/cmd/agent/api/server_cmd.go b/cmd/agent/api/server_cmd.go new file mode 100644 index 0000000000000..5421d618e722b --- /dev/null +++ b/cmd/agent/api/server_cmd.go @@ -0,0 +1,165 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package api + +import ( + "context" + "crypto/tls" + "crypto/x509" + "fmt" + "net" + "net/http" + "time" + + gorilla "github.com/gorilla/mux" + grpc_auth "github.com/grpc-ecosystem/go-grpc-middleware/auth" + "github.com/grpc-ecosystem/grpc-gateway/runtime" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + + "github.com/DataDog/datadog-agent/cmd/agent/api/internal/agent" + "github.com/DataDog/datadog-agent/cmd/agent/api/internal/check" + "github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer" + "github.com/DataDog/datadog-agent/comp/core/flare" + "github.com/DataDog/datadog-agent/comp/core/secrets" + "github.com/DataDog/datadog-agent/comp/core/workloadmeta" + workloadmetaServer "github.com/DataDog/datadog-agent/comp/core/workloadmeta/server" + "github.com/DataDog/datadog-agent/comp/dogstatsd/replay" + dogstatsdServer "github.com/DataDog/datadog-agent/comp/dogstatsd/server" + dogstatsddebug "github.com/DataDog/datadog-agent/comp/dogstatsd/serverDebug" + logsAgent "github.com/DataDog/datadog-agent/comp/logs/agent" + "github.com/DataDog/datadog-agent/comp/metadata/host" + "github.com/DataDog/datadog-agent/comp/metadata/inventoryagent" + "github.com/DataDog/datadog-agent/comp/metadata/inventoryhost" + "github.com/DataDog/datadog-agent/pkg/aggregator/sender" + "github.com/DataDog/datadog-agent/pkg/config" + remoteconfig "github.com/DataDog/datadog-agent/pkg/config/remote/service" + pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/core" + "github.com/DataDog/datadog-agent/pkg/tagger" + taggerserver "github.com/DataDog/datadog-agent/pkg/tagger/server" + grpcutil "github.com/DataDog/datadog-agent/pkg/util/grpc" + "github.com/DataDog/datadog-agent/pkg/util/optional" +) + +const cmdServerName string = "CMD API Server" + +var cmdListener net.Listener + +func startCMDServer( + cmdAddr string, + tlsConfig *tls.Config, + tlsCertPool *x509.CertPool, + configService *remoteconfig.Service, + flare flare.Component, + dogstatsdServer dogstatsdServer.Component, + capture replay.Component, + serverDebug dogstatsddebug.Component, + wmeta workloadmeta.Component, + logsAgent optional.Option[logsAgent.Component], + senderManager sender.DiagnoseSenderManager, + hostMetadata host.Component, + invAgent inventoryagent.Component, + demux demultiplexer.Component, + invHost inventoryhost.Component, + secretResolver secrets.Component, +) (err error) { + // get the transport we're going to use under HTTP + cmdListener, err = getListener(cmdAddr) + if err != nil { + // we use the listener to handle commands for the Agent, there's + // no way we can recover from this error + return fmt.Errorf("unable to listen to the given address: %v", err) + } + + // gRPC server + authInterceptor := grpcutil.AuthInterceptor(parseToken) + opts := []grpc.ServerOption{ + grpc.Creds(credentials.NewClientTLSFromCert(tlsCertPool, cmdAddr)), + grpc.StreamInterceptor(grpc_auth.StreamServerInterceptor(authInterceptor)), + grpc.UnaryInterceptor(grpc_auth.UnaryServerInterceptor(authInterceptor)), + } + + s := grpc.NewServer(opts...) + pb.RegisterAgentServer(s, &server{}) + pb.RegisterAgentSecureServer(s, &serverSecure{ + configService: configService, + taggerServer: taggerserver.NewServer(tagger.GetDefaultTagger()), + // TODO(components): decide if workloadmetaServer should be componentized itself + workloadmetaServer: workloadmetaServer.NewServer(wmeta), + dogstatsdServer: dogstatsdServer, + capture: capture, + }) + + dcreds := credentials.NewTLS(&tls.Config{ + ServerName: cmdAddr, + RootCAs: tlsCertPool, + }) + dopts := []grpc.DialOption{grpc.WithTransportCredentials(dcreds)} + + // starting grpc gateway + ctx := context.Background() + gwmux := runtime.NewServeMux() + err = pb.RegisterAgentHandlerFromEndpoint( + ctx, gwmux, cmdAddr, dopts) + if err != nil { + return fmt.Errorf("error registering agent handler from endpoint %s: %v", cmdAddr, err) + } + + err = pb.RegisterAgentSecureHandlerFromEndpoint( + ctx, gwmux, cmdAddr, dopts) + if err != nil { + return fmt.Errorf("error registering agent secure handler from endpoint %s: %v", cmdAddr, err) + } + + // Setup multiplexer + // create the REST HTTP router + agentMux := gorilla.NewRouter() + checkMux := gorilla.NewRouter() + // Validate token for every request + agentMux.Use(validateToken) + checkMux.Use(validateToken) + + cmdMux := http.NewServeMux() + cmdMux.Handle( + "/agent/", + http.StripPrefix("/agent", + agent.SetupHandlers( + agentMux, + flare, + dogstatsdServer, + serverDebug, + wmeta, + logsAgent, + senderManager, + hostMetadata, + invAgent, + demux, + invHost, + secretResolver, + ))) + cmdMux.Handle("/check/", http.StripPrefix("/check", check.SetupHandlers(checkMux))) + cmdMux.Handle("/", gwmux) + + srv := grpcutil.NewMuxedGRPCServer( + cmdAddr, + tlsConfig, + s, + grpcutil.TimeoutHandlerFunc(cmdMux, time.Duration(config.Datadog.GetInt64("server_timeout"))*time.Second), + ) + + startServer(cmdListener, srv, cmdServerName) + + return nil +} + +// ServerAddress returns the server address. +func ServerAddress() *net.TCPAddr { + return cmdListener.Addr().(*net.TCPAddr) +} + +func stopCMDServer() { + stopServer(cmdListener, cmdServerName) +} diff --git a/cmd/agent/api/server_ipc.go b/cmd/agent/api/server_ipc.go new file mode 100644 index 0000000000000..f799c3a58467a --- /dev/null +++ b/cmd/agent/api/server_ipc.go @@ -0,0 +1,48 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package api + +import ( + "crypto/tls" + "net" + "net/http" + "time" + + configendpoint "github.com/DataDog/datadog-agent/cmd/agent/api/internal/config" + "github.com/DataDog/datadog-agent/pkg/config" +) + +const ipcServerName string = "IPC API Server" + +var ipcListener net.Listener + +func startIPCServer(ipcServerAddr string, tlsConfig *tls.Config) (err error) { + ipcListener, err = getListener(ipcServerAddr) + if err != nil { + return err + } + + configEndpointMux := configendpoint.GetConfigEndpointMuxCore() + configEndpointMux.Use(validateToken) + ipcMux := http.NewServeMux() + ipcMux.Handle( + "/config/v1/", + http.StripPrefix("/config/v1", configEndpointMux)) + + ipcServer := &http.Server{ + Addr: ipcServerAddr, + Handler: http.TimeoutHandler(ipcMux, time.Duration(config.Datadog.GetInt64("server_timeout"))*time.Second, "timeout"), + TLSConfig: tlsConfig, + } + + startServer(ipcListener, ipcServer, ipcServerName) + + return nil +} + +func stopIPCServer() { + stopServer(ipcListener, ipcServerName) +} diff --git a/comp/api/api/apiimpl/api.go b/comp/api/api/apiimpl/api.go index 08131c0436e30..d7ea69524beff 100644 --- a/comp/api/api/apiimpl/api.go +++ b/comp/api/api/apiimpl/api.go @@ -60,7 +60,7 @@ func (server *apiServer) StartServer( invHost inventoryhost.Component, secretResolver secrets.Component, ) error { - return apiPackage.StartServer(configService, + return apiPackage.StartServers(configService, flare, dogstatsdServer, capture, @@ -79,7 +79,7 @@ func (server *apiServer) StartServer( // StopServer closes the connection and the server // stops listening to new commands. func (server *apiServer) StopServer() { - apiPackage.StopServer() + apiPackage.StopServers() } // ServerAddress returns the server address. diff --git a/pkg/api/security/security.go b/pkg/api/security/security.go index af601ccca824d..c7a98f72f3ed8 100644 --- a/pkg/api/security/security.go +++ b/pkg/api/security/security.go @@ -68,6 +68,8 @@ func CertTemplate() (*x509.Certificate, error) { // GenerateRootCert generates a root certificate func GenerateRootCert(hosts []string, bits int) (cert *x509.Certificate, certPEM []byte, rootKey *rsa.PrivateKey, err error) { + log.Info("Generating root certificate for hosts: ", strings.Join(hosts, ", ")) + rootCertTmpl, err := CertTemplate() if err != nil { return diff --git a/pkg/config/config.go b/pkg/config/config.go index 7783bd402b05f..8461136fd842d 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -248,6 +248,8 @@ func InitConfig(config Config) { config.BindEnv("ipc_address") // deprecated: use `cmd_host` instead config.BindEnvAndSetDefault("cmd_host", "localhost") config.BindEnvAndSetDefault("cmd_port", 5001) + config.BindEnvAndSetDefault("agent_ipc_host", "localhost") + config.BindEnvAndSetDefault("agent_ipc_port", 0) config.BindEnvAndSetDefault("default_integration_http_timeout", 9) config.BindEnvAndSetDefault("integration_tracing", false) config.BindEnvAndSetDefault("integration_tracing_exhaustive", false)