Skip to content

Commit

Permalink
Re-implement processes route to restore compatibility with the curren…
Browse files Browse the repository at this point in the history
…t cloud health checks (#1773)

* Re-implement processes route to restore compatibility with the current cloud health checks
* Removed unused route
* Make linter happy
* Add changelog fragment
* Add nil check for InputSpec
  • Loading branch information
aleksmaus authored Nov 23, 2022
1 parent c3b7460 commit e3e953b
Show file tree
Hide file tree
Showing 7 changed files with 236 additions and 4 deletions.
31 changes: 31 additions & 0 deletions changelog/fragments/1669159455-reimplement-processes-route.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: feature

# Change summary; a 80ish characters long description of the change.
summary: reimplement processes route

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
description: Re-implement processes route to restore compatibility with the current cloud health checks

# Affected component; a word indicating the component this changeset affects.
component:

# PR number; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: 1773

# Issue number; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: 1731
33 changes: 33 additions & 0 deletions internal/pkg/agent/application/monitoring/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package monitoring

import "fmt"

func errorWithStatus(status int, err error) *statusError {
return &statusError{
err: err,
status: status,
}
}

func errorfWithStatus(status int, msg string, args ...string) *statusError {
err := fmt.Errorf(msg, args)
return errorWithStatus(status, err)
}

// StatusError holds correlation between error and a status
type statusError struct {
err error
status int
}

func (s *statusError) Status() int {
return s.status
}

func (s *statusError) Error() string {
return s.err.Error()
}
63 changes: 63 additions & 0 deletions internal/pkg/agent/application/monitoring/process.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package monitoring

import (
"encoding/json"
"fmt"
"net/http"

"github.com/gorilla/mux"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
)

const processIDKey = "processID"

func processHandler(coord *coordinator.Coordinator, statsHandler func(http.ResponseWriter, *http.Request) error) func(http.ResponseWriter, *http.Request) error {
return func(w http.ResponseWriter, r *http.Request) error {
w.Header().Set("Content-Type", "application/json; charset=utf-8")

vars := mux.Vars(r)
id, found := vars[processIDKey]

if !found {
return errorfWithStatus(http.StatusNotFound, "productID not found")
}

if id == "" || id == paths.BinaryName {
// proxy stats for elastic agent process
return statsHandler(w, r)
}

state := coord.State(false)

for _, c := range state.Components {
if c.Component.ID == id {
data := struct {
State string `json:"state"`
Message string `json:"message"`
}{
State: c.State.State.String(),
Message: c.State.Message,
}

bytes, err := json.Marshal(data)
var content string
if err != nil {
content = fmt.Sprintf("Not valid json: %v", err)
} else {
content = string(bytes)
}
fmt.Fprint(w, content)

return nil
}
}

return errorWithStatus(http.StatusNotFound, fmt.Errorf("matching component %v not found", id))
}
}
75 changes: 75 additions & 0 deletions internal/pkg/agent/application/monitoring/processes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package monitoring

import (
"encoding/json"
"fmt"
"net/http"
"strings"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
)

type source struct {
Kind string `json:"kind"`
Outputs []string `json:"outputs"`
}

type process struct {
ID string `json:"id"`
PID string `json:"pid,omitempty"`
Binary string `json:"binary"`
Source source `json:"source"`
}

func sourceFromComponentID(procID string) source {
var s source
var out string
if pos := strings.LastIndex(procID, "-"); pos != -1 {
out = procID[pos+1:]
}
if strings.HasSuffix(out, "monitoring") {
s.Kind = "internal"
} else {
s.Kind = "configured"
}
s.Outputs = []string{out}
return s
}

func processesHandler(coord *coordinator.Coordinator) func(http.ResponseWriter, *http.Request) error {
return func(w http.ResponseWriter, r *http.Request) error {
w.Header().Set("Content-Type", "application/json; charset=utf-8")

procs := make([]process, 0)

state := coord.State(false)

for _, c := range state.Components {
if c.Component.InputSpec != nil {
procs = append(procs, process{c.Component.ID, c.Component.InputSpec.BinaryName,
c.LegacyPID,
sourceFromComponentID(c.Component.ID)})
}
}
data := struct {
Processes []process `json:"processes"`
}{
Processes: procs,
}

bytes, err := json.Marshal(data)
var content string
if err != nil {
content = fmt.Sprintf("Not valid json: %v", err)
} else {
content = string(bytes)
}
fmt.Fprint(w, content)

return nil
}
}
13 changes: 12 additions & 1 deletion internal/pkg/agent/application/monitoring/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/elastic/elastic-agent-libs/api"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
"github.com/elastic/elastic-agent/pkg/core/logger"
)

