Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-implement processes route to restore compatibility with the current cloud health checks #1773

Merged
merged 5 commits into from
Nov 23, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions internal/pkg/agent/application/monitoring/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package monitoring

import "fmt"

func errorWithStatus(status int, err error) *statusError {
return &statusError{
err: err,
status: status,
}
}

func errorfWithStatus(status int, msg string, args ...string) *statusError {
err := fmt.Errorf(msg, args)
return errorWithStatus(status, err)
}

// StatusError holds correlation between error and a status
type statusError struct {
err error
status int
}

func (s *statusError) Status() int {
return s.status
}

func (s *statusError) Error() string {
return s.err.Error()
}
62 changes: 62 additions & 0 deletions internal/pkg/agent/application/monitoring/process.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package monitoring

import (
"encoding/json"
"fmt"
"net/http"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/gorilla/mux"
)

const processIDKey = "processID"

func processHandler(coord *coordinator.Coordinator, statsHandler func(http.ResponseWriter, *http.Request) error) func(http.ResponseWriter, *http.Request) error {
return func(w http.ResponseWriter, r *http.Request) error {
w.Header().Set("Content-Type", "application/json; charset=utf-8")

vars := mux.Vars(r)
id, found := vars[processIDKey]

if !found {
return errorfWithStatus(http.StatusNotFound, "productID not found")
}

if id == "" || id == paths.BinaryName {
// proxy stats for elastic agent process
return statsHandler(w, r)
}

state := coord.State(false)

for _, c := range state.Components {
if c.Component.ID == id {
data := struct {
State string `json:"state"`
Message string `json:"message"`
}{
State: c.State.State.String(),
Message: c.State.Message,
}

bytes, err := json.Marshal(data)
var content string
if err != nil {
content = fmt.Sprintf("Not valid json: %v", err)
} else {
content = string(bytes)
}
fmt.Fprint(w, content)

return nil
}
}

return errorWithStatus(http.StatusNotFound, fmt.Errorf("matching component %v not found", id))
}
}
73 changes: 73 additions & 0 deletions internal/pkg/agent/application/monitoring/processes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package monitoring

import (
"encoding/json"
"fmt"
"net/http"
"strings"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
)

type source struct {
Kind string `json:"kind"`
Outputs []string `json:"outputs"`
}

type process struct {
ID string `json:"id"`
PID string `json:"pid,omitempty"`
Binary string `json:"binary"`
Source source `json:"source"`
}

func sourceFromComponentID(procID string) source {
var s source
var out string
if pos := strings.LastIndex(procID, "-"); pos != -1 {
out = procID[pos+1:]
}
if strings.HasSuffix(out, "monitoring") {
s.Kind = "internal"
} else {
s.Kind = "configured"
}
s.Outputs = []string{out}
return s
}

func processesHandler(coord *coordinator.Coordinator) func(http.ResponseWriter, *http.Request) error {
return func(w http.ResponseWriter, r *http.Request) error {
w.Header().Set("Content-Type", "application/json; charset=utf-8")

procs := make([]process, 0)

state := coord.State(false)

for _, c := range state.Components {
procs = append(procs, process{c.Component.ID, c.Component.InputSpec.BinaryName,
c.LegacyPID,
sourceFromComponentID(c.Component.ID)})
}
data := struct {
Processes []process `json:"processes"`
}{
Processes: procs,
}

bytes, err := json.Marshal(data)
var content string
if err != nil {
content = fmt.Sprintf("Not valid json: %v", err)
} else {
content = string(bytes)
}
fmt.Fprint(w, content)

return nil
}
}
14 changes: 13 additions & 1 deletion internal/pkg/agent/application/monitoring/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/elastic/elastic-agent-libs/api"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
"github.com/elastic/elastic-agent/pkg/core/logger"
)

