Skip to content

Commit

Permalink
Initialize response codes counter (flyteorg#268)
Browse files Browse the repository at this point in the history
* Initialize codes counter

Signed-off-by: Katrina Rogan <[email protected]>

* revert go.mod changes

Signed-off-by: Katrina Rogan <[email protected]>

* scrub

Signed-off-by: Katrina Rogan <[email protected]>
  • Loading branch information
Katrina Rogan authored Oct 8, 2021
1 parent 2e19960 commit e7edfbd
Show file tree
Hide file tree
Showing 13 changed files with 64 additions and 42 deletions.
5 changes: 4 additions & 1 deletion flyteadmin/cmd/entrypoints/k8s_secret.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,10 @@ func persistSecrets(ctx context.Context, _ *pflag.FlagSet) error {
serverCfg := config.GetConfig()
configuration := runtime.NewConfigurationProvider()
scope := promutils.NewScope(configuration.ApplicationConfiguration().GetTopLevelConfig().MetricsScope)
clusterClient, err := impl.NewInCluster(scope.NewSubScope("secrets"), serverCfg.KubeConfig, serverCfg.Master)
initializationErrorCounter := scope.NewSubScope("secrets").MustNewCounter(
"flyteclient_initialization_error",
"count of errors encountered initializing a flyte client from kube config")
clusterClient, err := impl.NewInCluster(initializationErrorCounter, serverCfg.KubeConfig, serverCfg.Master)
if err != nil {
return err
}
Expand Down
5 changes: 3 additions & 2 deletions flyteadmin/cmd/scheduler/entrypoints/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ var schedulerRunCmd = &cobra.Command{

// Define the schedulerScope for prometheus metrics
schedulerScope := promutils.NewScope(applicationConfiguration.MetricsScope).NewSubScope("flytescheduler")
schedulerPanics := schedulerScope.MustNewCounter("initialization_panic",
"panics encountered initializing the flyte native scheduler")

defer func() {
if err := recover(); err != nil {
schedulerScope.MustNewCounter("initialization_panic",
"panics encountered initializing the flyte native scheduler").Inc()
schedulerPanics.Inc()
logger.Fatalf(ctx, fmt.Sprintf("caught panic: %v [%+v]", err, string(debug.Stack())))
}
}()
Expand Down
7 changes: 4 additions & 3 deletions flyteadmin/pkg/async/schedule/aws/workflow_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type workflowExecutorMetrics struct {
ScheduledEventProcessingDelay labeled.StopWatch
CreateExecutionDuration labeled.StopWatch
ChannelClosedError prometheus.Counter
StopSubscriberFailed prometheus.Counter
}

type workflowExecutor struct {
Expand Down Expand Up @@ -267,8 +268,7 @@ func (e *workflowExecutor) Stop() error {
err := e.subscriber.Stop()
if err != nil {
logger.Warningf(context.Background(), "failed to stop workflow executor with err %v", err)
e.metrics.Scope.MustNewCounter("stop_subscriber_failed",
"failures stopping the event subscriber").Inc()
e.metrics.StopSubscriberFailed.Inc()
return errors.NewFlyteAdminErrorf(codes.Internal, "failed to stop workflow executor with err %v", err)
}
return nil
Expand Down Expand Up @@ -303,7 +303,8 @@ func newWorkflowExecutorMetrics(scope promutils.Scope) workflowExecutorMetrics {
CreateExecutionDuration: labeled.NewStopWatch("create_execution_duration",
"time spent waiting on the call to CreateExecution to return",
time.Second, scope, labeled.EmitUnlabeledMetric),
ChannelClosedError: scope.MustNewCounter("channel_closed_error", "count of channel closing errors"),
ChannelClosedError: scope.MustNewCounter("channel_closed_error", "count of channel closing errors"),
StopSubscriberFailed: scope.MustNewCounter("stop_subscriber_failed", "failures stopping the event subscriber"),
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"github.com/flyteorg/flyteadmin/pkg/flytek8s"
runtime "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces"
flyteclient "github.com/flyteorg/flytepropeller/pkg/client/clientset/versioned"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -14,12 +14,12 @@ import (
type clusterExecutionTargetProvider struct{}

// Creates a new Execution target for a cluster based on config passed in.
func (c *clusterExecutionTargetProvider) GetExecutionTarget(scope promutils.Scope, k8sCluster runtime.ClusterConfig) (*executioncluster.ExecutionTarget, error) {
func (c *clusterExecutionTargetProvider) GetExecutionTarget(initializationErrorCounter prometheus.Counter, k8sCluster runtime.ClusterConfig) (*executioncluster.ExecutionTarget, error) {
kubeConf, err := flytek8s.GetRestClientConfigForCluster(k8sCluster)
if err != nil {
return nil, err
}
flyteClient, err := getRestClientFromKubeConfig(scope, kubeConf)
flyteClient, err := getRestClientFromKubeConfig(initializationErrorCounter, kubeConf)
if err != nil {
return nil, err
}
Expand All @@ -41,12 +41,10 @@ func (c *clusterExecutionTargetProvider) GetExecutionTarget(scope promutils.Scop
}, nil
}

func getRestClientFromKubeConfig(scope promutils.Scope, kubeConfiguration *rest.Config) (*flyteclient.Clientset, error) {
func getRestClientFromKubeConfig(initializationErrorCounter prometheus.Counter, kubeConfiguration *rest.Config) (*flyteclient.Clientset, error) {
fc, err := flyteclient.NewForConfig(kubeConfiguration)
if err != nil {
scope.MustNewCounter(
"flyteclient_initialization_error",
"count of errors encountered initializing a flyte client from kube config").Inc()
initializationErrorCounter.Inc()
return nil, err
}
return fc, nil
Expand Down
7 changes: 5 additions & 2 deletions flyteadmin/pkg/executioncluster/impl/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,18 @@ import (
)

func GetExecutionCluster(scope promutils.Scope, kubeConfig, master string, config interfaces.Configuration, db repositories.RepositoryInterface) executioncluster_interface.ClusterInterface {
initializationErrorCounter := scope.MustNewCounter(
"flyteclient_initialization_error",
"count of errors encountered initializing a flyte client from kube config")
switch len(config.ClusterConfiguration().GetClusterConfigs()) {
case 0:
cluster, err := NewInCluster(scope, kubeConfig, master)
cluster, err := NewInCluster(initializationErrorCounter, kubeConfig, master)
if err != nil {
panic(err)
}
return cluster
default:
cluster, err := NewRandomClusterSelector(scope, config, &clusterExecutionTargetProvider{}, db)
cluster, err := NewRandomClusterSelector(initializationErrorCounter, config, &clusterExecutionTargetProvider{}, db)
if err != nil {
panic(err)
}
Expand Down
7 changes: 4 additions & 3 deletions flyteadmin/pkg/executioncluster/impl/in_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import (
"context"
"fmt"

"github.com/prometheus/client_golang/prometheus"

"github.com/flyteorg/flyteadmin/pkg/executioncluster"
"github.com/flyteorg/flyteadmin/pkg/executioncluster/interfaces"
"github.com/flyteorg/flyteadmin/pkg/flytek8s"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/pkg/errors"
"k8s.io/client-go/dynamic"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -30,12 +31,12 @@ func (i InCluster) GetAllValidTargets() []executioncluster.ExecutionTarget {
}
}

func NewInCluster(scope promutils.Scope, kubeConfig, master string) (interfaces.ClusterInterface, error) {
func NewInCluster(initializationErrorCounter prometheus.Counter, kubeConfig, master string) (interfaces.ClusterInterface, error) {
clientConfig, err := flytek8s.GetRestClientConfig(kubeConfig, master, nil)
if err != nil {
return nil, err
}
flyteClient, err := getRestClientFromKubeConfig(scope, clientConfig)
flyteClient, err := getRestClientFromKubeConfig(initializationErrorCounter, clientConfig)
if err != nil {
return nil, err
}
Expand Down
11 changes: 6 additions & 5 deletions flyteadmin/pkg/executioncluster/impl/random_cluster_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"hash/fnv"
"math/rand"

"github.com/prometheus/client_golang/prometheus"

"github.com/flyteorg/flytestdlib/logger"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
Expand All @@ -22,7 +24,6 @@ import (
"github.com/flyteorg/flytestdlib/random"

runtime "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flytestdlib/promutils"
)

// Implementation of Random cluster selector
Expand All @@ -44,15 +45,15 @@ func getRandSource(seed string) (rand.Source, error) {
return rand.NewSource(hashedSeed), nil
}

func getExecutionTargets(ctx context.Context, scope promutils.Scope, executionTargetProvider interfaces.ExecutionTargetProvider,
func getExecutionTargets(ctx context.Context, initializationErrorCounter prometheus.Counter, executionTargetProvider interfaces.ExecutionTargetProvider,
clusterConfig runtime.ClusterConfiguration) (random.WeightedRandomList, map[string]executioncluster.ExecutionTarget, error) {
executionTargetMap := make(map[string]executioncluster.ExecutionTarget)
entries := make([]random.Entry, 0)
for _, cluster := range clusterConfig.GetClusterConfigs() {
if _, ok := executionTargetMap[cluster.Name]; ok {
return nil, nil, fmt.Errorf("duplicate clusters for name %s", cluster.Name)
}
executionTarget, err := executionTargetProvider.GetExecutionTarget(scope, cluster)
executionTarget, err := executionTargetProvider.GetExecutionTarget(initializationErrorCounter, cluster)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -166,8 +167,8 @@ func (s RandomClusterSelector) GetTarget(ctx context.Context, spec *executionclu
return &execTarget, nil
}

func NewRandomClusterSelector(scope promutils.Scope, config runtime.Configuration, executionTargetProvider interfaces.ExecutionTargetProvider, db repositories.RepositoryInterface) (interfaces.ClusterInterface, error) {
equalWeightedAllClusters, executionTargetMap, err := getExecutionTargets(context.Background(), scope, executionTargetProvider, config.ClusterConfiguration())
func NewRandomClusterSelector(initializationErrorCounter prometheus.Counter, config runtime.Configuration, executionTargetProvider interfaces.ExecutionTargetProvider, db repositories.RepositoryInterface) (interfaces.ClusterInterface, error) {
equalWeightedAllClusters, executionTargetMap, err := getExecutionTargets(context.Background(), initializationErrorCounter, executionTargetProvider, config.ClusterConfiguration())
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"path/filepath"
"testing"

"github.com/prometheus/client_golang/prometheus"

"github.com/flyteorg/flyteadmin/pkg/errors"
repo_interface "github.com/flyteorg/flyteadmin/pkg/repositories/interfaces"
repo_mock "github.com/flyteorg/flyteadmin/pkg/repositories/mocks"
Expand All @@ -20,8 +22,6 @@ import (
"github.com/flyteorg/flyteadmin/pkg/runtime"
"github.com/flyteorg/flytestdlib/config"
"github.com/flyteorg/flytestdlib/config/viper"
"github.com/flyteorg/flytestdlib/promutils"

"github.com/stretchr/testify/assert"
)

Expand All @@ -43,7 +43,6 @@ func initTestConfig(fileName string) error {
}

func getRandomClusterSelectorForTest(t *testing.T) interfaces2.ClusterInterface {
var clusterScope promutils.Scope
err := initTestConfig("clusters_config.yaml")
assert.NoError(t, err)

Expand Down Expand Up @@ -85,7 +84,8 @@ func getRandomClusterSelectorForTest(t *testing.T) interfaces2.ClusterInterface
return response, nil
}
configProvider := runtime.NewConfigurationProvider()
randomCluster, err := NewRandomClusterSelector(clusterScope, configProvider, &mocks.MockExecutionTargetProvider{}, db)
var initializationErrorCounter prometheus.Counter
randomCluster, err := NewRandomClusterSelector(initializationErrorCounter, configProvider, &mocks.MockExecutionTargetProvider{}, db)
assert.NoError(t, err)
return randomCluster
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package interfaces
import (
"github.com/flyteorg/flyteadmin/pkg/executioncluster"
"github.com/flyteorg/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/prometheus/client_golang/prometheus"
)

type ExecutionTargetProvider interface {
GetExecutionTarget(scope promutils.Scope, k8sCluster interfaces.ClusterConfig) (*executioncluster.ExecutionTarget, error)
GetExecutionTarget(initializationErrorCounter prometheus.Counter, k8sCluster interfaces.ClusterConfig) (*executioncluster.ExecutionTarget, error)
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package mocks
import (
"github.com/flyteorg/flyteadmin/pkg/executioncluster"
"github.com/flyteorg/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/prometheus/client_golang/prometheus"
)

type MockExecutionTargetProvider struct{}

// Creates a new Execution target for a cluster based on config passed in.
func (c *MockExecutionTargetProvider) GetExecutionTarget(scope promutils.Scope, k8sCluster interfaces.ClusterConfig) (*executioncluster.ExecutionTarget, error) {
func (c *MockExecutionTargetProvider) GetExecutionTarget(_ prometheus.Counter, k8sCluster interfaces.ClusterConfig) (*executioncluster.ExecutionTarget, error) {
return &executioncluster.ExecutionTarget{
ID: k8sCluster.Name,
Enabled: k8sCluster.Enabled,
Expand Down
5 changes: 3 additions & 2 deletions flyteadmin/pkg/rpc/adminservice/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,12 @@ func NewAdminServer(kubeConfig, master string) *AdminService {
applicationConfiguration := configuration.ApplicationConfiguration().GetTopLevelConfig()

adminScope := promutils.NewScope(applicationConfiguration.GetMetricsScope()).NewSubScope("admin")
panicCounter := adminScope.MustNewCounter("initialization_panic",
"panics encountered initializing the admin service")

defer func() {
if err := recover(); err != nil {
adminScope.MustNewCounter("initialization_panic",
"panics encountered initializing the admin service").Inc()
panicCounter.Inc()
logger.Fatalf(context.Background(), fmt.Sprintf("caught panic: %v [%+v]", err, string(debug.Stack())))
}
}()
Expand Down
29 changes: 21 additions & 8 deletions flyteadmin/pkg/rpc/adminservice/util/metrics.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
package util

import (
"context"
"fmt"
"time"

"github.com/flyteorg/flytestdlib/logger"

"github.com/flyteorg/flytestdlib/promutils"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc/codes"
)

const maxGRPCStatusCode = 17 // From _maxCode in "google.golang.org/grpc/codes"

type responseCodeMetrics struct {
scope promutils.Scope
responseCodeCounters map[codes.Code]prometheus.Counter
Expand Down Expand Up @@ -36,9 +41,8 @@ func (m *RequestMetrics) Record(code codes.Code) {
}
counter, ok := m.responseCodes.responseCodeCounters[code]
if !ok {
m.responseCodes.responseCodeCounters[code] = m.responseCodes.scope.MustNewCounter(code.String(),
fmt.Sprintf("count of responses returning: %s", code.String()))
counter = m.responseCodes.responseCodeCounters[code]
logger.Warnf(context.TODO(), "encountered unexpected error code [%s]", code.String())
return
}
counter.Inc()
}
Expand All @@ -47,13 +51,22 @@ func (m *RequestMetrics) Success() {
m.Record(codes.OK)
}

func newResponseCodeMetrics(scope promutils.Scope) responseCodeMetrics {
responseCodeCounters := make(map[codes.Code]prometheus.Counter)
for i := 0; i < maxGRPCStatusCode; i++ {
code := codes.Code(i)
responseCodeCounters[code] = scope.MustNewCounter(code.String(),
fmt.Sprintf("count of responses returning: %s", code.String()))
}
return responseCodeMetrics{
scope: scope,
responseCodeCounters: responseCodeCounters,
}
}

func NewRequestMetrics(scope promutils.Scope, method string) RequestMetrics {
methodScope := scope.NewSubScope(method)

responseCodeMetrics := responseCodeMetrics{
scope: methodScope.NewSubScope("codes"),
responseCodeCounters: make(map[codes.Code]prometheus.Counter),
}
responseCodeMetrics := newResponseCodeMetrics(methodScope.NewSubScope("codes"))

return RequestMetrics{
scope: methodScope,
Expand Down
2 changes: 1 addition & 1 deletion flyteadmin/tests/project_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestCreateProject(t *testing.T) {
ResourceType: core.ResourceType_TASK,
},
})
assert.EqualError(t, err, "rpc error: code = NotFound desc = missing entity of type TASK" +
assert.EqualError(t, err, "rpc error: code = NotFound desc = missing entity of type TASK"+
" with identifier project:\"potato\" domain:\"development\" name:\"task\" version:\"1234\" ")
assert.Empty(t, task)

Expand Down

0 comments on commit e7edfbd

Please sign in to comment.