From a90c4fc1f4877cb1048481d9f7bbb6cfa76aa72e Mon Sep 17 00:00:00 2001 From: Marcus Ramberg Date: Thu, 26 Sep 2024 12:23:41 +0200 Subject: [PATCH 1/2] feat: Add workflow run metrics --- .gitignore | 1 + pkg/action/server.go | 34 +++++ pkg/exporter/workflow_job_test.go | 100 +++++++++++++ pkg/store/chai.go | 71 ++++++--- pkg/store/generic_workflow_job.go | 231 ++++++++++++++++++++++++++++++ pkg/store/mysql.go | 15 ++ pkg/store/postgres.go | 15 ++ pkg/store/sqlite.go | 15 ++ pkg/store/store.go | 21 +-- pkg/store/types.go | 63 +++++++- 10 files changed, 538 insertions(+), 28 deletions(-) create mode 100644 pkg/exporter/workflow_job_test.go create mode 100644 pkg/store/generic_workflow_job.go diff --git a/.gitignore b/.gitignore index f09a7957..3117e5a4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ .direnv +.envrc .devenv coverage.out diff --git a/pkg/action/server.go b/pkg/action/server.go index 1b8de7d7..33e54e3f 100644 --- a/pkg/action/server.go +++ b/pkg/action/server.go @@ -280,6 +280,40 @@ func handler(cfg *config.Config, db store.Store, logger *slog.Logger, client *gi w.Header().Set("Content-Type", "text/plain") w.WriteHeader(http.StatusInternalServerError) + io.WriteString(w, http.StatusText(http.StatusInternalServerError)) + } + case *github.WorkflowJobEvent: + wf_job := event.WorkflowJob + level.Debug(logger).Log( + "msg", "received webhook request", + "type", "workflow_job", + "owner", event.Repo.Owner.Login, + "repo", event.Repo.Name, + "id", wf_job.ID, + "name", wf_job.Name, + "attempt", wf_job.RunAttempt, + "status", wf_job.Status, + "conclusion", wf_job.Conclusion, + "created_at", wf_job.CreatedAt.Time.Unix(), + "started_at", wf_job.StartedAt.Time.Unix(), + "completed_at", wf_job.GetCompletedAt().Time.Unix(), + "labels", strings.Join(event.WorkflowJob.Labels, ", "), + ) + + if err := db.StoreWorkflowJobEvent(event); err != nil { + level.Error(logger).Log( + "msg", "failed to store github event", + "type", "workflow_job", + "owner", event.Repo.Owner.Login, + "repo", event.Repo.Name, + "name", wf_job.Name, + "id", wf_job.ID, + "error", err, + ) + + w.Header().Set("Content-Type", "text/plain") + w.WriteHeader(http.StatusInternalServerError) + io.WriteString(w, http.StatusText(http.StatusInternalServerError)) } } diff --git a/pkg/exporter/workflow_job_test.go b/pkg/exporter/workflow_job_test.go new file mode 100644 index 00000000..8bfdfbcb --- /dev/null +++ b/pkg/exporter/workflow_job_test.go @@ -0,0 +1,100 @@ +package exporter + +import ( + "reflect" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/google/go-github/v63/github" + "github.com/prometheus/client_golang/prometheus" + "github.com/promhippie/github_exporter/pkg/config" + "github.com/promhippie/github_exporter/pkg/store" +) + +type StaticStore struct{} + +func (s StaticStore) GetWorkflowJobRuns(owner, repo, workflow string) ([]*store.WorkflowRun, error) { + return nil, nil +} + +func (s StaticStore) StoreWorkflowRunEvent(*github.WorkflowRunEvent) error { return nil } + +func (s StaticStore) GetWorkflowRuns() ([]*store.WorkflowRun, error) { return nil, nil } +func (s StaticStore) PruneWorkflowRuns(time.Duration) error { return nil } + +// WorkflowJobEvent +func (s StaticStore) StoreWorkflowJobEvent(*github.WorkflowJobEvent) error { return nil } +func (s StaticStore) GetWorkflowJobs() ([]*store.WorkflowJob, error) { return nil, nil } +func (s StaticStore) PruneWorkflowJobs(time.Duration) error { return nil } + +func (s StaticStore) Open() error { return nil } +func (s StaticStore) Close() error { return nil } +func (s StaticStore) Ping() error { return nil } +func (s StaticStore) Migrate() error { return nil } + +func TestWorkflowJobCollector(t *testing.T) { + // Mock dependencies + mockClient := &github.Client{} + mockLogger := log.NewNopLogger() + mockStore := StaticStore{} + mockFailures := prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "test_failures_total", + Help: "Total number of test failures", + }, []string{"type"}) + mockDuration := prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "test_duration_seconds", + Help: "Duration of test", + }, []string{"type"}) + mockConfig := config.Target{} + + // Initialize WorkflowJobCollector + collector := &WorkflowJobCollector{ + client: mockClient, + logger: mockLogger, + db: mockStore, + failures: mockFailures, + duration: mockDuration, + config: mockConfig, + Status: prometheus.NewDesc( + "workflow_job_status", + "Status of the workflow job", + nil, nil, + ), + Duration: prometheus.NewDesc( + "workflow_job_duration_seconds", + "Duration of the workflow job", + nil, nil, + ), + Creation: prometheus.NewDesc( + "workflow_job_creation_timestamp_seconds", + "Creation time of the workflow job", + nil, nil, + ), + Created: prometheus.NewDesc( + "workflow_job_created_timestamp_seconds", + "Created time of the workflow job", + nil, nil, + ), + } + + // Verify initialization + if collector.client != mockClient { + t.Errorf("Expected client to be %v, got %v", mockClient, collector.client) + } + if collector.logger != mockLogger { + t.Errorf("Expected logger to be %v, got %v", mockLogger, collector.logger) + } + if collector.db != mockStore { + t.Errorf("Expected store to be %v, got %v", mockStore, collector.db) + } + if collector.failures != mockFailures { + t.Errorf("Expected failures to be %v, got %v", mockFailures, collector.failures) + } + if collector.duration != mockDuration { + t.Errorf("Expected duration to be %v, got %v", mockDuration, collector.duration) + } + if !reflect.DeepEqual(collector.config, mockConfig) { + t.Errorf("Expected config to be %v, got %v", mockConfig, collector.config) + } +} diff --git a/pkg/store/chai.go b/pkg/store/chai.go index 9bed9830..60ea29ef 100644 --- a/pkg/store/chai.go +++ b/pkg/store/chai.go @@ -18,12 +18,11 @@ import ( _ "github.com/chaisql/chai/driver" ) -var ( - chaiMigrations = []darwin.Migration{ - { - Version: 1, - Description: "Creating table workflow_runs", - Script: `CREATE TABLE workflow_runs ( +var chaiMigrations = []darwin.Migration{ + { + Version: 1, + Description: "Creating table workflow_runs", + Script: `CREATE TABLE workflow_runs ( owner TEXT NOT NULL, repo TEXT NOT NULL, workflow_id INTEGER NOT NULL, @@ -41,14 +40,41 @@ var ( started_at INTEGER, PRIMARY KEY(owner, repo, workflow_id, number) );`, - }, - { - Version: 2, - Description: "Adding actor column to workflow_runs table", - Script: `ALTER TABLE workflow_runs ADD COLUMN actor TEXT;`, - }, - } -) + }, + { + Version: 2, + Description: "Adding actor column to workflow_runs table", + Script: `ALTER TABLE workflow_runs ADD COLUMN actor TEXT;`, + }, + { + Version: 3, + Description: "Creating table workflow_jobs", + Script: `CREATE TABLE workflow_jobs ( + owner TEXT NOT NULL, + repo TEXT NOT NULL, + name TEXT, + status TEXT, + branch TEXT, + sha TEXT, + conclusion TEXT, + labels TEXT, + identifier INTEGER, + + run_id INTEGER NOT NULL, + run_attempt INTEGER NOT NULL, + + created_at INTEGER, + started_at INTEGER, + completed_at INTEGER, + runner_id INTEGER, + runner_name TEXT, + runner_group_id INTEGER, + runner_group_name TEXT, + workflow_name TEXT, + PRIMARY KEY(owner, repo, identifier) + );`, + }, +} func init() { register("chai", NewChaiStore) @@ -70,7 +96,6 @@ func (s *chaiStore) Open() (err error) { s.driver, s.dsn(), ) - if err != nil { return err } @@ -121,6 +146,21 @@ func (s *chaiStore) PruneWorkflowRuns(timeframe time.Duration) error { return pruneWorkflowRuns(s.handle, timeframe) } +// StoreWorkflowJobEvent implements the Store interface. +func (s *chaiStore) StoreWorkflowJobEvent(event *github.WorkflowJobEvent) error { + return storeWorkflowJobEvent(s.handle, event) +} + +// GetWorkflowJobs implements the Store interface. +func (s *chaiStore) GetWorkflowJobs() ([]*WorkflowJob, error) { + return getWorkflowJobs(s.handle) +} + +// PruneWorkflowJobs implements the Store interface. +func (s *chaiStore) PruneWorkflowJobs(timeframe time.Duration) error { + return pruneWorkflowJobs(s.handle, timeframe) +} + func (s *chaiStore) dsn() string { if len(s.meta) > 0 { return fmt.Sprintf( @@ -136,7 +176,6 @@ func (s *chaiStore) dsn() string { // NewChaiStore initializes a new MySQL store. func NewChaiStore(dsn string, logger *slog.Logger) (Store, error) { parsed, err := url.Parse(dsn) - if err != nil { return nil, fmt.Errorf("failed to parse dsn: %w", err) } diff --git a/pkg/store/generic_workflow_job.go b/pkg/store/generic_workflow_job.go new file mode 100644 index 00000000..55a93780 --- /dev/null +++ b/pkg/store/generic_workflow_job.go @@ -0,0 +1,231 @@ +package store + +import ( + "database/sql" + "errors" + "fmt" + "strings" + "time" + + "github.com/google/go-github/v63/github" + "github.com/jmoiron/sqlx" +) + +// storeWorkflowJobEvent handles workflow_run events from GitHub. +func storeWorkflowJobEvent(handle *sqlx.DB, event *github.WorkflowJobEvent) error { + job := event.WorkflowJob + + record := &WorkflowJob{ + Owner: *event.Repo.Owner.Login, + Repo: *event.Repo.Name, + Name: *job.Name, + Status: *job.Status, + Conclusion: job.GetConclusion(), + Branch: *job.HeadBranch, + SHA: *job.HeadSHA, + Identifier: *event.WorkflowJob.ID, + RunID: *job.RunID, + RunAttempt: int(*job.RunAttempt), + CreatedAt: job.CreatedAt.Time.Unix(), + StartedAt: job.StartedAt.Time.Unix(), + CompletedAt: job.GetCompletedAt().Time.Unix(), + Labels: strings.Join(job.Labels, ","), + RunnerID: *job.RunID, + RunnerName: job.GetRunnerName(), + RunnerGroupID: job.GetRunnerGroupID(), + RunnerGroupName: job.GetRunnerGroupName(), + WorkflowName: *job.WorkflowName, + } + + return createOrUpdateWorkflowJob(handle, record) +} + +// createOrUpdateWorkflowJob creates or updates the record. +func createOrUpdateWorkflowJob(handle *sqlx.DB, record *WorkflowJob) error { + existing := &WorkflowJob{} + stmt, err := handle.PrepareNamed(findWorkflowJobQuery) + + if err != nil && !errors.Is(err, sql.ErrNoRows) { + return fmt.Errorf("failed to prepare find: %w", err) + } + + if err := stmt.Get(existing, record); err != nil && !errors.Is(err, sql.ErrNoRows) { + return fmt.Errorf("failed to find record: %w", err) + } + + if existing.Identifier == 0 { + if _, err := handle.NamedExec( + createWorkflowJobQuery, + record, + ); err != nil { + return fmt.Errorf("failed to create record: %w", err) + } + } else { + // FIXME: UpdatedAt does not exist, does it make sense to use CreatedAt? + if existing.CreatedAt > record.CreatedAt { + return nil + } else if existing.CreatedAt == record.CreatedAt && existing.Status == "completed" { + // The updatedAt timestamp is in seconds, so if the existing record has + // the same timestamp as the new record, and the status is "completed", + // we can safely ignore the update. + return nil + } + + if _, err := handle.NamedExec( + updateWorkflowJobQuery, + record, + ); err != nil { + return fmt.Errorf("failed to update record: %w", err) + } + } + + return nil +} + +// getWorkflowJobs retrieves the workflow jobs from the database. +func getWorkflowJobs(handle *sqlx.DB) ([]*WorkflowJob, error) { + records := make([]*WorkflowJob, 0) + + rows, err := handle.Queryx( + selectWorkflowJobsQuery, + ) + + if err != nil { + return records, err + } + + defer rows.Close() + + for rows.Next() { + record := &WorkflowJob{} + + if err := rows.StructScan( + record, + ); err != nil { + return records, err + } + + records = append( + records, + record, + ) + } + + if err := rows.Err(); err != nil { + return records, err + } + + return records, nil +} + +// pruneWorkflowJobs prunes older workflow job records. +func pruneWorkflowJobs(handle *sqlx.DB, timeframe time.Duration) error { + if _, err := handle.NamedExec( + purgeWorkflowJobsQuery, + map[string]interface{}{ + "timeframe": time.Now().Add(-timeframe).Unix(), + }, + ); err != nil { + return fmt.Errorf("failed to prune workflow jobs: %w", err) + } + + return nil +} + +var selectWorkflowJobsQuery = ` +SELECT + owner, + repo, + name, + status, + conclusion, + branch, + sha, + identifier, + run_id, + run_attempt, + created_at, + started_at, + completed_at, + labels, + runner_id, + runner_name, + runner_group_id, + runner_group_name, + workflow_name +FROM + workflow_jobs +ORDER BY + created_at ASC;` + +var findWorkflowJobQuery = ` +SELECT + identifier +FROM + workflow_jobs +WHERE + owner=:owner AND repo=:repo AND identifier=:identifier;` + +var createWorkflowJobQuery = ` +INSERT INTO workflow_jobs ( + owner, + repo, + name, + status, + conclusion, + branch, + sha, + identifier, + run_id, + run_attempt, + created_at, + started_at, + completed_at, + labels, + runner_id, + runner_name, + runner_group_id, + runner_group_name, + workflow_name +) VALUES ( + :owner, + :repo, + :name, + :status, + :conclusion, + :branch, + :sha, + :identifier, + :run_id, + :run_attempt, + :created_at, + :started_at, + :completed_at, + :labels, + :runner_id, + :runner_name, + :runner_group_id, + :runner_group_name, + :workflow_name +);` + +var updateWorkflowJobQuery = ` +UPDATE + workflow_jobs +SET + run_attempt=:run_attempt, + name=:name, + status=:status, + branch=:branch, + sha=:sha, + identifier=:identifier, + created_at=:created_at, + started_at=:started_at +WHERE + owner=:owner AND repo=:repo AND identifier=:identifier;` + +var purgeWorkflowJobsQuery = ` +DELETE FROM + workflow_jobs +WHERE + created_at < :timeframe;` // FIXME: updated_at is gone diff --git a/pkg/store/mysql.go b/pkg/store/mysql.go index aed85783..ca55c316 100644 --- a/pkg/store/mysql.go +++ b/pkg/store/mysql.go @@ -132,6 +132,21 @@ func (s *mysqlStore) PruneWorkflowRuns(timeframe time.Duration) error { return pruneWorkflowRuns(s.handle, timeframe) } +// StoreWorkflowJobEvent implements the Store interface. +func (s *mysqlStore) StoreWorkflowJobEvent(event *github.WorkflowJobEvent) error { + return storeWorkflowJobEvent(s.handle, event) +} + +// GetWorkflowJobs implements the Store interface. +func (s *mysqlStore) GetWorkflowJobs() ([]*WorkflowJob, error) { + return getWorkflowJobs(s.handle) +} + +// PruneWorkflowJobs implements the Store interface. +func (s *mysqlStore) PruneWorkflowJobs(timeframe time.Duration) error { + return pruneWorkflowJobs(s.handle, timeframe) +} + func (s *mysqlStore) dsn() string { if s.password != "" { return fmt.Sprintf( diff --git a/pkg/store/postgres.go b/pkg/store/postgres.go index c830c1c2..4e3827e0 100644 --- a/pkg/store/postgres.go +++ b/pkg/store/postgres.go @@ -132,6 +132,21 @@ func (s *postgresStore) PruneWorkflowRuns(timeframe time.Duration) error { return pruneWorkflowRuns(s.handle, timeframe) } +// StoreWorkflowJobEvent implements the Store interface. +func (s *postgresStore) StoreWorkflowJobEvent(event *github.WorkflowJobEvent) error { + return storeWorkflowJobEvent(s.handle, event) +} + +// GetWorkflowJobs implements the Store interface. +func (s *postgresStore) GetWorkflowJobs() ([]*WorkflowJob, error) { + return getWorkflowJobs(s.handle) +} + +// PruneWorkflowJobs implements the Store interface. +func (s *postgresStore) PruneWorkflowJobs(timeframe time.Duration) error { + return pruneWorkflowJobs(s.handle, timeframe) +} + func (s *postgresStore) dsn() string { dsn := fmt.Sprintf( "host=%s port=%s dbname=%s user=%s", diff --git a/pkg/store/sqlite.go b/pkg/store/sqlite.go index a70c4f0e..6dbfe8f2 100644 --- a/pkg/store/sqlite.go +++ b/pkg/store/sqlite.go @@ -129,6 +129,21 @@ func (s *sqliteStore) PruneWorkflowRuns(timeframe time.Duration) error { return pruneWorkflowRuns(s.handle, timeframe) } +// StoreWorkflowJobEvent implements the Store interface. +func (s *sqliteStore) StoreWorkflowJobEvent(event *github.WorkflowJobEvent) error { + return storeWorkflowJobEvent(s.handle, event) +} + +// GetWorkflowJobs implements the Store interface. +func (s *sqliteStore) GetWorkflowJobs() ([]*WorkflowJob, error) { + return getWorkflowJobs(s.handle) +} + +// PruneWorkflowJobs implements the Store interface. +func (s *sqliteStore) PruneWorkflowJobs(timeframe time.Duration) error { + return pruneWorkflowJobs(s.handle, timeframe) +} + func (s *sqliteStore) dsn() string { if len(s.meta) > 0 { return fmt.Sprintf( diff --git a/pkg/store/store.go b/pkg/store/store.go index b8c62acd..28e2414f 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -1,30 +1,32 @@ package store import ( - "errors" "fmt" "log/slog" + "maps" "net/url" + "slices" + "strings" "time" "github.com/google/go-github/v66/github" ) -var ( - // ErrUnknownDriver defines a named error for unknown driver. - ErrUnknownDriver = errors.New("unknown database driver") - - // Drivers defines the list of registered database drivers. - Drivers = make(map[string]driver, 0) -) +// Drivers defines the list of registered database drivers. +var Drivers = make(map[string]driver, 0) type driver func(dsn string, logger *slog.Logger) (Store, error) // Store provides the interface for the store implementations. type Store interface { + // WorkflowRunEvent StoreWorkflowRunEvent(*github.WorkflowRunEvent) error GetWorkflowRuns() ([]*WorkflowRun, error) PruneWorkflowRuns(time.Duration) error + // WorkflowJobEvent + StoreWorkflowJobEvent(*github.WorkflowJobEvent) error + GetWorkflowJobs() ([]*WorkflowJob, error) + PruneWorkflowJobs(time.Duration) error Open() error Close() error @@ -35,7 +37,6 @@ type Store interface { // New initializes a new database driver supported by current os. func New(dsn string, logger *slog.Logger) (Store, error) { parsed, err := url.Parse(dsn) - if err != nil { return nil, fmt.Errorf("failed to parse dsn: %w", err) } @@ -44,7 +45,7 @@ func New(dsn string, logger *slog.Logger) (Store, error) { return val(dsn, logger) } - return nil, ErrUnknownDriver + return nil, fmt.Errorf("unknown database driver %s. available drivers are %v", parsed.Scheme, strings.Join(slices.Collect(maps.Keys(Drivers)), ", ")) } func register(name string, f driver) { diff --git a/pkg/store/types.go b/pkg/store/types.go index 41c20f2f..1fad13f1 100644 --- a/pkg/store/types.go +++ b/pkg/store/types.go @@ -6,8 +6,9 @@ import ( // WorkflowRun defines the type returned by GitHub. type WorkflowRun struct { - Owner string `db:"owner"` - Repo string `db:"repo"` + Owner string `db:"owner"` + Repo string `db:"repo"` + WorkflowID int64 `db:"workflow_id"` Event string `db:"event"` Name string `db:"name"` @@ -57,3 +58,61 @@ func (r *WorkflowRun) ByLabel(label string) string { return "" } + +// WorkflowJob defines the type returned by GitHub. +type WorkflowJob struct { + Owner string `db:"owner"` + Repo string `db:"repo"` + + Name string `db:"name"` + + Status string `db:"status"` + Conclusion string `db:"conclusion"` + Branch string `db:"branch"` + SHA string `db:"sha"` + Identifier int64 `db:"identifier"` + + RunID int64 `db:"run_id"` + RunAttempt int `db:"run_attempt"` + + CreatedAt int64 `db:"created_at"` + StartedAt int64 `db:"started_at"` + CompletedAt int64 `db:"completed_at"` + // Steps []*TaskStep `db:"steps"` // FIXME: Not implemented + Labels string `db:"labels"` // FIXME: comma separated ok? + RunnerID int64 `db:"runner_id"` + RunnerName string `db:"runner_name"` + RunnerGroupID int64 `db:"runner_group_id"` + RunnerGroupName string `db:"runner_group_name"` + WorkflowName string `db:"workflow_name"` +} + +// ByLabel returns values by the defined list of labels. +func (r *WorkflowJob) ByLabel(label string) string { + switch label { + case "owner": + return r.Owner + case "repo": + return r.Repo + case "name": + return r.Name + case "title": + return r.Name + case "status": + return r.Status + case "branch": + return r.Branch + case "sha": + return r.SHA + case "run_id": + return strconv.FormatInt(r.RunID, 10) + case "run_attempt": + return strconv.Itoa(r.RunAttempt) + case "job": + return strconv.FormatInt(r.Identifier, 10) + case "labels": + return r.Labels + } + + return "" +} From aba8b5d958099174c787c6982043c253633a6ea2 Mon Sep 17 00:00:00 2001 From: Marcus Ramberg Date: Thu, 26 Sep 2024 14:21:21 +0200 Subject: [PATCH 2/2] feat: Add workflow job collector --- .gitignore | 1 - pkg/action/server.go | 66 +++++----- pkg/command/command.go | 21 ++++ pkg/config/config.go | 63 ++++++---- pkg/exporter/workflow_job.go | 196 ++++++++++++++++++++++++++++++ pkg/exporter/workflow_job_test.go | 13 +- pkg/store/chai.go | 38 +++--- pkg/store/generic_workflow_job.go | 32 +++-- pkg/store/mysql.go | 28 +++++ pkg/store/postgres.go | 28 +++++ pkg/store/sqlite.go | 28 +++++ 11 files changed, 422 insertions(+), 92 deletions(-) create mode 100644 pkg/exporter/workflow_job.go diff --git a/.gitignore b/.gitignore index 3117e5a4..f09a7957 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,4 @@ .direnv -.envrc .devenv coverage.out diff --git a/pkg/action/server.go b/pkg/action/server.go index 33e54e3f..fbf09ceb 100644 --- a/pkg/action/server.go +++ b/pkg/action/server.go @@ -35,7 +35,6 @@ func Server(cfg *config.Config, db store.Store, logger *slog.Logger) error { ) client, err := getClient(cfg, logger) - if err != nil { return err } @@ -188,6 +187,19 @@ func handler(cfg *config.Config, db store.Store, logger *slog.Logger, client *gi )) } + if cfg.Collector.WorkflowJobs { + logger.Debug("WorkflowJob collector registered") + + registry.MustRegister(exporter.NewWorkflowJobCollector( + logger, + client, + db, + requestFailures, + requestDuration, + cfg.Target, + )) + } + reg := promhttp.HandlerFor( registry, promhttp.HandlerOpts{ @@ -202,10 +214,9 @@ func handler(cfg *config.Config, db store.Store, logger *slog.Logger, client *gi mux.Route("/", func(root chi.Router) { root.Handle(cfg.Server.Path, reg) - if cfg.Collector.Workflows { + if cfg.Collector.Workflows || cfg.Collector.WorkflowJobs { root.HandleFunc(cfg.Webhook.Path, func(w http.ResponseWriter, r *http.Request) { secret, err := config.Value(cfg.Webhook.Secret) - if err != nil { logger.Error("Failed to read webhook secret", "error", err, @@ -221,7 +232,6 @@ func handler(cfg *config.Config, db store.Store, logger *slog.Logger, client *gi r, []byte(secret), ) - if err != nil { logger.Error("Failed to parse github webhook", "error", err, @@ -237,7 +247,6 @@ func handler(cfg *config.Config, db store.Store, logger *slog.Logger, client *gi github.WebHookType(r), payload, ) - if err != nil { logger.Error("Failed to parse github event", "error", err, @@ -283,31 +292,30 @@ func handler(cfg *config.Config, db store.Store, logger *slog.Logger, client *gi io.WriteString(w, http.StatusText(http.StatusInternalServerError)) } case *github.WorkflowJobEvent: - wf_job := event.WorkflowJob - level.Debug(logger).Log( - "msg", "received webhook request", + wfJob := event.GetWorkflowJob() + logger.Debug("received webhook request", "type", "workflow_job", - "owner", event.Repo.Owner.Login, - "repo", event.Repo.Name, - "id", wf_job.ID, - "name", wf_job.Name, - "attempt", wf_job.RunAttempt, - "status", wf_job.Status, - "conclusion", wf_job.Conclusion, - "created_at", wf_job.CreatedAt.Time.Unix(), - "started_at", wf_job.StartedAt.Time.Unix(), - "completed_at", wf_job.GetCompletedAt().Time.Unix(), - "labels", strings.Join(event.WorkflowJob.Labels, ", "), + "owner", event.GetRepo().GetOwner().GetLogin(), + "repo", event.GetRepo().GetName(), + "id", wfJob.GetID(), + "name", wfJob.GetName(), + "attempt", wfJob.GetRunAttempt(), + "status", wfJob.GetStatus(), + "conclusion", wfJob.GetConclusion(), + "created_at", wfJob.GetCreatedAt().Time.Unix(), + "started_at", wfJob.GetStartedAt().Time.Unix(), + "completed_at", wfJob.GetCompletedAt().Time.Unix(), + "labels", strings.Join(wfJob.Labels, ", "), ) if err := db.StoreWorkflowJobEvent(event); err != nil { - level.Error(logger).Log( - "msg", "failed to store github event", + logger.Error( + "failed to store github event", "type", "workflow_job", - "owner", event.Repo.Owner.Login, - "repo", event.Repo.Name, - "name", wf_job.Name, - "id", wf_job.ID, + "owner", event.GetRepo().GetOwner().GetLogin(), + "repo", event.GetRepo().GetName(), + "name", wfJob.GetName(), + "id", wfJob.GetID(), "error", err, ) @@ -358,7 +366,6 @@ func getClient(cfg *config.Config, logger *slog.Logger) (*github.Client, error) if useApplication(cfg, logger) { privateKey, err := config.Value(cfg.Target.PrivateKey) - if err != nil { logger.Error("Failed to read GitHub key", "err", err, @@ -373,7 +380,6 @@ func getClient(cfg *config.Config, logger *slog.Logger) (*github.Client, error) cfg.Target.InstallID, []byte(privateKey), ) - if err != nil { logger.Error("Failed to create GitHub transport", "err", err, @@ -390,7 +396,6 @@ func getClient(cfg *config.Config, logger *slog.Logger) (*github.Client, error) } accessToken, err := config.Value(cfg.Target.Token) - if err != nil { logger.Error("Failed to read token", "err", err, @@ -414,7 +419,6 @@ func getClient(cfg *config.Config, logger *slog.Logger) (*github.Client, error) func getEnterprise(cfg *config.Config, logger *slog.Logger) (*github.Client, error) { if useApplication(cfg, logger) { privateKey, err := config.Value(cfg.Target.PrivateKey) - if err != nil { logger.Error("Failed to read GitHub key", "err", err, @@ -429,7 +433,6 @@ func getEnterprise(cfg *config.Config, logger *slog.Logger) (*github.Client, err cfg.Target.InstallID, []byte(privateKey), ) - if err != nil { logger.Error("Failed to create GitHub transport", "err", err, @@ -453,7 +456,6 @@ func getEnterprise(cfg *config.Config, logger *slog.Logger) (*github.Client, err cfg.Target.BaseURL, cfg.Target.BaseURL, ) - if err != nil { logger.Error("Failed to parse base URL", "err", err, @@ -466,7 +468,6 @@ func getEnterprise(cfg *config.Config, logger *slog.Logger) (*github.Client, err } accessToken, err := config.Value(cfg.Target.Token) - if err != nil { logger.Error("Failed to read token", "err", err, @@ -498,7 +499,6 @@ func getEnterprise(cfg *config.Config, logger *slog.Logger) (*github.Client, err cfg.Target.BaseURL, cfg.Target.BaseURL, ) - if err != nil { logger.Error("Failed to parse base URL", "err", err, diff --git a/pkg/command/command.go b/pkg/command/command.go index aa7b17fd..f88ceb65 100644 --- a/pkg/command/command.go +++ b/pkg/command/command.go @@ -312,6 +312,27 @@ func RootFlags(cfg *config.Config) []cli.Flag { EnvVars: []string{"GITHUB_EXPORTER_WORKFLOWS_LABELS"}, Destination: &cfg.Target.Workflows.Labels, }, + &cli.BoolFlag{ + Name: "collector.workflow_jobs", + Value: false, + Usage: "Enable collector for workflow jobs", + EnvVars: []string{"GITHUB_EXPORTER_COLLECTOR_WORKFLOW_JOBS"}, + Destination: &cfg.Collector.WorkflowJobs, + }, + &cli.DurationFlag{ + Name: "collector.workflow_jobs.window", + Value: 24 * time.Hour, + Usage: "History window for querying workflow jobs", + EnvVars: []string{"GITHUB_EXPORTER_WORKFLOW_JOBS_WINDOW"}, + Destination: &cfg.Target.WorkflowJobs.Window, + }, + &cli.StringSliceFlag{ + Name: "collector.workflow_jobs.labels", + Value: config.JobLabels(), + Usage: "List of labels used for workflow jobs", + EnvVars: []string{"GITHUB_EXPORTER_WORKFLOWS_LABELS"}, + Destination: &cfg.Target.WorkflowJobs.Labels, + }, &cli.BoolFlag{ Name: "collector.runners", Value: false, diff --git a/pkg/config/config.go b/pkg/config/config.go index fcc9a8ed..580e92d8 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -37,6 +37,12 @@ type Workflows struct { Labels cli.StringSlice } +// WorkflowJobs defines the workflow job specific configuration. +type WorkflowJobs struct { + Window time.Duration + Labels cli.StringSlice +} + // Runners defines the runner specific configuration. type Runners struct { Labels cli.StringSlice @@ -44,29 +50,31 @@ type Runners struct { // Target defines the target specific configuration. type Target struct { - Token string - PrivateKey string - AppID int64 - InstallID int64 - BaseURL string - Insecure bool - Enterprises cli.StringSlice - Orgs cli.StringSlice - Repos cli.StringSlice - Timeout time.Duration - PerPage int - Workflows Workflows - Runners Runners + Token string + PrivateKey string + AppID int64 + InstallID int64 + BaseURL string + Insecure bool + Enterprises cli.StringSlice + Orgs cli.StringSlice + Repos cli.StringSlice + Timeout time.Duration + PerPage int + Workflows Workflows + WorkflowJobs WorkflowJobs + Runners Runners } // Collector defines the collector specific configuration. type Collector struct { - Admin bool - Orgs bool - Repos bool - Billing bool - Workflows bool - Runners bool + Admin bool + Orgs bool + Repos bool + Billing bool + Workflows bool + WorkflowJobs bool + Runners bool } // Database defines the database specific configuration. @@ -104,6 +112,21 @@ func Labels() *cli.StringSlice { ) } +// JobLabels defines the default labels used by workflow_job collector. +func JobLabels() *cli.StringSlice { + return cli.NewStringSlice( + "owner", + "repo", + "name", + "title", + "branch", + "sha", + "run_id", + "run_attempt", + "labels", + ) +} + // RunnerLabels defines the default labels used by runner collector. func RunnerLabels() *cli.StringSlice { return cli.NewStringSlice( @@ -117,7 +140,6 @@ func Value(val string) (string, error) { content, err := os.ReadFile( strings.TrimPrefix(val, "file://"), ) - if err != nil { return "", fmt.Errorf("failed to parse secret file: %w", err) } @@ -129,7 +151,6 @@ func Value(val string) (string, error) { content, err := base64.StdEncoding.DecodeString( strings.TrimPrefix(val, "base64://"), ) - if err != nil { return "", fmt.Errorf("failed to parse base64 value: %w", err) } diff --git a/pkg/exporter/workflow_job.go b/pkg/exporter/workflow_job.go new file mode 100644 index 00000000..ff10ffb6 --- /dev/null +++ b/pkg/exporter/workflow_job.go @@ -0,0 +1,196 @@ +package exporter + +import ( + "log/slog" + "time" + + "github.com/google/go-github/v66/github" + "github.com/prometheus/client_golang/prometheus" + "github.com/promhippie/github_exporter/pkg/config" + "github.com/promhippie/github_exporter/pkg/store" +) + +// WorkflowJobCollector collects metrics about the servers. +type WorkflowJobCollector struct { + client *github.Client + logger *slog.Logger + db store.Store + failures *prometheus.CounterVec + duration *prometheus.HistogramVec + config config.Target + + Status *prometheus.Desc + Duration *prometheus.Desc + Creation *prometheus.Desc + Created *prometheus.Desc + Started *prometheus.Desc +} + +// NewWorkflowJobCollector returns a new WorkflowCollector. +func NewWorkflowJobCollector(logger *slog.Logger, client *github.Client, db store.Store, failures *prometheus.CounterVec, duration *prometheus.HistogramVec, cfg config.Target) *WorkflowJobCollector { + if failures != nil { + failures.WithLabelValues("action").Add(0) + } + + labels := cfg.Workflows.Labels.Value() + return &WorkflowJobCollector{ + client: client, + logger: logger.With("collector", "workflow_job"), + db: db, + failures: failures, + duration: duration, + config: cfg, + + Status: prometheus.NewDesc( + "github_workflow_job_status", + "Status of workflow jobs", + labels, + nil, + ), + Duration: prometheus.NewDesc( + "github_workflow_job_duration_ms", + "Duration of workflow runs", + labels, + nil, + ), + Creation: prometheus.NewDesc( + "github_workflow_job_duration_run_created_minutes", + "Duration since the workflow run creation time in minutes", + labels, + nil, + ), + Created: prometheus.NewDesc( + "github_workflow_job_created_timestamp", + "Timestamp when the workflow job have been created", + labels, + nil, + ), + Started: prometheus.NewDesc( + "github_workflow_job_started_timestamp", + "Timestamp when the workflow job have been started", + labels, + nil, + ), + } +} + +// Metrics simply returns the list metric descriptors for generating a documentation. +func (c *WorkflowJobCollector) Metrics() []*prometheus.Desc { + return []*prometheus.Desc{ + c.Status, + c.Duration, + c.Creation, + c.Created, + c.Started, + } +} + +// Describe sends the super-set of all possible descriptors of metrics collected by this Collector. +func (c *WorkflowJobCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- c.Status + ch <- c.Duration + ch <- c.Creation + ch <- c.Created + ch <- c.Started +} + +// Collect is called by the Prometheus registry when collecting metrics. +func (c *WorkflowJobCollector) Collect(ch chan<- prometheus.Metric) { + if err := c.db.PruneWorkflowJobs( + c.config.WorkflowJobs.Window, + ); err != nil { + c.logger.Error( + "Failed to prune workflow jobs", + "err", err, + ) + } + + now := time.Now() + records, err := c.db.GetWorkflowJobs() + c.duration.WithLabelValues("workflow_job").Observe(time.Since(now).Seconds()) + + if err != nil { + c.logger.Error( + "Failed to fetch workflows", + "err", err, + ) + + c.failures.WithLabelValues("workflow_job").Inc() + return + } + + c.logger.Debug( + "Fetched workflow jobs", + "count", len(records), + "duration", time.Since(now), + ) + + for _, record := range records { + c.logger.Debug( + "Collecting workflow job", + "owner", record.Owner, + "repo", record.Repo, + "id", record.Identifier, + "run_id", record.RunID, + ) + + labels := []string{} + + for _, label := range c.config.WorkflowJobs.Labels.Value() { + labels = append( + labels, + record.ByLabel(label), + ) + } + + ch <- prometheus.MustNewConstMetric( + c.Status, + prometheus.GaugeValue, + jobStatusToGauge(record.Status), + labels..., + ) + + ch <- prometheus.MustNewConstMetric( + c.Duration, + prometheus.GaugeValue, + float64((record.CompletedAt-record.StartedAt)*1000), + labels..., + ) + + ch <- prometheus.MustNewConstMetric( + c.Creation, + prometheus.GaugeValue, + time.Since(time.Unix(record.StartedAt, 0)).Minutes(), + labels..., + ) + + ch <- prometheus.MustNewConstMetric( + c.Created, + prometheus.GaugeValue, + float64(record.CreatedAt), + labels..., + ) + + ch <- prometheus.MustNewConstMetric( + c.Started, + prometheus.GaugeValue, + float64(record.StartedAt), + labels..., + ) + } +} + +func jobStatusToGauge(conclusion string) float64 { + switch conclusion { + case "queued": + return 1.0 + case "waiting": + return 2.0 + case "in_progress": + return 3.0 + case "completed": + return 4.0 + } + + return 0.0 +} diff --git a/pkg/exporter/workflow_job_test.go b/pkg/exporter/workflow_job_test.go index 8bfdfbcb..6220b47c 100644 --- a/pkg/exporter/workflow_job_test.go +++ b/pkg/exporter/workflow_job_test.go @@ -1,12 +1,14 @@ package exporter import ( + "fmt" + "log/slog" + "os" "reflect" "testing" "time" - "github.com/go-kit/log" - "github.com/google/go-github/v63/github" + "github.com/google/go-github/v66/github" "github.com/prometheus/client_golang/prometheus" "github.com/promhippie/github_exporter/pkg/config" "github.com/promhippie/github_exporter/pkg/store" @@ -15,6 +17,7 @@ import ( type StaticStore struct{} func (s StaticStore) GetWorkflowJobRuns(owner, repo, workflow string) ([]*store.WorkflowRun, error) { + fmt.Fprintf(os.Stdout, "GetWorkflowJobRuns for %s/%s %s \n", owner, repo, workflow) return nil, nil } @@ -36,7 +39,11 @@ func (s StaticStore) Migrate() error { return nil } func TestWorkflowJobCollector(t *testing.T) { // Mock dependencies mockClient := &github.Client{} - mockLogger := log.NewNopLogger() + mockLogger := slog.New( + slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ + Level: slog.LevelDebug, + }), + ) mockStore := StaticStore{} mockFailures := prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "test_failures_total", diff --git a/pkg/store/chai.go b/pkg/store/chai.go index 60ea29ef..eec42ef5 100644 --- a/pkg/store/chai.go +++ b/pkg/store/chai.go @@ -18,11 +18,12 @@ import ( _ "github.com/chaisql/chai/driver" ) -var chaiMigrations = []darwin.Migration{ - { - Version: 1, - Description: "Creating table workflow_runs", - Script: `CREATE TABLE workflow_runs ( +var ( + chaiMigrations = []darwin.Migration{ + { + Version: 1, + Description: "Creating table workflow_runs", + Script: `CREATE TABLE workflow_runs ( owner TEXT NOT NULL, repo TEXT NOT NULL, workflow_id INTEGER NOT NULL, @@ -40,16 +41,16 @@ var chaiMigrations = []darwin.Migration{ started_at INTEGER, PRIMARY KEY(owner, repo, workflow_id, number) );`, - }, - { - Version: 2, - Description: "Adding actor column to workflow_runs table", - Script: `ALTER TABLE workflow_runs ADD COLUMN actor TEXT;`, - }, - { - Version: 3, - Description: "Creating table workflow_jobs", - Script: `CREATE TABLE workflow_jobs ( + }, + { + Version: 2, + Description: "Adding actor column to workflow_runs table", + Script: `ALTER TABLE workflow_runs ADD COLUMN actor TEXT;`, + }, + { + Version: 3, + Description: "Creating table workflow_jobs", + Script: `CREATE TABLE workflow_jobs ( owner TEXT NOT NULL, repo TEXT NOT NULL, name TEXT, @@ -73,8 +74,9 @@ var chaiMigrations = []darwin.Migration{ workflow_name TEXT, PRIMARY KEY(owner, repo, identifier) );`, - }, -} + }, + } +) func init() { register("chai", NewChaiStore) @@ -96,6 +98,7 @@ func (s *chaiStore) Open() (err error) { s.driver, s.dsn(), ) + if err != nil { return err } @@ -176,6 +179,7 @@ func (s *chaiStore) dsn() string { // NewChaiStore initializes a new MySQL store. func NewChaiStore(dsn string, logger *slog.Logger) (Store, error) { parsed, err := url.Parse(dsn) + if err != nil { return nil, fmt.Errorf("failed to parse dsn: %w", err) } diff --git a/pkg/store/generic_workflow_job.go b/pkg/store/generic_workflow_job.go index 55a93780..59fa2a74 100644 --- a/pkg/store/generic_workflow_job.go +++ b/pkg/store/generic_workflow_job.go @@ -7,7 +7,7 @@ import ( "strings" "time" - "github.com/google/go-github/v63/github" + "github.com/google/go-github/v66/github" "github.com/jmoiron/sqlx" ) @@ -16,25 +16,25 @@ func storeWorkflowJobEvent(handle *sqlx.DB, event *github.WorkflowJobEvent) erro job := event.WorkflowJob record := &WorkflowJob{ - Owner: *event.Repo.Owner.Login, - Repo: *event.Repo.Name, - Name: *job.Name, - Status: *job.Status, + Owner: event.GetRepo().GetOwner().GetLogin(), + Repo: event.GetRepo().GetName(), + Name: job.GetName(), + Status: job.GetStatus(), Conclusion: job.GetConclusion(), - Branch: *job.HeadBranch, - SHA: *job.HeadSHA, - Identifier: *event.WorkflowJob.ID, - RunID: *job.RunID, - RunAttempt: int(*job.RunAttempt), - CreatedAt: job.CreatedAt.Time.Unix(), - StartedAt: job.StartedAt.Time.Unix(), + Branch: job.GetHeadBranch(), + SHA: job.GetHeadSHA(), + Identifier: event.GetWorkflowJob().GetID(), + RunID: job.GetRunID(), + RunAttempt: int(job.GetRunAttempt()), + CreatedAt: job.GetCreatedAt().Time.Unix(), + StartedAt: job.GetStartedAt().Time.Unix(), CompletedAt: job.GetCompletedAt().Time.Unix(), Labels: strings.Join(job.Labels, ","), - RunnerID: *job.RunID, + RunnerID: job.GetRunID(), RunnerName: job.GetRunnerName(), RunnerGroupID: job.GetRunnerGroupID(), RunnerGroupName: job.GetRunnerGroupName(), - WorkflowName: *job.WorkflowName, + WorkflowName: job.GetWorkflowName(), } return createOrUpdateWorkflowJob(handle, record) @@ -61,7 +61,6 @@ func createOrUpdateWorkflowJob(handle *sqlx.DB, record *WorkflowJob) error { return fmt.Errorf("failed to create record: %w", err) } } else { - // FIXME: UpdatedAt does not exist, does it make sense to use CreatedAt? if existing.CreatedAt > record.CreatedAt { return nil } else if existing.CreatedAt == record.CreatedAt && existing.Status == "completed" { @@ -89,7 +88,6 @@ func getWorkflowJobs(handle *sqlx.DB) ([]*WorkflowJob, error) { rows, err := handle.Queryx( selectWorkflowJobsQuery, ) - if err != nil { return records, err } @@ -228,4 +226,4 @@ var purgeWorkflowJobsQuery = ` DELETE FROM workflow_jobs WHERE - created_at < :timeframe;` // FIXME: updated_at is gone + created_at < :timeframe;` diff --git a/pkg/store/mysql.go b/pkg/store/mysql.go index ca55c316..16c37dbf 100644 --- a/pkg/store/mysql.go +++ b/pkg/store/mysql.go @@ -47,6 +47,34 @@ var ( Description: "Altering table workflow_runs to add actor column", Script: `ALTER TABLE workflow_runs ADD COLUMN actor VARCHAR(255);`, }, + { + Version: 3, + Description: "Creating table workflow_jobs", + Script: `CREATE TABLE workflow_jobs ( + owner VARCHAR(255) NOT NULL, + repo VARCHAR(255) NOT NULL, + name VARCHAR(255), + status VARCHAR(255), + branch VARCHAR(255), + sha VARCHAR(255), + conclusion VARCHAR(255), + labels VARCHAR(255), + identifier INTEGER, + + run_id INTEGER NOT NULL, + run_attempt INTEGER NOT NULL, + + created_at BIGINT, + started_at BIGINT, + completed_at BIGINT, + runner_id INTEGER, + runner_name VARCHAR(255), + runner_group_id INTEGER, + runner_group_name VARCHAR(255), + workflow_name VARCHAR(255), + PRIMARY KEY(owner, repo, identifier) + );`, + }, } ) diff --git a/pkg/store/postgres.go b/pkg/store/postgres.go index 4e3827e0..e05e02c8 100644 --- a/pkg/store/postgres.go +++ b/pkg/store/postgres.go @@ -47,6 +47,34 @@ var ( Description: "Adding actor column to workflow_runs table", Script: `ALTER TABLE workflow_runs ADD COLUMN actor TEXT;`, }, + { + Version: 3, + Description: "Creating table workflow_jobs", + Script: `CREATE TABLE workflow_jobs ( + owner TEXT NOT NULL, + repo TEXT NOT NULL, + name TEXT, + status TEXT, + branch TEXT, + sha TEXT, + conclusion TEXT, + labels TEXT, + identifier INTEGER, + + run_id INTEGER NOT NULL, + run_attempt INTEGER NOT NULL, + + created_at BIGINT, + started_at BIGINT, + completed_at BIGINT, + runner_id INTEGER, + runner_name TEXT, + runner_group_id INTEGER, + runner_group_name TEXT, + workflow_name TEXT, + PRIMARY KEY(owner, repo, identifier) + );`, + }, } ) diff --git a/pkg/store/sqlite.go b/pkg/store/sqlite.go index 6dbfe8f2..38e70818 100644 --- a/pkg/store/sqlite.go +++ b/pkg/store/sqlite.go @@ -48,6 +48,34 @@ var ( Description: "Adding actor column to workflow_runs table", Script: `ALTER TABLE workflow_runs ADD COLUMN actor TEXT;`, }, + { + Version: 3, + Description: "Creating table workflow_jobs", + Script: `CREATE TABLE workflow_jobs ( + owner TEXT NOT NULL, + repo TEXT NOT NULL, + name TEXT, + status TEXT, + branch TEXT, + sha TEXT, + conclusion TEXT, + labels TEXT, + identifier INTEGER, + + run_id INTEGER NOT NULL, + run_attempt INTEGER NOT NULL, + + created_at BIGINT, + started_at BIGINT, + completed_at BIGINT, + runner_id INTEGER, + runner_name TEXT, + runner_group_id INTEGER, + runner_group_name TEXT, + workflow_name TEXT, + PRIMARY KEY(owner, repo, identifier) + );`, + }, } )