Skip to content

Commit

Permalink
Add prometheus to tools (knative#280)
Browse files Browse the repository at this point in the history
* Add prometheus to tools

* Add blank lines

* Return errors if any when querying prometheus
  • Loading branch information
srinivashegde86 authored and knative-prow-robot committed Nov 30, 2018
1 parent 7ed3240 commit 2377fbc
Show file tree
Hide file tree
Showing 71 changed files with 10,134 additions and 1 deletion.
60 changes: 59 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

151 changes: 151 additions & 0 deletions tools/prometheus/prometheus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
Copyright 2018 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package prometheus

import (
"context"
"errors"
"fmt"
"os"
"os/exec"
"strings"
"time"

"github.com/knative/pkg/test/logging"
"github.com/prometheus/client_golang/api"
"github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
)

const (
prometheusPort = "9090"
appLabel = "app=prometheus"
)

// PromProxy defines a proxy to the prometheus server
type PromProxy struct {
Namespace string
portFwdProcess *os.Process
}

// Setup performs a port forwarding for app prometheus-test in given namespace
func (p *PromProxy) Setup(ctx context.Context, logger *logging.BaseLogger) error {
return p.portForward(ctx, logger, appLabel, prometheusPort, prometheusPort)
}

// Teardown will kill the port forwarding process if running
func (p *PromProxy) Teardown(logger *logging.BaseLogger) error {
logger.Info("Cleaning up prom proxy")
if p.portFwdProcess != nil {
return p.portFwdProcess.Kill()
}
return nil
}

// PortForward sets up local port forward to the pod specified by the "app" label in the given namespace
func (p *PromProxy) portForward(ctx context.Context, logger *logging.BaseLogger, labelSelector, localPort, remotePort string) error {
var pod string
var err error

getName := fmt.Sprintf("kubectl -n %s get pod -l %s -o jsonpath='{.items[0].metadata.name}'", p.Namespace, labelSelector)
pod, err = p.execShellCmd(ctx, logger, getName)
if err != nil {
return err
}
logger.Infof("%s pod name: %s", labelSelector, pod)

logger.Infof("Setting up %s proxy", labelSelector)
portFwdCmd := fmt.Sprintf("kubectl port-forward %s %s:%s -n %s", strings.Trim(pod, "'"), localPort, remotePort, p.Namespace)
if p.portFwdProcess, err = p.executeCmdBackground(logger, portFwdCmd); err != nil {
logger.Errorf("Failed to port forward: %s", err)
return err
}
logger.Infof("running %s port-forward in background, pid = %d", labelSelector, p.portFwdProcess.Pid)
return nil
}

// RunBackground starts a background process and returns the Process if succeed
func (p *PromProxy) executeCmdBackground(logger *logging.BaseLogger, format string, args ...interface{}) (*os.Process, error) {
cmd := fmt.Sprintf(format, args...)
logger.Infof("Executing command: %s", cmd)
parts := strings.Split(cmd, " ")
c := exec.Command(parts[0], parts[1:]...) // #nosec
err := c.Start()
if err != nil {
logger.Errorf("%s, command failed!", cmd)
return nil, err
}
return c.Process, nil
}

// ExecuteShellCmd executes a shell command
func (p *PromProxy) execShellCmd(ctx context.Context, logger *logging.BaseLogger, format string, args ...interface{}) (string, error) {
cmd := fmt.Sprintf(format, args...)
logger.Infof("Executing command: %s", cmd)
c := exec.CommandContext(ctx, "sh", "-c", cmd) // #nosec
bytes, err := c.CombinedOutput()
if err != nil {
logger.Infof("Command error: %v", err)
return string(bytes), fmt.Errorf("command failed: %q %v", string(bytes), err)
}

if output := strings.TrimSuffix(string(bytes), "\n"); len(output) > 0 {
logger.Infof("Command output: \n%s", output)
}

return string(bytes), nil
}

// PromAPI gets a handle to the prometheus API
func PromAPI() (v1.API, error) {
client, err := api.NewClient(api.Config{Address: fmt.Sprintf("http://localhost:%s", prometheusPort)})
if err != nil {
return nil, err
}
return v1.NewAPI(client), nil
}

// AllowPrometheusSync sleeps for sometime to allow prometheus time to scrape the metrics.
func AllowPrometheusSync(logger *logging.BaseLogger) {
logger.Info("Sleeping to allow prometheus to record metrics...")
time.Sleep(30 * time.Second)
}

// RunQuery runs a prometheus query and returns the metric value
func RunQuery(ctx context.Context, logger *logging.BaseLogger, promAPI v1.API, query string) (float64, error) {
logger.Infof("Prometheus query: %s", query)

value, err := promAPI.Query(ctx, query, time.Now())
if err != nil {
return 0, nil
}

return VectorValue(value)
}

// VectorValue gets the vector value from the value type
func VectorValue(val model.Value) (float64, error) {
if val.Type() != model.ValVector {
return 0, fmt.Errorf("Value type is %s. Expected: Valvector", val.String())
}
value := val.(model.Vector)
if len(value) == 0 {
return 0, errors.New("Query returned no results")
}

return float64(value[0].Value), nil
}
114 changes: 114 additions & 0 deletions tools/prometheus/prometheus_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
Copyright 2018 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package prometheus_test

import (
"context"
"testing"
"time"

"github.com/knative/pkg/test/logging"
"github.com/knative/test-infra/tools/prometheus"
"github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
)

const (
expected = 1.0
query = "test"
)

type TestPromAPI struct {
}

// AlertManagers returns an overview of the current state of the Prometheus alert manager discovery.
func (tpa *TestPromAPI) AlertManagers(ctx context.Context) (v1.AlertManagersResult, error) {
return v1.AlertManagersResult{}, nil
}

// CleanTombstones removes the deleted data from disk and cleans up the existing tombstones.
func (tpa *TestPromAPI) CleanTombstones(ctx context.Context) error {
return nil
}

// Config returns the current Prometheus configuration.
func (tpa *TestPromAPI) Config(ctx context.Context) (v1.ConfigResult, error) {
return v1.ConfigResult{}, nil
}

// DeleteSeries deletes data for a selection of series in a time range.
func (tpa *TestPromAPI) DeleteSeries(ctx context.Context, matches []string, startTime time.Time, endTime time.Time) error {
return nil
}

// Flags returns the flag values that Prometheus was launched with.
func (tpa *TestPromAPI) Flags(ctx context.Context) (v1.FlagsResult, error) {
return v1.FlagsResult{}, nil
}

// LabelValues performs a query for the values of the given label.
func (tpa *TestPromAPI) LabelValues(ctx context.Context, label string) (model.LabelValues, error) {
return nil, nil
}

// Query performas a query on the prom api
func (tpa *TestPromAPI) Query(c context.Context, query string, ts time.Time) (model.Value, error) {
s := model.Sample{Value: expected}
var v []*model.Sample
v = append(v, &s)

return model.Vector(v), nil
}

// QueryRange performs a query for the given range.
func (tpa *TestPromAPI) QueryRange(ctx context.Context, query string, r v1.Range) (model.Value, error) {
return nil, nil
}

// Series finds series by label matchers.
func (tpa *TestPromAPI) Series(ctx context.Context, matches []string, startTime time.Time, endTime time.Time) ([]model.LabelSet, error) {
return nil, nil
}

// Snapshot creates a snapshot of all current data into snapshots/<datetime>-<rand>
// under the TSDB's data directory and returns the directory as response.
func (tpa *TestPromAPI) Snapshot(ctx context.Context, skipHead bool) (v1.SnapshotResult, error) {
return v1.SnapshotResult{}, nil
}

// Targets returns an overview of the current state of the Prometheus target discovery.
func (t *TestPromAPI) Targets(ctx context.Context) (v1.TargetsResult, error) {
return v1.TargetsResult{}, nil
}

// getTestAPI gets the test api implementation for prometheus api
func getTestAPI() *TestPromAPI {
return &TestPromAPI{}
}

func TestRunQuery(t *testing.T) {
logging.InitializeLogger(true)
logger := logging.GetContextLogger("TestRunQuery")

r, err := prometheus.RunQuery(context.Background(), logger, getTestAPI(), query)
if err != nil {
t.Fatalf("Error running query: %v", err)
}
if r != expected {
t.Fatalf("Expected: %f Actual: %f", expected, r)
}
}
Loading

0 comments on commit 2377fbc

Please sign in to comment.