Expand All @@ -27,6 +28,8 @@ func NewServer(
endpointConfig api.Config,
ns func(string) *monitoring.Namespace,
tracer *apm.Tracer,
coord *coordinator.Coordinator,
enableProcessStats bool,
) (*api.Server, error) {
if err := createAgentMonitoringDrop(endpointConfig.Host); err != nil {
// log but ignore
Expand All @@ -38,14 +41,16 @@ func NewServer(
return nil, err
}

return exposeMetricsEndpoint(log, cfg, ns, tracer)
return exposeMetricsEndpoint(log, cfg, ns, tracer, coord, enableProcessStats)
}

func exposeMetricsEndpoint(
log *logger.Logger,
config *config.C,
ns func(string) *monitoring.Namespace,
tracer *apm.Tracer,
coord *coordinator.Coordinator,
enableProcessStats bool,
) (*api.Server, error) {
r := mux.NewRouter()
if tracer != nil {
Expand All @@ -54,6 +59,13 @@ func exposeMetricsEndpoint(
statsHandler := statsHandler(ns("stats"))
r.Handle("/stats", createHandler(statsHandler))

if enableProcessStats {
r.Handle("/processes", createHandler(processesHandler(coord)))
r.Handle("/processes/{processID}", createHandler(processHandler(coord, statsHandler)))
r.Handle("/processes/{processID}/", createHandler(processHandler(coord, statsHandler)))
r.Handle("/processes/{processID}/{beatsPath}", createHandler(processHandler(coord, statsHandler)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is {beatsPath} here? I don't see it used in the handler.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

}

mux := http.NewServeMux()
mux.Handle("/", r)

Expand Down
10 changes: 8 additions & 2 deletions internal/pkg/agent/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/elastic/elastic-agent-libs/service"
"github.com/elastic/elastic-agent-system-metrics/report"
"github.com/elastic/elastic-agent/internal/pkg/agent/application"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/filelock"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring"
Expand Down Expand Up @@ -170,7 +171,7 @@ func run(override cfgOverrider, modifiers ...component.PlatformModifier) error {
return err
}

serverStopFn, err := setupMetrics(logger, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, tracer)
serverStopFn, err := setupMetrics(logger, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, tracer, coord)
if err != nil {
return err
}
Expand Down Expand Up @@ -452,6 +453,7 @@ func setupMetrics(
operatingSystem string,
cfg *monitoringCfg.MonitoringConfig,
tracer *apm.Tracer,
coord *coordinator.Coordinator,
) (func() error, error) {
if err := report.SetupMetrics(logger, agentName, version.GetDefaultVersion()); err != nil {
return nil, err
Expand All @@ -463,7 +465,7 @@ func setupMetrics(
Host: monitoring.AgentMonitoringEndpoint(operatingSystem, cfg),
}

s, err := monitoring.NewServer(logger, endpointConfig, monitoringLib.GetNamespace, tracer)
s, err := monitoring.NewServer(logger, endpointConfig, monitoringLib.GetNamespace, tracer, coord, isProcessStatsEnabled(cfg))
if err != nil {
return nil, errors.New(err, "could not start the HTTP server for the API")
}
Expand All @@ -472,3 +474,7 @@ func setupMetrics(
// return server stopper
return s.Stop, nil
}

func isProcessStatsEnabled(cfg *monitoringCfg.MonitoringConfig) bool {
return cfg != nil && cfg.HTTP.Enabled
}
13 changes: 13 additions & 0 deletions pkg/component/runtime/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ var (
type ComponentComponentState struct {
Component component.Component `yaml:"component"`
State ComponentState `yaml:"state"`
LegacyPID string `yaml:"-"` // To propagate PID for the /processes, and yes, it was a string
}

// ComponentUnitDiagnosticRequest used to request diagnostics from specific unit.
Expand Down Expand Up @@ -284,9 +285,21 @@ func (m *Manager) State() []ComponentComponentState {
states := make([]ComponentComponentState, 0, len(m.current))
for _, crs := range m.current {
crs.latestMx.RLock()
var legacyPID string
if crs.runtime != nil {
if commandRuntime, ok := crs.runtime.(*CommandRuntime); ok {
if commandRuntime != nil {
procInfo := commandRuntime.proc
if procInfo != nil {
legacyPID = fmt.Sprint(commandRuntime.proc.PID)
}
}
}
}
states = append(states, ComponentComponentState{
Component: crs.currComp,
State: crs.latestState.Copy(),
LegacyPID: legacyPID,
})
crs.latestMx.RUnlock()
}
Expand Down