From e7edfbd6b5ca982b41794303cae470a153140bfd Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Fri, 8 Oct 2021 02:07:04 -0700 Subject: [PATCH] Initialize response codes counter (#268) * Initialize codes counter Signed-off-by: Katrina Rogan * revert go.mod changes Signed-off-by: Katrina Rogan * scrub Signed-off-by: Katrina Rogan --- flyteadmin/cmd/entrypoints/k8s_secret.go | 5 +++- .../cmd/scheduler/entrypoints/scheduler.go | 5 ++-- .../async/schedule/aws/workflow_executor.go | 7 +++-- .../impl/cluster_execution_target_provider.go | 12 ++++---- .../pkg/executioncluster/impl/factory.go | 7 +++-- .../pkg/executioncluster/impl/in_cluster.go | 7 +++-- .../impl/random_cluster_selector.go | 11 +++---- .../impl/random_cluster_selector_test.go | 8 ++--- .../interfaces/execution_target_provider.go | 4 +-- .../mocks/execution_target_provider.go | 4 +-- flyteadmin/pkg/rpc/adminservice/base.go | 5 ++-- .../pkg/rpc/adminservice/util/metrics.go | 29 ++++++++++++++----- flyteadmin/tests/project_test.go | 2 +- 13 files changed, 64 insertions(+), 42 deletions(-) diff --git a/flyteadmin/cmd/entrypoints/k8s_secret.go b/flyteadmin/cmd/entrypoints/k8s_secret.go index 5671e70750..1295c67087 100644 --- a/flyteadmin/cmd/entrypoints/k8s_secret.go +++ b/flyteadmin/cmd/entrypoints/k8s_secret.go @@ -96,7 +96,10 @@ func persistSecrets(ctx context.Context, _ *pflag.FlagSet) error { serverCfg := config.GetConfig() configuration := runtime.NewConfigurationProvider() scope := promutils.NewScope(configuration.ApplicationConfiguration().GetTopLevelConfig().MetricsScope) - clusterClient, err := impl.NewInCluster(scope.NewSubScope("secrets"), serverCfg.KubeConfig, serverCfg.Master) + initializationErrorCounter := scope.NewSubScope("secrets").MustNewCounter( + "flyteclient_initialization_error", + "count of errors encountered initializing a flyte client from kube config") + clusterClient, err := impl.NewInCluster(initializationErrorCounter, serverCfg.KubeConfig, serverCfg.Master) if err != nil { return err } diff --git a/flyteadmin/cmd/scheduler/entrypoints/scheduler.go b/flyteadmin/cmd/scheduler/entrypoints/scheduler.go index e953ced176..341777e1ab 100644 --- a/flyteadmin/cmd/scheduler/entrypoints/scheduler.go +++ b/flyteadmin/cmd/scheduler/entrypoints/scheduler.go @@ -32,11 +32,12 @@ var schedulerRunCmd = &cobra.Command{ // Define the schedulerScope for prometheus metrics schedulerScope := promutils.NewScope(applicationConfiguration.MetricsScope).NewSubScope("flytescheduler") + schedulerPanics := schedulerScope.MustNewCounter("initialization_panic", + "panics encountered initializing the flyte native scheduler") defer func() { if err := recover(); err != nil { - schedulerScope.MustNewCounter("initialization_panic", - "panics encountered initializing the flyte native scheduler").Inc() + schedulerPanics.Inc() logger.Fatalf(ctx, fmt.Sprintf("caught panic: %v [%+v]", err, string(debug.Stack()))) } }() diff --git a/flyteadmin/pkg/async/schedule/aws/workflow_executor.go b/flyteadmin/pkg/async/schedule/aws/workflow_executor.go index 03064fef6b..b67b53fd0b 100644 --- a/flyteadmin/pkg/async/schedule/aws/workflow_executor.go +++ b/flyteadmin/pkg/async/schedule/aws/workflow_executor.go @@ -46,6 +46,7 @@ type workflowExecutorMetrics struct { ScheduledEventProcessingDelay labeled.StopWatch CreateExecutionDuration labeled.StopWatch ChannelClosedError prometheus.Counter + StopSubscriberFailed prometheus.Counter } type workflowExecutor struct { @@ -267,8 +268,7 @@ func (e *workflowExecutor) Stop() error { err := e.subscriber.Stop() if err != nil { logger.Warningf(context.Background(), "failed to stop workflow executor with err %v", err) - e.metrics.Scope.MustNewCounter("stop_subscriber_failed", - "failures stopping the event subscriber").Inc() + e.metrics.StopSubscriberFailed.Inc() return errors.NewFlyteAdminErrorf(codes.Internal, "failed to stop workflow executor with err %v", err) } return nil @@ -303,7 +303,8 @@ func newWorkflowExecutorMetrics(scope promutils.Scope) workflowExecutorMetrics { CreateExecutionDuration: labeled.NewStopWatch("create_execution_duration", "time spent waiting on the call to CreateExecution to return", time.Second, scope, labeled.EmitUnlabeledMetric), - ChannelClosedError: scope.MustNewCounter("channel_closed_error", "count of channel closing errors"), + ChannelClosedError: scope.MustNewCounter("channel_closed_error", "count of channel closing errors"), + StopSubscriberFailed: scope.MustNewCounter("stop_subscriber_failed", "failures stopping the event subscriber"), } } diff --git a/flyteadmin/pkg/executioncluster/impl/cluster_execution_target_provider.go b/flyteadmin/pkg/executioncluster/impl/cluster_execution_target_provider.go index 98919f2990..ba54a08463 100644 --- a/flyteadmin/pkg/executioncluster/impl/cluster_execution_target_provider.go +++ b/flyteadmin/pkg/executioncluster/impl/cluster_execution_target_provider.go @@ -5,7 +5,7 @@ import ( "github.com/flyteorg/flyteadmin/pkg/flytek8s" runtime "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces" flyteclient "github.com/flyteorg/flytepropeller/pkg/client/clientset/versioned" - "github.com/flyteorg/flytestdlib/promutils" + "github.com/prometheus/client_golang/prometheus" "k8s.io/client-go/dynamic" "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/client" @@ -14,12 +14,12 @@ import ( type clusterExecutionTargetProvider struct{} // Creates a new Execution target for a cluster based on config passed in. -func (c *clusterExecutionTargetProvider) GetExecutionTarget(scope promutils.Scope, k8sCluster runtime.ClusterConfig) (*executioncluster.ExecutionTarget, error) { +func (c *clusterExecutionTargetProvider) GetExecutionTarget(initializationErrorCounter prometheus.Counter, k8sCluster runtime.ClusterConfig) (*executioncluster.ExecutionTarget, error) { kubeConf, err := flytek8s.GetRestClientConfigForCluster(k8sCluster) if err != nil { return nil, err } - flyteClient, err := getRestClientFromKubeConfig(scope, kubeConf) + flyteClient, err := getRestClientFromKubeConfig(initializationErrorCounter, kubeConf) if err != nil { return nil, err } @@ -41,12 +41,10 @@ func (c *clusterExecutionTargetProvider) GetExecutionTarget(scope promutils.Scop }, nil } -func getRestClientFromKubeConfig(scope promutils.Scope, kubeConfiguration *rest.Config) (*flyteclient.Clientset, error) { +func getRestClientFromKubeConfig(initializationErrorCounter prometheus.Counter, kubeConfiguration *rest.Config) (*flyteclient.Clientset, error) { fc, err := flyteclient.NewForConfig(kubeConfiguration) if err != nil { - scope.MustNewCounter( - "flyteclient_initialization_error", - "count of errors encountered initializing a flyte client from kube config").Inc() + initializationErrorCounter.Inc() return nil, err } return fc, nil diff --git a/flyteadmin/pkg/executioncluster/impl/factory.go b/flyteadmin/pkg/executioncluster/impl/factory.go index 04f7d3f3d6..7224004369 100644 --- a/flyteadmin/pkg/executioncluster/impl/factory.go +++ b/flyteadmin/pkg/executioncluster/impl/factory.go @@ -8,15 +8,18 @@ import ( ) func GetExecutionCluster(scope promutils.Scope, kubeConfig, master string, config interfaces.Configuration, db repositories.RepositoryInterface) executioncluster_interface.ClusterInterface { + initializationErrorCounter := scope.MustNewCounter( + "flyteclient_initialization_error", + "count of errors encountered initializing a flyte client from kube config") switch len(config.ClusterConfiguration().GetClusterConfigs()) { case 0: - cluster, err := NewInCluster(scope, kubeConfig, master) + cluster, err := NewInCluster(initializationErrorCounter, kubeConfig, master) if err != nil { panic(err) } return cluster default: - cluster, err := NewRandomClusterSelector(scope, config, &clusterExecutionTargetProvider{}, db) + cluster, err := NewRandomClusterSelector(initializationErrorCounter, config, &clusterExecutionTargetProvider{}, db) if err != nil { panic(err) } diff --git a/flyteadmin/pkg/executioncluster/impl/in_cluster.go b/flyteadmin/pkg/executioncluster/impl/in_cluster.go index eabbbb2ba8..175cf0a41d 100644 --- a/flyteadmin/pkg/executioncluster/impl/in_cluster.go +++ b/flyteadmin/pkg/executioncluster/impl/in_cluster.go @@ -4,10 +4,11 @@ import ( "context" "fmt" + "github.com/prometheus/client_golang/prometheus" + "github.com/flyteorg/flyteadmin/pkg/executioncluster" "github.com/flyteorg/flyteadmin/pkg/executioncluster/interfaces" "github.com/flyteorg/flyteadmin/pkg/flytek8s" - "github.com/flyteorg/flytestdlib/promutils" "github.com/pkg/errors" "k8s.io/client-go/dynamic" "sigs.k8s.io/controller-runtime/pkg/client" @@ -30,12 +31,12 @@ func (i InCluster) GetAllValidTargets() []executioncluster.ExecutionTarget { } } -func NewInCluster(scope promutils.Scope, kubeConfig, master string) (interfaces.ClusterInterface, error) { +func NewInCluster(initializationErrorCounter prometheus.Counter, kubeConfig, master string) (interfaces.ClusterInterface, error) { clientConfig, err := flytek8s.GetRestClientConfig(kubeConfig, master, nil) if err != nil { return nil, err } - flyteClient, err := getRestClientFromKubeConfig(scope, clientConfig) + flyteClient, err := getRestClientFromKubeConfig(initializationErrorCounter, clientConfig) if err != nil { return nil, err } diff --git a/flyteadmin/pkg/executioncluster/impl/random_cluster_selector.go b/flyteadmin/pkg/executioncluster/impl/random_cluster_selector.go index cf210febf0..87d77a34bc 100644 --- a/flyteadmin/pkg/executioncluster/impl/random_cluster_selector.go +++ b/flyteadmin/pkg/executioncluster/impl/random_cluster_selector.go @@ -6,6 +6,8 @@ import ( "hash/fnv" "math/rand" + "github.com/prometheus/client_golang/prometheus" + "github.com/flyteorg/flytestdlib/logger" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" @@ -22,7 +24,6 @@ import ( "github.com/flyteorg/flytestdlib/random" runtime "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces" - "github.com/flyteorg/flytestdlib/promutils" ) // Implementation of Random cluster selector @@ -44,7 +45,7 @@ func getRandSource(seed string) (rand.Source, error) { return rand.NewSource(hashedSeed), nil } -func getExecutionTargets(ctx context.Context, scope promutils.Scope, executionTargetProvider interfaces.ExecutionTargetProvider, +func getExecutionTargets(ctx context.Context, initializationErrorCounter prometheus.Counter, executionTargetProvider interfaces.ExecutionTargetProvider, clusterConfig runtime.ClusterConfiguration) (random.WeightedRandomList, map[string]executioncluster.ExecutionTarget, error) { executionTargetMap := make(map[string]executioncluster.ExecutionTarget) entries := make([]random.Entry, 0) @@ -52,7 +53,7 @@ func getExecutionTargets(ctx context.Context, scope promutils.Scope, executionTa if _, ok := executionTargetMap[cluster.Name]; ok { return nil, nil, fmt.Errorf("duplicate clusters for name %s", cluster.Name) } - executionTarget, err := executionTargetProvider.GetExecutionTarget(scope, cluster) + executionTarget, err := executionTargetProvider.GetExecutionTarget(initializationErrorCounter, cluster) if err != nil { return nil, nil, err } @@ -166,8 +167,8 @@ func (s RandomClusterSelector) GetTarget(ctx context.Context, spec *executionclu return &execTarget, nil } -func NewRandomClusterSelector(scope promutils.Scope, config runtime.Configuration, executionTargetProvider interfaces.ExecutionTargetProvider, db repositories.RepositoryInterface) (interfaces.ClusterInterface, error) { - equalWeightedAllClusters, executionTargetMap, err := getExecutionTargets(context.Background(), scope, executionTargetProvider, config.ClusterConfiguration()) +func NewRandomClusterSelector(initializationErrorCounter prometheus.Counter, config runtime.Configuration, executionTargetProvider interfaces.ExecutionTargetProvider, db repositories.RepositoryInterface) (interfaces.ClusterInterface, error) { + equalWeightedAllClusters, executionTargetMap, err := getExecutionTargets(context.Background(), initializationErrorCounter, executionTargetProvider, config.ClusterConfiguration()) if err != nil { return nil, err } diff --git a/flyteadmin/pkg/executioncluster/impl/random_cluster_selector_test.go b/flyteadmin/pkg/executioncluster/impl/random_cluster_selector_test.go index 866c06c891..7a1d8ee249 100644 --- a/flyteadmin/pkg/executioncluster/impl/random_cluster_selector_test.go +++ b/flyteadmin/pkg/executioncluster/impl/random_cluster_selector_test.go @@ -6,6 +6,8 @@ import ( "path/filepath" "testing" + "github.com/prometheus/client_golang/prometheus" + "github.com/flyteorg/flyteadmin/pkg/errors" repo_interface "github.com/flyteorg/flyteadmin/pkg/repositories/interfaces" repo_mock "github.com/flyteorg/flyteadmin/pkg/repositories/mocks" @@ -20,8 +22,6 @@ import ( "github.com/flyteorg/flyteadmin/pkg/runtime" "github.com/flyteorg/flytestdlib/config" "github.com/flyteorg/flytestdlib/config/viper" - "github.com/flyteorg/flytestdlib/promutils" - "github.com/stretchr/testify/assert" ) @@ -43,7 +43,6 @@ func initTestConfig(fileName string) error { } func getRandomClusterSelectorForTest(t *testing.T) interfaces2.ClusterInterface { - var clusterScope promutils.Scope err := initTestConfig("clusters_config.yaml") assert.NoError(t, err) @@ -85,7 +84,8 @@ func getRandomClusterSelectorForTest(t *testing.T) interfaces2.ClusterInterface return response, nil } configProvider := runtime.NewConfigurationProvider() - randomCluster, err := NewRandomClusterSelector(clusterScope, configProvider, &mocks.MockExecutionTargetProvider{}, db) + var initializationErrorCounter prometheus.Counter + randomCluster, err := NewRandomClusterSelector(initializationErrorCounter, configProvider, &mocks.MockExecutionTargetProvider{}, db) assert.NoError(t, err) return randomCluster } diff --git a/flyteadmin/pkg/executioncluster/interfaces/execution_target_provider.go b/flyteadmin/pkg/executioncluster/interfaces/execution_target_provider.go index 57376738a1..66ac5b49ac 100644 --- a/flyteadmin/pkg/executioncluster/interfaces/execution_target_provider.go +++ b/flyteadmin/pkg/executioncluster/interfaces/execution_target_provider.go @@ -3,9 +3,9 @@ package interfaces import ( "github.com/flyteorg/flyteadmin/pkg/executioncluster" "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces" - "github.com/flyteorg/flytestdlib/promutils" + "github.com/prometheus/client_golang/prometheus" ) type ExecutionTargetProvider interface { - GetExecutionTarget(scope promutils.Scope, k8sCluster interfaces.ClusterConfig) (*executioncluster.ExecutionTarget, error) + GetExecutionTarget(initializationErrorCounter prometheus.Counter, k8sCluster interfaces.ClusterConfig) (*executioncluster.ExecutionTarget, error) } diff --git a/flyteadmin/pkg/executioncluster/mocks/execution_target_provider.go b/flyteadmin/pkg/executioncluster/mocks/execution_target_provider.go index a849a71268..562e1c27f5 100644 --- a/flyteadmin/pkg/executioncluster/mocks/execution_target_provider.go +++ b/flyteadmin/pkg/executioncluster/mocks/execution_target_provider.go @@ -3,13 +3,13 @@ package mocks import ( "github.com/flyteorg/flyteadmin/pkg/executioncluster" "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces" - "github.com/flyteorg/flytestdlib/promutils" + "github.com/prometheus/client_golang/prometheus" ) type MockExecutionTargetProvider struct{} // Creates a new Execution target for a cluster based on config passed in. -func (c *MockExecutionTargetProvider) GetExecutionTarget(scope promutils.Scope, k8sCluster interfaces.ClusterConfig) (*executioncluster.ExecutionTarget, error) { +func (c *MockExecutionTargetProvider) GetExecutionTarget(_ prometheus.Counter, k8sCluster interfaces.ClusterConfig) (*executioncluster.ExecutionTarget, error) { return &executioncluster.ExecutionTarget{ ID: k8sCluster.Name, Enabled: k8sCluster.Enabled, diff --git a/flyteadmin/pkg/rpc/adminservice/base.go b/flyteadmin/pkg/rpc/adminservice/base.go index c3fa21eae1..7b6ffe2121 100644 --- a/flyteadmin/pkg/rpc/adminservice/base.go +++ b/flyteadmin/pkg/rpc/adminservice/base.go @@ -61,11 +61,12 @@ func NewAdminServer(kubeConfig, master string) *AdminService { applicationConfiguration := configuration.ApplicationConfiguration().GetTopLevelConfig() adminScope := promutils.NewScope(applicationConfiguration.GetMetricsScope()).NewSubScope("admin") + panicCounter := adminScope.MustNewCounter("initialization_panic", + "panics encountered initializing the admin service") defer func() { if err := recover(); err != nil { - adminScope.MustNewCounter("initialization_panic", - "panics encountered initializing the admin service").Inc() + panicCounter.Inc() logger.Fatalf(context.Background(), fmt.Sprintf("caught panic: %v [%+v]", err, string(debug.Stack()))) } }() diff --git a/flyteadmin/pkg/rpc/adminservice/util/metrics.go b/flyteadmin/pkg/rpc/adminservice/util/metrics.go index 2e58ea2fd6..bf726db85a 100644 --- a/flyteadmin/pkg/rpc/adminservice/util/metrics.go +++ b/flyteadmin/pkg/rpc/adminservice/util/metrics.go @@ -1,14 +1,19 @@ package util import ( + "context" "fmt" "time" + "github.com/flyteorg/flytestdlib/logger" + "github.com/flyteorg/flytestdlib/promutils" "github.com/prometheus/client_golang/prometheus" "google.golang.org/grpc/codes" ) +const maxGRPCStatusCode = 17 // From _maxCode in "google.golang.org/grpc/codes" + type responseCodeMetrics struct { scope promutils.Scope responseCodeCounters map[codes.Code]prometheus.Counter @@ -36,9 +41,8 @@ func (m *RequestMetrics) Record(code codes.Code) { } counter, ok := m.responseCodes.responseCodeCounters[code] if !ok { - m.responseCodes.responseCodeCounters[code] = m.responseCodes.scope.MustNewCounter(code.String(), - fmt.Sprintf("count of responses returning: %s", code.String())) - counter = m.responseCodes.responseCodeCounters[code] + logger.Warnf(context.TODO(), "encountered unexpected error code [%s]", code.String()) + return } counter.Inc() } @@ -47,13 +51,22 @@ func (m *RequestMetrics) Success() { m.Record(codes.OK) } +func newResponseCodeMetrics(scope promutils.Scope) responseCodeMetrics { + responseCodeCounters := make(map[codes.Code]prometheus.Counter) + for i := 0; i < maxGRPCStatusCode; i++ { + code := codes.Code(i) + responseCodeCounters[code] = scope.MustNewCounter(code.String(), + fmt.Sprintf("count of responses returning: %s", code.String())) + } + return responseCodeMetrics{ + scope: scope, + responseCodeCounters: responseCodeCounters, + } +} + func NewRequestMetrics(scope promutils.Scope, method string) RequestMetrics { methodScope := scope.NewSubScope(method) - - responseCodeMetrics := responseCodeMetrics{ - scope: methodScope.NewSubScope("codes"), - responseCodeCounters: make(map[codes.Code]prometheus.Counter), - } + responseCodeMetrics := newResponseCodeMetrics(methodScope.NewSubScope("codes")) return RequestMetrics{ scope: methodScope, diff --git a/flyteadmin/tests/project_test.go b/flyteadmin/tests/project_test.go index d46849d8be..aa557445f3 100644 --- a/flyteadmin/tests/project_test.go +++ b/flyteadmin/tests/project_test.go @@ -28,7 +28,7 @@ func TestCreateProject(t *testing.T) { ResourceType: core.ResourceType_TASK, }, }) - assert.EqualError(t, err, "rpc error: code = NotFound desc = missing entity of type TASK" + + assert.EqualError(t, err, "rpc error: code = NotFound desc = missing entity of type TASK"+ " with identifier project:\"potato\" domain:\"development\" name:\"task\" version:\"1234\" ") assert.Empty(t, task)