Skip to content

Commit

Permalink
Metrics import/export
Browse files Browse the repository at this point in the history
Signed-off-by: Raul Sevilla <[email protected]>
  • Loading branch information
rsevilla87 committed Jun 23, 2021
1 parent e0ed8a6 commit 597b80c
Show file tree
Hide file tree
Showing 7 changed files with 218 additions and 29 deletions.
69 changes: 59 additions & 10 deletions cmd/kube-burner/kube-burner.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func initCmd() *cobra.Command {
log.Infof("🔥 Starting kube-burner with UUID %s", uuid)
err := config.Parse(configFile, true)
if err != nil {
log.Fatal(err)
log.Fatal(err.Error())
}
if url != "" {
prometheusClient, err = prometheus.NewPrometheusClient(url, token, username, password, uuid, skipTLSVerify, prometheusStep)
Expand Down Expand Up @@ -138,7 +138,7 @@ func destroyCmd() *cobra.Command {
if configFile != "" {
err := config.Parse(configFile, false)
if err != nil {
log.Fatal(err)
log.Fatal(err.Error())
}
}
selector := util.NewSelector()
Expand All @@ -162,18 +162,21 @@ func indexCmd() *cobra.Command {
var username, password, uuid, token string
var skipTLSVerify bool
var prometheusStep time.Duration
var indexer *indexers.Indexer
cmd := &cobra.Command{
Use: "index",
Short: "Index metrics from the given time range",
Short: "Index kube-burner metrics",
Args: cobra.MaximumNArgs(0),
Run: func(cmd *cobra.Command, args []string) {
err := config.Parse(configFile, false)
if err != nil {
log.Fatal(err)
log.Fatal(err.Error())
}
var indexer *indexers.Indexer
if config.ConfigSpec.GlobalConfig.IndexerConfig.Enabled {
indexer = indexers.NewIndexer()
indexer, err = indexers.NewIndexer()
if err != nil {
log.Fatal(err.Error())
}
}
p, err := prometheus.NewPrometheusClient(url, token, username, password, uuid, skipTLSVerify, prometheusStep)
if err != nil {
Expand All @@ -188,6 +191,12 @@ func indexCmd() *cobra.Command {
if err := p.ScrapeMetrics(startTime, endTime, indexer); err != nil {
log.Error(err)
}
if config.ConfigSpec.GlobalConfig.WriteToFile && config.ConfigSpec.GlobalConfig.CreateTarball {
err = prometheus.CreateTarball(config.ConfigSpec.GlobalConfig.MetricsDirectory)
if err != nil {
log.Fatal(err.Error())
}
}
},
}
cmd.Flags().StringVar(&uuid, "uuid", "", "Benchmark UUID")
Expand All @@ -201,13 +210,39 @@ func indexCmd() *cobra.Command {
cmd.Flags().Int64VarP(&start, "start", "", time.Now().Unix()-3600, "Epoch start time")
cmd.Flags().Int64VarP(&end, "end", "", time.Now().Unix(), "Epoch end time")
cmd.Flags().StringVarP(&configFile, "config", "c", "", "Config file path or URL")
cmd.MarkFlagRequired("prometheus-url")
cmd.MarkFlagRequired("uuid")
cmd.MarkFlagRequired("prometheus-url")
cmd.MarkFlagRequired("config")
cmd.Flags().SortFlags = false
return cmd
}

func importCmd() *cobra.Command {
var configFile, tarball string
cmd := &cobra.Command{
Use: "import",
Short: "Import metrics tarball",
Run: func(cmd *cobra.Command, args []string) {
err := config.Parse(configFile, false)
if err != nil {
log.Fatal(err.Error())
}
indexer, err := indexers.NewIndexer()
if err != nil {
log.Fatal(err.Error())
}
err = prometheus.ImportTarball(tarball, config.ConfigSpec.GlobalConfig.IndexerConfig.DefaultIndex, indexer)
if err != nil {
log.Fatal(err.Error())
}
},
}
cmd.Flags().StringVarP(&configFile, "config", "c", "", "Config file path or URL")
cmd.Flags().StringVar(&tarball, "tarball", "", "Metrics tarball file")
cmd.MarkFlagRequired("config")
return cmd
}

func alertCmd() *cobra.Command {
var url, alertProfile string
var start, end int64
Expand Down Expand Up @@ -252,9 +287,13 @@ func alertCmd() *cobra.Command {
func steps(uuid string, p *prometheus.Prometheus, alertM *alerting.AlertManager) {
verification := true
var rc int
var err error
var indexer *indexers.Indexer
if config.ConfigSpec.GlobalConfig.IndexerConfig.Enabled {
indexer = indexers.NewIndexer()
indexer, err = indexers.NewIndexer()
if err != nil {
log.Fatal(err.Error())
}
}
restConfig, err := config.GetRestConfig(0, 0)
if err != nil {
Expand Down Expand Up @@ -295,7 +334,10 @@ func steps(uuid string, p *prometheus.Prometheus, alertM *alerting.AlertManager)
elapsedTime := time.Now().UTC().Sub(jobList[jobPosition].Start).Seconds()
log.Infof("Job %s took %.2f seconds", job.Config.Name, elapsedTime)
if config.ConfigSpec.GlobalConfig.IndexerConfig.Enabled {
burner.IndexMetadataInfo(indexer, uuid, elapsedTime, job.Config, jobList[jobPosition].Start)
err := burner.IndexMetadataInfo(indexer, uuid, elapsedTime, job.Config, jobList[jobPosition].Start)
if err != nil {
log.Errorf(err.Error())
}
}
}
if p != nil {
Expand All @@ -311,7 +353,13 @@ func steps(uuid string, p *prometheus.Prometheus, alertM *alerting.AlertManager)
// If prometheus is enabled query metrics from the start of the first job to the end of the last one
if len(p.MetricsProfile.Metrics) > 0 {
if err := p.ScrapeJobsMetrics(jobList, indexer); err != nil {
log.Error(err)
log.Fatal(err.Error())
}
if config.ConfigSpec.GlobalConfig.WriteToFile && config.ConfigSpec.GlobalConfig.CreateTarball {
err = prometheus.CreateTarball(config.ConfigSpec.GlobalConfig.MetricsDirectory)
if err != nil {
log.Fatal(err.Error())
}
}
}
}
Expand All @@ -328,6 +376,7 @@ func main() {
destroyCmd(),
indexCmd(),
alertCmd(),
importCmd(),
)
logLevel := rootCmd.PersistentFlags().String("log-level", "info", "Allowed values: debug, info, warn, error, fatal")
rootCmd.PersistentPreRun = func(cmd *cobra.Command, args []string) {
Expand Down
26 changes: 25 additions & 1 deletion pkg/burner/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
package burner

import (
"encoding/json"
"fmt"
"os"
"path"
"time"

"github.com/cloud-bulldozer/kube-burner/log"
Expand All @@ -33,7 +37,7 @@ type metadata struct {
const jobSummary = "jobSummary"

// IndexMetadataInfo Generates and indexes a document with metadata information of the passed job
func IndexMetadataInfo(indexer *indexers.Indexer, uuid string, elapsedTime float64, jobConfig config.Job, timestamp time.Time) {
func IndexMetadataInfo(indexer *indexers.Indexer, uuid string, elapsedTime float64, jobConfig config.Job, timestamp time.Time) error {
metadataInfo := []interface{}{
metadata{
UUID: uuid,
Expand All @@ -43,6 +47,26 @@ func IndexMetadataInfo(indexer *indexers.Indexer, uuid string, elapsedTime float
Timestamp: timestamp,
},
}
if config.ConfigSpec.GlobalConfig.WriteToFile {
filename := fmt.Sprintf("%s-metadata.json", jobConfig.Name)
if config.ConfigSpec.GlobalConfig.MetricsDirectory != "" {
if err := os.MkdirAll(config.ConfigSpec.GlobalConfig.MetricsDirectory, 0744); err != nil {
return fmt.Errorf("Error creating metrics directory: %v: ", err)
}
filename = path.Join(config.ConfigSpec.GlobalConfig.MetricsDirectory, filename)
log.Infof("Writing to: %s", filename)
f, err := os.Create(filename)
if err != nil {
return fmt.Errorf("Error creating %s: %v", filename, err)
}
defer f.Close()
jsonEnc := json.NewEncoder(f)
if err := jsonEnc.Encode(metadataInfo); err != nil {
return fmt.Errorf("JSON encoding error: %s", err)
}
}
}
log.Infof("Indexing metadata information for job: %s", jobConfig.Name)
(*indexer).Index(config.ConfigSpec.GlobalConfig.IndexerConfig.DefaultIndex, metadataInfo)
return nil
}
2 changes: 2 additions & 0 deletions pkg/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ type GlobalConfig struct {
IndexerConfig IndexerConfig `yaml:"indexerConfig"`
// Write prometheus metrics to a file
WriteToFile bool `yaml:"writeToFile"`
// Create tarball
CreateTarball bool `yaml:"createTarball"`
// Directory to save metrics files in
MetricsDirectory string `yaml:"metricsDirectory"`
// Measurements describes a list of measurements kube-burner
Expand Down
29 changes: 21 additions & 8 deletions pkg/indexers/elastic.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ package indexers
import (
"bytes"
"context"
"crypto/sha256"
"crypto/tls"
"encoding/hex"
"encoding/json"
"fmt"
"net/http"
"runtime"
"time"
Expand All @@ -41,28 +44,31 @@ func init() {
indexerMap[elastic] = &Elastic{}
}

func (esIndexer *Elastic) new() {
func (esIndexer *Elastic) new() error {
esConfig := config.ConfigSpec.GlobalConfig.IndexerConfig
cfg := elasticsearch.Config{
Addresses: esConfig.ESServers,
Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: esConfig.InsecureSkipVerify}},
}
ESClient, err := elasticsearch.NewClient(cfg)
if err != nil {
log.Errorf("Error creating the ES client: %s", err)
return fmt.Errorf("Error creating the ES client: %s", err)
}
r, err := ESClient.Cluster.Health()
if err != nil {
log.Fatalf("ES health check failed: %s", err)
return fmt.Errorf("ES health check failed: %s", err)
}
if r.StatusCode != 200 {
log.Fatalf("Unexpected ES status code: %d", r.StatusCode)
return fmt.Errorf("Unexpected ES status code: %d", r.StatusCode)
}
esIndexer.client = ESClient
return nil
}

// Index uses bulkIndexer to index the documents in the given index
func (esIndexer *Elastic) Index(index string, documents []interface{}) {
var statusCount int
hasher := sha256.New()
bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Client: esIndexer.client,
Index: index,
Expand All @@ -87,21 +93,28 @@ func (esIndexer *Elastic) Index(index string, documents []interface{}) {
if err != nil {
log.Errorf("Cannot encode document %s: %s", document, err)
}
hasher.Write(j)
err = bi.Add(
context.Background(),
esutil.BulkIndexerItem{
Action: "index",
Body: bytes.NewReader(j),
Action: "index",
Body: bytes.NewReader(j),
DocumentID: hex.EncodeToString(hasher.Sum(nil)),
OnSuccess: func(c context.Context, bii esutil.BulkIndexerItem, biri esutil.BulkIndexerResponseItem) {
if biri.Status == 201 {
statusCount++
}
},
},
)
if err != nil {
log.Errorf("Unexpected ES indexing error: %s", err)
}
hasher.Reset()
}
if err := bi.Close(context.Background()); err != nil {
log.Fatalf("Unexpected ES error: %s", err)
}
stats := bi.Stats()
dur := time.Since(start)
log.Infof("Successfully indexed [%d] documents in %s in %s", stats.NumFlushed, dur.Truncate(time.Millisecond), index)
log.Infof("Indexed [%d] documents in %s in %s", statusCount, dur.Truncate(time.Millisecond), index)
}
11 changes: 7 additions & 4 deletions pkg/indexers/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,24 @@ import (
// Indexer indexer interface
type Indexer interface {
Index(string, []interface{})
new()
new() error
}

var indexerMap = make(map[string]Indexer)

// NewIndexer creates a new Indexer with the specified IndexerConfig
func NewIndexer() *Indexer {
func NewIndexer() (*Indexer, error) {
var indexer Indexer
var exists bool
cfg := config.ConfigSpec.GlobalConfig.IndexerConfig
if indexer, exists = indexerMap[cfg.Type]; exists {
log.Infof("📁 Creating indexer: %s", cfg.Type)
indexer.new()
err := indexer.new()
if err != nil {
return &indexer, err
}
} else {
log.Fatalf("Indexer not found: %s", cfg.Type)
}
return &indexer
return &indexer, nil
}
13 changes: 7 additions & 6 deletions pkg/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type MetricsProfile struct {
Metrics []metricDefinition `yaml:"metrics"`
}

type metric struct {
type Metric struct {
Timestamp time.Time `json:"timestamp"`
Labels map[string]string `json:"labels"`
Value float64 `json:"value"`
Expand Down Expand Up @@ -198,22 +198,23 @@ func (p *Prometheus) scrapeMetrics(jobList []burner.Executor, start, end time.Ti
if config.ConfigSpec.GlobalConfig.MetricsDirectory != "" {
err = os.MkdirAll(config.ConfigSpec.GlobalConfig.MetricsDirectory, 0744)
if err != nil {
return fmt.Errorf("Error creating metrics directory %s: ", err)
return fmt.Errorf("Error creating metrics directory: %v: ", err)
}
filename = path.Join(config.ConfigSpec.GlobalConfig.MetricsDirectory, filename)
}
log.Infof("Writing to: %s", filename)
f, err := os.Create(filename)
defer f.Close()
if err != nil {
log.Errorf("Error creating metrics file %s: %s", filename, err)
continue
}
jsonEnc := json.NewEncoder(f)
jsonEnc.SetIndent("", " ")
err = jsonEnc.Encode(metrics)
if err != nil {
log.Errorf("JSON encoding error: %s", err)

}
f.Close()
}
if config.ConfigSpec.GlobalConfig.IndexerConfig.Enabled {
indexName := config.ConfigSpec.GlobalConfig.IndexerConfig.DefaultIndex
Expand All @@ -232,7 +233,7 @@ func (p *Prometheus) parseVector(metricName, query, jobName string, value model.
return prometheusError(fmt.Errorf("Unsupported result format: %s", value.Type().String()))
}
for _, v := range data {
m := metric{
m := Metric{
Labels: make(map[string]string),
UUID: p.uuid,
Query: query,
Expand Down Expand Up @@ -264,7 +265,7 @@ func (p *Prometheus) parseMatrix(metricName, query string, jobList []burner.Exec
for _, v := range data {
for _, val := range v.Values {
jobName := getJobName(val.Timestamp.Time(), jobList)
m := metric{
m := Metric{
Labels: make(map[string]string),
UUID: p.uuid,
Query: query,
Expand Down
Loading

0 comments on commit 597b80c

Please sign in to comment.