Skip to content

Commit

Permalink
[process-agent] Refactor status build and add it to core agent (#10886)
Browse files Browse the repository at this point in the history
* refactor status command in process-agent; add process-agent status output to core agent status command
  • Loading branch information
mbotarro authored Feb 15, 2022
1 parent 2e45d3a commit df865bf
Show file tree
Hide file tree
Showing 10 changed files with 457 additions and 257 deletions.
34 changes: 30 additions & 4 deletions cmd/process-agent/api/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,49 @@ package api

import (
"encoding/json"
"fmt"
"net/http"

"github.com/DataDog/datadog-agent/pkg/status"
ddconfig "github.com/DataDog/datadog-agent/pkg/config"
"github.com/DataDog/datadog-agent/pkg/process/util"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

func writeError(err error, code int, w http.ResponseWriter) {
body, _ := json.Marshal(map[string]string{"error": err.Error()})
http.Error(w, string(body), code)
}

func statusHandler(w http.ResponseWriter, _ *http.Request) {
log.Trace("Received status request from process-agent")
log.Info("Got a request for the status. Making status.")

// Get expVar server address
ipcAddr, err := ddconfig.GetIPCAddress()
if err != nil {
writeError(err, http.StatusInternalServerError, w)
_ = log.Warn("config error:", err)
return
}

port := ddconfig.Datadog.GetInt("process_config.expvar_port")
if port <= 0 {
_ = log.Warnf("Invalid process_config.expvar_port -- %d, using default port %d\n", port, ddconfig.DefaultProcessExpVarPort)
port = ddconfig.DefaultProcessExpVarPort
}
expvarEndpoint := fmt.Sprintf("http://%s:%d/debug/vars", ipcAddr, port)

agentStatus, err := status.GetStatus()
agentStatus, err := util.GetStatus(expvarEndpoint)
if err != nil {
_ = log.Warn("failed to get status from agent:", agentStatus)
_ = log.Warn("failed to get status from agent:", err)
writeError(err, http.StatusInternalServerError, w)
return
}

b, err := json.Marshal(agentStatus)
if err != nil {
_ = log.Warn("failed to serialize status response from agent:", err)
writeError(err, http.StatusInternalServerError, w)
return
}

_, err = w.Write(b)
Expand Down
230 changes: 54 additions & 176 deletions cmd/process-agent/app/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,69 +11,21 @@ import (
"io"
"os"
"text/template"
"time"

"github.com/spf13/cobra"

"github.com/DataDog/datadog-agent/cmd/process-agent/api"
"github.com/DataDog/datadog-agent/pkg/api/util"
apiutil "github.com/DataDog/datadog-agent/pkg/api/util"
ddconfig "github.com/DataDog/datadog-agent/pkg/config"
"github.com/DataDog/datadog-agent/pkg/metadata/host"
"github.com/DataDog/datadog-agent/pkg/process/config"
"github.com/DataDog/datadog-agent/pkg/process/util"
ddstatus "github.com/DataDog/datadog-agent/pkg/status"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

var httpClient = util.GetClient(false)
var httpClient = apiutil.GetClient(false)

const (
statusTemplate = `
==============================
Process Agent ({{ .Core.AgentVersion }})
==============================
Status date: {{ formatUnixTime .Date }}
Process Agent Start: {{ formatUnixTime .Expvars.UptimeNano }}
Pid: {{ .Expvars.Pid }}
Go Version: {{ .Core.GoVersion }}
Python Version: {{ .Core.PythonVersion }}
Build arch: {{ .Core.Arch }}
Log Level: {{ .Core.Config.LogLevel }}
Enabled Checks: {{ .Expvars.EnabledChecks }}
Allocated Memory: {{ .Expvars.MemStats.Alloc }} bytes
Hostname: {{ .Core.Metadata.Meta.Hostname }}
=================
Process Endpoints
=================
{{- with .Expvars.Endpoints}}
{{- range $key, $value := .}}
{{$key}} - API Key{{ if gt (len $value) 1}}s{{end}} ending with:
{{- range $idx, $apikey := $value }}
- {{$apikey}}
{{- end}}
{{- end}}
{{- else }}
No endpoints information. The agent may be misconfigured.
{{- end }}
=========
Collector
=========
Last collection time: {{.Expvars.LastCollectTime}}
Docker socket: {{.Expvars.DockerSocket}}
Number of processes: {{.Expvars.ProcessCount}}
Number of containers: {{.Expvars.ContainerCount}}
Process Queue length: {{.Expvars.ProcessQueueSize}}
RTProcess Queue length: {{.Expvars.RTProcessQueueSize}}
Pod Queue length: {{.Expvars.PodQueueSize}}
Process Bytes enqueued: {{.Expvars.ProcessQueueBytes}}
RTProcess Bytes enqueued: {{.Expvars.RTProcessQueueBytes}}
Pod Bytes enqueued: {{.Expvars.PodQueueBytes}}
`
notRunning = `
=============
Process Agent
Expand All @@ -93,122 +45,6 @@ Error
`
)

type coreStatus struct {
AgentVersion string `json:"version"`
GoVersion string `json:"go_version"`
PythonVersion string `json:"python_version"`
Arch string `json:"build_arch"`
Config struct {
LogLevel string `json:"log_level"`
} `json:"config"`
Metadata host.Payload `json:"metadata"`
}

type infoVersion struct {
Version string
GitCommit string
GitBranch string
BuildDate string
GoVersion string
}

type processExpvars struct {
Pid int `json:"pid"`
Uptime int `json:"uptime"`
UptimeNano float64 `json:"uptime_nano"`
MemStats struct{ Alloc uint64 } `json:"memstats"`
Version infoVersion `json:"version"`
DockerSocket string `json:"docker_socket"`
LastCollectTime string `json:"last_collect_time"`
ProcessCount int `json:"process_count"`
ContainerCount int `json:"container_count"`
ProcessQueueSize int `json:"process_queue_size"`
RTProcessQueueSize int `json:"rtprocess_queue_size"`
PodQueueSize int `json:"pod_queue_size"`
ProcessQueueBytes int `json:"process_queue_bytes"`
RTProcessQueueBytes int `json:"rtprocess_queue_bytes"`
PodQueueBytes int `json:"pod_queue_bytes"`
ContainerID string `json:"container_id"`
ProxyURL string `json:"proxy_url"`
LogFile string `json:"log_file"`
EnabledChecks []string `json:"enabled_checks"`
Endpoints map[string][]string `json:"endpoints"`
}

type status struct {
Date float64
Core coreStatus // Contains the status from the core agent
Expvars processExpvars // Contains the expvars retrieved from the process agent
}

type statusOption func(s *status)

type connectionError struct {
error
}

func overrideTime(t time.Time) statusOption {
return func(s *status) {
s.Date = float64(t.UnixNano())
}
}

func getCoreStatus() (s coreStatus, err error) {
addressPort, err := api.GetAPIAddressPort()
if err != nil {
return coreStatus{}, fmt.Errorf("config error: %s", err.Error())
}

statusEndpoint := fmt.Sprintf("http://%s/agent/status", addressPort)
b, err := util.DoGet(httpClient, statusEndpoint)
if err != nil {
return s, connectionError{err}
}

err = json.Unmarshal(b, &s)
return
}

func getExpvars() (s processExpvars, err error) {
ipcAddr, err := ddconfig.GetIPCAddress()
if err != nil {
return processExpvars{}, fmt.Errorf("config error: %s", err.Error())
}

port := ddconfig.Datadog.GetInt("process_config.expvar_port")
if port <= 0 {
_ = log.Warnf("Invalid process_config.expvar_port -- %d, using default port %d\n", port, ddconfig.DefaultProcessExpVarPort)
port = ddconfig.DefaultProcessExpVarPort
}
expvarEndpoint := fmt.Sprintf("http://%s:%d/debug/vars", ipcAddr, port)
b, err := util.DoGet(httpClient, expvarEndpoint)
if err != nil {
return s, connectionError{err}
}

err = json.Unmarshal(b, &s)
return
}

func getStatus() (status, error) {
coreStatus, err := getCoreStatus()
if err != nil {
return status{}, err
}

processStatus, err := getExpvars()
if err != nil {
return status{}, err
}

s := status{
Date: float64(time.Now().UnixNano()),
Core: coreStatus,
Expvars: processStatus,
}
return s, nil
}

func writeNotRunning(w io.Writer) {
_, err := fmt.Fprint(w, notRunning)
if err != nil {
Expand All @@ -227,34 +63,68 @@ func writeError(w io.Writer, e error) {
_ = log.Error(err)
}
}
func fetchStatus(statusURL string) ([]byte, error) {
body, err := apiutil.DoGet(httpClient, statusURL)
if err != nil {
return nil, util.NewConnectionError(err)
}

return body, nil
}

// getAndWriteStatus calls the status server and writes it to `w`
func getAndWriteStatus(w io.Writer, options ...statusOption) {
status, err := getStatus()
func getAndWriteStatus(statusURL string, w io.Writer, options ...util.StatusOption) {
body, err := fetchStatus(statusURL)
if err != nil {
switch err.(type) {
case connectionError:
case util.ConnectionError:
writeNotRunning(w)
default:
writeError(w, err)
}
return
}
for _, option := range options {
option(&status)

// If options to override the status are provided, we need to deserialize and serialize it again
if len(options) > 0 {
var s util.Status
err = json.Unmarshal(body, &s)
if err != nil {
writeError(w, err)
return
}

for _, option := range options {
option(&s)
}

body, err = json.Marshal(s)
if err != nil {
writeError(w, err)
return
}
}

tpl, err := template.New("").Funcs(ddstatus.Textfmap()).Parse(statusTemplate)
stats, err := ddstatus.FormatProcessAgentStatus(body)
if err != nil {
_ = log.Error(err)
writeError(w, err)
return
}

err = tpl.Execute(w, status)
_, err = w.Write([]byte(stats))
if err != nil {
_ = log.Error(err)
}
}

func getStatusURL() (string, error) {
addressPort, err := api.GetAPIAddressPort()
if err != nil {
return "", fmt.Errorf("config error: %s", err.Error())
}
return fmt.Sprintf("http://%s/agent/status", addressPort), nil
}

// StatusCmd returns a cobra command that prints the current status
func StatusCmd() *cobra.Command {
return &cobra.Command{
Expand All @@ -269,6 +139,7 @@ func runStatus(cmd *cobra.Command, _ []string) error {
err := config.LoadConfigIfExists(cmd.Flag("config").Value.String())
if err != nil {
writeError(os.Stdout, err)
return err
}

err = ddconfig.SetupLogger(
Expand All @@ -282,8 +153,15 @@ func runStatus(cmd *cobra.Command, _ []string) error {
)
if err != nil {
writeError(os.Stdout, err)
return err
}

statusURL, err := getStatusURL()
if err != nil {
writeError(os.Stdout, err)
return err
}

getAndWriteStatus(os.Stdout)
getAndWriteStatus(statusURL, os.Stdout)
return nil
}
Loading

0 comments on commit df865bf

Please sign in to comment.