Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce cron job metrics #2256

Merged
merged 12 commits into from
Aug 7, 2024
14 changes: 14 additions & 0 deletions backend/controller/cronjobs/cronjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/TBD54566975/ftl/backend/controller/cronjobs/dal"
parentdal "github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/observability"
"github.com/TBD54566975/ftl/backend/controller/scheduledtask"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema"
Expand Down Expand Up @@ -202,6 +203,7 @@ func (s *Service) executeJob(ctx context.Context, job model.CronJob) {
requestJSON, err := json.Marshal(requestBody)
if err != nil {
logger.Errorf(err, "could not build body for cron job: %v", job.Key)
observability.Cron.JobFailed(ctx, job.Key, job.DeploymentKey)
return
}

Expand All @@ -214,26 +216,34 @@ func (s *Service) executeJob(ctx context.Context, job model.CronJob) {

callCtx, cancel := context.WithTimeout(ctx, s.config.Timeout)
defer cancel()

observability.Cron.JobExecutionStarted(ctx, job.Key, job.DeploymentKey)
_, err = s.call(callCtx, req, optional.Some(requestKey), s.requestSource)
observability.Cron.JobExecutionCompleted(ctx, job.Key, job.DeploymentKey, job.NextExecution)

if err != nil {
logger.Errorf(err, "failed to execute cron job %v", job.Key)
observability.Cron.JobFailed(ctx, job.Key, job.DeploymentKey)
// Do not return, continue to end the job and schedule the next execution
}

schedule, err := cron.Parse(job.Schedule)
if err != nil {
logger.Errorf(err, "failed to parse cron schedule %q", job.Schedule)
observability.Cron.JobFailed(ctx, job.Key, job.DeploymentKey)
return
}
next, err := cron.NextAfter(schedule, s.clock.Now().UTC(), false)
if err != nil {
logger.Errorf(err, "failed to calculate next execution for cron job %v with schedule %q", job.Key, job.Schedule)
observability.Cron.JobFailed(ctx, job.Key, job.DeploymentKey)
return
}

updatedJob, err := s.dal.EndCronJob(ctx, job, next)
if err != nil {
logger.Errorf(err, "failed to end cron job %v", job.Key)
observability.Cron.JobFailed(ctx, job.Key, job.DeploymentKey)
} else {
s.events.Publish(endedJobsEvent{
jobs: []model.CronJob{updatedJob},
Expand All @@ -260,20 +270,24 @@ func (s *Service) killOldJobs(ctx context.Context) (time.Duration, error) {
pattern, err := cron.Parse(stale.Schedule)
if err != nil {
logger.Errorf(err, "Could not kill stale cron job %q because schedule could not be parsed: %q", stale.Key, stale.Schedule)
observability.Cron.JobFailed(ctx, stale.Key, stale.DeploymentKey)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of bundling all the failures to kill a job with failures of the actual job itself, could we refactor this to the following?

  • JobRun with attribute .succeeded
  • JobKilled with attribute .succeeded

Copy link
Contributor Author

@jonathanj-square jonathanj-square Aug 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea and would like to take it a little further. Removing the jobFailures counter does result in a loss of visibility that can be avoided if the ftl.status.succeeded attribute can take on a more generalized semantic. Changing from success/fail to "outcome" would help. Outcome is a string representing the canonical set of outcomes for a given domain. So in the metrics where the ftl.status.succeed is used today; outcome would have the values of succeeded or failed.

For cron jobs I would like outcomes covering the following: success, failed to start, execution failure, and killed. This would allow me to remove the jobsKilled metric and segment latency by outcome

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

continue
}
next, err := cron.NextAfter(pattern, start, false)
if err != nil {
logger.Errorf(err, "Could not kill stale cron job %q because next date could not be calculated: %q", stale.Key, stale.Schedule)
observability.Cron.JobFailed(ctx, stale.Key, stale.DeploymentKey)
continue
}

updated, err := s.dal.EndCronJob(ctx, stale, next)
if err != nil {
logger.Errorf(err, "Could not kill stale cron job %s because: %v", stale.Key, err)
observability.Cron.JobFailed(ctx, stale.Key, stale.DeploymentKey)
continue
}
logger.Warnf("Killed stale cron job %s", stale.Key)
observability.Cron.JobKilled(ctx, stale.Key, stale.DeploymentKey)
updatedJobs = append(updatedJobs, updated)
}

Expand Down
106 changes: 106 additions & 0 deletions backend/controller/observability/cron.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package observability

import (
"context"
"fmt"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/observability"
)

const (
cronMeterName = "ftl.cron"
cronJobRefAttribute = "ftl.cron.job.ref"
)

type CronMetrics struct {
jobFailures metric.Int64Counter
jobsKilled metric.Int64Counter
jobsActive metric.Int64UpDownCounter
jobLatency metric.Int64Histogram
}

func initCronMetrics() (*CronMetrics, error) {
result := &CronMetrics{}

var errs error
var err error

meter := otel.Meter(deploymentMeterName)

counter := fmt.Sprintf("%s.job.failures", cronMeterName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/failures/completed/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😅 thanks for spotting that, updated

if result.jobFailures, err = meter.Int64Counter(
counter,
metric.WithDescription("the number of failures encountered while performing activities associated with starting or ending a cron job")); err != nil {
result.jobFailures, errs = handleInt64CounterError(counter, err, errs)
}

counter = fmt.Sprintf("%s.jobs.kills", cronMeterName)
if result.jobsKilled, err = meter.Int64Counter(
counter,
metric.WithDescription("the number cron jobs killed by the controller")); err != nil {
result.jobsKilled, errs = handleInt64CounterError(counter, err, errs)
}

counter = fmt.Sprintf("%s.jobs.active", cronMeterName)
if result.jobsActive, err = meter.Int64UpDownCounter(
counter,
metric.WithDescription("the number of actively executing cron jobs")); err != nil {
result.jobsActive, errs = handleInt64UpDownCounterError(counter, err, errs)
}

counter = fmt.Sprintf("%s.job.latency", cronMeterName)
if result.jobLatency, err = meter.Int64Histogram(
counter,
metric.WithDescription("the latency between the scheduled execution time and completion of a cron job"),
metric.WithUnit("ms")); err != nil {
result.jobLatency, errs = handleInt64HistogramCounterError(counter, err, errs)
}

return result, errs
}

func (m *CronMetrics) JobExecutionStarted(ctx context.Context, job model.CronJobKey, deployment model.DeploymentKey) {
m.jobsActive.Add(ctx, 1, metric.WithAttributes(
attribute.String(observability.ModuleNameAttribute, job.Payload.Module),
attribute.String(cronJobRefAttribute, job.String()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just confirming - is job.String() the ref to the cron job declaration?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the module qualified verb name

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahhh in that case should we rename the attribute key to match?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

attribute.String(observability.RunnerDeploymentKeyAttribute, deployment.String()),
))
}

func (m *CronMetrics) JobExecutionCompleted(ctx context.Context, job model.CronJobKey, deployment model.DeploymentKey, scheduled time.Time) {
elapsed := time.Since(scheduled)

m.jobsActive.Add(ctx, -1, metric.WithAttributes(
attribute.String(observability.ModuleNameAttribute, job.Payload.Module),
attribute.String(cronJobRefAttribute, job.String()),
attribute.String(observability.RunnerDeploymentKeyAttribute, deployment.String()),
))

m.jobLatency.Record(ctx, elapsed.Milliseconds(), metric.WithAttributes(
attribute.String(observability.ModuleNameAttribute, job.Payload.Module),
attribute.String(cronJobRefAttribute, job.String()),
attribute.String(observability.RunnerDeploymentKeyAttribute, deployment.String()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since these attributes are identical to the ones above for jobsActive, would you mind deduplicating them into an attrs array? In fact, since they're duplicated across all these functions, could we make a helper function to construct attrs given job, deployment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

))
}

func (m *CronMetrics) JobKilled(ctx context.Context, job model.CronJobKey, deployment model.DeploymentKey) {
m.jobsActive.Add(ctx, -1, metric.WithAttributes(
attribute.String(observability.ModuleNameAttribute, job.Payload.Module),
attribute.String(cronJobRefAttribute, job.String()),
attribute.String(observability.RunnerDeploymentKeyAttribute, deployment.String()),
))
}

func (m *CronMetrics) JobFailed(ctx context.Context, job model.CronJobKey, deployment model.DeploymentKey) {
m.jobFailures.Add(ctx, 1, metric.WithAttributes(
attribute.String(observability.ModuleNameAttribute, job.Payload.Module),
attribute.String(cronJobRefAttribute, job.String()),
attribute.String(observability.RunnerDeploymentKeyAttribute, deployment.String()),
))
}
8 changes: 8 additions & 0 deletions backend/controller/observability/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ var (
Deployment *DeploymentMetrics
FSM *FSMMetrics
PubSub *PubSubMetrics
Cron *CronMetrics
)

func init() {
Expand All @@ -28,6 +29,8 @@ func init() {
errs = errors.Join(errs, err)
PubSub, err = initPubSubMetrics()
errs = errors.Join(errs, err)
Cron, err = initCronMetrics()
errs = errors.Join(errs, err)

if err != nil {
panic(fmt.Errorf("could not initialize controller metrics: %w", errs))
Expand All @@ -44,6 +47,11 @@ func handleInt64UpDownCounterError(counter string, err error, errs error) (metri
return noop.Int64UpDownCounter{}, errors.Join(errs, fmt.Errorf("%q counter init failed; falling back to noop: %w", counter, err))
}

//nolint:unparam
func handleInt64HistogramCounterError(counter string, err error, errs error) (metric.Int64Histogram, error) {
return noop.Int64Histogram{}, errors.Join(errs, fmt.Errorf("%q counter init failed; falling back to noop: %w", counter, err))
}

func timeSinceMS(start time.Time) int64 {
return time.Since(start).Milliseconds()
}
Loading