From e3e953b62d922ad62d737c7b638a96d1d25ed126 Mon Sep 17 00:00:00 2001 From: Aleksandr Maus Date: Wed, 23 Nov 2022 08:29:33 -0500 Subject: [PATCH] Re-implement processes route to restore compatibility with the current cloud health checks (#1773) * Re-implement processes route to restore compatibility with the current cloud health checks * Removed unused route * Make linter happy * Add changelog fragment * Add nil check for InputSpec --- ...669159455-reimplement-processes-route.yaml | 31 ++++++++ .../pkg/agent/application/monitoring/error.go | 33 ++++++++ .../agent/application/monitoring/process.go | 63 ++++++++++++++++ .../agent/application/monitoring/processes.go | 75 +++++++++++++++++++ .../agent/application/monitoring/server.go | 13 +++- internal/pkg/agent/cmd/run.go | 12 ++- pkg/component/runtime/manager.go | 13 ++++ 7 files changed, 236 insertions(+), 4 deletions(-) create mode 100644 changelog/fragments/1669159455-reimplement-processes-route.yaml create mode 100644 internal/pkg/agent/application/monitoring/error.go create mode 100644 internal/pkg/agent/application/monitoring/process.go create mode 100644 internal/pkg/agent/application/monitoring/processes.go diff --git a/changelog/fragments/1669159455-reimplement-processes-route.yaml b/changelog/fragments/1669159455-reimplement-processes-route.yaml new file mode 100644 index 00000000000..574fdeedf83 --- /dev/null +++ b/changelog/fragments/1669159455-reimplement-processes-route.yaml @@ -0,0 +1,31 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: feature + +# Change summary; a 80ish characters long description of the change. +summary: reimplement processes route + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +description: Re-implement processes route to restore compatibility with the current cloud health checks + +# Affected component; a word indicating the component this changeset affects. +component: + +# PR number; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: 1773 + +# Issue number; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: 1731 diff --git a/internal/pkg/agent/application/monitoring/error.go b/internal/pkg/agent/application/monitoring/error.go new file mode 100644 index 00000000000..ebf33d9b7fd --- /dev/null +++ b/internal/pkg/agent/application/monitoring/error.go @@ -0,0 +1,33 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package monitoring + +import "fmt" + +func errorWithStatus(status int, err error) *statusError { + return &statusError{ + err: err, + status: status, + } +} + +func errorfWithStatus(status int, msg string, args ...string) *statusError { + err := fmt.Errorf(msg, args) + return errorWithStatus(status, err) +} + +// StatusError holds correlation between error and a status +type statusError struct { + err error + status int +} + +func (s *statusError) Status() int { + return s.status +} + +func (s *statusError) Error() string { + return s.err.Error() +} diff --git a/internal/pkg/agent/application/monitoring/process.go b/internal/pkg/agent/application/monitoring/process.go new file mode 100644 index 00000000000..63a5270882c --- /dev/null +++ b/internal/pkg/agent/application/monitoring/process.go @@ -0,0 +1,63 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package monitoring + +import ( + "encoding/json" + "fmt" + "net/http" + + "github.com/gorilla/mux" + + "github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" +) + +const processIDKey = "processID" + +func processHandler(coord *coordinator.Coordinator, statsHandler func(http.ResponseWriter, *http.Request) error) func(http.ResponseWriter, *http.Request) error { + return func(w http.ResponseWriter, r *http.Request) error { + w.Header().Set("Content-Type", "application/json; charset=utf-8") + + vars := mux.Vars(r) + id, found := vars[processIDKey] + + if !found { + return errorfWithStatus(http.StatusNotFound, "productID not found") + } + + if id == "" || id == paths.BinaryName { + // proxy stats for elastic agent process + return statsHandler(w, r) + } + + state := coord.State(false) + + for _, c := range state.Components { + if c.Component.ID == id { + data := struct { + State string `json:"state"` + Message string `json:"message"` + }{ + State: c.State.State.String(), + Message: c.State.Message, + } + + bytes, err := json.Marshal(data) + var content string + if err != nil { + content = fmt.Sprintf("Not valid json: %v", err) + } else { + content = string(bytes) + } + fmt.Fprint(w, content) + + return nil + } + } + + return errorWithStatus(http.StatusNotFound, fmt.Errorf("matching component %v not found", id)) + } +} diff --git a/internal/pkg/agent/application/monitoring/processes.go b/internal/pkg/agent/application/monitoring/processes.go new file mode 100644 index 00000000000..9649778fa51 --- /dev/null +++ b/internal/pkg/agent/application/monitoring/processes.go @@ -0,0 +1,75 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package monitoring + +import ( + "encoding/json" + "fmt" + "net/http" + "strings" + + "github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator" +) + +type source struct { + Kind string `json:"kind"` + Outputs []string `json:"outputs"` +} + +type process struct { + ID string `json:"id"` + PID string `json:"pid,omitempty"` + Binary string `json:"binary"` + Source source `json:"source"` +} + +func sourceFromComponentID(procID string) source { + var s source + var out string + if pos := strings.LastIndex(procID, "-"); pos != -1 { + out = procID[pos+1:] + } + if strings.HasSuffix(out, "monitoring") { + s.Kind = "internal" + } else { + s.Kind = "configured" + } + s.Outputs = []string{out} + return s +} + +func processesHandler(coord *coordinator.Coordinator) func(http.ResponseWriter, *http.Request) error { + return func(w http.ResponseWriter, r *http.Request) error { + w.Header().Set("Content-Type", "application/json; charset=utf-8") + + procs := make([]process, 0) + + state := coord.State(false) + + for _, c := range state.Components { + if c.Component.InputSpec != nil { + procs = append(procs, process{c.Component.ID, c.Component.InputSpec.BinaryName, + c.LegacyPID, + sourceFromComponentID(c.Component.ID)}) + } + } + data := struct { + Processes []process `json:"processes"` + }{ + Processes: procs, + } + + bytes, err := json.Marshal(data) + var content string + if err != nil { + content = fmt.Sprintf("Not valid json: %v", err) + } else { + content = string(bytes) + } + fmt.Fprint(w, content) + + return nil + } +} diff --git a/internal/pkg/agent/application/monitoring/server.go b/internal/pkg/agent/application/monitoring/server.go index ef5a26df9d2..5973ea751b8 100644 --- a/internal/pkg/agent/application/monitoring/server.go +++ b/internal/pkg/agent/application/monitoring/server.go @@ -18,6 +18,7 @@ import ( "github.com/elastic/elastic-agent-libs/api" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/monitoring" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator" "github.com/elastic/elastic-agent/pkg/core/logger" ) @@ -27,6 +28,8 @@ func NewServer( endpointConfig api.Config, ns func(string) *monitoring.Namespace, tracer *apm.Tracer, + coord *coordinator.Coordinator, + enableProcessStats bool, ) (*api.Server, error) { if err := createAgentMonitoringDrop(endpointConfig.Host); err != nil { // log but ignore @@ -38,7 +41,7 @@ func NewServer( return nil, err } - return exposeMetricsEndpoint(log, cfg, ns, tracer) + return exposeMetricsEndpoint(log, cfg, ns, tracer, coord, enableProcessStats) } func exposeMetricsEndpoint( @@ -46,6 +49,8 @@ func exposeMetricsEndpoint( config *config.C, ns func(string) *monitoring.Namespace, tracer *apm.Tracer, + coord *coordinator.Coordinator, + enableProcessStats bool, ) (*api.Server, error) { r := mux.NewRouter() if tracer != nil { @@ -54,6 +59,12 @@ func exposeMetricsEndpoint( statsHandler := statsHandler(ns("stats")) r.Handle("/stats", createHandler(statsHandler)) + if enableProcessStats { + r.Handle("/processes", createHandler(processesHandler(coord))) + r.Handle("/processes/{processID}", createHandler(processHandler(coord, statsHandler))) + r.Handle("/processes/{processID}/", createHandler(processHandler(coord, statsHandler))) + } + mux := http.NewServeMux() mux.Handle("/", r) diff --git a/internal/pkg/agent/cmd/run.go b/internal/pkg/agent/cmd/run.go index e6f9ec8d0f7..5a6a6ce0ba2 100644 --- a/internal/pkg/agent/cmd/run.go +++ b/internal/pkg/agent/cmd/run.go @@ -25,6 +25,7 @@ import ( "github.com/elastic/elastic-agent-libs/service" "github.com/elastic/elastic-agent-system-metrics/report" "github.com/elastic/elastic-agent/internal/pkg/agent/application" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator" "github.com/elastic/elastic-agent/internal/pkg/agent/application/filelock" "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring" @@ -170,7 +171,7 @@ func run(override cfgOverrider, modifiers ...component.PlatformModifier) error { return err } - serverStopFn, err := setupMetrics(logger, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, tracer) + serverStopFn, err := setupMetrics(logger, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, tracer, coord) if err != nil { return err } @@ -395,7 +396,7 @@ func initTracer(agentName, version string, mcfg *monitoringCfg.MonitoringConfig) cfg := mcfg.APM - // nolint:godox // the TODO is intentional + //nolint:godox // the TODO is intentional // TODO(stn): Ideally, we'd use apmtransport.NewHTTPTransportOptions() // but it doesn't exist today. Update this code once we have something // available via the APM Go agent. @@ -452,6 +453,7 @@ func setupMetrics( operatingSystem string, cfg *monitoringCfg.MonitoringConfig, tracer *apm.Tracer, + coord *coordinator.Coordinator, ) (func() error, error) { if err := report.SetupMetrics(logger, agentName, version.GetDefaultVersion()); err != nil { return nil, err @@ -463,7 +465,7 @@ func setupMetrics( Host: monitoring.AgentMonitoringEndpoint(operatingSystem, cfg), } - s, err := monitoring.NewServer(logger, endpointConfig, monitoringLib.GetNamespace, tracer) + s, err := monitoring.NewServer(logger, endpointConfig, monitoringLib.GetNamespace, tracer, coord, isProcessStatsEnabled(cfg)) if err != nil { return nil, errors.New(err, "could not start the HTTP server for the API") } @@ -472,3 +474,7 @@ func setupMetrics( // return server stopper return s.Stop, nil } + +func isProcessStatsEnabled(cfg *monitoringCfg.MonitoringConfig) bool { + return cfg != nil && cfg.HTTP.Enabled +} diff --git a/pkg/component/runtime/manager.go b/pkg/component/runtime/manager.go index 0c4befc5e2f..d19eda7c9a4 100644 --- a/pkg/component/runtime/manager.go +++ b/pkg/component/runtime/manager.go @@ -54,6 +54,7 @@ var ( type ComponentComponentState struct { Component component.Component `yaml:"component"` State ComponentState `yaml:"state"` + LegacyPID string `yaml:"-"` // To propagate PID for the /processes, and yes, it was a string } // ComponentUnitDiagnosticRequest used to request diagnostics from specific unit. @@ -284,9 +285,21 @@ func (m *Manager) State() []ComponentComponentState { states := make([]ComponentComponentState, 0, len(m.current)) for _, crs := range m.current { crs.latestMx.RLock() + var legacyPID string + if crs.runtime != nil { + if commandRuntime, ok := crs.runtime.(*CommandRuntime); ok { + if commandRuntime != nil { + procInfo := commandRuntime.proc + if procInfo != nil { + legacyPID = fmt.Sprint(commandRuntime.proc.PID) + } + } + } + } states = append(states, ComponentComponentState{ Component: crs.currComp, State: crs.latestState.Copy(), + LegacyPID: legacyPID, }) crs.latestMx.RUnlock() }