Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[process-agent] Refactor status build and add it to core agent #10886

Merged
merged 13 commits into from
Feb 15, 2022
Merged
34 changes: 30 additions & 4 deletions cmd/process-agent/api/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,49 @@ package api

import (
"encoding/json"
"fmt"
"net/http"

"github.com/DataDog/datadog-agent/pkg/status"
ddconfig "github.com/DataDog/datadog-agent/pkg/config"
"github.com/DataDog/datadog-agent/pkg/process/util"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

func writeError(err error, code int, w http.ResponseWriter) {
body, _ := json.Marshal(map[string]string{"error": err.Error()})
http.Error(w, string(body), code)
}

func statusHandler(w http.ResponseWriter, _ *http.Request) {
log.Trace("Received status request from process-agent")
log.Info("Got a request for the status. Making status.")

// Get expVar server address
ipcAddr, err := ddconfig.GetIPCAddress()
if err != nil {
writeError(err, http.StatusInternalServerError, w)
_ = log.Warn("config error:", err)
return
}

port := ddconfig.Datadog.GetInt("process_config.expvar_port")
if port <= 0 {
_ = log.Warnf("Invalid process_config.expvar_port -- %d, using default port %d\n", port, ddconfig.DefaultProcessExpVarPort)
port = ddconfig.DefaultProcessExpVarPort
}
expvarEndpoint := fmt.Sprintf("http://%s:%d/debug/vars", ipcAddr, port)

agentStatus, err := status.GetStatus()
agentStatus, err := util.GetStatus(expvarEndpoint)
if err != nil {
_ = log.Warn("failed to get status from agent:", agentStatus)
_ = log.Warn("failed to get status from agent:", err)
writeError(err, http.StatusInternalServerError, w)
return
}

b, err := json.Marshal(agentStatus)
if err != nil {
_ = log.Warn("failed to serialize status response from agent:", err)
writeError(err, http.StatusInternalServerError, w)
return
}

_, err = w.Write(b)
Expand Down
230 changes: 54 additions & 176 deletions cmd/process-agent/app/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,69 +11,21 @@ import (
"io"
"os"
"text/template"
"time"

"github.com/spf13/cobra"

"github.com/DataDog/datadog-agent/cmd/process-agent/api"
"github.com/DataDog/datadog-agent/pkg/api/util"
apiutil "github.com/DataDog/datadog-agent/pkg/api/util"
ddconfig "github.com/DataDog/datadog-agent/pkg/config"
"github.com/DataDog/datadog-agent/pkg/metadata/host"
"github.com/DataDog/datadog-agent/pkg/process/config"
"github.com/DataDog/datadog-agent/pkg/process/util"
ddstatus "github.com/DataDog/datadog-agent/pkg/status"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

var httpClient = util.GetClient(false)
var httpClient = apiutil.GetClient(false)

const (
statusTemplate = `
==============================
Process Agent ({{ .Core.AgentVersion }})
==============================

Status date: {{ formatUnixTime .Date }}
Process Agent Start: {{ formatUnixTime .Expvars.UptimeNano }}
Pid: {{ .Expvars.Pid }}
Go Version: {{ .Core.GoVersion }}
Python Version: {{ .Core.PythonVersion }}
Build arch: {{ .Core.Arch }}
Log Level: {{ .Core.Config.LogLevel }}
Enabled Checks: {{ .Expvars.EnabledChecks }}
Allocated Memory: {{ .Expvars.MemStats.Alloc }} bytes
Hostname: {{ .Core.Metadata.Meta.Hostname }}

=================
Process Endpoints
=================
{{- with .Expvars.Endpoints}}
{{- range $key, $value := .}}
{{$key}} - API Key{{ if gt (len $value) 1}}s{{end}} ending with:
{{- range $idx, $apikey := $value }}
- {{$apikey}}
{{- end}}
{{- end}}
{{- else }}

No endpoints information. The agent may be misconfigured.
{{- end }}

=========
Collector
=========

Last collection time: {{.Expvars.LastCollectTime}}
Docker socket: {{.Expvars.DockerSocket}}
Number of processes: {{.Expvars.ProcessCount}}
Number of containers: {{.Expvars.ContainerCount}}
Process Queue length: {{.Expvars.ProcessQueueSize}}
RTProcess Queue length: {{.Expvars.RTProcessQueueSize}}
Pod Queue length: {{.Expvars.PodQueueSize}}
Process Bytes enqueued: {{.Expvars.ProcessQueueBytes}}
RTProcess Bytes enqueued: {{.Expvars.RTProcessQueueBytes}}
Pod Bytes enqueued: {{.Expvars.PodQueueBytes}}

`
notRunning = `
=============
Process Agent
Expand All @@ -93,122 +45,6 @@ Error
`
)

type coreStatus struct {
AgentVersion string `json:"version"`
GoVersion string `json:"go_version"`
PythonVersion string `json:"python_version"`
Arch string `json:"build_arch"`
Config struct {
LogLevel string `json:"log_level"`
} `json:"config"`
Metadata host.Payload `json:"metadata"`
}

type infoVersion struct {
Version string
GitCommit string
GitBranch string
BuildDate string
GoVersion string
}

type processExpvars struct {
Pid int `json:"pid"`
Uptime int `json:"uptime"`
UptimeNano float64 `json:"uptime_nano"`
MemStats struct{ Alloc uint64 } `json:"memstats"`
Version infoVersion `json:"version"`
DockerSocket string `json:"docker_socket"`
LastCollectTime string `json:"last_collect_time"`
ProcessCount int `json:"process_count"`
ContainerCount int `json:"container_count"`
ProcessQueueSize int `json:"process_queue_size"`
RTProcessQueueSize int `json:"rtprocess_queue_size"`
PodQueueSize int `json:"pod_queue_size"`
ProcessQueueBytes int `json:"process_queue_bytes"`
RTProcessQueueBytes int `json:"rtprocess_queue_bytes"`
PodQueueBytes int `json:"pod_queue_bytes"`
ContainerID string `json:"container_id"`
ProxyURL string `json:"proxy_url"`
LogFile string `json:"log_file"`
EnabledChecks []string `json:"enabled_checks"`
Endpoints map[string][]string `json:"endpoints"`
}

type status struct {
Date float64
Core coreStatus // Contains the status from the core agent
Expvars processExpvars // Contains the expvars retrieved from the process agent
}

type statusOption func(s *status)

type connectionError struct {
error
}

func overrideTime(t time.Time) statusOption {
return func(s *status) {
s.Date = float64(t.UnixNano())
}
}

func getCoreStatus() (s coreStatus, err error) {
addressPort, err := api.GetAPIAddressPort()
if err != nil {
return coreStatus{}, fmt.Errorf("config error: %s", err.Error())
}

statusEndpoint := fmt.Sprintf("http://%s/agent/status", addressPort)
b, err := util.DoGet(httpClient, statusEndpoint)
if err != nil {
return s, connectionError{err}
}

err = json.Unmarshal(b, &s)
return
}

func getExpvars() (s processExpvars, err error) {
ipcAddr, err := ddconfig.GetIPCAddress()
if err != nil {
return processExpvars{}, fmt.Errorf("config error: %s", err.Error())
}

port := ddconfig.Datadog.GetInt("process_config.expvar_port")
if port <= 0 {
_ = log.Warnf("Invalid process_config.expvar_port -- %d, using default port %d\n", port, ddconfig.DefaultProcessExpVarPort)
port = ddconfig.DefaultProcessExpVarPort
}
expvarEndpoint := fmt.Sprintf("http://%s:%d/debug/vars", ipcAddr, port)
b, err := util.DoGet(httpClient, expvarEndpoint)
if err != nil {
return s, connectionError{err}
}

err = json.Unmarshal(b, &s)
return
}

func getStatus() (status, error) {
coreStatus, err := getCoreStatus()
if err != nil {
return status{}, err
}

processStatus, err := getExpvars()
if err != nil {
return status{}, err
}

s := status{
Date: float64(time.Now().UnixNano()),
Core: coreStatus,
Expvars: processStatus,
}
return s, nil
}

func writeNotRunning(w io.Writer) {
_, err := fmt.Fprint(w, notRunning)
if err != nil {
Expand All @@ -227,34 +63,68 @@ func writeError(w io.Writer, e error) {
_ = log.Error(err)
}
}
func fetchStatus(statusURL string) ([]byte, error) {
body, err := apiutil.DoGet(httpClient, statusURL)
if err != nil {
return nil, util.NewConnectionError(err)
}

return body, nil
}

// getAndWriteStatus calls the status server and writes it to `w`
func getAndWriteStatus(w io.Writer, options ...statusOption) {
status, err := getStatus()
func getAndWriteStatus(statusURL string, w io.Writer, options ...util.StatusOption) {
body, err := fetchStatus(statusURL)
if err != nil {
switch err.(type) {
case connectionError:
case util.ConnectionError:
writeNotRunning(w)
default:
writeError(w, err)
}
return
}
for _, option := range options {
option(&status)

// If options to override the status are provided, we need to deserialize and serialize it again
if len(options) > 0 {
var s util.Status
err = json.Unmarshal(body, &s)
if err != nil {
writeError(w, err)
return
}

for _, option := range options {
option(&s)
}

body, err = json.Marshal(s)
if err != nil {
writeError(w, err)
return
}
}

tpl, err := template.New("").Funcs(ddstatus.Textfmap()).Parse(statusTemplate)
stats, err := ddstatus.FormatProcessAgentStatus(body)
if err != nil {
_ = log.Error(err)
writeError(w, err)
return
}

err = tpl.Execute(w, status)
_, err = w.Write([]byte(stats))
if err != nil {
_ = log.Error(err)
}
}

func getStatusURL() (string, error) {
addressPort, err := api.GetAPIAddressPort()
if err != nil {
return "", fmt.Errorf("config error: %s", err.Error())
}
return fmt.Sprintf("http://%s/agent/status", addressPort), nil
}

// StatusCmd returns a cobra command that prints the current status
func StatusCmd() *cobra.Command {
return &cobra.Command{
Expand All @@ -269,6 +139,7 @@ func runStatus(cmd *cobra.Command, _ []string) error {
err := config.LoadConfigIfExists(cmd.Flag("config").Value.String())
if err != nil {
writeError(os.Stdout, err)
return err
}

err = ddconfig.SetupLogger(
Expand All @@ -282,8 +153,15 @@ func runStatus(cmd *cobra.Command, _ []string) error {
)
if err != nil {
writeError(os.Stdout, err)
return err
}

statusURL, err := getStatusURL()
if err != nil {
writeError(os.Stdout, err)
return err
}

getAndWriteStatus(os.Stdout)
getAndWriteStatus(statusURL, os.Stdout)
return nil
}
Loading