Expand All @@ -27,6 +28,8 @@ func NewServer(
endpointConfig api.Config,
ns func(string) *monitoring.Namespace,
tracer *apm.Tracer,
coord *coordinator.Coordinator,
enableProcessStats bool,
) (*api.Server, error) {
if err := createAgentMonitoringDrop(endpointConfig.Host); err != nil {
// log but ignore
Expand All @@ -38,14 +41,16 @@ func NewServer(
return nil, err
}

return exposeMetricsEndpoint(log, cfg, ns, tracer)
return exposeMetricsEndpoint(log, cfg, ns, tracer, coord, enableProcessStats)
}

func exposeMetricsEndpoint(
log *logger.Logger,
config *config.C,
ns func(string) *monitoring.Namespace,
tracer *apm.Tracer,
coord *coordinator.Coordinator,
enableProcessStats bool,
) (*api.Server, error) {
r := mux.NewRouter()
if tracer != nil {
Expand All @@ -54,6 +59,12 @@ func exposeMetricsEndpoint(
statsHandler := statsHandler(ns("stats"))
r.Handle("/stats", createHandler(statsHandler))

if enableProcessStats {
r.Handle("/processes", createHandler(processesHandler(coord)))
r.Handle("/processes/{processID}", createHandler(processHandler(coord, statsHandler)))
r.Handle("/processes/{processID}/", createHandler(processHandler(coord, statsHandler)))
}

mux := http.NewServeMux()
mux.Handle("/", r)

Expand Down
12 changes: 9 additions & 3 deletions internal/pkg/agent/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/elastic/elastic-agent-libs/service"
"github.com/elastic/elastic-agent-system-metrics/report"
"github.com/elastic/elastic-agent/internal/pkg/agent/application"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/filelock"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring"
Expand Down Expand Up @@ -170,7 +171,7 @@ func run(override cfgOverrider, modifiers ...component.PlatformModifier) error {
return err
}

serverStopFn, err := setupMetrics(logger, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, tracer)
serverStopFn, err := setupMetrics(logger, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, tracer, coord)
if err != nil {
return err
}
Expand Down Expand Up @@ -395,7 +396,7 @@ func initTracer(agentName, version string, mcfg *monitoringCfg.MonitoringConfig)

cfg := mcfg.APM

// nolint:godox // the TODO is intentional
//nolint:godox // the TODO is intentional
// TODO(stn): Ideally, we'd use apmtransport.NewHTTPTransportOptions()
// but it doesn't exist today. Update this code once we have something
// available via the APM Go agent.
Expand Down Expand Up @@ -452,6 +453,7 @@ func setupMetrics(
operatingSystem string,
cfg *monitoringCfg.MonitoringConfig,
tracer *apm.Tracer,
coord *coordinator.Coordinator,
) (func() error, error) {
if err := report.SetupMetrics(logger, agentName, version.GetDefaultVersion()); err != nil {
return nil, err
Expand All @@ -463,7 +465,7 @@ func setupMetrics(
Host: monitoring.AgentMonitoringEndpoint(operatingSystem, cfg),
}

s, err := monitoring.NewServer(logger, endpointConfig, monitoringLib.GetNamespace, tracer)
s, err := monitoring.NewServer(logger, endpointConfig, monitoringLib.GetNamespace, tracer, coord, isProcessStatsEnabled(cfg))
if err != nil {
return nil, errors.New(err, "could not start the HTTP server for the API")
}
Expand All @@ -472,3 +474,7 @@ func setupMetrics(
// return server stopper
return s.Stop, nil
}

func isProcessStatsEnabled(cfg *monitoringCfg.MonitoringConfig) bool {
return cfg != nil && cfg.HTTP.Enabled
}
13 changes: 13 additions & 0 deletions pkg/component/runtime/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ var (
type ComponentComponentState struct {
Component component.Component `yaml:"component"`
State ComponentState `yaml:"state"`
LegacyPID string `yaml:"-"` // To propagate PID for the /processes, and yes, it was a string
}

// ComponentUnitDiagnosticRequest used to request diagnostics from specific unit.
Expand Down Expand Up @@ -284,9 +285,21 @@ func (m *Manager) State() []ComponentComponentState {
states := make([]ComponentComponentState, 0, len(m.current))
for _, crs := range m.current {
crs.latestMx.RLock()
var legacyPID string
if crs.runtime != nil {
if commandRuntime, ok := crs.runtime.(*CommandRuntime); ok {
if commandRuntime != nil {
procInfo := commandRuntime.proc
if procInfo != nil {
legacyPID = fmt.Sprint(commandRuntime.proc.PID)
}
}
}
}
states = append(states, ComponentComponentState{
Component: crs.currComp,
State: crs.latestState.Copy(),
LegacyPID: legacyPID,
})
crs.latestMx.RUnlock()
}
Expand Down

0 comments on commit e3e953b

Please sign in to comment.